From d9b95f43e80139aba5368d387e129511a339bd9b Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Thu, 7 May 2026 14:30:04 +0200 Subject: [PATCH 01/22] wip --- objectstore-server/docs/architecture.md | 16 + objectstore-server/src/auth/service.rs | 79 ++++ objectstore-server/src/endpoints/mod.rs | 4 +- objectstore-server/src/endpoints/multipart.rs | 338 ++++++++++++++++++ objectstore-service/docs/architecture.md | 8 + objectstore-service/src/backend/common.rs | 9 + objectstore-service/src/backend/gcs.rs | 4 + objectstore-service/src/backend/in_memory.rs | 4 + objectstore-service/src/backend/local_fs.rs | 4 + objectstore-service/src/backend/testing.rs | 4 + objectstore-service/src/backend/tiered.rs | 4 + objectstore-service/src/service.rs | 119 ++++++ 12 files changed, 592 insertions(+), 1 deletion(-) create mode 100644 objectstore-server/src/endpoints/multipart.rs diff --git a/objectstore-server/docs/architecture.md b/objectstore-server/docs/architecture.md index b31f6d1f..5173ede4 100644 --- a/objectstore-server/docs/architecture.md +++ b/objectstore-server/docs/architecture.md @@ -18,6 +18,22 @@ All object operations live under the `/v1/` prefix: | `DELETE` | `/v1/objects/{usecase}/{scopes}/{key}` | Delete object | | `POST` | `/v1/objects:batch/{usecase}/{scopes}/` | Batch operations (multipart) | +### Multipart Upload Endpoints + +| Method | Path | Description | +|----------|--------------------------------------------------------------|--------------------------------------| +| `POST` | `/v1/objects:multipart:initiate/{usecase}/{scopes}/` | Initiate upload (server-generated key) | +| `PUT` | `/v1/objects:multipart:initiate/{usecase}/{scopes}/{key}` | Initiate upload (user-provided key) | +| `PUT` | `/v1/objects:multipart:parts/{usecase}/{scopes}/{key}` | Upload a part (`uploadId`, `partNumber` query params) | +| `GET` | `/v1/objects:multipart:parts/{usecase}/{scopes}/{key}` | List uploaded parts (`uploadId` query param) | +| `POST` | `/v1/objects:multipart:complete/{usecase}/{scopes}/{key}` | Complete upload (`uploadId` query param) | +| `DELETE` | `/v1/objects:multipart/{usecase}/{scopes}/{key}` | Abort upload (`uploadId` query param) | + +The complete endpoint returns `200 OK` immediately with chunked transfer +encoding, sends whitespace keepalive bytes while the backend assembles the +object, then writes the final JSON result (success or error) into the response +body. Clients must parse the body to determine the actual outcome. + Scopes are encoded in the URL path using Matrix URI syntax: `org=123;project=456`. An underscore (`_`) represents empty scopes. diff --git a/objectstore-server/src/auth/service.rs b/objectstore-server/src/auth/service.rs index 5d51fb9f..72d942fa 100644 --- a/objectstore-server/src/auth/service.rs +++ b/objectstore-server/src/auth/service.rs @@ -1,4 +1,8 @@ use objectstore_service::id::{ObjectContext, ObjectId}; +use objectstore_service::multipart::{ + AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse, + ListPartsResponse, PartNumber, UploadId, UploadPartResponse, +}; use objectstore_service::service::{DeleteResponse, GetResponse, InsertResponse, MetadataResponse}; use objectstore_service::{ClientStream, StorageService}; use objectstore_types::auth::Permission; @@ -116,4 +120,79 @@ impl AuthAwareService { self.assert_authorized(Permission::ObjectDelete, id.context())?; Ok(self.service.delete_object(id).await?) } + + // --- Multipart upload operations --- + + /// Auth-aware wrapper around [`StorageService::initiate_multipart`]. + pub async fn initiate_multipart( + &self, + id: ObjectId, + metadata: Metadata, + ) -> ApiResult { + self.assert_authorized(Permission::ObjectWrite, id.context())?; + Ok(self.service.initiate_multipart(id, metadata).await?) + } + + /// Auth-aware wrapper around [`StorageService::upload_part`]. + pub async fn upload_part( + &self, + id: ObjectId, + upload_id: UploadId, + part_number: PartNumber, + content_length: u64, + content_md5: Option, + body: ClientStream, + ) -> ApiResult { + self.assert_authorized(Permission::ObjectWrite, id.context())?; + Ok(self + .service + .upload_part( + id, + upload_id, + part_number, + content_length, + content_md5, + body, + ) + .await?) + } + + /// Auth-aware wrapper around [`StorageService::list_parts`]. + pub async fn list_parts( + &self, + id: ObjectId, + upload_id: UploadId, + max_parts: Option, + part_number_marker: Option, + ) -> ApiResult { + self.assert_authorized(Permission::ObjectRead, id.context())?; + Ok(self + .service + .list_parts(id, upload_id, max_parts, part_number_marker) + .await?) + } + + /// Auth-aware wrapper around [`StorageService::abort_multipart`]. + pub async fn abort_multipart( + &self, + id: ObjectId, + upload_id: UploadId, + ) -> ApiResult { + self.assert_authorized(Permission::ObjectDelete, id.context())?; + Ok(self.service.abort_multipart(id, upload_id).await?) + } + + /// Auth-aware wrapper around [`StorageService::complete_multipart`]. + pub async fn complete_multipart( + &self, + id: ObjectId, + upload_id: UploadId, + parts: Vec, + ) -> ApiResult { + self.assert_authorized(Permission::ObjectWrite, id.context())?; + Ok(self + .service + .complete_multipart(id, upload_id, parts) + .await?) + } } diff --git a/objectstore-server/src/endpoints/mod.rs b/objectstore-server/src/endpoints/mod.rs index 7ab06cdd..a29f5060 100644 --- a/objectstore-server/src/endpoints/mod.rs +++ b/objectstore-server/src/endpoints/mod.rs @@ -10,6 +10,7 @@ mod batch; pub mod common; pub mod health; mod keda; +mod multipart; mod objects; /// Returns `true` for internal endpoints that are exempt from metrics and concurrency limits. @@ -24,7 +25,8 @@ pub fn is_internal_route(route: &str) -> bool { pub fn routes() -> Router { let routes_v1 = Router::new() .merge(objects::router()) - .merge(batch::router()); + .merge(batch::router()) + .merge(multipart::router()); Router::new() .merge(health::router()) diff --git a/objectstore-server/src/endpoints/multipart.rs b/objectstore-server/src/endpoints/multipart.rs new file mode 100644 index 00000000..41a91356 --- /dev/null +++ b/objectstore-server/src/endpoints/multipart.rs @@ -0,0 +1,338 @@ +use std::collections::BTreeMap; +use std::convert::Infallible; +use std::time::{Duration, SystemTime}; + +use axum::body::Body; +use axum::extract::{Query, State}; +use axum::http::{HeaderMap, StatusCode}; +use axum::response::{IntoResponse, Response}; +use axum::routing; +use axum::{Json, Router}; +use bytes::Bytes; +use objectstore_service::error::Error as ServiceError; +use objectstore_service::id::{ObjectContext, ObjectId}; +use objectstore_service::multipart::CompletedPart; +use objectstore_types::metadata::Metadata; +use serde::{Deserialize, Serialize}; + +use crate::auth::AuthAwareService; +use crate::endpoints::common::{ApiError, ApiResult}; +use crate::extractors::Xt; +use crate::extractors::body::MeteredBody; +use crate::state::ServiceState; + +pub fn router() -> Router { + let initiate_collection = routing::post(initiate_post); + let initiate_object = routing::put(initiate_put); + + let parts = routing::put(upload_part).get(list_parts); + + Router::new() + .route( + "/objects:multipart:initiate/{usecase}/{scopes}", + initiate_collection.clone(), + ) + .route( + "/objects:multipart:initiate/{usecase}/{scopes}/", + initiate_collection, + ) + .route( + "/objects:multipart:initiate/{usecase}/{scopes}/{*key}", + initiate_object, + ) + .route("/objects:multipart:parts/{usecase}/{scopes}/{*key}", parts) + .route( + "/objects:multipart:complete/{usecase}/{scopes}/{*key}", + routing::post(complete), + ) + .route( + "/objects:multipart/{usecase}/{scopes}/{*key}", + routing::delete(abort), + ) +} + +// --- Query parameter types --- + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct UploadPartQuery { + upload_id: String, + part_number: u32, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct UploadIdQuery { + upload_id: String, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct ListPartsQuery { + upload_id: String, + max_parts: Option, + part_number_marker: Option, +} + +// --- Request/Response types --- + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +struct InitiateResponse { + upload_id: String, + #[serde(skip_serializing_if = "Option::is_none")] + key: Option, +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +struct UploadPartResponse { + e_tag: String, +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +struct PartInfo { + e_tag: String, + last_modified: u64, + size: u64, +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +struct ListPartsResponse { + parts: BTreeMap, + is_truncated: bool, + #[serde(skip_serializing_if = "Option::is_none")] + next_part_number_marker: Option, +} + +#[derive(Debug, Deserialize)] +struct CompleteRequest { + parts: Vec, +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +struct CompleteSuccessResponse { + key: String, +} + +#[derive(Debug, Serialize)] +struct CompleteErrorDetail { + code: String, + message: String, +} + +#[derive(Debug, Serialize)] +struct CompleteErrorResponse { + error: CompleteErrorDetail, +} + +// --- Handlers --- + +async fn initiate_post( + service: AuthAwareService, + State(state): State, + Xt(context): Xt, + headers: HeaderMap, +) -> ApiResult { + let mut metadata = Metadata::from_headers(&headers, "").map_err(ServiceError::from)?; + metadata.time_created = Some(SystemTime::now()); + + state + .config + .usecases + .validate(&context.usecase, &metadata) + .map_err(|e| ApiError::Client(e.to_string()))?; + + let id = ObjectId::optional(context, None); + let key = id.key().to_string(); + let upload_id = service.initiate_multipart(id, metadata).await?; + + Ok(( + StatusCode::OK, + Json(InitiateResponse { + upload_id, + key: Some(key), + }), + ) + .into_response()) +} + +async fn initiate_put( + service: AuthAwareService, + State(state): State, + Xt(id): Xt, + headers: HeaderMap, +) -> ApiResult { + let mut metadata = Metadata::from_headers(&headers, "").map_err(ServiceError::from)?; + metadata.time_created = Some(SystemTime::now()); + + state + .config + .usecases + .validate(&id.context().usecase, &metadata) + .map_err(|e| ApiError::Client(e.to_string()))?; + + let upload_id = service.initiate_multipart(id, metadata).await?; + + Ok(( + StatusCode::OK, + Json(InitiateResponse { + upload_id, + key: None, + }), + ) + .into_response()) +} + +async fn upload_part( + service: AuthAwareService, + State(_state): State, + Xt(id): Xt, + Query(params): Query, + headers: HeaderMap, + MeteredBody(body): MeteredBody, +) -> ApiResult { + let content_length = headers + .get(axum::http::header::CONTENT_LENGTH) + .and_then(|v| v.to_str().ok()) + .and_then(|v| v.parse::().ok()) + .ok_or_else(|| ApiError::Client("Content-Length header is required".into()))?; + + let content_md5 = headers + .get("content-md5") + .and_then(|v| v.to_str().ok()) + .map(String::from); + + let etag = service + .upload_part( + id, + params.upload_id, + params.part_number, + content_length, + content_md5, + body, + ) + .await?; + + Ok((StatusCode::OK, Json(UploadPartResponse { e_tag: etag })).into_response()) +} + +async fn list_parts( + service: AuthAwareService, + Xt(id): Xt, + Query(params): Query, +) -> ApiResult { + let response = service + .list_parts( + id, + params.upload_id, + params.max_parts, + params.part_number_marker, + ) + .await?; + + let parts = response + .parts + .into_iter() + .map(|p| { + let info = PartInfo { + e_tag: p.etag, + last_modified: p + .last_modified + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(), + size: p.size, + }; + (p.part_number, info) + }) + .collect(); + + Ok(Json(ListPartsResponse { + parts, + is_truncated: response.is_truncated, + next_part_number_marker: response.next_part_number_marker, + }) + .into_response()) +} + +async fn abort( + service: AuthAwareService, + Xt(id): Xt, + Query(params): Query, +) -> ApiResult { + service.abort_multipart(id, params.upload_id).await?; + Ok(StatusCode::NO_CONTENT) +} + +async fn complete( + service: AuthAwareService, + Xt(id): Xt, + Query(params): Query, + Json(body): Json, +) -> ApiResult { + let key = id.key().to_string(); + + let parts: Vec = body + .parts + .into_iter() + .enumerate() + .map(|(i, etag)| CompletedPart { + part_number: (i as u32) + 1, + etag, + }) + .collect(); + + let upload_id = params.upload_id; + + let body_stream = async_stream::stream! { + let mut keepalive = tokio::time::interval(Duration::from_secs(10)); + // Consume the first tick immediately (it fires at t=0). + keepalive.tick().await; + + let result_fut = service.complete_multipart(id, upload_id, parts); + tokio::pin!(result_fut); + + loop { + tokio::select! { + result = &mut result_fut => { + let json = match result { + Ok(None) => serde_json::to_vec( + &CompleteSuccessResponse { key }, + ).unwrap(), + Ok(Some(err)) => serde_json::to_vec( + &CompleteErrorResponse { + error: CompleteErrorDetail { + code: err.code, + message: err.message, + }, + }, + ).unwrap(), + Err(e) => serde_json::to_vec( + &CompleteErrorResponse { + error: CompleteErrorDetail { + code: "internal".into(), + message: e.to_string(), + }, + }, + ).unwrap(), + }; + yield Ok::<_, Infallible>(Bytes::from(json)); + break; + } + _ = keepalive.tick() => { + yield Ok::<_, Infallible>(Bytes::from_static(b" ")); + } + } + } + }; + + Ok(Response::builder() + .status(StatusCode::OK) + .header("content-type", "application/json") + .body(Body::from_stream(body_stream)) + .unwrap()) +} diff --git a/objectstore-service/docs/architecture.md b/objectstore-service/docs/architecture.md index 568420da..0de1ea8c 100644 --- a/objectstore-service/docs/architecture.md +++ b/objectstore-service/docs/architecture.md @@ -166,6 +166,14 @@ with [`Error::AtCapacity`](error::Error::AtCapacity). The default limit is [`DEFAULT_CONCURRENCY_LIMIT`](service::DEFAULT_CONCURRENCY_LIMIT). Callers can override it via [`StorageService::with_concurrency_limit`]. +## Multipart Uploads + +When the configured backend supports it, [`StorageService`] exposes multipart +upload operations (initiate, upload part, list parts, complete, abort). These +delegate to the [`MultipartUploadBackend`](backend::common::MultipartUploadBackend) +trait, accessed via [`Backend::as_multipart_upload`](backend::common::Backend::as_multipart_upload). +Multipart operations share the same concurrency limiter as regular operations. + ## Streaming Concurrency The [`streaming`](streaming) module provides [`StreamExecutor`](streaming::StreamExecutor) diff --git a/objectstore-service/src/backend/common.rs b/objectstore-service/src/backend/common.rs index 6b7a676a..f03afae4 100644 --- a/objectstore-service/src/backend/common.rs +++ b/objectstore-service/src/backend/common.rs @@ -62,6 +62,15 @@ pub trait Backend: fmt::Debug + Send + Sync + 'static { /// (such as [`TieredStorage`](super::tiered::TieredStorage)) should override this /// to wait for those tasks to complete. async fn join(&self) {} + + /// Returns a reference to the [`MultipartUploadBackend`] implementation, + /// if this backend supports multipart uploads. + /// + /// The default returns `None`. Backends that implement + /// [`MultipartUploadBackend`] should override this to return `Some(self)`. + fn as_multipart_upload(&self) -> Option<&dyn MultipartUploadBackend> { + None + } } /// Trait for backends that support our S3-style multipart upload protocol. diff --git a/objectstore-service/src/backend/gcs.rs b/objectstore-service/src/backend/gcs.rs index eddef040..24643fc8 100644 --- a/objectstore-service/src/backend/gcs.rs +++ b/objectstore-service/src/backend/gcs.rs @@ -599,6 +599,10 @@ impl Backend for GcsBackend { "gcs" } + fn as_multipart_upload(&self) -> Option<&dyn MultipartUploadBackend> { + Some(self) + } + #[tracing::instrument(level = "trace", fields(?id), skip_all)] async fn put_object( &self, diff --git a/objectstore-service/src/backend/in_memory.rs b/objectstore-service/src/backend/in_memory.rs index 253aafcd..841d3455 100644 --- a/objectstore-service/src/backend/in_memory.rs +++ b/objectstore-service/src/backend/in_memory.rs @@ -104,6 +104,10 @@ impl super::common::Backend for InMemoryBackend { self.name } + fn as_multipart_upload(&self) -> Option<&dyn super::common::MultipartUploadBackend> { + Some(self) + } + async fn put_object( &self, id: &ObjectId, diff --git a/objectstore-service/src/backend/local_fs.rs b/objectstore-service/src/backend/local_fs.rs index c4518a18..cd62ba5d 100644 --- a/objectstore-service/src/backend/local_fs.rs +++ b/objectstore-service/src/backend/local_fs.rs @@ -71,6 +71,10 @@ impl Backend for LocalFsBackend { "local-fs" } + fn as_multipart_upload(&self) -> Option<&dyn MultipartUploadBackend> { + Some(self) + } + #[tracing::instrument(level = "trace", fields(?id), skip_all)] async fn put_object( &self, diff --git a/objectstore-service/src/backend/testing.rs b/objectstore-service/src/backend/testing.rs index 4b767e21..4361556a 100644 --- a/objectstore-service/src/backend/testing.rs +++ b/objectstore-service/src/backend/testing.rs @@ -269,6 +269,10 @@ impl Backend for TestBackend { self.hooks.name() } + fn as_multipart_upload(&self) -> Option<&dyn MultipartUploadBackend> { + Some(self) + } + async fn put_object( &self, id: &ObjectId, diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index 4d329c3e..05898123 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -365,6 +365,10 @@ impl Backend for TieredStorage { "tiered" } + fn as_multipart_upload(&self) -> Option<&dyn MultipartUploadBackend> { + Some(self) + } + async fn put_object( &self, id: &ObjectId, diff --git a/objectstore-service/src/service.rs b/objectstore-service/src/service.rs index 9ec10fba..6329f28f 100644 --- a/objectstore-service/src/service.rs +++ b/objectstore-service/src/service.rs @@ -14,6 +14,10 @@ use crate::backend::common::Backend; use crate::concurrency::ConcurrencyLimiter; use crate::error::{Error, Result}; use crate::id::{ObjectContext, ObjectId}; +use crate::multipart::{ + AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse, + ListPartsResponse, PartNumber, UploadId, UploadPartResponse, +}; use crate::stream::{ClientStream, PayloadStream}; use crate::streaming::StreamExecutor; @@ -233,6 +237,121 @@ impl StorageService { pub async fn join(&self) { self.inner.join().await; } + + // --- Multipart upload operations --- + + fn require_multipart(&self) -> Result<()> { + if self.inner.as_multipart_upload().is_none() { + return Err(Error::generic( + "multipart uploads not supported by this storage backend", + )); + } + Ok(()) + } + + /// Initiates a new multipart upload. + pub async fn initiate_multipart( + &self, + id: ObjectId, + metadata: Metadata, + ) -> Result { + self.require_multipart()?; + let inner = Arc::clone(&self.inner); + self.spawn("initiate_multipart", async move { + inner + .as_multipart_upload() + .unwrap() + .initiate_multipart(&id, &metadata) + .await + }) + .await + } + + /// Uploads a single part. + pub async fn upload_part( + &self, + id: ObjectId, + upload_id: UploadId, + part_number: PartNumber, + content_length: u64, + content_md5: Option, + body: ClientStream, + ) -> Result { + self.require_multipart()?; + let inner = Arc::clone(&self.inner); + self.spawn("upload_part", async move { + inner + .as_multipart_upload() + .unwrap() + .upload_part( + &id, + &upload_id, + part_number, + content_length, + content_md5.as_deref(), + body, + ) + .await + }) + .await + } + + /// Lists the parts uploaded so far. + pub async fn list_parts( + &self, + id: ObjectId, + upload_id: UploadId, + max_parts: Option, + part_number_marker: Option, + ) -> Result { + self.require_multipart()?; + let inner = Arc::clone(&self.inner); + self.spawn("list_parts", async move { + inner + .as_multipart_upload() + .unwrap() + .list_parts(&id, &upload_id, max_parts, part_number_marker) + .await + }) + .await + } + + /// Aborts a multipart upload. + pub async fn abort_multipart( + &self, + id: ObjectId, + upload_id: UploadId, + ) -> Result { + self.require_multipart()?; + let inner = Arc::clone(&self.inner); + self.spawn("abort_multipart", async move { + inner + .as_multipart_upload() + .unwrap() + .abort_multipart(&id, &upload_id) + .await + }) + .await + } + + /// Finalizes a multipart upload. + pub async fn complete_multipart( + &self, + id: ObjectId, + upload_id: UploadId, + parts: Vec, + ) -> Result { + self.require_multipart()?; + let inner = Arc::clone(&self.inner); + self.spawn("complete_multipart", async move { + inner + .as_multipart_upload() + .unwrap() + .complete_multipart(&id, &upload_id, parts) + .await + }) + .await + } } #[cfg(test)] From cbdb2beec4a0075ad3beaa6e4480a2b63f22cb7a Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Thu, 7 May 2026 14:50:08 +0200 Subject: [PATCH 02/22] wip --- objectstore-server/docs/architecture.md | 27 +++++++++--------- objectstore-server/src/endpoints/common.rs | 1 + objectstore-service/src/backend/common.rs | 2 +- objectstore-service/src/backend/gcs.rs | 2 +- objectstore-service/src/backend/in_memory.rs | 2 +- objectstore-service/src/backend/local_fs.rs | 2 +- objectstore-service/src/backend/testing.rs | 2 +- objectstore-service/src/backend/tiered.rs | 2 +- objectstore-service/src/error.rs | 5 ++++ objectstore-service/src/service.rs | 30 +++++++++----------- 10 files changed, 40 insertions(+), 35 deletions(-) diff --git a/objectstore-server/docs/architecture.md b/objectstore-server/docs/architecture.md index 5173ede4..08bb7496 100644 --- a/objectstore-server/docs/architecture.md +++ b/objectstore-server/docs/architecture.md @@ -20,19 +20,20 @@ All object operations live under the `/v1/` prefix: ### Multipart Upload Endpoints -| Method | Path | Description | -|----------|--------------------------------------------------------------|--------------------------------------| -| `POST` | `/v1/objects:multipart:initiate/{usecase}/{scopes}/` | Initiate upload (server-generated key) | -| `PUT` | `/v1/objects:multipart:initiate/{usecase}/{scopes}/{key}` | Initiate upload (user-provided key) | -| `PUT` | `/v1/objects:multipart:parts/{usecase}/{scopes}/{key}` | Upload a part (`uploadId`, `partNumber` query params) | -| `GET` | `/v1/objects:multipart:parts/{usecase}/{scopes}/{key}` | List uploaded parts (`uploadId` query param) | -| `POST` | `/v1/objects:multipart:complete/{usecase}/{scopes}/{key}` | Complete upload (`uploadId` query param) | -| `DELETE` | `/v1/objects:multipart/{usecase}/{scopes}/{key}` | Abort upload (`uploadId` query param) | - -The complete endpoint returns `200 OK` immediately with chunked transfer -encoding, sends whitespace keepalive bytes while the backend assembles the -object, then writes the final JSON result (success or error) into the response -body. Clients must parse the body to determine the actual outcome. +| Method | Path | Description | +|-----------|--------------------------------------------------------------|--------------------------------------| +| `POST` | `/v1/objects:multipart:initiate/{usecase}/{scopes}/` | Initiate upload (server-generated key) | +| `PUT` | `/v1/objects:multipart:initiate/{usecase}/{scopes}/{key}` | Initiate upload (user-provided key) | +| `PUT` | `/v1/objects:multipart:parts/{usecase}/{scopes}/{key}` | Upload a part (`uploadId`, `partNumber` query params) | +| `GET` | `/v1/objects:multipart:parts/{usecase}/{scopes}/{key}` | List uploaded parts (`uploadId` query param) | +| `POST` | `/v1/objects:multipart:complete/{usecase}/{scopes}/{key}` | Complete upload (`uploadId` query param) | +| `DELETE` | `/v1/objects:multipart/{usecase}/{scopes}/{key}` | Abort upload (`uploadId` query param) | + +The complete endpoint returns `200 OK` immediately, with a streaming body that +will contain the error (if any) as JSON. Whitespace is sent in the streaming body +to keep the connection open. +Clients must parse the body to determine the actual outcome, and not rely on the +status code. Scopes are encoded in the URL path using Matrix URI syntax: `org=123;project=456`. An underscore (`_`) represents empty scopes. diff --git a/objectstore-server/src/endpoints/common.rs b/objectstore-server/src/endpoints/common.rs index a9111d89..b44207e6 100644 --- a/objectstore-server/src/endpoints/common.rs +++ b/objectstore-server/src/endpoints/common.rs @@ -90,6 +90,7 @@ impl ApiError { ApiError::Service(ServiceError::Client(_)) => StatusCode::BAD_REQUEST, ApiError::Service(ServiceError::Metadata(_)) => StatusCode::BAD_REQUEST, ApiError::Service(ServiceError::AtCapacity) => StatusCode::TOO_MANY_REQUESTS, + ApiError::Service(ServiceError::NotImplemented) => StatusCode::NOT_IMPLEMENTED, ApiError::Service(_) => { objectstore_log::error!(!!self, "error handling request"); StatusCode::INTERNAL_SERVER_ERROR diff --git a/objectstore-service/src/backend/common.rs b/objectstore-service/src/backend/common.rs index f03afae4..700699bb 100644 --- a/objectstore-service/src/backend/common.rs +++ b/objectstore-service/src/backend/common.rs @@ -68,7 +68,7 @@ pub trait Backend: fmt::Debug + Send + Sync + 'static { /// /// The default returns `None`. Backends that implement /// [`MultipartUploadBackend`] should override this to return `Some(self)`. - fn as_multipart_upload(&self) -> Option<&dyn MultipartUploadBackend> { + fn as_multipart_upload_backend(&self) -> Option<&dyn MultipartUploadBackend> { None } } diff --git a/objectstore-service/src/backend/gcs.rs b/objectstore-service/src/backend/gcs.rs index 24643fc8..c89967d6 100644 --- a/objectstore-service/src/backend/gcs.rs +++ b/objectstore-service/src/backend/gcs.rs @@ -599,7 +599,7 @@ impl Backend for GcsBackend { "gcs" } - fn as_multipart_upload(&self) -> Option<&dyn MultipartUploadBackend> { + fn as_multipart_upload_backend(&self) -> Option<&dyn MultipartUploadBackend> { Some(self) } diff --git a/objectstore-service/src/backend/in_memory.rs b/objectstore-service/src/backend/in_memory.rs index 841d3455..07fda998 100644 --- a/objectstore-service/src/backend/in_memory.rs +++ b/objectstore-service/src/backend/in_memory.rs @@ -104,7 +104,7 @@ impl super::common::Backend for InMemoryBackend { self.name } - fn as_multipart_upload(&self) -> Option<&dyn super::common::MultipartUploadBackend> { + fn as_multipart_upload_backend(&self) -> Option<&dyn MultipartUploadBackend> { Some(self) } diff --git a/objectstore-service/src/backend/local_fs.rs b/objectstore-service/src/backend/local_fs.rs index cd62ba5d..3a0d2102 100644 --- a/objectstore-service/src/backend/local_fs.rs +++ b/objectstore-service/src/backend/local_fs.rs @@ -71,7 +71,7 @@ impl Backend for LocalFsBackend { "local-fs" } - fn as_multipart_upload(&self) -> Option<&dyn MultipartUploadBackend> { + fn as_multipart_upload_backend(&self) -> Option<&dyn MultipartUploadBackend> { Some(self) } diff --git a/objectstore-service/src/backend/testing.rs b/objectstore-service/src/backend/testing.rs index 4361556a..55330663 100644 --- a/objectstore-service/src/backend/testing.rs +++ b/objectstore-service/src/backend/testing.rs @@ -269,7 +269,7 @@ impl Backend for TestBackend { self.hooks.name() } - fn as_multipart_upload(&self) -> Option<&dyn MultipartUploadBackend> { + fn as_multipart_upload_backend(&self) -> Option<&dyn MultipartUploadBackend> { Some(self) } diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index 05898123..d3e8c8de 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -365,7 +365,7 @@ impl Backend for TieredStorage { "tiered" } - fn as_multipart_upload(&self) -> Option<&dyn MultipartUploadBackend> { + fn as_multipart_upload_backend(&self) -> Option<&dyn MultipartUploadBackend> { Some(self) } diff --git a/objectstore-service/src/error.rs b/objectstore-service/src/error.rs index 186a22d9..1169fa98 100644 --- a/objectstore-service/src/error.rs +++ b/objectstore-service/src/error.rs @@ -87,6 +87,10 @@ pub enum Error { #[source] cause: Option>, }, + + /// The functionality is not implemented by this instance of the service. + #[error("not implemented")] + NotImplemented, } impl Error { @@ -142,6 +146,7 @@ impl Error { Self::Panic(_) => Level::ERROR, Self::Dropped => Level::ERROR, Self::UnexpectedTombstone => Level::ERROR, + Self::NotImplemented => Level::WARN, Self::Generic { .. } => Level::ERROR, } } diff --git a/objectstore-service/src/service.rs b/objectstore-service/src/service.rs index 6329f28f..e964adcf 100644 --- a/objectstore-service/src/service.rs +++ b/objectstore-service/src/service.rs @@ -240,12 +240,10 @@ impl StorageService { // --- Multipart upload operations --- - fn require_multipart(&self) -> Result<()> { - if self.inner.as_multipart_upload().is_none() { - return Err(Error::generic( - "multipart uploads not supported by this storage backend", - )); - } + fn ensure_inner_multipart(&self) -> Result<()> { + self.inner + .as_multipart_upload_backend() + .ok_or(Error::NotImplemented)?; Ok(()) } @@ -255,11 +253,11 @@ impl StorageService { id: ObjectId, metadata: Metadata, ) -> Result { - self.require_multipart()?; + self.ensure_inner_multipart()?; let inner = Arc::clone(&self.inner); self.spawn("initiate_multipart", async move { inner - .as_multipart_upload() + .as_multipart_upload_backend() .unwrap() .initiate_multipart(&id, &metadata) .await @@ -277,11 +275,11 @@ impl StorageService { content_md5: Option, body: ClientStream, ) -> Result { - self.require_multipart()?; + self.ensure_inner_multipart()?; let inner = Arc::clone(&self.inner); self.spawn("upload_part", async move { inner - .as_multipart_upload() + .as_multipart_upload_backend() .unwrap() .upload_part( &id, @@ -304,11 +302,11 @@ impl StorageService { max_parts: Option, part_number_marker: Option, ) -> Result { - self.require_multipart()?; + self.ensure_inner_multipart()?; let inner = Arc::clone(&self.inner); self.spawn("list_parts", async move { inner - .as_multipart_upload() + .as_multipart_upload_backend() .unwrap() .list_parts(&id, &upload_id, max_parts, part_number_marker) .await @@ -322,11 +320,11 @@ impl StorageService { id: ObjectId, upload_id: UploadId, ) -> Result { - self.require_multipart()?; + self.ensure_inner_multipart()?; let inner = Arc::clone(&self.inner); self.spawn("abort_multipart", async move { inner - .as_multipart_upload() + .as_multipart_upload_backend() .unwrap() .abort_multipart(&id, &upload_id) .await @@ -341,11 +339,11 @@ impl StorageService { upload_id: UploadId, parts: Vec, ) -> Result { - self.require_multipart()?; + self.ensure_inner_multipart()?; let inner = Arc::clone(&self.inner); self.spawn("complete_multipart", async move { inner - .as_multipart_upload() + .as_multipart_upload_backend() .unwrap() .complete_multipart(&id, &upload_id, parts) .await From b5a448efcb2a3f61cf597ddb1c2b35a3217c4aee Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Thu, 7 May 2026 14:52:21 +0200 Subject: [PATCH 03/22] fix: update doc link to renamed as_multipart_upload_backend method --- objectstore-service/docs/architecture.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/objectstore-service/docs/architecture.md b/objectstore-service/docs/architecture.md index 0de1ea8c..f9e1e5f4 100644 --- a/objectstore-service/docs/architecture.md +++ b/objectstore-service/docs/architecture.md @@ -171,7 +171,7 @@ The default limit is [`DEFAULT_CONCURRENCY_LIMIT`](service::DEFAULT_CONCURRENCY_ When the configured backend supports it, [`StorageService`] exposes multipart upload operations (initiate, upload part, list parts, complete, abort). These delegate to the [`MultipartUploadBackend`](backend::common::MultipartUploadBackend) -trait, accessed via [`Backend::as_multipart_upload`](backend::common::Backend::as_multipart_upload). +trait, accessed via [`Backend::as_multipart_upload_backend`](backend::common::Backend::as_multipart_upload_backend). Multipart operations share the same concurrency limiter as regular operations. ## Streaming Concurrency From 9af398bb2447a8f6e0349304d0b11f6d4e7333ad Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Thu, 7 May 2026 15:49:51 +0200 Subject: [PATCH 04/22] improve --- objectstore-server/src/auth/service.rs | 2 +- objectstore-server/src/endpoints/multipart.rs | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/objectstore-server/src/auth/service.rs b/objectstore-server/src/auth/service.rs index 72d942fa..a1a2c388 100644 --- a/objectstore-server/src/auth/service.rs +++ b/objectstore-server/src/auth/service.rs @@ -178,7 +178,7 @@ impl AuthAwareService { id: ObjectId, upload_id: UploadId, ) -> ApiResult { - self.assert_authorized(Permission::ObjectDelete, id.context())?; + self.assert_authorized(Permission::ObjectWrite, id.context())?; Ok(self.service.abort_multipart(id, upload_id).await?) } diff --git a/objectstore-server/src/endpoints/multipart.rs b/objectstore-server/src/endpoints/multipart.rs index 41a91356..9d76d7d5 100644 --- a/objectstore-server/src/endpoints/multipart.rs +++ b/objectstore-server/src/endpoints/multipart.rs @@ -15,6 +15,8 @@ use objectstore_service::multipart::CompletedPart; use objectstore_types::metadata::Metadata; use serde::{Deserialize, Serialize}; +use objectstore_types::auth::Permission; + use crate::auth::AuthAwareService; use crate::endpoints::common::{ApiError, ApiResult}; use crate::extractors::Xt; @@ -274,6 +276,8 @@ async fn complete( Query(params): Query, Json(body): Json, ) -> ApiResult { + service.check_permission(Permission::ObjectWrite, id.context())?; + let key = id.key().to_string(); let parts: Vec = body From bc0bef513753e4addfd1f07d8b6ff332a7fc5a17 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Thu, 7 May 2026 15:52:23 +0200 Subject: [PATCH 05/22] improve --- objectstore-server/src/endpoints/multipart.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/objectstore-server/src/endpoints/multipart.rs b/objectstore-server/src/endpoints/multipart.rs index 9d76d7d5..dfd626aa 100644 --- a/objectstore-server/src/endpoints/multipart.rs +++ b/objectstore-server/src/endpoints/multipart.rs @@ -109,9 +109,16 @@ struct ListPartsResponse { next_part_number_marker: Option, } +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct CompletePartRequest { + part_number: u32, + etag: String, +} + #[derive(Debug, Deserialize)] struct CompleteRequest { - parts: Vec, + parts: Vec, } #[derive(Debug, Serialize)] @@ -283,10 +290,9 @@ async fn complete( let parts: Vec = body .parts .into_iter() - .enumerate() - .map(|(i, etag)| CompletedPart { - part_number: (i as u32) + 1, - etag, + .map(|p| CompletedPart { + part_number: p.part_number, + etag: p.etag, }) .collect(); From 25ac7997c7e232939ce46cdec5ab6cffdfa01dd3 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Thu, 7 May 2026 16:12:14 +0200 Subject: [PATCH 06/22] improve --- objectstore-server/src/endpoints/multipart.rs | 75 ++++++------------- 1 file changed, 24 insertions(+), 51 deletions(-) diff --git a/objectstore-server/src/endpoints/multipart.rs b/objectstore-server/src/endpoints/multipart.rs index dfd626aa..fc93c867 100644 --- a/objectstore-server/src/endpoints/multipart.rs +++ b/objectstore-server/src/endpoints/multipart.rs @@ -24,25 +24,21 @@ use crate::extractors::body::MeteredBody; use crate::state::ServiceState; pub fn router() -> Router { - let initiate_collection = routing::post(initiate_post); - let initiate_object = routing::put(initiate_put); - - let parts = routing::put(upload_part).get(list_parts); - + let initiate_no_key = routing::post(initiate_post); Router::new() .route( - "/objects:multipart:initiate/{usecase}/{scopes}", - initiate_collection.clone(), + "/objects:multipart/{usecase}/{scopes}", + initiate_no_key.clone(), ) + .route("/objects:multipart/{usecase}/{scopes}/", initiate_no_key) .route( - "/objects:multipart:initiate/{usecase}/{scopes}/", - initiate_collection, + "/objects:multipart/{usecase}/{scopes}/{*key}", + routing::put(initiate_put), ) .route( - "/objects:multipart:initiate/{usecase}/{scopes}/{*key}", - initiate_object, + "/objects:multipart:parts/{usecase}/{scopes}/{*key}", + routing::get(list_parts).put(upload_part), ) - .route("/objects:multipart:parts/{usecase}/{scopes}/{*key}", parts) .route( "/objects:multipart:complete/{usecase}/{scopes}/{*key}", routing::post(complete), @@ -56,20 +52,17 @@ pub fn router() -> Router { // --- Query parameter types --- #[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] struct UploadPartQuery { upload_id: String, part_number: u32, } #[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] struct UploadIdQuery { upload_id: String, } #[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] struct ListPartsQuery { upload_id: String, max_parts: Option, @@ -79,7 +72,6 @@ struct ListPartsQuery { // --- Request/Response types --- #[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] struct InitiateResponse { upload_id: String, #[serde(skip_serializing_if = "Option::is_none")] @@ -87,13 +79,11 @@ struct InitiateResponse { } #[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] struct UploadPartResponse { e_tag: String, } #[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] struct PartInfo { e_tag: String, last_modified: u64, @@ -101,7 +91,6 @@ struct PartInfo { } #[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] struct ListPartsResponse { parts: BTreeMap, is_truncated: bool, @@ -110,7 +99,6 @@ struct ListPartsResponse { } #[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] struct CompletePartRequest { part_number: u32, etag: String, @@ -122,7 +110,6 @@ struct CompleteRequest { } #[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] struct CompleteSuccessResponse { key: String, } @@ -142,40 +129,33 @@ struct CompleteErrorResponse { async fn initiate_post( service: AuthAwareService, - State(state): State, + state: State, Xt(context): Xt, headers: HeaderMap, ) -> ApiResult { - let mut metadata = Metadata::from_headers(&headers, "").map_err(ServiceError::from)?; - metadata.time_created = Some(SystemTime::now()); - - state - .config - .usecases - .validate(&context.usecase, &metadata) - .map_err(|e| ApiError::Client(e.to_string()))?; - let id = ObjectId::optional(context, None); - let key = id.key().to_string(); - let upload_id = service.initiate_multipart(id, metadata).await?; - - Ok(( - StatusCode::OK, - Json(InitiateResponse { - upload_id, - key: Some(key), - }), - ) - .into_response()) + let key = Some(id.key().to_string()); + initiate_inner(service, state, id, key, headers).await } async fn initiate_put( service: AuthAwareService, - State(state): State, + state: State, Xt(id): Xt, headers: HeaderMap, +) -> ApiResult { + initiate_inner(service, state, id, None, headers).await +} + +async fn initiate_inner( + service: AuthAwareService, + State(state): State, + id: ObjectId, + key: Option, + headers: HeaderMap, ) -> ApiResult { let mut metadata = Metadata::from_headers(&headers, "").map_err(ServiceError::from)?; + // TODO: maybe do this on finalize? metadata.time_created = Some(SystemTime::now()); state @@ -186,14 +166,7 @@ async fn initiate_put( let upload_id = service.initiate_multipart(id, metadata).await?; - Ok(( - StatusCode::OK, - Json(InitiateResponse { - upload_id, - key: None, - }), - ) - .into_response()) + Ok((StatusCode::OK, Json(InitiateResponse { upload_id, key })).into_response()) } async fn upload_part( From 5374bb406b73ab70206e5655d2d7947a0d313a7b Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Thu, 7 May 2026 16:15:25 +0200 Subject: [PATCH 07/22] improve --- objectstore-server/src/endpoints/multipart.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/objectstore-server/src/endpoints/multipart.rs b/objectstore-server/src/endpoints/multipart.rs index fc93c867..0c1f7ba4 100644 --- a/objectstore-server/src/endpoints/multipart.rs +++ b/objectstore-server/src/endpoints/multipart.rs @@ -80,12 +80,12 @@ struct InitiateResponse { #[derive(Debug, Serialize)] struct UploadPartResponse { - e_tag: String, + etag: String, } #[derive(Debug, Serialize)] struct PartInfo { - e_tag: String, + etag: String, last_modified: u64, size: u64, } @@ -199,7 +199,7 @@ async fn upload_part( ) .await?; - Ok((StatusCode::OK, Json(UploadPartResponse { e_tag: etag })).into_response()) + Ok((StatusCode::OK, Json(UploadPartResponse { etag })).into_response()) } async fn list_parts( @@ -221,7 +221,7 @@ async fn list_parts( .into_iter() .map(|p| { let info = PartInfo { - e_tag: p.etag, + etag: p.etag, last_modified: p .last_modified .duration_since(SystemTime::UNIX_EPOCH) From cf2ad11a5431a5033db71f5d038a0bba6bc2d96b Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Thu, 7 May 2026 17:32:23 +0200 Subject: [PATCH 08/22] fix --- objectstore-server/src/endpoints/multipart.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/objectstore-server/src/endpoints/multipart.rs b/objectstore-server/src/endpoints/multipart.rs index 0c1f7ba4..bef9807a 100644 --- a/objectstore-server/src/endpoints/multipart.rs +++ b/objectstore-server/src/endpoints/multipart.rs @@ -33,7 +33,7 @@ pub fn router() -> Router { .route("/objects:multipart/{usecase}/{scopes}/", initiate_no_key) .route( "/objects:multipart/{usecase}/{scopes}/{*key}", - routing::put(initiate_put), + routing::put(initiate_put).delete(abort), ) .route( "/objects:multipart:parts/{usecase}/{scopes}/{*key}", @@ -43,10 +43,6 @@ pub fn router() -> Router { "/objects:multipart:complete/{usecase}/{scopes}/{*key}", routing::post(complete), ) - .route( - "/objects:multipart/{usecase}/{scopes}/{*key}", - routing::delete(abort), - ) } // --- Query parameter types --- From 241dcfc609376565973dc587ce54d0a9b9f991ff Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 8 May 2026 14:32:40 +0200 Subject: [PATCH 09/22] improve --- objectstore-server/src/endpoints/common.rs | 4 + objectstore-server/src/endpoints/multipart.rs | 103 ++++++++++-------- 2 files changed, 62 insertions(+), 45 deletions(-) diff --git a/objectstore-server/src/endpoints/common.rs b/objectstore-server/src/endpoints/common.rs index b44207e6..c3521a88 100644 --- a/objectstore-server/src/endpoints/common.rs +++ b/objectstore-server/src/endpoints/common.rs @@ -30,6 +30,10 @@ pub enum ApiError { /// Errors encountered when parsing or executing a batch request. #[error("batch error: {0}")] Batch(#[from] BatchError), + + /// Internal server errors. + #[error("internal error: {0}")] + Internal(String), } /// Result type for API operations. diff --git a/objectstore-server/src/endpoints/multipart.rs b/objectstore-server/src/endpoints/multipart.rs index bef9807a..968fffb6 100644 --- a/objectstore-server/src/endpoints/multipart.rs +++ b/objectstore-server/src/endpoints/multipart.rs @@ -9,6 +9,9 @@ use axum::response::{IntoResponse, Response}; use axum::routing; use axum::{Json, Router}; use bytes::Bytes; +use futures::StreamExt; +use http::HeaderValue; +use http::header; use objectstore_service::error::Error as ServiceError; use objectstore_service::id::{ObjectContext, ObjectId}; use objectstore_service::multipart::CompletedPart; @@ -62,16 +65,15 @@ struct UploadIdQuery { struct ListPartsQuery { upload_id: String, max_parts: Option, - part_number_marker: Option, + next_part_number_marker: Option, } // --- Request/Response types --- #[derive(Debug, Serialize)] struct InitiateResponse { + key: String, upload_id: String, - #[serde(skip_serializing_if = "Option::is_none")] - key: Option, } #[derive(Debug, Serialize)] @@ -82,7 +84,7 @@ struct UploadPartResponse { #[derive(Debug, Serialize)] struct PartInfo { etag: String, - last_modified: u64, + last_modified: u128, size: u64, } @@ -123,35 +125,33 @@ struct CompleteErrorResponse { // --- Handlers --- -async fn initiate_post( +async fn initiate_put( service: AuthAwareService, state: State, - Xt(context): Xt, + Xt(id): Xt, headers: HeaderMap, ) -> ApiResult { - let id = ObjectId::optional(context, None); - let key = Some(id.key().to_string()); - initiate_inner(service, state, id, key, headers).await + initiate_inner(service, state, id, headers).await } -async fn initiate_put( +async fn initiate_post( service: AuthAwareService, state: State, - Xt(id): Xt, + Xt(context): Xt, headers: HeaderMap, ) -> ApiResult { - initiate_inner(service, state, id, None, headers).await + let id = ObjectId::optional(context, None); + initiate_inner(service, state, id, headers).await } async fn initiate_inner( service: AuthAwareService, State(state): State, id: ObjectId, - key: Option, headers: HeaderMap, ) -> ApiResult { let mut metadata = Metadata::from_headers(&headers, "").map_err(ServiceError::from)?; - // TODO: maybe do this on finalize? + // TODO: Should we do this on finalize instead? It will require one more metadata request. metadata.time_created = Some(SystemTime::now()); state @@ -160,9 +160,13 @@ async fn initiate_inner( .validate(&id.context().usecase, &metadata) .map_err(|e| ApiError::Client(e.to_string()))?; - let upload_id = service.initiate_multipart(id, metadata).await?; + let upload_id = service.initiate_multipart(id.clone(), metadata).await?; - Ok((StatusCode::OK, Json(InitiateResponse { upload_id, key })).into_response()) + let response = Json(InitiateResponse { + key: id.key().to_owned(), + upload_id, + }); + Ok((StatusCode::OK, response).into_response()) } async fn upload_part( @@ -195,7 +199,8 @@ async fn upload_part( ) .await?; - Ok((StatusCode::OK, Json(UploadPartResponse { etag })).into_response()) + let response = Json(UploadPartResponse { etag }); + Ok((StatusCode::OK, response).into_response()) } async fn list_parts( @@ -208,7 +213,7 @@ async fn list_parts( id, params.upload_id, params.max_parts, - params.part_number_marker, + params.next_part_number_marker, ) .await?; @@ -222,28 +227,28 @@ async fn list_parts( .last_modified .duration_since(SystemTime::UNIX_EPOCH) .unwrap_or_default() - .as_secs(), + .as_millis(), size: p.size, }; (p.part_number, info) }) .collect(); - Ok(Json(ListPartsResponse { + let response = Json(ListPartsResponse { parts, is_truncated: response.is_truncated, next_part_number_marker: response.next_part_number_marker, - }) - .into_response()) + }); + Ok((StatusCode::OK, response).into_response()) } async fn abort( service: AuthAwareService, Xt(id): Xt, Query(params): Query, -) -> ApiResult { +) -> ApiResult { service.abort_multipart(id, params.upload_id).await?; - Ok(StatusCode::NO_CONTENT) + Ok(StatusCode::NO_CONTENT.into_response()) } async fn complete( @@ -267,21 +272,20 @@ async fn complete( let upload_id = params.upload_id; - let body_stream = async_stream::stream! { - let mut keepalive = tokio::time::interval(Duration::from_secs(10)); - // Consume the first tick immediately (it fires at t=0). - keepalive.tick().await; - - let result_fut = service.complete_multipart(id, upload_id, parts); - tokio::pin!(result_fut); + // This operation can take a while at the service level, so we stream whitespace to the client + // until we have a response body, to keep the connection from being terminated. + let stream = async_stream::stream! { + let fut = service.complete_multipart(id, upload_id, parts); + tokio::pin!(fut); + let mut keepalive = tokio::time::interval(Duration::from_secs(1)); loop { tokio::select! { - result = &mut result_fut => { - let json = match result { + res = &mut fut => { + let serialized = match res { Ok(None) => serde_json::to_vec( &CompleteSuccessResponse { key }, - ).unwrap(), + ), Ok(Some(err)) => serde_json::to_vec( &CompleteErrorResponse { error: CompleteErrorDetail { @@ -289,29 +293,38 @@ async fn complete( message: err.message, }, }, - ).unwrap(), + ), + // TODO(lcian): Construct more precise error code and message, given that + // we have a structured `ApiError`. Err(e) => serde_json::to_vec( &CompleteErrorResponse { error: CompleteErrorDetail { - code: "internal".into(), + code: "internal error".into(), message: e.to_string(), }, }, - ).unwrap(), + ), }; - yield Ok::<_, Infallible>(Bytes::from(json)); + // Fallback to avoid `unwrap()`. This should never happen in practice. + let serialized = serialized.unwrap_or_else(|_| { + br#"{"error":{"code":"internal error","message":"unexpected error, please report a bug"}}"#.to_vec() + }); + + yield Bytes::from(serialized); break; } _ = keepalive.tick() => { - yield Ok::<_, Infallible>(Bytes::from_static(b" ")); + yield Bytes::from_static(b" "); } } } }; - - Ok(Response::builder() - .status(StatusCode::OK) - .header("content-type", "application/json") - .body(Body::from_stream(body_stream)) - .unwrap()) + let stream = stream.map(|s| Ok::<_, Infallible>(s)); + + let mut headers = HeaderMap::new(); + headers.insert( + header::CONTENT_TYPE, + HeaderValue::from_static("application/json"), + ); + Ok((StatusCode::OK, headers, Body::from_stream(stream)).into_response()) } From f615f9a6e9388ad51481d487797f004bfecfe0e9 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 8 May 2026 15:03:22 +0200 Subject: [PATCH 10/22] improve, add tests --- objectstore-server/src/endpoints/common.rs | 5 + objectstore-server/src/endpoints/multipart.rs | 9 +- objectstore-server/tests/multipart.rs | 565 ++++++++++++++++++ objectstore-service/src/service.rs | 7 + 4 files changed, 580 insertions(+), 6 deletions(-) create mode 100644 objectstore-server/tests/multipart.rs diff --git a/objectstore-server/src/endpoints/common.rs b/objectstore-server/src/endpoints/common.rs index c3521a88..365f8e62 100644 --- a/objectstore-server/src/endpoints/common.rs +++ b/objectstore-server/src/endpoints/common.rs @@ -99,6 +99,11 @@ impl ApiError { objectstore_log::error!(!!self, "error handling request"); StatusCode::INTERNAL_SERVER_ERROR } + + ApiError::Internal(_) => { + objectstore_log::error!(!!self, "internal error"); + StatusCode::INTERNAL_SERVER_ERROR + } } } } diff --git a/objectstore-server/src/endpoints/multipart.rs b/objectstore-server/src/endpoints/multipart.rs index 968fffb6..b13a920f 100644 --- a/objectstore-server/src/endpoints/multipart.rs +++ b/objectstore-server/src/endpoints/multipart.rs @@ -84,7 +84,8 @@ struct UploadPartResponse { #[derive(Debug, Serialize)] struct PartInfo { etag: String, - last_modified: u128, + #[serde(with = "humantime_serde")] + last_modified: SystemTime, size: u64, } @@ -223,11 +224,7 @@ async fn list_parts( .map(|p| { let info = PartInfo { etag: p.etag, - last_modified: p - .last_modified - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap_or_default() - .as_millis(), + last_modified: p.last_modified, size: p.size, }; (p.part_number, info) diff --git a/objectstore-server/tests/multipart.rs b/objectstore-server/tests/multipart.rs new file mode 100644 index 00000000..f04340f3 --- /dev/null +++ b/objectstore-server/tests/multipart.rs @@ -0,0 +1,565 @@ +//! Integration tests for the multipart upload endpoints. + +use anyhow::Result; +use objectstore_server::config::{AuthZ, Config}; +use objectstore_test::server::TestServer; +use serde::Deserialize; + +#[derive(Debug, Deserialize)] +struct InitiateResponse { + key: String, + upload_id: String, +} + +#[derive(Debug, Deserialize)] +struct UploadPartResponse { + etag: String, +} + +#[derive(Debug, Deserialize)] +#[allow(dead_code)] +struct PartInfo { + etag: String, + last_modified: String, + size: u64, +} + +#[derive(Debug, Deserialize)] +struct ListPartsResponse { + parts: std::collections::BTreeMap, + is_truncated: bool, + next_part_number_marker: Option, +} + +#[derive(Debug, Deserialize)] +struct CompleteSuccessResponse { + key: String, +} + +async fn test_server() -> TestServer { + TestServer::with_config(Config { + auth: AuthZ { + enforce: false, + ..Default::default() + }, + ..Default::default() + }) + .await +} + +/// Sends a complete request and asserts success. Returns the key from the response. +/// +/// The complete endpoint uses a streaming response with keepalive whitespace, so we +/// trim the response body before parsing. +async fn complete_and_assert( + client: &reqwest::Client, + url: &str, + parts: &[(&str, &str)], +) -> Result { + let parts_json: Vec<_> = parts + .iter() + .map(|(pn, etag)| { + serde_json::json!({"part_number": pn.parse::().unwrap(), "etag": etag}) + }) + .collect(); + let body = serde_json::json!({ "parts": parts_json }); + + let response = client + .post(url) + .header("content-type", "application/json") + .body(body.to_string()) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + + let text = response.text().await?; + let trimmed = text.trim(); + assert!( + !trimmed.contains("\"error\""), + "complete returned error: {trimmed}" + ); + let parsed: CompleteSuccessResponse = serde_json::from_str(trimmed)?; + + Ok(parsed.key) +} + +// --- Initiate --- + +#[tokio::test] +async fn test_initiate_put_with_key() -> Result<()> { + let server = test_server().await; + let client = reqwest::Client::new(); + + let response = client + .put(server.url("/v1/objects:multipart/test/org=1/my-key")) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + + let body: InitiateResponse = response.json().await?; + assert_eq!(body.key, "my-key"); + assert!(!body.upload_id.is_empty()); + + Ok(()) +} + +#[tokio::test] +async fn test_initiate_post_generates_key() -> Result<()> { + let server = test_server().await; + let client = reqwest::Client::new(); + + let response = client + .post(server.url("/v1/objects:multipart/test/org=1/")) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + + let body: InitiateResponse = response.json().await?; + assert!(!body.key.is_empty()); + assert!(!body.upload_id.is_empty()); + + Ok(()) +} + +// --- Full flow: initiate → upload parts → list → complete → GET --- + +#[tokio::test] +async fn test_multipart_full_flow() -> Result<()> { + let server = test_server().await; + let client = reqwest::Client::new(); + + // 1. Initiate + let response = client + .put(server.url("/v1/objects:multipart/test/org=1/full-flow-key")) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let initiate: InitiateResponse = response.json().await?; + let upload_id = &initiate.upload_id; + assert_eq!(initiate.key, "full-flow-key"); + + // 2. Upload part 1 + let part1_data = b"hello "; + let response = client + .put(server.url(&format!( + "/v1/objects:multipart:parts/test/org=1/full-flow-key?upload_id={upload_id}&part_number=1" + ))) + .header("content-length", part1_data.len().to_string()) + .body(part1_data.to_vec()) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let part1: UploadPartResponse = response.json().await?; + assert!(!part1.etag.is_empty()); + + // 3. Upload part 2 + let part2_data = b"world!"; + let response = client + .put(server.url(&format!( + "/v1/objects:multipart:parts/test/org=1/full-flow-key?upload_id={upload_id}&part_number=2" + ))) + .header("content-length", part2_data.len().to_string()) + .body(part2_data.to_vec()) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let part2: UploadPartResponse = response.json().await?; + assert!(!part2.etag.is_empty()); + + // 4. List parts + let response = client + .get(server.url(&format!( + "/v1/objects:multipart:parts/test/org=1/full-flow-key?upload_id={upload_id}" + ))) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let list: ListPartsResponse = response.json().await?; + assert_eq!(list.parts.len(), 2); + assert!(list.parts.contains_key(&1)); + assert!(list.parts.contains_key(&2)); + assert_eq!(list.parts[&1].size, part1_data.len() as u64); + assert_eq!(list.parts[&2].size, part2_data.len() as u64); + assert!(!list.is_truncated); + + // 5. Complete + let key = complete_and_assert( + &client, + &server.url(&format!( + "/v1/objects:multipart:complete/test/org=1/full-flow-key?upload_id={upload_id}" + )), + &[("1", &part1.etag), ("2", &part2.etag)], + ) + .await?; + assert_eq!(key, "full-flow-key"); + + // 6. Verify object is accessible via regular GET + let response = client + .get(server.url("/v1/objects/test/org=1/full-flow-key")) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let payload = response.bytes().await?; + assert_eq!(&payload[..], b"hello world!"); + + Ok(()) +} + +// --- Initiate with POST and auto-generated key, then complete --- + +#[tokio::test] +async fn test_multipart_post_initiate_full_flow() -> Result<()> { + let server = test_server().await; + let client = reqwest::Client::new(); + + // 1. Initiate with POST (server-generated key) + let response = client + .post(server.url("/v1/objects:multipart/test/org=1/")) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let initiate: InitiateResponse = response.json().await?; + let upload_id = &initiate.upload_id; + let key = &initiate.key; + assert!(!key.is_empty()); + + // 2. Upload a single part + let data = b"auto-keyed content"; + let response = client + .put(server.url(&format!( + "/v1/objects:multipart:parts/test/org=1/{key}?upload_id={upload_id}&part_number=1" + ))) + .header("content-length", data.len().to_string()) + .body(data.to_vec()) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let part: UploadPartResponse = response.json().await?; + + // 3. Complete + complete_and_assert( + &client, + &server.url(&format!( + "/v1/objects:multipart:complete/test/org=1/{key}?upload_id={upload_id}" + )), + &[("1", &part.etag)], + ) + .await?; + + // 4. Verify + let response = client + .get(server.url(&format!("/v1/objects/test/org=1/{key}"))) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let payload = response.bytes().await?; + assert_eq!(&payload[..], b"auto-keyed content"); + + Ok(()) +} + +// --- Abort --- + +#[tokio::test] +async fn test_multipart_abort() -> Result<()> { + let server = test_server().await; + let client = reqwest::Client::new(); + + // 1. Initiate + let response = client + .put(server.url("/v1/objects:multipart/test/org=1/abort-key")) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let initiate: InitiateResponse = response.json().await?; + let upload_id = &initiate.upload_id; + + // 2. Upload a part + let data = b"will be aborted"; + let response = client + .put(server.url(&format!( + "/v1/objects:multipart:parts/test/org=1/abort-key?upload_id={upload_id}&part_number=1" + ))) + .header("content-length", data.len().to_string()) + .body(data.to_vec()) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + + // 3. Abort + let response = client + .delete(server.url(&format!( + "/v1/objects:multipart/test/org=1/abort-key?upload_id={upload_id}" + ))) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::NO_CONTENT); + + // 4. Object should not exist + let response = client + .get(server.url("/v1/objects/test/org=1/abort-key")) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::NOT_FOUND); + + Ok(()) +} + +// --- List parts with pagination --- + +#[tokio::test] +async fn test_list_parts_pagination() -> Result<()> { + let server = test_server().await; + let client = reqwest::Client::new(); + + // 1. Initiate + let response = client + .put(server.url("/v1/objects:multipart/test/org=1/paginated-key")) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let initiate: InitiateResponse = response.json().await?; + let upload_id = &initiate.upload_id; + + // 2. Upload 3 parts + for i in 1..=3 { + let data = format!("part-{i}"); + let response = client + .put(server.url(&format!( + "/v1/objects:multipart:parts/test/org=1/paginated-key?upload_id={upload_id}&part_number={i}" + ))) + .header("content-length", data.len().to_string()) + .body(data) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + } + + // 3. List with max_parts=2 (should be truncated) + let response = client + .get(server.url(&format!( + "/v1/objects:multipart:parts/test/org=1/paginated-key?upload_id={upload_id}&max_parts=2" + ))) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let page1: ListPartsResponse = response.json().await?; + assert_eq!(page1.parts.len(), 2); + assert!(page1.is_truncated); + assert!(page1.next_part_number_marker.is_some()); + + // 4. List next page + let marker = page1.next_part_number_marker.unwrap(); + let response = client + .get(server.url(&format!( + "/v1/objects:multipart:parts/test/org=1/paginated-key?upload_id={upload_id}&max_parts=2&next_part_number_marker={marker}" + ))) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let page2: ListPartsResponse = response.json().await?; + assert_eq!(page2.parts.len(), 1); + assert!(!page2.is_truncated); + + Ok(()) +} + +// --- Part overwrite --- + +#[tokio::test] +async fn test_upload_part_overwrite() -> Result<()> { + let server = test_server().await; + let client = reqwest::Client::new(); + + // 1. Initiate + let response = client + .put(server.url("/v1/objects:multipart/test/org=1/overwrite-key")) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let initiate: InitiateResponse = response.json().await?; + let upload_id = &initiate.upload_id; + + // 2. Upload part 1 with initial data + let response = client + .put(server.url(&format!( + "/v1/objects:multipart:parts/test/org=1/overwrite-key?upload_id={upload_id}&part_number=1" + ))) + .header("content-length", "5") + .body("first") + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let first_etag: UploadPartResponse = response.json().await?; + + // 3. Overwrite part 1 with new data + let response = client + .put(server.url(&format!( + "/v1/objects:multipart:parts/test/org=1/overwrite-key?upload_id={upload_id}&part_number=1" + ))) + .header("content-length", "6") + .body("second") + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let second_etag: UploadPartResponse = response.json().await?; + + // The etags should differ since the data changed + assert_ne!(first_etag.etag, second_etag.etag); + + // 4. List parts — should show only the latest version + let response = client + .get(server.url(&format!( + "/v1/objects:multipart:parts/test/org=1/overwrite-key?upload_id={upload_id}" + ))) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let list: ListPartsResponse = response.json().await?; + assert_eq!(list.parts.len(), 1); + assert_eq!(list.parts[&1].etag, second_etag.etag); + assert_eq!(list.parts[&1].size, 6); + + // 5. Complete with the overwritten part + complete_and_assert( + &client, + &server.url(&format!( + "/v1/objects:multipart:complete/test/org=1/overwrite-key?upload_id={upload_id}" + )), + &[("1", &second_etag.etag)], + ) + .await?; + + // 6. Verify the final object has the overwritten data + let response = client + .get(server.url("/v1/objects/test/org=1/overwrite-key")) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let payload = response.bytes().await?; + assert_eq!(&payload[..], b"second"); + + Ok(()) +} + +// --- Missing Content-Length on upload_part --- + +#[tokio::test] +async fn test_upload_part_missing_content_length() -> Result<()> { + let server = test_server().await; + let client = reqwest::Client::builder().no_proxy().build()?; + + // 1. Initiate + let response = client + .put(server.url("/v1/objects:multipart/test/org=1/no-cl-key")) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let initiate: InitiateResponse = response.json().await?; + let upload_id = &initiate.upload_id; + + // 2. Upload part without Content-Length — reqwest normally adds it automatically for + // fixed-size bodies, so we use a streaming body to avoid that. + let stream = + futures_util::stream::once(async { Ok::<_, std::io::Error>(bytes::Bytes::from("data")) }); + let body = reqwest::Body::wrap_stream(stream); + let response = client + .put(server.url(&format!( + "/v1/objects:multipart:parts/test/org=1/no-cl-key?upload_id={upload_id}&part_number=1" + ))) + .body(body) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::BAD_REQUEST); + + Ok(()) +} + +// --- Missing upload_id query param --- + +#[tokio::test] +async fn test_missing_upload_id_on_complete() -> Result<()> { + let server = test_server().await; + let client = reqwest::Client::new(); + + let complete_body = serde_json::json!({ + "parts": [{"part_number": 1, "etag": "fake"}] + }); + let response = client + .post(server.url("/v1/objects:multipart:complete/test/org=1/some-key")) + .header("content-type", "application/json") + .body(complete_body.to_string()) + .send() + .await?; + + assert_eq!(response.status(), reqwest::StatusCode::BAD_REQUEST); + + Ok(()) +} + +#[tokio::test] +async fn test_missing_upload_id_on_abort() -> Result<()> { + let server = test_server().await; + let client = reqwest::Client::new(); + + let response = client + .delete(server.url("/v1/objects:multipart/test/org=1/some-key")) + .send() + .await?; + + assert_eq!(response.status(), reqwest::StatusCode::BAD_REQUEST); + + Ok(()) +} + +// --- Structured key with slashes --- + +#[tokio::test] +async fn test_multipart_structured_key() -> Result<()> { + let server = test_server().await; + let client = reqwest::Client::new(); + + // 1. Initiate with a key containing slashes + let response = client + .put(server.url("/v1/objects:multipart/test/org=1/path/to/object")) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let initiate: InitiateResponse = response.json().await?; + let upload_id = &initiate.upload_id; + assert_eq!(initiate.key, "path/to/object"); + + // 2. Upload a part + let data = b"structured key data"; + let response = client + .put(server.url(&format!( + "/v1/objects:multipart:parts/test/org=1/path/to/object?upload_id={upload_id}&part_number=1" + ))) + .header("content-length", data.len().to_string()) + .body(data.to_vec()) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let part: UploadPartResponse = response.json().await?; + + // 3. Complete + complete_and_assert( + &client, + &server.url(&format!( + "/v1/objects:multipart:complete/test/org=1/path/to/object?upload_id={upload_id}" + )), + &[("1", &part.etag)], + ) + .await?; + + // 4. Verify via regular GET + let response = client + .get(server.url("/v1/objects/test/org=1/path/to/object")) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let payload = response.bytes().await?; + assert_eq!(&payload[..], b"structured key data"); + + Ok(()) +} diff --git a/objectstore-service/src/service.rs b/objectstore-service/src/service.rs index e964adcf..93c5b8c3 100644 --- a/objectstore-service/src/service.rs +++ b/objectstore-service/src/service.rs @@ -266,6 +266,13 @@ impl StorageService { } /// Uploads a single part. + /// + /// Note that this requires a `content_length`. + /// This grants us the broadest and most seamless compatibility when it comes to backends. + /// For example, MinIO rejects `UploadPart` requests without a `Content-Length` on plain PUT + /// requests. + /// This can be worked around by using AWS SigV4 chunked streaming requests, which we could use + /// if one day we'll have a usecase where the client doesn't know the part length upfront. pub async fn upload_part( &self, id: ObjectId, From 0e0b4da8a707400224aa8a64cd3da134417baefc Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 8 May 2026 15:11:51 +0200 Subject: [PATCH 11/22] add tess --- objectstore-server/tests/multipart.rs | 85 ++++++++++++++++++++++++--- 1 file changed, 76 insertions(+), 9 deletions(-) diff --git a/objectstore-server/tests/multipart.rs b/objectstore-server/tests/multipart.rs index f04340f3..8a7c48f6 100644 --- a/objectstore-server/tests/multipart.rs +++ b/objectstore-server/tests/multipart.rs @@ -36,6 +36,17 @@ struct CompleteSuccessResponse { key: String, } +#[derive(Debug, Deserialize)] +struct CompleteErrorDetail { + code: String, + message: String, +} + +#[derive(Debug, Deserialize)] +struct CompleteErrorResponse { + error: CompleteErrorDetail, +} + async fn test_server() -> TestServer { TestServer::with_config(Config { auth: AuthZ { @@ -47,11 +58,11 @@ async fn test_server() -> TestServer { .await } -/// Sends a complete request and asserts success. Returns the key from the response. +/// Sends a complete request and returns the trimmed response body. /// /// The complete endpoint uses a streaming response with keepalive whitespace, so we -/// trim the response body before parsing. -async fn complete_and_assert( +/// trim the response body before returning. +async fn send_complete( client: &reqwest::Client, url: &str, parts: &[(&str, &str)], @@ -73,13 +84,18 @@ async fn complete_and_assert( assert_eq!(response.status(), reqwest::StatusCode::OK); let text = response.text().await?; - let trimmed = text.trim(); - assert!( - !trimmed.contains("\"error\""), - "complete returned error: {trimmed}" - ); - let parsed: CompleteSuccessResponse = serde_json::from_str(trimmed)?; + Ok(text.trim().to_string()) +} +/// Sends a complete request and asserts success. Returns the key from the response. +async fn complete_and_assert( + client: &reqwest::Client, + url: &str, + parts: &[(&str, &str)], +) -> Result { + let body = send_complete(client, url, parts).await?; + let parsed: CompleteSuccessResponse = serde_json::from_str(&body) + .map_err(|e| anyhow::anyhow!("expected success response, got {body:?}: {e}"))?; Ok(parsed.key) } @@ -512,6 +528,57 @@ async fn test_missing_upload_id_on_abort() -> Result<()> { Ok(()) } +// --- Complete with invalid etag --- + +#[tokio::test] +async fn test_complete_invalid_etag() -> Result<()> { + let server = test_server().await; + let client = reqwest::Client::new(); + + // 1. Initiate + let response = client + .put(server.url("/v1/objects:multipart/test/org=1/bad-etag-key")) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let initiate: InitiateResponse = response.json().await?; + let upload_id = &initiate.upload_id; + + // 2. Upload a part + let response = client + .put(server.url(&format!( + "/v1/objects:multipart:parts/test/org=1/bad-etag-key?upload_id={upload_id}&part_number=1" + ))) + .header("content-length", "4") + .body("data") + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + + // 3. Complete with a wrong etag + let body = send_complete( + &client, + &server.url(&format!( + "/v1/objects:multipart:complete/test/org=1/bad-etag-key?upload_id={upload_id}" + )), + &[("1", "wrong-etag")], + ) + .await?; + + let err: CompleteErrorResponse = serde_json::from_str(&body)?; + assert!(!err.error.code.is_empty()); + assert!(!err.error.message.is_empty()); + + // 4. Object should not exist + let response = client + .get(server.url("/v1/objects/test/org=1/bad-etag-key")) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::NOT_FOUND); + + Ok(()) +} + // --- Structured key with slashes --- #[tokio::test] From ddb051ef0cf275b46a52c40493da868267646467 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 8 May 2026 15:29:09 +0200 Subject: [PATCH 12/22] fmt and clippy --- objectstore-server/src/endpoints/multipart.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/objectstore-server/src/endpoints/multipart.rs b/objectstore-server/src/endpoints/multipart.rs index b13a920f..c0a0ab65 100644 --- a/objectstore-server/src/endpoints/multipart.rs +++ b/objectstore-server/src/endpoints/multipart.rs @@ -316,7 +316,7 @@ async fn complete( } } }; - let stream = stream.map(|s| Ok::<_, Infallible>(s)); + let stream = stream.map(Ok::<_, Infallible>); let mut headers = HeaderMap::new(); headers.insert( From 2f36536fb6b70fb78cca1168400da6d1ece35bba Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 8 May 2026 15:31:46 +0200 Subject: [PATCH 13/22] fix --- objectstore-server/src/endpoints/multipart.rs | 4 ++-- objectstore-server/tests/multipart.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/objectstore-server/src/endpoints/multipart.rs b/objectstore-server/src/endpoints/multipart.rs index c0a0ab65..726c7ba2 100644 --- a/objectstore-server/src/endpoints/multipart.rs +++ b/objectstore-server/src/endpoints/multipart.rs @@ -65,7 +65,7 @@ struct UploadIdQuery { struct ListPartsQuery { upload_id: String, max_parts: Option, - next_part_number_marker: Option, + part_number_marker: Option, } // --- Request/Response types --- @@ -214,7 +214,7 @@ async fn list_parts( id, params.upload_id, params.max_parts, - params.next_part_number_marker, + params.part_number_marker, ) .await?; diff --git a/objectstore-server/tests/multipart.rs b/objectstore-server/tests/multipart.rs index 8a7c48f6..93e8733c 100644 --- a/objectstore-server/tests/multipart.rs +++ b/objectstore-server/tests/multipart.rs @@ -368,7 +368,7 @@ async fn test_list_parts_pagination() -> Result<()> { let marker = page1.next_part_number_marker.unwrap(); let response = client .get(server.url(&format!( - "/v1/objects:multipart:parts/test/org=1/paginated-key?upload_id={upload_id}&max_parts=2&next_part_number_marker={marker}" + "/v1/objects:multipart:parts/test/org=1/paginated-key?upload_id={upload_id}&max_parts=2&part_number_marker={marker}" ))) .send() .await?; From d639e4db520fb3d4338cd2583c07ad0bce8bcc8e Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 8 May 2026 15:55:50 +0200 Subject: [PATCH 14/22] improve --- objectstore-server/src/endpoints/multipart.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/objectstore-server/src/endpoints/multipart.rs b/objectstore-server/src/endpoints/multipart.rs index 726c7ba2..e784bab8 100644 --- a/objectstore-server/src/endpoints/multipart.rs +++ b/objectstore-server/src/endpoints/multipart.rs @@ -276,6 +276,7 @@ async fn complete( tokio::pin!(fut); let mut keepalive = tokio::time::interval(Duration::from_secs(1)); + keepalive.tick().await; loop { tokio::select! { res = &mut fut => { From 8a1890670dac5c8d1cbe86711adda5d3fbe81517 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 8 May 2026 16:03:19 +0200 Subject: [PATCH 15/22] improve --- objectstore-server/src/endpoints/multipart.rs | 13 ++++- objectstore-server/tests/multipart.rs | 54 +++++++++++++++++++ 2 files changed, 66 insertions(+), 1 deletion(-) diff --git a/objectstore-server/src/endpoints/multipart.rs b/objectstore-server/src/endpoints/multipart.rs index e784bab8..b806b8de 100644 --- a/objectstore-server/src/endpoints/multipart.rs +++ b/objectstore-server/src/endpoints/multipart.rs @@ -124,6 +124,13 @@ struct CompleteErrorResponse { error: CompleteErrorDetail, } +fn validate_part_number(part_number: u32) -> ApiResult<()> { + if part_number == 0 { + return Err(ApiError::Client("part_number must be >= 1".into())); + } + Ok(()) +} + // --- Handlers --- async fn initiate_put( @@ -152,7 +159,7 @@ async fn initiate_inner( headers: HeaderMap, ) -> ApiResult { let mut metadata = Metadata::from_headers(&headers, "").map_err(ServiceError::from)?; - // TODO: Should we do this on finalize instead? It will require one more metadata request. + // TODO: Do this in `complete` instead, when we have a Service API to mutate metadata. metadata.time_created = Some(SystemTime::now()); state @@ -183,6 +190,7 @@ async fn upload_part( .and_then(|v| v.to_str().ok()) .and_then(|v| v.parse::().ok()) .ok_or_else(|| ApiError::Client("Content-Length header is required".into()))?; + validate_part_number(params.part_number)?; let content_md5 = headers .get("content-md5") @@ -255,6 +263,9 @@ async fn complete( Json(body): Json, ) -> ApiResult { service.check_permission(Permission::ObjectWrite, id.context())?; + for part in &body.parts { + validate_part_number(part.part_number)?; + } let key = id.key().to_string(); diff --git a/objectstore-server/tests/multipart.rs b/objectstore-server/tests/multipart.rs index 93e8733c..782c6335 100644 --- a/objectstore-server/tests/multipart.rs +++ b/objectstore-server/tests/multipart.rs @@ -491,6 +491,33 @@ async fn test_upload_part_missing_content_length() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_upload_part_zero_part_number() -> Result<()> { + let server = test_server().await; + let client = reqwest::Client::new(); + + let response = client + .put(server.url("/v1/objects:multipart/test/org=1/zero-part-upload")) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let initiate: InitiateResponse = response.json().await?; + let upload_id = &initiate.upload_id; + + let response = client + .put(server.url(&format!( + "/v1/objects:multipart:parts/test/org=1/zero-part-upload?upload_id={upload_id}&part_number=0" + ))) + .header("content-length", "4") + .body("data") + .send() + .await?; + + assert_eq!(response.status(), reqwest::StatusCode::BAD_REQUEST); + + Ok(()) +} + // --- Missing upload_id query param --- #[tokio::test] @@ -513,6 +540,33 @@ async fn test_missing_upload_id_on_complete() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_complete_zero_part_number() -> Result<()> { + let server = test_server().await; + let client = reqwest::Client::new(); + + let response = client + .put(server.url("/v1/objects:multipart/test/org=1/zero-part-complete")) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let initiate: InitiateResponse = response.json().await?; + let upload_id = &initiate.upload_id; + + let response = client + .post(server.url(&format!( + "/v1/objects:multipart:complete/test/org=1/zero-part-complete?upload_id={upload_id}" + ))) + .header("content-type", "application/json") + .body(r#"{"parts":[{"part_number":0,"etag":"fake"}]}"#) + .send() + .await?; + + assert_eq!(response.status(), reqwest::StatusCode::BAD_REQUEST); + + Ok(()) +} + #[tokio::test] async fn test_missing_upload_id_on_abort() -> Result<()> { let server = test_server().await; From 7e218f2f149bceaa77e0df5c4d552d7a374967bb Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 27 May 2026 14:00:09 +0200 Subject: [PATCH 16/22] change list_parts to also require ObjectWrite, not ObjectRead --- objectstore-server/src/auth/service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/objectstore-server/src/auth/service.rs b/objectstore-server/src/auth/service.rs index a1a2c388..b0e5d3bc 100644 --- a/objectstore-server/src/auth/service.rs +++ b/objectstore-server/src/auth/service.rs @@ -165,7 +165,7 @@ impl AuthAwareService { max_parts: Option, part_number_marker: Option, ) -> ApiResult { - self.assert_authorized(Permission::ObjectRead, id.context())?; + self.assert_authorized(Permission::ObjectWrite, id.context())?; Ok(self .service .list_parts(id, upload_id, max_parts, part_number_marker) From cffe00d2239db63f73703f2907845d964fa9095b Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 27 May 2026 14:13:04 +0200 Subject: [PATCH 17/22] notimplemented raise error --- objectstore-service/src/error.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/objectstore-service/src/error.rs b/objectstore-service/src/error.rs index 1169fa98..27344abe 100644 --- a/objectstore-service/src/error.rs +++ b/objectstore-service/src/error.rs @@ -146,7 +146,7 @@ impl Error { Self::Panic(_) => Level::ERROR, Self::Dropped => Level::ERROR, Self::UnexpectedTombstone => Level::ERROR, - Self::NotImplemented => Level::WARN, + Self::NotImplemented => Level::ERROR, Self::Generic { .. } => Level::ERROR, } } From b10424f70d9a0c7ce254c653903a8057cba34a95 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 27 May 2026 14:52:18 +0200 Subject: [PATCH 18/22] improve as_multipart_upload_backend using cast inside Arc --- objectstore-service/src/backend/common.rs | 14 +++---- objectstore-service/src/backend/gcs.rs | 5 ++- objectstore-service/src/backend/in_memory.rs | 4 +- objectstore-service/src/backend/local_fs.rs | 5 ++- objectstore-service/src/backend/testing.rs | 5 ++- objectstore-service/src/backend/tiered.rs | 4 +- objectstore-service/src/service.rs | 44 ++++---------------- 7 files changed, 28 insertions(+), 53 deletions(-) diff --git a/objectstore-service/src/backend/common.rs b/objectstore-service/src/backend/common.rs index 700699bb..8efad5da 100644 --- a/objectstore-service/src/backend/common.rs +++ b/objectstore-service/src/backend/common.rs @@ -1,12 +1,13 @@ //! Shared trait definition and types for all backends. use std::fmt; +use std::sync::Arc; use objectstore_types::metadata::{ExpirationPolicy, Metadata}; use bytes::Bytes; -use crate::error::Result; +use crate::error::{Error, Result}; use crate::id::ObjectId; use crate::multipart::{ AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse, @@ -63,13 +64,12 @@ pub trait Backend: fmt::Debug + Send + Sync + 'static { /// to wait for those tasks to complete. async fn join(&self) {} - /// Returns a reference to the [`MultipartUploadBackend`] implementation, - /// if this backend supports multipart uploads. + /// Casts this backend into an [`Arc`] if supported. /// - /// The default returns `None`. Backends that implement - /// [`MultipartUploadBackend`] should override this to return `Some(self)`. - fn as_multipart_upload_backend(&self) -> Option<&dyn MultipartUploadBackend> { - None + /// The default returns [`Error::NotImplemented`]. Backends that implement + /// [`MultipartUploadBackend`] should override this to return `Ok(self)`. + fn as_multipart_upload_backend(self: Arc) -> Result> { + Err(Error::NotImplemented) } } diff --git a/objectstore-service/src/backend/gcs.rs b/objectstore-service/src/backend/gcs.rs index c89967d6..1e69bd2b 100644 --- a/objectstore-service/src/backend/gcs.rs +++ b/objectstore-service/src/backend/gcs.rs @@ -3,6 +3,7 @@ use std::borrow::Cow; use std::collections::BTreeMap; use std::future::Future; +use std::sync::Arc; use std::time::{Duration, SystemTime}; use std::{fmt, io}; @@ -599,8 +600,8 @@ impl Backend for GcsBackend { "gcs" } - fn as_multipart_upload_backend(&self) -> Option<&dyn MultipartUploadBackend> { - Some(self) + fn as_multipart_upload_backend(self: Arc) -> Result> { + Ok(self) } #[tracing::instrument(level = "trace", fields(?id), skip_all)] diff --git a/objectstore-service/src/backend/in_memory.rs b/objectstore-service/src/backend/in_memory.rs index 07fda998..1ca92604 100644 --- a/objectstore-service/src/backend/in_memory.rs +++ b/objectstore-service/src/backend/in_memory.rs @@ -104,8 +104,8 @@ impl super::common::Backend for InMemoryBackend { self.name } - fn as_multipart_upload_backend(&self) -> Option<&dyn MultipartUploadBackend> { - Some(self) + fn as_multipart_upload_backend(self: Arc) -> Result> { + Ok(self) } async fn put_object( diff --git a/objectstore-service/src/backend/local_fs.rs b/objectstore-service/src/backend/local_fs.rs index 3a0d2102..18d0c1b5 100644 --- a/objectstore-service/src/backend/local_fs.rs +++ b/objectstore-service/src/backend/local_fs.rs @@ -3,6 +3,7 @@ use std::io::ErrorKind; use std::path::PathBuf; use std::pin::pin; +use std::sync::Arc; use std::time::SystemTime; use futures_util::StreamExt; @@ -71,8 +72,8 @@ impl Backend for LocalFsBackend { "local-fs" } - fn as_multipart_upload_backend(&self) -> Option<&dyn MultipartUploadBackend> { - Some(self) + fn as_multipart_upload_backend(self: Arc) -> Result> { + Ok(self) } #[tracing::instrument(level = "trace", fields(?id), skip_all)] diff --git a/objectstore-service/src/backend/testing.rs b/objectstore-service/src/backend/testing.rs index 55330663..ac709fec 100644 --- a/objectstore-service/src/backend/testing.rs +++ b/objectstore-service/src/backend/testing.rs @@ -35,6 +35,7 @@ //! ``` use std::fmt; +use std::sync::Arc; use bytes::Bytes; use objectstore_types::metadata::Metadata; @@ -269,8 +270,8 @@ impl Backend for TestBackend { self.hooks.name() } - fn as_multipart_upload_backend(&self) -> Option<&dyn MultipartUploadBackend> { - Some(self) + fn as_multipart_upload_backend(self: Arc) -> Result> { + Ok(self) } async fn put_object( diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index d3e8c8de..77138cb4 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -365,8 +365,8 @@ impl Backend for TieredStorage { "tiered" } - fn as_multipart_upload_backend(&self) -> Option<&dyn MultipartUploadBackend> { - Some(self) + fn as_multipart_upload_backend(self: Arc) -> Result> { + Ok(self) } async fn put_object( diff --git a/objectstore-service/src/service.rs b/objectstore-service/src/service.rs index 93c5b8c3..8505da44 100644 --- a/objectstore-service/src/service.rs +++ b/objectstore-service/src/service.rs @@ -240,27 +240,15 @@ impl StorageService { // --- Multipart upload operations --- - fn ensure_inner_multipart(&self) -> Result<()> { - self.inner - .as_multipart_upload_backend() - .ok_or(Error::NotImplemented)?; - Ok(()) - } - /// Initiates a new multipart upload. pub async fn initiate_multipart( &self, id: ObjectId, metadata: Metadata, ) -> Result { - self.ensure_inner_multipart()?; - let inner = Arc::clone(&self.inner); + let inner = self.inner.clone().as_multipart_upload_backend()?; self.spawn("initiate_multipart", async move { - inner - .as_multipart_upload_backend() - .unwrap() - .initiate_multipart(&id, &metadata) - .await + inner.initiate_multipart(&id, &metadata).await }) .await } @@ -282,12 +270,9 @@ impl StorageService { content_md5: Option, body: ClientStream, ) -> Result { - self.ensure_inner_multipart()?; - let inner = Arc::clone(&self.inner); + let inner = self.inner.clone().as_multipart_upload_backend()?; self.spawn("upload_part", async move { inner - .as_multipart_upload_backend() - .unwrap() .upload_part( &id, &upload_id, @@ -309,12 +294,9 @@ impl StorageService { max_parts: Option, part_number_marker: Option, ) -> Result { - self.ensure_inner_multipart()?; - let inner = Arc::clone(&self.inner); + let inner = self.inner.clone().as_multipart_upload_backend()?; self.spawn("list_parts", async move { inner - .as_multipart_upload_backend() - .unwrap() .list_parts(&id, &upload_id, max_parts, part_number_marker) .await }) @@ -327,14 +309,9 @@ impl StorageService { id: ObjectId, upload_id: UploadId, ) -> Result { - self.ensure_inner_multipart()?; - let inner = Arc::clone(&self.inner); + let inner = self.inner.clone().as_multipart_upload_backend()?; self.spawn("abort_multipart", async move { - inner - .as_multipart_upload_backend() - .unwrap() - .abort_multipart(&id, &upload_id) - .await + inner.abort_multipart(&id, &upload_id).await }) .await } @@ -346,14 +323,9 @@ impl StorageService { upload_id: UploadId, parts: Vec, ) -> Result { - self.ensure_inner_multipart()?; - let inner = Arc::clone(&self.inner); + let inner = self.inner.clone().as_multipart_upload_backend()?; self.spawn("complete_multipart", async move { - inner - .as_multipart_upload_backend() - .unwrap() - .complete_multipart(&id, &upload_id, parts) - .await + inner.complete_multipart(&id, &upload_id, parts).await }) .await } From 1cf3a78a479eb95f59aa59a67723cee082b69944 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 27 May 2026 14:56:09 +0200 Subject: [PATCH 19/22] remove redundant auth check --- objectstore-server/src/endpoints/multipart.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/objectstore-server/src/endpoints/multipart.rs b/objectstore-server/src/endpoints/multipart.rs index b806b8de..656be64e 100644 --- a/objectstore-server/src/endpoints/multipart.rs +++ b/objectstore-server/src/endpoints/multipart.rs @@ -262,7 +262,6 @@ async fn complete( Query(params): Query, Json(body): Json, ) -> ApiResult { - service.check_permission(Permission::ObjectWrite, id.context())?; for part in &body.parts { validate_part_number(part.part_number)?; } From a2496b30f3675eafb5b1a33f1af7750b118db1ed Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 27 May 2026 15:24:42 +0200 Subject: [PATCH 20/22] PartNumber = NonZeroU32 --- objectstore-server/src/endpoints/multipart.rs | 26 ++------ objectstore-server/tests/multipart.rs | 3 +- objectstore-service/src/backend/gcs.rs | 65 ++++++++++++------- objectstore-service/src/backend/in_memory.rs | 57 ++++++++++------ objectstore-service/src/backend/local_fs.rs | 57 ++++++++++------ objectstore-service/src/backend/tiered.rs | 48 +++++++------- objectstore-service/src/multipart.rs | 2 +- 7 files changed, 145 insertions(+), 113 deletions(-) diff --git a/objectstore-server/src/endpoints/multipart.rs b/objectstore-server/src/endpoints/multipart.rs index 656be64e..4c72c57e 100644 --- a/objectstore-server/src/endpoints/multipart.rs +++ b/objectstore-server/src/endpoints/multipart.rs @@ -14,12 +14,10 @@ use http::HeaderValue; use http::header; use objectstore_service::error::Error as ServiceError; use objectstore_service::id::{ObjectContext, ObjectId}; -use objectstore_service::multipart::CompletedPart; +use objectstore_service::multipart::{CompletedPart, PartNumber}; use objectstore_types::metadata::Metadata; use serde::{Deserialize, Serialize}; -use objectstore_types::auth::Permission; - use crate::auth::AuthAwareService; use crate::endpoints::common::{ApiError, ApiResult}; use crate::extractors::Xt; @@ -53,7 +51,7 @@ pub fn router() -> Router { #[derive(Debug, Deserialize)] struct UploadPartQuery { upload_id: String, - part_number: u32, + part_number: PartNumber, } #[derive(Debug, Deserialize)] @@ -65,7 +63,7 @@ struct UploadIdQuery { struct ListPartsQuery { upload_id: String, max_parts: Option, - part_number_marker: Option, + part_number_marker: Option, } // --- Request/Response types --- @@ -91,15 +89,15 @@ struct PartInfo { #[derive(Debug, Serialize)] struct ListPartsResponse { - parts: BTreeMap, + parts: BTreeMap, is_truncated: bool, #[serde(skip_serializing_if = "Option::is_none")] - next_part_number_marker: Option, + next_part_number_marker: Option, } #[derive(Debug, Deserialize)] struct CompletePartRequest { - part_number: u32, + part_number: PartNumber, etag: String, } @@ -124,13 +122,6 @@ struct CompleteErrorResponse { error: CompleteErrorDetail, } -fn validate_part_number(part_number: u32) -> ApiResult<()> { - if part_number == 0 { - return Err(ApiError::Client("part_number must be >= 1".into())); - } - Ok(()) -} - // --- Handlers --- async fn initiate_put( @@ -190,7 +181,6 @@ async fn upload_part( .and_then(|v| v.to_str().ok()) .and_then(|v| v.parse::().ok()) .ok_or_else(|| ApiError::Client("Content-Length header is required".into()))?; - validate_part_number(params.part_number)?; let content_md5 = headers .get("content-md5") @@ -262,10 +252,6 @@ async fn complete( Query(params): Query, Json(body): Json, ) -> ApiResult { - for part in &body.parts { - validate_part_number(part.part_number)?; - } - let key = id.key().to_string(); let parts: Vec = body diff --git a/objectstore-server/tests/multipart.rs b/objectstore-server/tests/multipart.rs index 782c6335..eaa70b24 100644 --- a/objectstore-server/tests/multipart.rs +++ b/objectstore-server/tests/multipart.rs @@ -562,8 +562,7 @@ async fn test_complete_zero_part_number() -> Result<()> { .send() .await?; - assert_eq!(response.status(), reqwest::StatusCode::BAD_REQUEST); - + assert_eq!(response.status(), reqwest::StatusCode::UNPROCESSABLE_ENTITY); Ok(()) } diff --git a/objectstore-service/src/backend/gcs.rs b/objectstore-service/src/backend/gcs.rs index 1e69bd2b..15e0f6c4 100644 --- a/objectstore-service/src/backend/gcs.rs +++ b/objectstore-service/src/backend/gcs.rs @@ -740,7 +740,7 @@ impl From for InitiateMultipartResponse { struct XmlListPartsResponse { #[serde(default)] is_truncated: bool, - next_part_number_marker: Option, + next_part_number_marker: Option, #[serde(default, rename = "Part")] parts: Vec, } @@ -758,7 +758,7 @@ impl From for ListPartsResponse { #[derive(Debug, Deserialize)] #[serde(rename_all = "PascalCase")] struct XmlPart { - part_number: u32, + part_number: PartNumber, #[serde(rename = "ETag")] e_tag: String, #[serde(with = "humantime_serde")] @@ -1019,6 +1019,7 @@ impl MultipartUploadBackend for GcsBackend { #[cfg(test)] mod tests { use std::collections::BTreeMap; + use std::num::NonZeroU32; use anyhow::Result; use objectstore_types::scope::{Scope, Scopes}; @@ -1362,7 +1363,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), data.len() as u64, None, stream::single(data.to_vec()), @@ -1374,7 +1375,7 @@ mod tests { &id, &upload_id, vec![CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag, }], ) @@ -1416,7 +1417,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), part1.len() as u64, None, stream::single(part1.clone()), @@ -1426,7 +1427,7 @@ mod tests { .upload_part( &id, &upload_id, - 2, + NonZeroU32::new(2).unwrap(), part2.len() as u64, None, stream::single(part2.clone()), @@ -1436,7 +1437,7 @@ mod tests { .upload_part( &id, &upload_id, - 3, + NonZeroU32::new(3).unwrap(), part3.len() as u64, None, stream::single(part3.clone()), @@ -1449,15 +1450,15 @@ mod tests { &upload_id, vec![ CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag: etag1, }, CompletedPart { - part_number: 2, + part_number: NonZeroU32::new(2).unwrap(), etag: etag2, }, CompletedPart { - part_number: 3, + part_number: NonZeroU32::new(3).unwrap(), etag: etag3, }, ], @@ -1496,7 +1497,7 @@ mod tests { .upload_part( &id, &upload_id, - 2, + NonZeroU32::new(2).unwrap(), part2.len() as u64, None, stream::single(part2.clone()), @@ -1506,7 +1507,7 @@ mod tests { .upload_part( &id, &upload_id, - 3, + NonZeroU32::new(3).unwrap(), part3.len() as u64, None, stream::single(part3.clone()), @@ -1516,7 +1517,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), part1.len() as u64, None, stream::single(part1.clone()), @@ -1530,15 +1531,15 @@ mod tests { &upload_id, vec![ CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag: etag1, }, CompletedPart { - part_number: 2, + part_number: NonZeroU32::new(2).unwrap(), etag: etag2, }, CompletedPart { - part_number: 3, + part_number: NonZeroU32::new(3).unwrap(), etag: etag3, }, ], @@ -1567,26 +1568,40 @@ mod tests { let upload_id = backend.initiate_multipart(&id, &metadata).await?; let etag1 = backend - .upload_part(&id, &upload_id, 1, 3, None, stream::single(b"aaa".to_vec())) + .upload_part( + &id, + &upload_id, + NonZeroU32::new(1).unwrap(), + 3, + None, + stream::single(b"aaa".to_vec()), + ) .await?; let etag2 = backend - .upload_part(&id, &upload_id, 2, 3, None, stream::single(b"bbb".to_vec())) + .upload_part( + &id, + &upload_id, + NonZeroU32::new(2).unwrap(), + 3, + None, + stream::single(b"bbb".to_vec()), + ) .await?; // List all parts. let list = backend.list_parts(&id, &upload_id, None, None).await?; assert_eq!(list.parts.len(), 2); - assert_eq!(list.parts[0].part_number, 1); + assert_eq!(list.parts[0].part_number.get(), 1); assert_eq!(list.parts[0].etag, etag1); assert_eq!(list.parts[0].size, 3); - assert_eq!(list.parts[1].part_number, 2); + assert_eq!(list.parts[1].part_number.get(), 2); assert_eq!(list.parts[1].etag, etag2); assert_eq!(list.parts[1].size, 3); // List with max_parts=1 to test pagination. let page1 = backend.list_parts(&id, &upload_id, Some(1), None).await?; assert_eq!(page1.parts.len(), 1); - assert_eq!(page1.parts[0].part_number, 1); + assert_eq!(page1.parts[0].part_number.get(), 1); assert!(page1.is_truncated); assert!(page1.next_part_number_marker.is_some()); @@ -1594,7 +1609,7 @@ mod tests { .list_parts(&id, &upload_id, Some(1), page1.next_part_number_marker) .await?; assert_eq!(page2.parts.len(), 1); - assert_eq!(page2.parts[0].part_number, 2); + assert_eq!(page2.parts[0].part_number.get(), 2); // Clean up. backend.abort_multipart(&id, &upload_id).await?; @@ -1614,7 +1629,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), 5, None, stream::single(b"hello".to_vec()), @@ -1642,7 +1657,7 @@ mod tests { .upload_part( id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), payload.len() as u64, None, stream::single(payload), @@ -1653,7 +1668,7 @@ mod tests { id, &upload_id, vec![CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag, }], ) diff --git a/objectstore-service/src/backend/in_memory.rs b/objectstore-service/src/backend/in_memory.rs index 1ca92604..77a3374c 100644 --- a/objectstore-service/src/backend/in_memory.rs +++ b/objectstore-service/src/backend/in_memory.rs @@ -459,6 +459,7 @@ impl Entry { #[cfg(test)] mod tests { + use std::num::NonZeroU32; use std::time::Duration; use objectstore_types::metadata::ExpirationPolicy; @@ -495,7 +496,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), data.len() as u64, None, stream::single(data.to_vec()), @@ -508,7 +509,7 @@ mod tests { &id, &upload_id, vec![CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag, }], ) @@ -544,7 +545,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), part1.len() as u64, None, stream::single(part1.clone()), @@ -555,7 +556,7 @@ mod tests { .upload_part( &id, &upload_id, - 2, + NonZeroU32::new(2).unwrap(), part2.len() as u64, None, stream::single(part2.clone()), @@ -566,7 +567,7 @@ mod tests { .upload_part( &id, &upload_id, - 3, + NonZeroU32::new(3).unwrap(), part3.len() as u64, None, stream::single(part3.clone()), @@ -580,15 +581,15 @@ mod tests { &upload_id, vec![ CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag: etag1, }, CompletedPart { - part_number: 2, + part_number: NonZeroU32::new(2).unwrap(), etag: etag2, }, CompletedPart { - part_number: 3, + part_number: NonZeroU32::new(3).unwrap(), etag: etag3, }, ], @@ -611,11 +612,25 @@ mod tests { let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap(); let etag1 = backend - .upload_part(&id, &upload_id, 1, 3, None, stream::single(b"aaa".to_vec())) + .upload_part( + &id, + &upload_id, + NonZeroU32::new(1).unwrap(), + 3, + None, + stream::single(b"aaa".to_vec()), + ) .await .unwrap(); let etag2 = backend - .upload_part(&id, &upload_id, 2, 3, None, stream::single(b"bbb".to_vec())) + .upload_part( + &id, + &upload_id, + NonZeroU32::new(2).unwrap(), + 3, + None, + stream::single(b"bbb".to_vec()), + ) .await .unwrap(); @@ -624,10 +639,10 @@ mod tests { .await .unwrap(); assert_eq!(list.parts.len(), 2); - assert_eq!(list.parts[0].part_number, 1); + assert_eq!(list.parts[0].part_number.get(), 1); assert_eq!(list.parts[0].etag, etag1); assert_eq!(list.parts[0].size, 3); - assert_eq!(list.parts[1].part_number, 2); + assert_eq!(list.parts[1].part_number.get(), 2); assert_eq!(list.parts[1].etag, etag2); assert_eq!(list.parts[1].size, 3); @@ -637,7 +652,7 @@ mod tests { .await .unwrap(); assert_eq!(page1.parts.len(), 1); - assert_eq!(page1.parts[0].part_number, 1); + assert_eq!(page1.parts[0].part_number.get(), 1); assert!(page1.is_truncated); assert!(page1.next_part_number_marker.is_some()); @@ -646,7 +661,7 @@ mod tests { .await .unwrap(); assert_eq!(page2.parts.len(), 1); - assert_eq!(page2.parts[0].part_number, 2); + assert_eq!(page2.parts[0].part_number.get(), 2); backend.abort_multipart(&id, &upload_id).await.unwrap(); } @@ -663,7 +678,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), 5, None, stream::single(b"hello".to_vec()), @@ -689,7 +704,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), 5, None, stream::single(b"hello".to_vec()), @@ -702,7 +717,7 @@ mod tests { &id, &upload_id, vec![CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag: "wrong-etag".into(), }], ) @@ -717,7 +732,7 @@ mod tests { &id, &upload_id, vec![CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag, }], ) @@ -738,7 +753,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), 5, None, stream::single(b"hello".to_vec()), @@ -751,7 +766,7 @@ mod tests { &id, &upload_id, vec![CompletedPart { - part_number: 99, + part_number: NonZeroU32::new(99).unwrap(), etag: "whatever".into(), }], ) @@ -766,7 +781,7 @@ mod tests { &id, &upload_id, vec![CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag, }], ) diff --git a/objectstore-service/src/backend/local_fs.rs b/objectstore-service/src/backend/local_fs.rs index 18d0c1b5..433766d3 100644 --- a/objectstore-service/src/backend/local_fs.rs +++ b/objectstore-service/src/backend/local_fs.rs @@ -427,6 +427,7 @@ impl MultipartUploadBackend for LocalFsBackend { #[cfg(test)] mod tests { + use std::num::NonZeroU32; use std::time::{Duration, SystemTime}; use bytes::BytesMut; @@ -562,7 +563,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), data.len() as u64, None, stream::single(data.to_vec()), @@ -575,7 +576,7 @@ mod tests { &id, &upload_id, vec![crate::multipart::CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag, }], ) @@ -611,7 +612,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), part1.len() as u64, None, stream::single(part1.clone()), @@ -622,7 +623,7 @@ mod tests { .upload_part( &id, &upload_id, - 2, + NonZeroU32::new(2).unwrap(), part2.len() as u64, None, stream::single(part2.clone()), @@ -633,7 +634,7 @@ mod tests { .upload_part( &id, &upload_id, - 3, + NonZeroU32::new(3).unwrap(), part3.len() as u64, None, stream::single(part3.clone()), @@ -647,15 +648,15 @@ mod tests { &upload_id, vec![ crate::multipart::CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag: etag1, }, crate::multipart::CompletedPart { - part_number: 2, + part_number: NonZeroU32::new(2).unwrap(), etag: etag2, }, crate::multipart::CompletedPart { - part_number: 3, + part_number: NonZeroU32::new(3).unwrap(), etag: etag3, }, ], @@ -678,11 +679,25 @@ mod tests { let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap(); let etag1 = backend - .upload_part(&id, &upload_id, 1, 3, None, stream::single(b"aaa".to_vec())) + .upload_part( + &id, + &upload_id, + NonZeroU32::new(1).unwrap(), + 3, + None, + stream::single(b"aaa".to_vec()), + ) .await .unwrap(); let etag2 = backend - .upload_part(&id, &upload_id, 2, 3, None, stream::single(b"bbb".to_vec())) + .upload_part( + &id, + &upload_id, + NonZeroU32::new(2).unwrap(), + 3, + None, + stream::single(b"bbb".to_vec()), + ) .await .unwrap(); @@ -691,10 +706,10 @@ mod tests { .await .unwrap(); assert_eq!(list.parts.len(), 2); - assert_eq!(list.parts[0].part_number, 1); + assert_eq!(list.parts[0].part_number.get(), 1); assert_eq!(list.parts[0].etag, etag1); assert_eq!(list.parts[0].size, 3); - assert_eq!(list.parts[1].part_number, 2); + assert_eq!(list.parts[1].part_number.get(), 2); assert_eq!(list.parts[1].etag, etag2); assert_eq!(list.parts[1].size, 3); @@ -704,7 +719,7 @@ mod tests { .await .unwrap(); assert_eq!(page1.parts.len(), 1); - assert_eq!(page1.parts[0].part_number, 1); + assert_eq!(page1.parts[0].part_number.get(), 1); assert!(page1.is_truncated); assert!(page1.next_part_number_marker.is_some()); @@ -713,7 +728,7 @@ mod tests { .await .unwrap(); assert_eq!(page2.parts.len(), 1); - assert_eq!(page2.parts[0].part_number, 2); + assert_eq!(page2.parts[0].part_number.get(), 2); backend.abort_multipart(&id, &upload_id).await.unwrap(); } @@ -730,7 +745,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), 5, None, stream::single(b"hello".to_vec()), @@ -756,7 +771,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), 5, None, stream::single(b"hello".to_vec()), @@ -769,7 +784,7 @@ mod tests { &id, &upload_id, vec![crate::multipart::CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag: "wrong-etag".into(), }], ) @@ -784,7 +799,7 @@ mod tests { &id, &upload_id, vec![crate::multipart::CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag, }], ) @@ -805,7 +820,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), 5, None, stream::single(b"hello".to_vec()), @@ -818,7 +833,7 @@ mod tests { &id, &upload_id, vec![crate::multipart::CompletedPart { - part_number: 99, + part_number: NonZeroU32::new(99).unwrap(), etag: "whatever".into(), }], ) @@ -833,7 +848,7 @@ mod tests { &id, &upload_id, vec![crate::multipart::CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag, }], ) diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index 77138cb4..624ca955 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -794,6 +794,8 @@ impl MultipartUploadBackend for TieredStorage { #[cfg(test)] mod tests { + use std::num::NonZeroU32; + use futures::lock::Mutex; use objectstore_types::metadata::{ExpirationPolicy, Metadata}; use objectstore_types::scope::{Scope, Scopes}; @@ -1571,7 +1573,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), payload.len() as u64, None, stream::single(payload.clone()), @@ -1584,7 +1586,7 @@ mod tests { &id, &upload_id, vec![CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag, }], ) @@ -1628,7 +1630,7 @@ mod tests { .upload_part( &id, &upload_id, - 3, + NonZeroU32::new(3).unwrap(), part3.len() as u64, None, stream::single(part3.clone()), @@ -1639,7 +1641,7 @@ mod tests { .upload_part( &id, &upload_id, - 2, + NonZeroU32::new(2).unwrap(), part2.len() as u64, None, stream::single(part2.clone()), @@ -1650,7 +1652,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), part1.len() as u64, None, stream::single(part1.clone()), @@ -1664,15 +1666,15 @@ mod tests { &upload_id, vec![ CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag: etag1, }, CompletedPart { - part_number: 2, + part_number: NonZeroU32::new(2).unwrap(), etag: etag2, }, CompletedPart { - part_number: 3, + part_number: NonZeroU32::new(3).unwrap(), etag: etag3, }, ], @@ -1707,7 +1709,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), payload.len() as u64, None, stream::single(payload), @@ -1740,7 +1742,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), part1.len() as u64, None, stream::single(part1), @@ -1751,7 +1753,7 @@ mod tests { .upload_part( &id, &upload_id, - 2, + NonZeroU32::new(2).unwrap(), part2.len() as u64, None, stream::single(part2), @@ -1764,9 +1766,9 @@ mod tests { .await .unwrap(); assert_eq!(resp.parts.len(), 2); - assert_eq!(resp.parts[0].part_number, 1); + assert_eq!(resp.parts[0].part_number.get(), 1); assert_eq!(resp.parts[0].size, 100); - assert_eq!(resp.parts[1].part_number, 2); + assert_eq!(resp.parts[1].part_number.get(), 2); assert_eq!(resp.parts[1].size, 200); } @@ -1794,7 +1796,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), payload2.len() as u64, None, stream::single(payload2.clone()), @@ -1812,7 +1814,7 @@ mod tests { &id, &upload_id, vec![CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag, }], ) @@ -1910,7 +1912,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), payload.len() as u64, None, stream::single(payload), @@ -1923,7 +1925,7 @@ mod tests { &id, &upload_id, vec![CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag, }], ) @@ -2025,7 +2027,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), payload.len() as u64, None, stream::single(payload.clone()), @@ -2039,7 +2041,7 @@ mod tests { &id, &upload_id, vec![CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag: etag.clone(), }], ) @@ -2053,7 +2055,7 @@ mod tests { &id, &upload_id, vec![CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag, }], ) @@ -2161,7 +2163,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), payload.len() as u64, None, stream::single(payload.clone()), @@ -2176,7 +2178,7 @@ mod tests { &id, &upload_id, vec![CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag: etag.clone(), }], ) @@ -2190,7 +2192,7 @@ mod tests { &id, &upload_id, vec![CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag, }], ) diff --git a/objectstore-service/src/multipart.rs b/objectstore-service/src/multipart.rs index e1ce51da..c228f4d1 100644 --- a/objectstore-service/src/multipart.rs +++ b/objectstore-service/src/multipart.rs @@ -5,7 +5,7 @@ use std::time::SystemTime; /// Identifier for an in-progress multipart upload. pub type UploadId = String; /// 1-indexed position of a part within its multipart upload. -pub type PartNumber = u32; +pub type PartNumber = std::num::NonZeroU32; /// Opaque per-part identifier returned by the backend after a successful part upload. pub type ETag = String; From 53c937e245144316340e6e3b7d960107ca5cbcd3 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 29 May 2026 10:36:58 +0200 Subject: [PATCH 21/22] fix: prevent path traversal via upload_id in LocalFsBackend Validate that upload_id contains only normal path components before joining it into the multipart directory path. Rejects segments containing .., /, or other traversal patterns. --- objectstore-service/src/backend/local_fs.rs | 30 ++++++++++++++------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/objectstore-service/src/backend/local_fs.rs b/objectstore-service/src/backend/local_fs.rs index 433766d3..6c0d89b3 100644 --- a/objectstore-service/src/backend/local_fs.rs +++ b/objectstore-service/src/backend/local_fs.rs @@ -1,7 +1,7 @@ //! Local filesystem backend for development and testing. use std::io::ErrorKind; -use std::path::PathBuf; +use std::path::{Component, Path, PathBuf}; use std::pin::pin; use std::sync::Arc; use std::time::SystemTime; @@ -164,12 +164,22 @@ impl Backend for LocalFsBackend { } } +fn safe_join(base: &Path, segment: &str) -> Result { + for component in Path::new(segment).components() { + if !matches!(component, Component::Normal(_)) { + return Err(Error::generic(format!("invalid path segment: {segment}"))); + } + } + Ok(base.join(segment)) +} + impl LocalFsBackend { - fn multipart_dir(&self, id: &ObjectId, upload_id: &UploadId) -> PathBuf { - self.path + fn multipart_dir(&self, id: &ObjectId, upload_id: &UploadId) -> Result { + let base = self + .path .join("__multipart__") - .join(id.as_storage_path().to_string()) - .join(upload_id) + .join(id.as_storage_path().to_string()); + safe_join(&base, upload_id) } } @@ -181,7 +191,7 @@ impl MultipartUploadBackend for LocalFsBackend { metadata: &Metadata, ) -> Result { let upload_id = uuid::Uuid::now_v7().to_string(); - let dir = self.multipart_dir(id, &upload_id); + let dir = self.multipart_dir(id, &upload_id)?; tokio::fs::create_dir_all(&dir).await?; let meta_path = dir.join("metadata.json"); @@ -203,7 +213,7 @@ impl MultipartUploadBackend for LocalFsBackend { _content_md5: Option<&str>, body: ClientStream, ) -> Result { - let dir = self.multipart_dir(id, upload_id); + let dir = self.multipart_dir(id, upload_id)?; if !tokio::fs::try_exists(&dir).await? { return Err(Error::generic("multipart upload not found")); } @@ -259,7 +269,7 @@ impl MultipartUploadBackend for LocalFsBackend { max_parts: Option, part_number_marker: Option, ) -> Result { - let dir = self.multipart_dir(id, upload_id); + let dir = self.multipart_dir(id, upload_id)?; if !tokio::fs::try_exists(&dir).await? { return Err(Error::generic("multipart upload not found")); } @@ -324,7 +334,7 @@ impl MultipartUploadBackend for LocalFsBackend { id: &ObjectId, upload_id: &UploadId, ) -> Result { - let dir = self.multipart_dir(id, upload_id); + let dir = self.multipart_dir(id, upload_id)?; if tokio::fs::try_exists(&dir).await? { tokio::fs::remove_dir_all(dir).await?; } @@ -337,7 +347,7 @@ impl MultipartUploadBackend for LocalFsBackend { upload_id: &UploadId, parts: Vec, ) -> Result { - let dir = self.multipart_dir(id, upload_id); + let dir = self.multipart_dir(id, upload_id)?; if !tokio::fs::try_exists(&dir).await? { return Err(Error::generic("multipart upload not found")); } From 198bcf9657faed55992d38b4bd46d0b8b45089b6 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 29 May 2026 10:57:42 +0200 Subject: [PATCH 22/22] fix: make UploadId a validated newtype to prevent path traversal Replace `type UploadId = String` with a newtype struct that validates on construction: non-empty and no path-traversal components (.., /, \). This moves the path-safety guarantee from LocalFsBackend's safe_join helper to the type itself, protecting all backends. Invalid upload_id values in query params are now rejected at deserialization time (400). --- objectstore-server/src/endpoints/multipart.rs | 10 ++-- objectstore-service/src/backend/gcs.rs | 10 ++-- objectstore-service/src/backend/in_memory.rs | 2 +- objectstore-service/src/backend/local_fs.rs | 32 ++++------- objectstore-service/src/backend/tiered.rs | 6 +- objectstore-service/src/multipart.rs | 57 ++++++++++++++++++- 6 files changed, 82 insertions(+), 35 deletions(-) diff --git a/objectstore-server/src/endpoints/multipart.rs b/objectstore-server/src/endpoints/multipart.rs index 4c72c57e..a9a5f17b 100644 --- a/objectstore-server/src/endpoints/multipart.rs +++ b/objectstore-server/src/endpoints/multipart.rs @@ -14,7 +14,7 @@ use http::HeaderValue; use http::header; use objectstore_service::error::Error as ServiceError; use objectstore_service::id::{ObjectContext, ObjectId}; -use objectstore_service::multipart::{CompletedPart, PartNumber}; +use objectstore_service::multipart::{CompletedPart, PartNumber, UploadId}; use objectstore_types::metadata::Metadata; use serde::{Deserialize, Serialize}; @@ -50,18 +50,18 @@ pub fn router() -> Router { #[derive(Debug, Deserialize)] struct UploadPartQuery { - upload_id: String, + upload_id: UploadId, part_number: PartNumber, } #[derive(Debug, Deserialize)] struct UploadIdQuery { - upload_id: String, + upload_id: UploadId, } #[derive(Debug, Deserialize)] struct ListPartsQuery { - upload_id: String, + upload_id: UploadId, max_parts: Option, part_number_marker: Option, } @@ -71,7 +71,7 @@ struct ListPartsQuery { #[derive(Debug, Serialize)] struct InitiateResponse { key: String, - upload_id: String, + upload_id: UploadId, } #[derive(Debug, Serialize)] diff --git a/objectstore-service/src/backend/gcs.rs b/objectstore-service/src/backend/gcs.rs index 15e0f6c4..80174148 100644 --- a/objectstore-service/src/backend/gcs.rs +++ b/objectstore-service/src/backend/gcs.rs @@ -729,9 +729,11 @@ struct XmlInitiateMultipartUploadResponse { upload_id: String, } -impl From for InitiateMultipartResponse { - fn from(r: XmlInitiateMultipartUploadResponse) -> Self { - r.upload_id +impl TryFrom for InitiateMultipartResponse { + type Error = crate::error::Error; + + fn try_from(r: XmlInitiateMultipartUploadResponse) -> crate::error::Result { + UploadId::new(r.upload_id) } } @@ -868,7 +870,7 @@ impl MultipartUploadBackend for GcsBackend { cause: Some(Box::new(e)), })?; - Ok(xml.into()) + Ok(xml.try_into()?) } #[tracing::instrument(level = "trace", fields(?id, upload_id, part_number), skip_all)] diff --git a/objectstore-service/src/backend/in_memory.rs b/objectstore-service/src/backend/in_memory.rs index 77a3374c..9fc1499a 100644 --- a/objectstore-service/src/backend/in_memory.rs +++ b/objectstore-service/src/backend/in_memory.rs @@ -228,7 +228,7 @@ impl MultipartUploadBackend for InMemoryBackend { id: &ObjectId, metadata: &Metadata, ) -> Result { - let upload_id = uuid::Uuid::now_v7().to_string(); + let upload_id = UploadId::new(uuid::Uuid::now_v7().to_string())?; let upload = MultipartUpload { metadata: metadata.clone(), parts: BTreeMap::new(), diff --git a/objectstore-service/src/backend/local_fs.rs b/objectstore-service/src/backend/local_fs.rs index 6c0d89b3..533b616e 100644 --- a/objectstore-service/src/backend/local_fs.rs +++ b/objectstore-service/src/backend/local_fs.rs @@ -1,7 +1,7 @@ //! Local filesystem backend for development and testing. use std::io::ErrorKind; -use std::path::{Component, Path, PathBuf}; +use std::path::PathBuf; use std::pin::pin; use std::sync::Arc; use std::time::SystemTime; @@ -164,22 +164,12 @@ impl Backend for LocalFsBackend { } } -fn safe_join(base: &Path, segment: &str) -> Result { - for component in Path::new(segment).components() { - if !matches!(component, Component::Normal(_)) { - return Err(Error::generic(format!("invalid path segment: {segment}"))); - } - } - Ok(base.join(segment)) -} - impl LocalFsBackend { - fn multipart_dir(&self, id: &ObjectId, upload_id: &UploadId) -> Result { - let base = self - .path + fn multipart_dir(&self, id: &ObjectId, upload_id: &UploadId) -> PathBuf { + self.path .join("__multipart__") - .join(id.as_storage_path().to_string()); - safe_join(&base, upload_id) + .join(id.as_storage_path().to_string()) + .join(upload_id.as_str()) } } @@ -190,8 +180,8 @@ impl MultipartUploadBackend for LocalFsBackend { id: &ObjectId, metadata: &Metadata, ) -> Result { - let upload_id = uuid::Uuid::now_v7().to_string(); - let dir = self.multipart_dir(id, &upload_id)?; + let upload_id = UploadId::new(uuid::Uuid::now_v7().to_string())?; + let dir = self.multipart_dir(id, &upload_id); tokio::fs::create_dir_all(&dir).await?; let meta_path = dir.join("metadata.json"); @@ -213,7 +203,7 @@ impl MultipartUploadBackend for LocalFsBackend { _content_md5: Option<&str>, body: ClientStream, ) -> Result { - let dir = self.multipart_dir(id, upload_id)?; + let dir = self.multipart_dir(id, upload_id); if !tokio::fs::try_exists(&dir).await? { return Err(Error::generic("multipart upload not found")); } @@ -269,7 +259,7 @@ impl MultipartUploadBackend for LocalFsBackend { max_parts: Option, part_number_marker: Option, ) -> Result { - let dir = self.multipart_dir(id, upload_id)?; + let dir = self.multipart_dir(id, upload_id); if !tokio::fs::try_exists(&dir).await? { return Err(Error::generic("multipart upload not found")); } @@ -334,7 +324,7 @@ impl MultipartUploadBackend for LocalFsBackend { id: &ObjectId, upload_id: &UploadId, ) -> Result { - let dir = self.multipart_dir(id, upload_id)?; + let dir = self.multipart_dir(id, upload_id); if tokio::fs::try_exists(&dir).await? { tokio::fs::remove_dir_all(dir).await?; } @@ -347,7 +337,7 @@ impl MultipartUploadBackend for LocalFsBackend { upload_id: &UploadId, parts: Vec, ) -> Result { - let dir = self.multipart_dir(id, upload_id)?; + let dir = self.multipart_dir(id, upload_id); if !tokio::fs::try_exists(&dir).await? { return Err(Error::generic("multipart upload not found")); } diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index 624ca955..7d63dd54 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -571,7 +571,7 @@ where #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] struct TieredUploadId { revision: String, - upload_id: String, + upload_id: UploadId, } impl TryInto for TieredUploadId { @@ -580,7 +580,7 @@ impl TryInto for TieredUploadId { fn try_into(self) -> Result { let json = serde_json::to_vec(&self).map_err(|e| Error::serde("encoding multipart token", e))?; - Ok(base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(json)) + UploadId::new(base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(json)) } } @@ -1549,7 +1549,7 @@ mod tests { fn multipart_upload_id_roundtrip() { let id = TieredUploadId { revision: "my-key/01924a6f-7e28-7b9a-9c1d-abcdef123456".into(), - upload_id: "upstream-upload-id-abc".into(), + upload_id: UploadId::new("upstream-upload-id-abc".into()).unwrap(), }; let encoded: UploadId = id.clone().try_into().unwrap(); let decoded: TieredUploadId = (&encoded.clone()).try_into().unwrap(); diff --git a/objectstore-service/src/multipart.rs b/objectstore-service/src/multipart.rs index c228f4d1..1cbe465a 100644 --- a/objectstore-service/src/multipart.rs +++ b/objectstore-service/src/multipart.rs @@ -1,9 +1,64 @@ //! Shared types for Objectstore's multipart upload protocol. +use std::fmt; +use std::ops::Deref; +use std::path::{Component, Path}; use std::time::SystemTime; +use serde::{Deserialize, Deserializer, Serialize}; + +use crate::error::Error; + /// Identifier for an in-progress multipart upload. -pub type UploadId = String; +/// +/// Validated on construction: non-empty and free of path-traversal components +/// (`..`, leading `/`, etc.), so it is always safe to use as a single path segment. +#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize)] +#[serde(transparent)] +pub struct UploadId(String); + +impl UploadId { + /// Returns the upload ID as a string slice. + pub fn as_str(&self) -> &str { + &self.0 + } + + /// Creates a new `UploadId` after validating the input. + pub fn new(s: String) -> Result { + if s.is_empty() { + return Err(Error::generic("upload_id must not be empty")); + } + for component in Path::new(&s).components() { + if !matches!(component, Component::Normal(_)) { + return Err(Error::generic(format!("invalid upload_id: {s}"))); + } + } + Ok(Self(s)) + } +} + +impl Deref for UploadId { + type Target = str; + fn deref(&self) -> &str { + &self.0 + } +} + +impl fmt::Display for UploadId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} + +impl<'de> Deserialize<'de> for UploadId { + fn deserialize(deserializer: D) -> std::result::Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + Self::new(s).map_err(serde::de::Error::custom) + } +} /// 1-indexed position of a part within its multipart upload. pub type PartNumber = std::num::NonZeroU32; /// Opaque per-part identifier returned by the backend after a successful part upload.