diff --git a/objectstore-server/docs/architecture.md b/objectstore-server/docs/architecture.md index b31f6d1f..08bb7496 100644 --- a/objectstore-server/docs/architecture.md +++ b/objectstore-server/docs/architecture.md @@ -18,6 +18,23 @@ All object operations live under the `/v1/` prefix: | `DELETE` | `/v1/objects/{usecase}/{scopes}/{key}` | Delete object | | `POST` | `/v1/objects:batch/{usecase}/{scopes}/` | Batch operations (multipart) | +### Multipart Upload Endpoints + +| Method | Path | Description | +|-----------|--------------------------------------------------------------|--------------------------------------| +| `POST` | `/v1/objects:multipart:initiate/{usecase}/{scopes}/` | Initiate upload (server-generated key) | +| `PUT` | `/v1/objects:multipart:initiate/{usecase}/{scopes}/{key}` | Initiate upload (user-provided key) | +| `PUT` | `/v1/objects:multipart:parts/{usecase}/{scopes}/{key}` | Upload a part (`uploadId`, `partNumber` query params) | +| `GET` | `/v1/objects:multipart:parts/{usecase}/{scopes}/{key}` | List uploaded parts (`uploadId` query param) | +| `POST` | `/v1/objects:multipart:complete/{usecase}/{scopes}/{key}` | Complete upload (`uploadId` query param) | +| `DELETE` | `/v1/objects:multipart/{usecase}/{scopes}/{key}` | Abort upload (`uploadId` query param) | + +The complete endpoint returns `200 OK` immediately, with a streaming body that +will contain the error (if any) as JSON. Whitespace is sent in the streaming body +to keep the connection open. +Clients must parse the body to determine the actual outcome, and not rely on the +status code. + Scopes are encoded in the URL path using Matrix URI syntax: `org=123;project=456`. An underscore (`_`) represents empty scopes. diff --git a/objectstore-server/src/auth/service.rs b/objectstore-server/src/auth/service.rs index 5d51fb9f..b0e5d3bc 100644 --- a/objectstore-server/src/auth/service.rs +++ b/objectstore-server/src/auth/service.rs @@ -1,4 +1,8 @@ use objectstore_service::id::{ObjectContext, ObjectId}; +use objectstore_service::multipart::{ + AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse, + ListPartsResponse, PartNumber, UploadId, UploadPartResponse, +}; use objectstore_service::service::{DeleteResponse, GetResponse, InsertResponse, MetadataResponse}; use objectstore_service::{ClientStream, StorageService}; use objectstore_types::auth::Permission; @@ -116,4 +120,79 @@ impl AuthAwareService { self.assert_authorized(Permission::ObjectDelete, id.context())?; Ok(self.service.delete_object(id).await?) } + + // --- Multipart upload operations --- + + /// Auth-aware wrapper around [`StorageService::initiate_multipart`]. + pub async fn initiate_multipart( + &self, + id: ObjectId, + metadata: Metadata, + ) -> ApiResult { + self.assert_authorized(Permission::ObjectWrite, id.context())?; + Ok(self.service.initiate_multipart(id, metadata).await?) + } + + /// Auth-aware wrapper around [`StorageService::upload_part`]. + pub async fn upload_part( + &self, + id: ObjectId, + upload_id: UploadId, + part_number: PartNumber, + content_length: u64, + content_md5: Option, + body: ClientStream, + ) -> ApiResult { + self.assert_authorized(Permission::ObjectWrite, id.context())?; + Ok(self + .service + .upload_part( + id, + upload_id, + part_number, + content_length, + content_md5, + body, + ) + .await?) + } + + /// Auth-aware wrapper around [`StorageService::list_parts`]. + pub async fn list_parts( + &self, + id: ObjectId, + upload_id: UploadId, + max_parts: Option, + part_number_marker: Option, + ) -> ApiResult { + self.assert_authorized(Permission::ObjectWrite, id.context())?; + Ok(self + .service + .list_parts(id, upload_id, max_parts, part_number_marker) + .await?) + } + + /// Auth-aware wrapper around [`StorageService::abort_multipart`]. + pub async fn abort_multipart( + &self, + id: ObjectId, + upload_id: UploadId, + ) -> ApiResult { + self.assert_authorized(Permission::ObjectWrite, id.context())?; + Ok(self.service.abort_multipart(id, upload_id).await?) + } + + /// Auth-aware wrapper around [`StorageService::complete_multipart`]. + pub async fn complete_multipart( + &self, + id: ObjectId, + upload_id: UploadId, + parts: Vec, + ) -> ApiResult { + self.assert_authorized(Permission::ObjectWrite, id.context())?; + Ok(self + .service + .complete_multipart(id, upload_id, parts) + .await?) + } } diff --git a/objectstore-server/src/endpoints/common.rs b/objectstore-server/src/endpoints/common.rs index a9111d89..365f8e62 100644 --- a/objectstore-server/src/endpoints/common.rs +++ b/objectstore-server/src/endpoints/common.rs @@ -30,6 +30,10 @@ pub enum ApiError { /// Errors encountered when parsing or executing a batch request. #[error("batch error: {0}")] Batch(#[from] BatchError), + + /// Internal server errors. + #[error("internal error: {0}")] + Internal(String), } /// Result type for API operations. @@ -90,10 +94,16 @@ impl ApiError { ApiError::Service(ServiceError::Client(_)) => StatusCode::BAD_REQUEST, ApiError::Service(ServiceError::Metadata(_)) => StatusCode::BAD_REQUEST, ApiError::Service(ServiceError::AtCapacity) => StatusCode::TOO_MANY_REQUESTS, + ApiError::Service(ServiceError::NotImplemented) => StatusCode::NOT_IMPLEMENTED, ApiError::Service(_) => { objectstore_log::error!(!!self, "error handling request"); StatusCode::INTERNAL_SERVER_ERROR } + + ApiError::Internal(_) => { + objectstore_log::error!(!!self, "internal error"); + StatusCode::INTERNAL_SERVER_ERROR + } } } } diff --git a/objectstore-server/src/endpoints/mod.rs b/objectstore-server/src/endpoints/mod.rs index 7ab06cdd..a29f5060 100644 --- a/objectstore-server/src/endpoints/mod.rs +++ b/objectstore-server/src/endpoints/mod.rs @@ -10,6 +10,7 @@ mod batch; pub mod common; pub mod health; mod keda; +mod multipart; mod objects; /// Returns `true` for internal endpoints that are exempt from metrics and concurrency limits. @@ -24,7 +25,8 @@ pub fn is_internal_route(route: &str) -> bool { pub fn routes() -> Router { let routes_v1 = Router::new() .merge(objects::router()) - .merge(batch::router()); + .merge(batch::router()) + .merge(multipart::router()); Router::new() .merge(health::router()) diff --git a/objectstore-server/src/endpoints/multipart.rs b/objectstore-server/src/endpoints/multipart.rs new file mode 100644 index 00000000..a9a5f17b --- /dev/null +++ b/objectstore-server/src/endpoints/multipart.rs @@ -0,0 +1,324 @@ +use std::collections::BTreeMap; +use std::convert::Infallible; +use std::time::{Duration, SystemTime}; + +use axum::body::Body; +use axum::extract::{Query, State}; +use axum::http::{HeaderMap, StatusCode}; +use axum::response::{IntoResponse, Response}; +use axum::routing; +use axum::{Json, Router}; +use bytes::Bytes; +use futures::StreamExt; +use http::HeaderValue; +use http::header; +use objectstore_service::error::Error as ServiceError; +use objectstore_service::id::{ObjectContext, ObjectId}; +use objectstore_service::multipart::{CompletedPart, PartNumber, UploadId}; +use objectstore_types::metadata::Metadata; +use serde::{Deserialize, Serialize}; + +use crate::auth::AuthAwareService; +use crate::endpoints::common::{ApiError, ApiResult}; +use crate::extractors::Xt; +use crate::extractors::body::MeteredBody; +use crate::state::ServiceState; + +pub fn router() -> Router { + let initiate_no_key = routing::post(initiate_post); + Router::new() + .route( + "/objects:multipart/{usecase}/{scopes}", + initiate_no_key.clone(), + ) + .route("/objects:multipart/{usecase}/{scopes}/", initiate_no_key) + .route( + "/objects:multipart/{usecase}/{scopes}/{*key}", + routing::put(initiate_put).delete(abort), + ) + .route( + "/objects:multipart:parts/{usecase}/{scopes}/{*key}", + routing::get(list_parts).put(upload_part), + ) + .route( + "/objects:multipart:complete/{usecase}/{scopes}/{*key}", + routing::post(complete), + ) +} + +// --- Query parameter types --- + +#[derive(Debug, Deserialize)] +struct UploadPartQuery { + upload_id: UploadId, + part_number: PartNumber, +} + +#[derive(Debug, Deserialize)] +struct UploadIdQuery { + upload_id: UploadId, +} + +#[derive(Debug, Deserialize)] +struct ListPartsQuery { + upload_id: UploadId, + max_parts: Option, + part_number_marker: Option, +} + +// --- Request/Response types --- + +#[derive(Debug, Serialize)] +struct InitiateResponse { + key: String, + upload_id: UploadId, +} + +#[derive(Debug, Serialize)] +struct UploadPartResponse { + etag: String, +} + +#[derive(Debug, Serialize)] +struct PartInfo { + etag: String, + #[serde(with = "humantime_serde")] + last_modified: SystemTime, + size: u64, +} + +#[derive(Debug, Serialize)] +struct ListPartsResponse { + parts: BTreeMap, + is_truncated: bool, + #[serde(skip_serializing_if = "Option::is_none")] + next_part_number_marker: Option, +} + +#[derive(Debug, Deserialize)] +struct CompletePartRequest { + part_number: PartNumber, + etag: String, +} + +#[derive(Debug, Deserialize)] +struct CompleteRequest { + parts: Vec, +} + +#[derive(Debug, Serialize)] +struct CompleteSuccessResponse { + key: String, +} + +#[derive(Debug, Serialize)] +struct CompleteErrorDetail { + code: String, + message: String, +} + +#[derive(Debug, Serialize)] +struct CompleteErrorResponse { + error: CompleteErrorDetail, +} + +// --- Handlers --- + +async fn initiate_put( + service: AuthAwareService, + state: State, + Xt(id): Xt, + headers: HeaderMap, +) -> ApiResult { + initiate_inner(service, state, id, headers).await +} + +async fn initiate_post( + service: AuthAwareService, + state: State, + Xt(context): Xt, + headers: HeaderMap, +) -> ApiResult { + let id = ObjectId::optional(context, None); + initiate_inner(service, state, id, headers).await +} + +async fn initiate_inner( + service: AuthAwareService, + State(state): State, + id: ObjectId, + headers: HeaderMap, +) -> ApiResult { + let mut metadata = Metadata::from_headers(&headers, "").map_err(ServiceError::from)?; + // TODO: Do this in `complete` instead, when we have a Service API to mutate metadata. + metadata.time_created = Some(SystemTime::now()); + + state + .config + .usecases + .validate(&id.context().usecase, &metadata) + .map_err(|e| ApiError::Client(e.to_string()))?; + + let upload_id = service.initiate_multipart(id.clone(), metadata).await?; + + let response = Json(InitiateResponse { + key: id.key().to_owned(), + upload_id, + }); + Ok((StatusCode::OK, response).into_response()) +} + +async fn upload_part( + service: AuthAwareService, + State(_state): State, + Xt(id): Xt, + Query(params): Query, + headers: HeaderMap, + MeteredBody(body): MeteredBody, +) -> ApiResult { + let content_length = headers + .get(axum::http::header::CONTENT_LENGTH) + .and_then(|v| v.to_str().ok()) + .and_then(|v| v.parse::().ok()) + .ok_or_else(|| ApiError::Client("Content-Length header is required".into()))?; + + let content_md5 = headers + .get("content-md5") + .and_then(|v| v.to_str().ok()) + .map(String::from); + + let etag = service + .upload_part( + id, + params.upload_id, + params.part_number, + content_length, + content_md5, + body, + ) + .await?; + + let response = Json(UploadPartResponse { etag }); + Ok((StatusCode::OK, response).into_response()) +} + +async fn list_parts( + service: AuthAwareService, + Xt(id): Xt, + Query(params): Query, +) -> ApiResult { + let response = service + .list_parts( + id, + params.upload_id, + params.max_parts, + params.part_number_marker, + ) + .await?; + + let parts = response + .parts + .into_iter() + .map(|p| { + let info = PartInfo { + etag: p.etag, + last_modified: p.last_modified, + size: p.size, + }; + (p.part_number, info) + }) + .collect(); + + let response = Json(ListPartsResponse { + parts, + is_truncated: response.is_truncated, + next_part_number_marker: response.next_part_number_marker, + }); + Ok((StatusCode::OK, response).into_response()) +} + +async fn abort( + service: AuthAwareService, + Xt(id): Xt, + Query(params): Query, +) -> ApiResult { + service.abort_multipart(id, params.upload_id).await?; + Ok(StatusCode::NO_CONTENT.into_response()) +} + +async fn complete( + service: AuthAwareService, + Xt(id): Xt, + Query(params): Query, + Json(body): Json, +) -> ApiResult { + let key = id.key().to_string(); + + let parts: Vec = body + .parts + .into_iter() + .map(|p| CompletedPart { + part_number: p.part_number, + etag: p.etag, + }) + .collect(); + + let upload_id = params.upload_id; + + // This operation can take a while at the service level, so we stream whitespace to the client + // until we have a response body, to keep the connection from being terminated. + let stream = async_stream::stream! { + let fut = service.complete_multipart(id, upload_id, parts); + tokio::pin!(fut); + + let mut keepalive = tokio::time::interval(Duration::from_secs(1)); + keepalive.tick().await; + loop { + tokio::select! { + res = &mut fut => { + let serialized = match res { + Ok(None) => serde_json::to_vec( + &CompleteSuccessResponse { key }, + ), + Ok(Some(err)) => serde_json::to_vec( + &CompleteErrorResponse { + error: CompleteErrorDetail { + code: err.code, + message: err.message, + }, + }, + ), + // TODO(lcian): Construct more precise error code and message, given that + // we have a structured `ApiError`. + Err(e) => serde_json::to_vec( + &CompleteErrorResponse { + error: CompleteErrorDetail { + code: "internal error".into(), + message: e.to_string(), + }, + }, + ), + }; + // Fallback to avoid `unwrap()`. This should never happen in practice. + let serialized = serialized.unwrap_or_else(|_| { + br#"{"error":{"code":"internal error","message":"unexpected error, please report a bug"}}"#.to_vec() + }); + + yield Bytes::from(serialized); + break; + } + _ = keepalive.tick() => { + yield Bytes::from_static(b" "); + } + } + } + }; + let stream = stream.map(Ok::<_, Infallible>); + + let mut headers = HeaderMap::new(); + headers.insert( + header::CONTENT_TYPE, + HeaderValue::from_static("application/json"), + ); + Ok((StatusCode::OK, headers, Body::from_stream(stream)).into_response()) +} diff --git a/objectstore-server/tests/multipart.rs b/objectstore-server/tests/multipart.rs new file mode 100644 index 00000000..eaa70b24 --- /dev/null +++ b/objectstore-server/tests/multipart.rs @@ -0,0 +1,685 @@ +//! Integration tests for the multipart upload endpoints. + +use anyhow::Result; +use objectstore_server::config::{AuthZ, Config}; +use objectstore_test::server::TestServer; +use serde::Deserialize; + +#[derive(Debug, Deserialize)] +struct InitiateResponse { + key: String, + upload_id: String, +} + +#[derive(Debug, Deserialize)] +struct UploadPartResponse { + etag: String, +} + +#[derive(Debug, Deserialize)] +#[allow(dead_code)] +struct PartInfo { + etag: String, + last_modified: String, + size: u64, +} + +#[derive(Debug, Deserialize)] +struct ListPartsResponse { + parts: std::collections::BTreeMap, + is_truncated: bool, + next_part_number_marker: Option, +} + +#[derive(Debug, Deserialize)] +struct CompleteSuccessResponse { + key: String, +} + +#[derive(Debug, Deserialize)] +struct CompleteErrorDetail { + code: String, + message: String, +} + +#[derive(Debug, Deserialize)] +struct CompleteErrorResponse { + error: CompleteErrorDetail, +} + +async fn test_server() -> TestServer { + TestServer::with_config(Config { + auth: AuthZ { + enforce: false, + ..Default::default() + }, + ..Default::default() + }) + .await +} + +/// Sends a complete request and returns the trimmed response body. +/// +/// The complete endpoint uses a streaming response with keepalive whitespace, so we +/// trim the response body before returning. +async fn send_complete( + client: &reqwest::Client, + url: &str, + parts: &[(&str, &str)], +) -> Result { + let parts_json: Vec<_> = parts + .iter() + .map(|(pn, etag)| { + serde_json::json!({"part_number": pn.parse::().unwrap(), "etag": etag}) + }) + .collect(); + let body = serde_json::json!({ "parts": parts_json }); + + let response = client + .post(url) + .header("content-type", "application/json") + .body(body.to_string()) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + + let text = response.text().await?; + Ok(text.trim().to_string()) +} + +/// Sends a complete request and asserts success. Returns the key from the response. +async fn complete_and_assert( + client: &reqwest::Client, + url: &str, + parts: &[(&str, &str)], +) -> Result { + let body = send_complete(client, url, parts).await?; + let parsed: CompleteSuccessResponse = serde_json::from_str(&body) + .map_err(|e| anyhow::anyhow!("expected success response, got {body:?}: {e}"))?; + Ok(parsed.key) +} + +// --- Initiate --- + +#[tokio::test] +async fn test_initiate_put_with_key() -> Result<()> { + let server = test_server().await; + let client = reqwest::Client::new(); + + let response = client + .put(server.url("/v1/objects:multipart/test/org=1/my-key")) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + + let body: InitiateResponse = response.json().await?; + assert_eq!(body.key, "my-key"); + assert!(!body.upload_id.is_empty()); + + Ok(()) +} + +#[tokio::test] +async fn test_initiate_post_generates_key() -> Result<()> { + let server = test_server().await; + let client = reqwest::Client::new(); + + let response = client + .post(server.url("/v1/objects:multipart/test/org=1/")) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + + let body: InitiateResponse = response.json().await?; + assert!(!body.key.is_empty()); + assert!(!body.upload_id.is_empty()); + + Ok(()) +} + +// --- Full flow: initiate → upload parts → list → complete → GET --- + +#[tokio::test] +async fn test_multipart_full_flow() -> Result<()> { + let server = test_server().await; + let client = reqwest::Client::new(); + + // 1. Initiate + let response = client + .put(server.url("/v1/objects:multipart/test/org=1/full-flow-key")) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let initiate: InitiateResponse = response.json().await?; + let upload_id = &initiate.upload_id; + assert_eq!(initiate.key, "full-flow-key"); + + // 2. Upload part 1 + let part1_data = b"hello "; + let response = client + .put(server.url(&format!( + "/v1/objects:multipart:parts/test/org=1/full-flow-key?upload_id={upload_id}&part_number=1" + ))) + .header("content-length", part1_data.len().to_string()) + .body(part1_data.to_vec()) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let part1: UploadPartResponse = response.json().await?; + assert!(!part1.etag.is_empty()); + + // 3. Upload part 2 + let part2_data = b"world!"; + let response = client + .put(server.url(&format!( + "/v1/objects:multipart:parts/test/org=1/full-flow-key?upload_id={upload_id}&part_number=2" + ))) + .header("content-length", part2_data.len().to_string()) + .body(part2_data.to_vec()) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let part2: UploadPartResponse = response.json().await?; + assert!(!part2.etag.is_empty()); + + // 4. List parts + let response = client + .get(server.url(&format!( + "/v1/objects:multipart:parts/test/org=1/full-flow-key?upload_id={upload_id}" + ))) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let list: ListPartsResponse = response.json().await?; + assert_eq!(list.parts.len(), 2); + assert!(list.parts.contains_key(&1)); + assert!(list.parts.contains_key(&2)); + assert_eq!(list.parts[&1].size, part1_data.len() as u64); + assert_eq!(list.parts[&2].size, part2_data.len() as u64); + assert!(!list.is_truncated); + + // 5. Complete + let key = complete_and_assert( + &client, + &server.url(&format!( + "/v1/objects:multipart:complete/test/org=1/full-flow-key?upload_id={upload_id}" + )), + &[("1", &part1.etag), ("2", &part2.etag)], + ) + .await?; + assert_eq!(key, "full-flow-key"); + + // 6. Verify object is accessible via regular GET + let response = client + .get(server.url("/v1/objects/test/org=1/full-flow-key")) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let payload = response.bytes().await?; + assert_eq!(&payload[..], b"hello world!"); + + Ok(()) +} + +// --- Initiate with POST and auto-generated key, then complete --- + +#[tokio::test] +async fn test_multipart_post_initiate_full_flow() -> Result<()> { + let server = test_server().await; + let client = reqwest::Client::new(); + + // 1. Initiate with POST (server-generated key) + let response = client + .post(server.url("/v1/objects:multipart/test/org=1/")) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let initiate: InitiateResponse = response.json().await?; + let upload_id = &initiate.upload_id; + let key = &initiate.key; + assert!(!key.is_empty()); + + // 2. Upload a single part + let data = b"auto-keyed content"; + let response = client + .put(server.url(&format!( + "/v1/objects:multipart:parts/test/org=1/{key}?upload_id={upload_id}&part_number=1" + ))) + .header("content-length", data.len().to_string()) + .body(data.to_vec()) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let part: UploadPartResponse = response.json().await?; + + // 3. Complete + complete_and_assert( + &client, + &server.url(&format!( + "/v1/objects:multipart:complete/test/org=1/{key}?upload_id={upload_id}" + )), + &[("1", &part.etag)], + ) + .await?; + + // 4. Verify + let response = client + .get(server.url(&format!("/v1/objects/test/org=1/{key}"))) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let payload = response.bytes().await?; + assert_eq!(&payload[..], b"auto-keyed content"); + + Ok(()) +} + +// --- Abort --- + +#[tokio::test] +async fn test_multipart_abort() -> Result<()> { + let server = test_server().await; + let client = reqwest::Client::new(); + + // 1. Initiate + let response = client + .put(server.url("/v1/objects:multipart/test/org=1/abort-key")) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let initiate: InitiateResponse = response.json().await?; + let upload_id = &initiate.upload_id; + + // 2. Upload a part + let data = b"will be aborted"; + let response = client + .put(server.url(&format!( + "/v1/objects:multipart:parts/test/org=1/abort-key?upload_id={upload_id}&part_number=1" + ))) + .header("content-length", data.len().to_string()) + .body(data.to_vec()) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + + // 3. Abort + let response = client + .delete(server.url(&format!( + "/v1/objects:multipart/test/org=1/abort-key?upload_id={upload_id}" + ))) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::NO_CONTENT); + + // 4. Object should not exist + let response = client + .get(server.url("/v1/objects/test/org=1/abort-key")) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::NOT_FOUND); + + Ok(()) +} + +// --- List parts with pagination --- + +#[tokio::test] +async fn test_list_parts_pagination() -> Result<()> { + let server = test_server().await; + let client = reqwest::Client::new(); + + // 1. Initiate + let response = client + .put(server.url("/v1/objects:multipart/test/org=1/paginated-key")) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let initiate: InitiateResponse = response.json().await?; + let upload_id = &initiate.upload_id; + + // 2. Upload 3 parts + for i in 1..=3 { + let data = format!("part-{i}"); + let response = client + .put(server.url(&format!( + "/v1/objects:multipart:parts/test/org=1/paginated-key?upload_id={upload_id}&part_number={i}" + ))) + .header("content-length", data.len().to_string()) + .body(data) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + } + + // 3. List with max_parts=2 (should be truncated) + let response = client + .get(server.url(&format!( + "/v1/objects:multipart:parts/test/org=1/paginated-key?upload_id={upload_id}&max_parts=2" + ))) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let page1: ListPartsResponse = response.json().await?; + assert_eq!(page1.parts.len(), 2); + assert!(page1.is_truncated); + assert!(page1.next_part_number_marker.is_some()); + + // 4. List next page + let marker = page1.next_part_number_marker.unwrap(); + let response = client + .get(server.url(&format!( + "/v1/objects:multipart:parts/test/org=1/paginated-key?upload_id={upload_id}&max_parts=2&part_number_marker={marker}" + ))) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let page2: ListPartsResponse = response.json().await?; + assert_eq!(page2.parts.len(), 1); + assert!(!page2.is_truncated); + + Ok(()) +} + +// --- Part overwrite --- + +#[tokio::test] +async fn test_upload_part_overwrite() -> Result<()> { + let server = test_server().await; + let client = reqwest::Client::new(); + + // 1. Initiate + let response = client + .put(server.url("/v1/objects:multipart/test/org=1/overwrite-key")) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let initiate: InitiateResponse = response.json().await?; + let upload_id = &initiate.upload_id; + + // 2. Upload part 1 with initial data + let response = client + .put(server.url(&format!( + "/v1/objects:multipart:parts/test/org=1/overwrite-key?upload_id={upload_id}&part_number=1" + ))) + .header("content-length", "5") + .body("first") + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let first_etag: UploadPartResponse = response.json().await?; + + // 3. Overwrite part 1 with new data + let response = client + .put(server.url(&format!( + "/v1/objects:multipart:parts/test/org=1/overwrite-key?upload_id={upload_id}&part_number=1" + ))) + .header("content-length", "6") + .body("second") + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let second_etag: UploadPartResponse = response.json().await?; + + // The etags should differ since the data changed + assert_ne!(first_etag.etag, second_etag.etag); + + // 4. List parts — should show only the latest version + let response = client + .get(server.url(&format!( + "/v1/objects:multipart:parts/test/org=1/overwrite-key?upload_id={upload_id}" + ))) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let list: ListPartsResponse = response.json().await?; + assert_eq!(list.parts.len(), 1); + assert_eq!(list.parts[&1].etag, second_etag.etag); + assert_eq!(list.parts[&1].size, 6); + + // 5. Complete with the overwritten part + complete_and_assert( + &client, + &server.url(&format!( + "/v1/objects:multipart:complete/test/org=1/overwrite-key?upload_id={upload_id}" + )), + &[("1", &second_etag.etag)], + ) + .await?; + + // 6. Verify the final object has the overwritten data + let response = client + .get(server.url("/v1/objects/test/org=1/overwrite-key")) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let payload = response.bytes().await?; + assert_eq!(&payload[..], b"second"); + + Ok(()) +} + +// --- Missing Content-Length on upload_part --- + +#[tokio::test] +async fn test_upload_part_missing_content_length() -> Result<()> { + let server = test_server().await; + let client = reqwest::Client::builder().no_proxy().build()?; + + // 1. Initiate + let response = client + .put(server.url("/v1/objects:multipart/test/org=1/no-cl-key")) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let initiate: InitiateResponse = response.json().await?; + let upload_id = &initiate.upload_id; + + // 2. Upload part without Content-Length — reqwest normally adds it automatically for + // fixed-size bodies, so we use a streaming body to avoid that. + let stream = + futures_util::stream::once(async { Ok::<_, std::io::Error>(bytes::Bytes::from("data")) }); + let body = reqwest::Body::wrap_stream(stream); + let response = client + .put(server.url(&format!( + "/v1/objects:multipart:parts/test/org=1/no-cl-key?upload_id={upload_id}&part_number=1" + ))) + .body(body) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::BAD_REQUEST); + + Ok(()) +} + +#[tokio::test] +async fn test_upload_part_zero_part_number() -> Result<()> { + let server = test_server().await; + let client = reqwest::Client::new(); + + let response = client + .put(server.url("/v1/objects:multipart/test/org=1/zero-part-upload")) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let initiate: InitiateResponse = response.json().await?; + let upload_id = &initiate.upload_id; + + let response = client + .put(server.url(&format!( + "/v1/objects:multipart:parts/test/org=1/zero-part-upload?upload_id={upload_id}&part_number=0" + ))) + .header("content-length", "4") + .body("data") + .send() + .await?; + + assert_eq!(response.status(), reqwest::StatusCode::BAD_REQUEST); + + Ok(()) +} + +// --- Missing upload_id query param --- + +#[tokio::test] +async fn test_missing_upload_id_on_complete() -> Result<()> { + let server = test_server().await; + let client = reqwest::Client::new(); + + let complete_body = serde_json::json!({ + "parts": [{"part_number": 1, "etag": "fake"}] + }); + let response = client + .post(server.url("/v1/objects:multipart:complete/test/org=1/some-key")) + .header("content-type", "application/json") + .body(complete_body.to_string()) + .send() + .await?; + + assert_eq!(response.status(), reqwest::StatusCode::BAD_REQUEST); + + Ok(()) +} + +#[tokio::test] +async fn test_complete_zero_part_number() -> Result<()> { + let server = test_server().await; + let client = reqwest::Client::new(); + + let response = client + .put(server.url("/v1/objects:multipart/test/org=1/zero-part-complete")) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let initiate: InitiateResponse = response.json().await?; + let upload_id = &initiate.upload_id; + + let response = client + .post(server.url(&format!( + "/v1/objects:multipart:complete/test/org=1/zero-part-complete?upload_id={upload_id}" + ))) + .header("content-type", "application/json") + .body(r#"{"parts":[{"part_number":0,"etag":"fake"}]}"#) + .send() + .await?; + + assert_eq!(response.status(), reqwest::StatusCode::UNPROCESSABLE_ENTITY); + Ok(()) +} + +#[tokio::test] +async fn test_missing_upload_id_on_abort() -> Result<()> { + let server = test_server().await; + let client = reqwest::Client::new(); + + let response = client + .delete(server.url("/v1/objects:multipart/test/org=1/some-key")) + .send() + .await?; + + assert_eq!(response.status(), reqwest::StatusCode::BAD_REQUEST); + + Ok(()) +} + +// --- Complete with invalid etag --- + +#[tokio::test] +async fn test_complete_invalid_etag() -> Result<()> { + let server = test_server().await; + let client = reqwest::Client::new(); + + // 1. Initiate + let response = client + .put(server.url("/v1/objects:multipart/test/org=1/bad-etag-key")) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let initiate: InitiateResponse = response.json().await?; + let upload_id = &initiate.upload_id; + + // 2. Upload a part + let response = client + .put(server.url(&format!( + "/v1/objects:multipart:parts/test/org=1/bad-etag-key?upload_id={upload_id}&part_number=1" + ))) + .header("content-length", "4") + .body("data") + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + + // 3. Complete with a wrong etag + let body = send_complete( + &client, + &server.url(&format!( + "/v1/objects:multipart:complete/test/org=1/bad-etag-key?upload_id={upload_id}" + )), + &[("1", "wrong-etag")], + ) + .await?; + + let err: CompleteErrorResponse = serde_json::from_str(&body)?; + assert!(!err.error.code.is_empty()); + assert!(!err.error.message.is_empty()); + + // 4. Object should not exist + let response = client + .get(server.url("/v1/objects/test/org=1/bad-etag-key")) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::NOT_FOUND); + + Ok(()) +} + +// --- Structured key with slashes --- + +#[tokio::test] +async fn test_multipart_structured_key() -> Result<()> { + let server = test_server().await; + let client = reqwest::Client::new(); + + // 1. Initiate with a key containing slashes + let response = client + .put(server.url("/v1/objects:multipart/test/org=1/path/to/object")) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let initiate: InitiateResponse = response.json().await?; + let upload_id = &initiate.upload_id; + assert_eq!(initiate.key, "path/to/object"); + + // 2. Upload a part + let data = b"structured key data"; + let response = client + .put(server.url(&format!( + "/v1/objects:multipart:parts/test/org=1/path/to/object?upload_id={upload_id}&part_number=1" + ))) + .header("content-length", data.len().to_string()) + .body(data.to_vec()) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let part: UploadPartResponse = response.json().await?; + + // 3. Complete + complete_and_assert( + &client, + &server.url(&format!( + "/v1/objects:multipart:complete/test/org=1/path/to/object?upload_id={upload_id}" + )), + &[("1", &part.etag)], + ) + .await?; + + // 4. Verify via regular GET + let response = client + .get(server.url("/v1/objects/test/org=1/path/to/object")) + .send() + .await?; + assert_eq!(response.status(), reqwest::StatusCode::OK); + let payload = response.bytes().await?; + assert_eq!(&payload[..], b"structured key data"); + + Ok(()) +} diff --git a/objectstore-service/docs/architecture.md b/objectstore-service/docs/architecture.md index 568420da..f9e1e5f4 100644 --- a/objectstore-service/docs/architecture.md +++ b/objectstore-service/docs/architecture.md @@ -166,6 +166,14 @@ with [`Error::AtCapacity`](error::Error::AtCapacity). The default limit is [`DEFAULT_CONCURRENCY_LIMIT`](service::DEFAULT_CONCURRENCY_LIMIT). Callers can override it via [`StorageService::with_concurrency_limit`]. +## Multipart Uploads + +When the configured backend supports it, [`StorageService`] exposes multipart +upload operations (initiate, upload part, list parts, complete, abort). These +delegate to the [`MultipartUploadBackend`](backend::common::MultipartUploadBackend) +trait, accessed via [`Backend::as_multipart_upload_backend`](backend::common::Backend::as_multipart_upload_backend). +Multipart operations share the same concurrency limiter as regular operations. + ## Streaming Concurrency The [`streaming`](streaming) module provides [`StreamExecutor`](streaming::StreamExecutor) diff --git a/objectstore-service/src/backend/common.rs b/objectstore-service/src/backend/common.rs index 6b7a676a..8efad5da 100644 --- a/objectstore-service/src/backend/common.rs +++ b/objectstore-service/src/backend/common.rs @@ -1,12 +1,13 @@ //! Shared trait definition and types for all backends. use std::fmt; +use std::sync::Arc; use objectstore_types::metadata::{ExpirationPolicy, Metadata}; use bytes::Bytes; -use crate::error::Result; +use crate::error::{Error, Result}; use crate::id::ObjectId; use crate::multipart::{ AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse, @@ -62,6 +63,14 @@ pub trait Backend: fmt::Debug + Send + Sync + 'static { /// (such as [`TieredStorage`](super::tiered::TieredStorage)) should override this /// to wait for those tasks to complete. async fn join(&self) {} + + /// Casts this backend into an [`Arc`] if supported. + /// + /// The default returns [`Error::NotImplemented`]. Backends that implement + /// [`MultipartUploadBackend`] should override this to return `Ok(self)`. + fn as_multipart_upload_backend(self: Arc) -> Result> { + Err(Error::NotImplemented) + } } /// Trait for backends that support our S3-style multipart upload protocol. diff --git a/objectstore-service/src/backend/gcs.rs b/objectstore-service/src/backend/gcs.rs index eddef040..80174148 100644 --- a/objectstore-service/src/backend/gcs.rs +++ b/objectstore-service/src/backend/gcs.rs @@ -3,6 +3,7 @@ use std::borrow::Cow; use std::collections::BTreeMap; use std::future::Future; +use std::sync::Arc; use std::time::{Duration, SystemTime}; use std::{fmt, io}; @@ -599,6 +600,10 @@ impl Backend for GcsBackend { "gcs" } + fn as_multipart_upload_backend(self: Arc) -> Result> { + Ok(self) + } + #[tracing::instrument(level = "trace", fields(?id), skip_all)] async fn put_object( &self, @@ -724,9 +729,11 @@ struct XmlInitiateMultipartUploadResponse { upload_id: String, } -impl From for InitiateMultipartResponse { - fn from(r: XmlInitiateMultipartUploadResponse) -> Self { - r.upload_id +impl TryFrom for InitiateMultipartResponse { + type Error = crate::error::Error; + + fn try_from(r: XmlInitiateMultipartUploadResponse) -> crate::error::Result { + UploadId::new(r.upload_id) } } @@ -735,7 +742,7 @@ impl From for InitiateMultipartResponse { struct XmlListPartsResponse { #[serde(default)] is_truncated: bool, - next_part_number_marker: Option, + next_part_number_marker: Option, #[serde(default, rename = "Part")] parts: Vec, } @@ -753,7 +760,7 @@ impl From for ListPartsResponse { #[derive(Debug, Deserialize)] #[serde(rename_all = "PascalCase")] struct XmlPart { - part_number: u32, + part_number: PartNumber, #[serde(rename = "ETag")] e_tag: String, #[serde(with = "humantime_serde")] @@ -863,7 +870,7 @@ impl MultipartUploadBackend for GcsBackend { cause: Some(Box::new(e)), })?; - Ok(xml.into()) + Ok(xml.try_into()?) } #[tracing::instrument(level = "trace", fields(?id, upload_id, part_number), skip_all)] @@ -1014,6 +1021,7 @@ impl MultipartUploadBackend for GcsBackend { #[cfg(test)] mod tests { use std::collections::BTreeMap; + use std::num::NonZeroU32; use anyhow::Result; use objectstore_types::scope::{Scope, Scopes}; @@ -1357,7 +1365,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), data.len() as u64, None, stream::single(data.to_vec()), @@ -1369,7 +1377,7 @@ mod tests { &id, &upload_id, vec![CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag, }], ) @@ -1411,7 +1419,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), part1.len() as u64, None, stream::single(part1.clone()), @@ -1421,7 +1429,7 @@ mod tests { .upload_part( &id, &upload_id, - 2, + NonZeroU32::new(2).unwrap(), part2.len() as u64, None, stream::single(part2.clone()), @@ -1431,7 +1439,7 @@ mod tests { .upload_part( &id, &upload_id, - 3, + NonZeroU32::new(3).unwrap(), part3.len() as u64, None, stream::single(part3.clone()), @@ -1444,15 +1452,15 @@ mod tests { &upload_id, vec![ CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag: etag1, }, CompletedPart { - part_number: 2, + part_number: NonZeroU32::new(2).unwrap(), etag: etag2, }, CompletedPart { - part_number: 3, + part_number: NonZeroU32::new(3).unwrap(), etag: etag3, }, ], @@ -1491,7 +1499,7 @@ mod tests { .upload_part( &id, &upload_id, - 2, + NonZeroU32::new(2).unwrap(), part2.len() as u64, None, stream::single(part2.clone()), @@ -1501,7 +1509,7 @@ mod tests { .upload_part( &id, &upload_id, - 3, + NonZeroU32::new(3).unwrap(), part3.len() as u64, None, stream::single(part3.clone()), @@ -1511,7 +1519,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), part1.len() as u64, None, stream::single(part1.clone()), @@ -1525,15 +1533,15 @@ mod tests { &upload_id, vec![ CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag: etag1, }, CompletedPart { - part_number: 2, + part_number: NonZeroU32::new(2).unwrap(), etag: etag2, }, CompletedPart { - part_number: 3, + part_number: NonZeroU32::new(3).unwrap(), etag: etag3, }, ], @@ -1562,26 +1570,40 @@ mod tests { let upload_id = backend.initiate_multipart(&id, &metadata).await?; let etag1 = backend - .upload_part(&id, &upload_id, 1, 3, None, stream::single(b"aaa".to_vec())) + .upload_part( + &id, + &upload_id, + NonZeroU32::new(1).unwrap(), + 3, + None, + stream::single(b"aaa".to_vec()), + ) .await?; let etag2 = backend - .upload_part(&id, &upload_id, 2, 3, None, stream::single(b"bbb".to_vec())) + .upload_part( + &id, + &upload_id, + NonZeroU32::new(2).unwrap(), + 3, + None, + stream::single(b"bbb".to_vec()), + ) .await?; // List all parts. let list = backend.list_parts(&id, &upload_id, None, None).await?; assert_eq!(list.parts.len(), 2); - assert_eq!(list.parts[0].part_number, 1); + assert_eq!(list.parts[0].part_number.get(), 1); assert_eq!(list.parts[0].etag, etag1); assert_eq!(list.parts[0].size, 3); - assert_eq!(list.parts[1].part_number, 2); + assert_eq!(list.parts[1].part_number.get(), 2); assert_eq!(list.parts[1].etag, etag2); assert_eq!(list.parts[1].size, 3); // List with max_parts=1 to test pagination. let page1 = backend.list_parts(&id, &upload_id, Some(1), None).await?; assert_eq!(page1.parts.len(), 1); - assert_eq!(page1.parts[0].part_number, 1); + assert_eq!(page1.parts[0].part_number.get(), 1); assert!(page1.is_truncated); assert!(page1.next_part_number_marker.is_some()); @@ -1589,7 +1611,7 @@ mod tests { .list_parts(&id, &upload_id, Some(1), page1.next_part_number_marker) .await?; assert_eq!(page2.parts.len(), 1); - assert_eq!(page2.parts[0].part_number, 2); + assert_eq!(page2.parts[0].part_number.get(), 2); // Clean up. backend.abort_multipart(&id, &upload_id).await?; @@ -1609,7 +1631,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), 5, None, stream::single(b"hello".to_vec()), @@ -1637,7 +1659,7 @@ mod tests { .upload_part( id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), payload.len() as u64, None, stream::single(payload), @@ -1648,7 +1670,7 @@ mod tests { id, &upload_id, vec![CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag, }], ) diff --git a/objectstore-service/src/backend/in_memory.rs b/objectstore-service/src/backend/in_memory.rs index 253aafcd..9fc1499a 100644 --- a/objectstore-service/src/backend/in_memory.rs +++ b/objectstore-service/src/backend/in_memory.rs @@ -104,6 +104,10 @@ impl super::common::Backend for InMemoryBackend { self.name } + fn as_multipart_upload_backend(self: Arc) -> Result> { + Ok(self) + } + async fn put_object( &self, id: &ObjectId, @@ -224,7 +228,7 @@ impl MultipartUploadBackend for InMemoryBackend { id: &ObjectId, metadata: &Metadata, ) -> Result { - let upload_id = uuid::Uuid::now_v7().to_string(); + let upload_id = UploadId::new(uuid::Uuid::now_v7().to_string())?; let upload = MultipartUpload { metadata: metadata.clone(), parts: BTreeMap::new(), @@ -455,6 +459,7 @@ impl Entry { #[cfg(test)] mod tests { + use std::num::NonZeroU32; use std::time::Duration; use objectstore_types::metadata::ExpirationPolicy; @@ -491,7 +496,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), data.len() as u64, None, stream::single(data.to_vec()), @@ -504,7 +509,7 @@ mod tests { &id, &upload_id, vec![CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag, }], ) @@ -540,7 +545,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), part1.len() as u64, None, stream::single(part1.clone()), @@ -551,7 +556,7 @@ mod tests { .upload_part( &id, &upload_id, - 2, + NonZeroU32::new(2).unwrap(), part2.len() as u64, None, stream::single(part2.clone()), @@ -562,7 +567,7 @@ mod tests { .upload_part( &id, &upload_id, - 3, + NonZeroU32::new(3).unwrap(), part3.len() as u64, None, stream::single(part3.clone()), @@ -576,15 +581,15 @@ mod tests { &upload_id, vec![ CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag: etag1, }, CompletedPart { - part_number: 2, + part_number: NonZeroU32::new(2).unwrap(), etag: etag2, }, CompletedPart { - part_number: 3, + part_number: NonZeroU32::new(3).unwrap(), etag: etag3, }, ], @@ -607,11 +612,25 @@ mod tests { let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap(); let etag1 = backend - .upload_part(&id, &upload_id, 1, 3, None, stream::single(b"aaa".to_vec())) + .upload_part( + &id, + &upload_id, + NonZeroU32::new(1).unwrap(), + 3, + None, + stream::single(b"aaa".to_vec()), + ) .await .unwrap(); let etag2 = backend - .upload_part(&id, &upload_id, 2, 3, None, stream::single(b"bbb".to_vec())) + .upload_part( + &id, + &upload_id, + NonZeroU32::new(2).unwrap(), + 3, + None, + stream::single(b"bbb".to_vec()), + ) .await .unwrap(); @@ -620,10 +639,10 @@ mod tests { .await .unwrap(); assert_eq!(list.parts.len(), 2); - assert_eq!(list.parts[0].part_number, 1); + assert_eq!(list.parts[0].part_number.get(), 1); assert_eq!(list.parts[0].etag, etag1); assert_eq!(list.parts[0].size, 3); - assert_eq!(list.parts[1].part_number, 2); + assert_eq!(list.parts[1].part_number.get(), 2); assert_eq!(list.parts[1].etag, etag2); assert_eq!(list.parts[1].size, 3); @@ -633,7 +652,7 @@ mod tests { .await .unwrap(); assert_eq!(page1.parts.len(), 1); - assert_eq!(page1.parts[0].part_number, 1); + assert_eq!(page1.parts[0].part_number.get(), 1); assert!(page1.is_truncated); assert!(page1.next_part_number_marker.is_some()); @@ -642,7 +661,7 @@ mod tests { .await .unwrap(); assert_eq!(page2.parts.len(), 1); - assert_eq!(page2.parts[0].part_number, 2); + assert_eq!(page2.parts[0].part_number.get(), 2); backend.abort_multipart(&id, &upload_id).await.unwrap(); } @@ -659,7 +678,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), 5, None, stream::single(b"hello".to_vec()), @@ -685,7 +704,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), 5, None, stream::single(b"hello".to_vec()), @@ -698,7 +717,7 @@ mod tests { &id, &upload_id, vec![CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag: "wrong-etag".into(), }], ) @@ -713,7 +732,7 @@ mod tests { &id, &upload_id, vec![CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag, }], ) @@ -734,7 +753,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), 5, None, stream::single(b"hello".to_vec()), @@ -747,7 +766,7 @@ mod tests { &id, &upload_id, vec![CompletedPart { - part_number: 99, + part_number: NonZeroU32::new(99).unwrap(), etag: "whatever".into(), }], ) @@ -762,7 +781,7 @@ mod tests { &id, &upload_id, vec![CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag, }], ) diff --git a/objectstore-service/src/backend/local_fs.rs b/objectstore-service/src/backend/local_fs.rs index c4518a18..533b616e 100644 --- a/objectstore-service/src/backend/local_fs.rs +++ b/objectstore-service/src/backend/local_fs.rs @@ -3,6 +3,7 @@ use std::io::ErrorKind; use std::path::PathBuf; use std::pin::pin; +use std::sync::Arc; use std::time::SystemTime; use futures_util::StreamExt; @@ -71,6 +72,10 @@ impl Backend for LocalFsBackend { "local-fs" } + fn as_multipart_upload_backend(self: Arc) -> Result> { + Ok(self) + } + #[tracing::instrument(level = "trace", fields(?id), skip_all)] async fn put_object( &self, @@ -164,7 +169,7 @@ impl LocalFsBackend { self.path .join("__multipart__") .join(id.as_storage_path().to_string()) - .join(upload_id) + .join(upload_id.as_str()) } } @@ -175,7 +180,7 @@ impl MultipartUploadBackend for LocalFsBackend { id: &ObjectId, metadata: &Metadata, ) -> Result { - let upload_id = uuid::Uuid::now_v7().to_string(); + let upload_id = UploadId::new(uuid::Uuid::now_v7().to_string())?; let dir = self.multipart_dir(id, &upload_id); tokio::fs::create_dir_all(&dir).await?; @@ -422,6 +427,7 @@ impl MultipartUploadBackend for LocalFsBackend { #[cfg(test)] mod tests { + use std::num::NonZeroU32; use std::time::{Duration, SystemTime}; use bytes::BytesMut; @@ -557,7 +563,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), data.len() as u64, None, stream::single(data.to_vec()), @@ -570,7 +576,7 @@ mod tests { &id, &upload_id, vec![crate::multipart::CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag, }], ) @@ -606,7 +612,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), part1.len() as u64, None, stream::single(part1.clone()), @@ -617,7 +623,7 @@ mod tests { .upload_part( &id, &upload_id, - 2, + NonZeroU32::new(2).unwrap(), part2.len() as u64, None, stream::single(part2.clone()), @@ -628,7 +634,7 @@ mod tests { .upload_part( &id, &upload_id, - 3, + NonZeroU32::new(3).unwrap(), part3.len() as u64, None, stream::single(part3.clone()), @@ -642,15 +648,15 @@ mod tests { &upload_id, vec![ crate::multipart::CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag: etag1, }, crate::multipart::CompletedPart { - part_number: 2, + part_number: NonZeroU32::new(2).unwrap(), etag: etag2, }, crate::multipart::CompletedPart { - part_number: 3, + part_number: NonZeroU32::new(3).unwrap(), etag: etag3, }, ], @@ -673,11 +679,25 @@ mod tests { let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap(); let etag1 = backend - .upload_part(&id, &upload_id, 1, 3, None, stream::single(b"aaa".to_vec())) + .upload_part( + &id, + &upload_id, + NonZeroU32::new(1).unwrap(), + 3, + None, + stream::single(b"aaa".to_vec()), + ) .await .unwrap(); let etag2 = backend - .upload_part(&id, &upload_id, 2, 3, None, stream::single(b"bbb".to_vec())) + .upload_part( + &id, + &upload_id, + NonZeroU32::new(2).unwrap(), + 3, + None, + stream::single(b"bbb".to_vec()), + ) .await .unwrap(); @@ -686,10 +706,10 @@ mod tests { .await .unwrap(); assert_eq!(list.parts.len(), 2); - assert_eq!(list.parts[0].part_number, 1); + assert_eq!(list.parts[0].part_number.get(), 1); assert_eq!(list.parts[0].etag, etag1); assert_eq!(list.parts[0].size, 3); - assert_eq!(list.parts[1].part_number, 2); + assert_eq!(list.parts[1].part_number.get(), 2); assert_eq!(list.parts[1].etag, etag2); assert_eq!(list.parts[1].size, 3); @@ -699,7 +719,7 @@ mod tests { .await .unwrap(); assert_eq!(page1.parts.len(), 1); - assert_eq!(page1.parts[0].part_number, 1); + assert_eq!(page1.parts[0].part_number.get(), 1); assert!(page1.is_truncated); assert!(page1.next_part_number_marker.is_some()); @@ -708,7 +728,7 @@ mod tests { .await .unwrap(); assert_eq!(page2.parts.len(), 1); - assert_eq!(page2.parts[0].part_number, 2); + assert_eq!(page2.parts[0].part_number.get(), 2); backend.abort_multipart(&id, &upload_id).await.unwrap(); } @@ -725,7 +745,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), 5, None, stream::single(b"hello".to_vec()), @@ -751,7 +771,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), 5, None, stream::single(b"hello".to_vec()), @@ -764,7 +784,7 @@ mod tests { &id, &upload_id, vec![crate::multipart::CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag: "wrong-etag".into(), }], ) @@ -779,7 +799,7 @@ mod tests { &id, &upload_id, vec![crate::multipart::CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag, }], ) @@ -800,7 +820,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), 5, None, stream::single(b"hello".to_vec()), @@ -813,7 +833,7 @@ mod tests { &id, &upload_id, vec![crate::multipart::CompletedPart { - part_number: 99, + part_number: NonZeroU32::new(99).unwrap(), etag: "whatever".into(), }], ) @@ -828,7 +848,7 @@ mod tests { &id, &upload_id, vec![crate::multipart::CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag, }], ) diff --git a/objectstore-service/src/backend/testing.rs b/objectstore-service/src/backend/testing.rs index 4b767e21..ac709fec 100644 --- a/objectstore-service/src/backend/testing.rs +++ b/objectstore-service/src/backend/testing.rs @@ -35,6 +35,7 @@ //! ``` use std::fmt; +use std::sync::Arc; use bytes::Bytes; use objectstore_types::metadata::Metadata; @@ -269,6 +270,10 @@ impl Backend for TestBackend { self.hooks.name() } + fn as_multipart_upload_backend(self: Arc) -> Result> { + Ok(self) + } + async fn put_object( &self, id: &ObjectId, diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index 4d329c3e..7d63dd54 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -365,6 +365,10 @@ impl Backend for TieredStorage { "tiered" } + fn as_multipart_upload_backend(self: Arc) -> Result> { + Ok(self) + } + async fn put_object( &self, id: &ObjectId, @@ -567,7 +571,7 @@ where #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] struct TieredUploadId { revision: String, - upload_id: String, + upload_id: UploadId, } impl TryInto for TieredUploadId { @@ -576,7 +580,7 @@ impl TryInto for TieredUploadId { fn try_into(self) -> Result { let json = serde_json::to_vec(&self).map_err(|e| Error::serde("encoding multipart token", e))?; - Ok(base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(json)) + UploadId::new(base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(json)) } } @@ -790,6 +794,8 @@ impl MultipartUploadBackend for TieredStorage { #[cfg(test)] mod tests { + use std::num::NonZeroU32; + use futures::lock::Mutex; use objectstore_types::metadata::{ExpirationPolicy, Metadata}; use objectstore_types::scope::{Scope, Scopes}; @@ -1543,7 +1549,7 @@ mod tests { fn multipart_upload_id_roundtrip() { let id = TieredUploadId { revision: "my-key/01924a6f-7e28-7b9a-9c1d-abcdef123456".into(), - upload_id: "upstream-upload-id-abc".into(), + upload_id: UploadId::new("upstream-upload-id-abc".into()).unwrap(), }; let encoded: UploadId = id.clone().try_into().unwrap(); let decoded: TieredUploadId = (&encoded.clone()).try_into().unwrap(); @@ -1567,7 +1573,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), payload.len() as u64, None, stream::single(payload.clone()), @@ -1580,7 +1586,7 @@ mod tests { &id, &upload_id, vec![CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag, }], ) @@ -1624,7 +1630,7 @@ mod tests { .upload_part( &id, &upload_id, - 3, + NonZeroU32::new(3).unwrap(), part3.len() as u64, None, stream::single(part3.clone()), @@ -1635,7 +1641,7 @@ mod tests { .upload_part( &id, &upload_id, - 2, + NonZeroU32::new(2).unwrap(), part2.len() as u64, None, stream::single(part2.clone()), @@ -1646,7 +1652,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), part1.len() as u64, None, stream::single(part1.clone()), @@ -1660,15 +1666,15 @@ mod tests { &upload_id, vec![ CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag: etag1, }, CompletedPart { - part_number: 2, + part_number: NonZeroU32::new(2).unwrap(), etag: etag2, }, CompletedPart { - part_number: 3, + part_number: NonZeroU32::new(3).unwrap(), etag: etag3, }, ], @@ -1703,7 +1709,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), payload.len() as u64, None, stream::single(payload), @@ -1736,7 +1742,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), part1.len() as u64, None, stream::single(part1), @@ -1747,7 +1753,7 @@ mod tests { .upload_part( &id, &upload_id, - 2, + NonZeroU32::new(2).unwrap(), part2.len() as u64, None, stream::single(part2), @@ -1760,9 +1766,9 @@ mod tests { .await .unwrap(); assert_eq!(resp.parts.len(), 2); - assert_eq!(resp.parts[0].part_number, 1); + assert_eq!(resp.parts[0].part_number.get(), 1); assert_eq!(resp.parts[0].size, 100); - assert_eq!(resp.parts[1].part_number, 2); + assert_eq!(resp.parts[1].part_number.get(), 2); assert_eq!(resp.parts[1].size, 200); } @@ -1790,7 +1796,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), payload2.len() as u64, None, stream::single(payload2.clone()), @@ -1808,7 +1814,7 @@ mod tests { &id, &upload_id, vec![CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag, }], ) @@ -1906,7 +1912,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), payload.len() as u64, None, stream::single(payload), @@ -1919,7 +1925,7 @@ mod tests { &id, &upload_id, vec![CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag, }], ) @@ -2021,7 +2027,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), payload.len() as u64, None, stream::single(payload.clone()), @@ -2035,7 +2041,7 @@ mod tests { &id, &upload_id, vec![CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag: etag.clone(), }], ) @@ -2049,7 +2055,7 @@ mod tests { &id, &upload_id, vec![CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag, }], ) @@ -2157,7 +2163,7 @@ mod tests { .upload_part( &id, &upload_id, - 1, + NonZeroU32::new(1).unwrap(), payload.len() as u64, None, stream::single(payload.clone()), @@ -2172,7 +2178,7 @@ mod tests { &id, &upload_id, vec![CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag: etag.clone(), }], ) @@ -2186,7 +2192,7 @@ mod tests { &id, &upload_id, vec![CompletedPart { - part_number: 1, + part_number: NonZeroU32::new(1).unwrap(), etag, }], ) diff --git a/objectstore-service/src/error.rs b/objectstore-service/src/error.rs index 186a22d9..27344abe 100644 --- a/objectstore-service/src/error.rs +++ b/objectstore-service/src/error.rs @@ -87,6 +87,10 @@ pub enum Error { #[source] cause: Option>, }, + + /// The functionality is not implemented by this instance of the service. + #[error("not implemented")] + NotImplemented, } impl Error { @@ -142,6 +146,7 @@ impl Error { Self::Panic(_) => Level::ERROR, Self::Dropped => Level::ERROR, Self::UnexpectedTombstone => Level::ERROR, + Self::NotImplemented => Level::ERROR, Self::Generic { .. } => Level::ERROR, } } diff --git a/objectstore-service/src/multipart.rs b/objectstore-service/src/multipart.rs index e1ce51da..1cbe465a 100644 --- a/objectstore-service/src/multipart.rs +++ b/objectstore-service/src/multipart.rs @@ -1,11 +1,66 @@ //! Shared types for Objectstore's multipart upload protocol. +use std::fmt; +use std::ops::Deref; +use std::path::{Component, Path}; use std::time::SystemTime; +use serde::{Deserialize, Deserializer, Serialize}; + +use crate::error::Error; + /// Identifier for an in-progress multipart upload. -pub type UploadId = String; +/// +/// Validated on construction: non-empty and free of path-traversal components +/// (`..`, leading `/`, etc.), so it is always safe to use as a single path segment. +#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize)] +#[serde(transparent)] +pub struct UploadId(String); + +impl UploadId { + /// Returns the upload ID as a string slice. + pub fn as_str(&self) -> &str { + &self.0 + } + + /// Creates a new `UploadId` after validating the input. + pub fn new(s: String) -> Result { + if s.is_empty() { + return Err(Error::generic("upload_id must not be empty")); + } + for component in Path::new(&s).components() { + if !matches!(component, Component::Normal(_)) { + return Err(Error::generic(format!("invalid upload_id: {s}"))); + } + } + Ok(Self(s)) + } +} + +impl Deref for UploadId { + type Target = str; + fn deref(&self) -> &str { + &self.0 + } +} + +impl fmt::Display for UploadId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} + +impl<'de> Deserialize<'de> for UploadId { + fn deserialize(deserializer: D) -> std::result::Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + Self::new(s).map_err(serde::de::Error::custom) + } +} /// 1-indexed position of a part within its multipart upload. -pub type PartNumber = u32; +pub type PartNumber = std::num::NonZeroU32; /// Opaque per-part identifier returned by the backend after a successful part upload. pub type ETag = String; diff --git a/objectstore-service/src/service.rs b/objectstore-service/src/service.rs index 9ec10fba..8505da44 100644 --- a/objectstore-service/src/service.rs +++ b/objectstore-service/src/service.rs @@ -14,6 +14,10 @@ use crate::backend::common::Backend; use crate::concurrency::ConcurrencyLimiter; use crate::error::{Error, Result}; use crate::id::{ObjectContext, ObjectId}; +use crate::multipart::{ + AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse, + ListPartsResponse, PartNumber, UploadId, UploadPartResponse, +}; use crate::stream::{ClientStream, PayloadStream}; use crate::streaming::StreamExecutor; @@ -233,6 +237,98 @@ impl StorageService { pub async fn join(&self) { self.inner.join().await; } + + // --- Multipart upload operations --- + + /// Initiates a new multipart upload. + pub async fn initiate_multipart( + &self, + id: ObjectId, + metadata: Metadata, + ) -> Result { + let inner = self.inner.clone().as_multipart_upload_backend()?; + self.spawn("initiate_multipart", async move { + inner.initiate_multipart(&id, &metadata).await + }) + .await + } + + /// Uploads a single part. + /// + /// Note that this requires a `content_length`. + /// This grants us the broadest and most seamless compatibility when it comes to backends. + /// For example, MinIO rejects `UploadPart` requests without a `Content-Length` on plain PUT + /// requests. + /// This can be worked around by using AWS SigV4 chunked streaming requests, which we could use + /// if one day we'll have a usecase where the client doesn't know the part length upfront. + pub async fn upload_part( + &self, + id: ObjectId, + upload_id: UploadId, + part_number: PartNumber, + content_length: u64, + content_md5: Option, + body: ClientStream, + ) -> Result { + let inner = self.inner.clone().as_multipart_upload_backend()?; + self.spawn("upload_part", async move { + inner + .upload_part( + &id, + &upload_id, + part_number, + content_length, + content_md5.as_deref(), + body, + ) + .await + }) + .await + } + + /// Lists the parts uploaded so far. + pub async fn list_parts( + &self, + id: ObjectId, + upload_id: UploadId, + max_parts: Option, + part_number_marker: Option, + ) -> Result { + let inner = self.inner.clone().as_multipart_upload_backend()?; + self.spawn("list_parts", async move { + inner + .list_parts(&id, &upload_id, max_parts, part_number_marker) + .await + }) + .await + } + + /// Aborts a multipart upload. + pub async fn abort_multipart( + &self, + id: ObjectId, + upload_id: UploadId, + ) -> Result { + let inner = self.inner.clone().as_multipart_upload_backend()?; + self.spawn("abort_multipart", async move { + inner.abort_multipart(&id, &upload_id).await + }) + .await + } + + /// Finalizes a multipart upload. + pub async fn complete_multipart( + &self, + id: ObjectId, + upload_id: UploadId, + parts: Vec, + ) -> Result { + let inner = self.inner.clone().as_multipart_upload_backend()?; + self.spawn("complete_multipart", async move { + inner.complete_multipart(&id, &upload_id, parts).await + }) + .await + } } #[cfg(test)]