From ef042a01b0d402f09f03745ae4b949ddff2376c4 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Mon, 4 May 2026 11:37:51 +0200 Subject: [PATCH 1/5] feat(multipart): implement MultipartUploadBackend for InMemory and LocalFs backends --- objectstore-service/src/backend/in_memory.rs | 465 ++++++++++++++++- objectstore-service/src/backend/local_fs.rs | 508 ++++++++++++++++++- 2 files changed, 967 insertions(+), 6 deletions(-) diff --git a/objectstore-service/src/backend/in_memory.rs b/objectstore-service/src/backend/in_memory.rs index 5536f3b4..2c949ed9 100644 --- a/objectstore-service/src/backend/in_memory.rs +++ b/objectstore-service/src/backend/in_memory.rs @@ -5,19 +5,24 @@ //! backend is [`Clone`] so tests can hold a handle for direct inspection while //! the service owns a boxed copy. -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::sync::{Arc, Mutex}; +use std::time::SystemTime; use bytes::{Bytes, BytesMut}; use futures_util::TryStreamExt; use objectstore_types::metadata::Metadata; use super::common::{ - DeleteResponse, GetResponse, HighVolumeBackend, PutResponse, TieredGet, TieredMetadata, - TieredWrite, Tombstone, + DeleteResponse, GetResponse, HighVolumeBackend, MultipartUploadBackend, PutResponse, TieredGet, + TieredMetadata, TieredWrite, Tombstone, }; use crate::error::{Error, Result}; use crate::id::ObjectId; +use crate::multipart::{ + AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse, + ListPartsResponse, Part, PartNumber, UploadId, UploadPartResponse, +}; use crate::stream::ClientStream; /// An entry in the in-memory store. @@ -29,6 +34,21 @@ enum StoreEntry { type Store = HashMap; +#[derive(Clone, Debug)] +struct MultipartUpload { + metadata: Metadata, + parts: BTreeMap, +} + +#[derive(Clone, Debug)] +struct UploadedPart { + etag: String, + data: Bytes, + uploaded_at: SystemTime, +} + +type MultipartStore = HashMap<(ObjectId, UploadId), MultipartUpload>; + /// In-memory [`Backend`](super::common::Backend) backed by a `HashMap`. /// /// Removes the need for filesystem tempdir management in unit tests. The @@ -38,6 +58,7 @@ type Store = HashMap; pub struct InMemoryBackend { name: &'static str, store: Arc>, + multipart_store: Arc>, } impl InMemoryBackend { @@ -46,6 +67,7 @@ impl InMemoryBackend { Self { name, store: Arc::new(Mutex::new(HashMap::new())), + multipart_store: Arc::new(Mutex::new(HashMap::new())), } } @@ -195,6 +217,164 @@ impl HighVolumeBackend for InMemoryBackend { } } +#[async_trait::async_trait] +impl MultipartUploadBackend for InMemoryBackend { + async fn initiate_multipart( + &self, + id: &ObjectId, + metadata: &Metadata, + ) -> Result { + let upload_id = uuid::Uuid::now_v7().to_string(); + let upload = MultipartUpload { + metadata: metadata.clone(), + parts: BTreeMap::new(), + }; + self.multipart_store + .lock() + .unwrap() + .insert((id.clone(), upload_id.clone()), upload); + Ok(upload_id) + } + + async fn upload_part( + &self, + id: &ObjectId, + upload_id: &UploadId, + part_number: PartNumber, + _content_length: u64, + _content_md5: Option<&str>, + body: ClientStream, + ) -> Result { + let data: BytesMut = body.try_collect().await?; + let data = data.freeze(); + let etag = format!("\"etag-{part_number}-{}\"", data.len()); + + let mut store = self.multipart_store.lock().unwrap(); + let upload = store + .get_mut(&(id.clone(), upload_id.clone())) + .ok_or_else(|| Error::generic("multipart upload not found"))?; + + upload.parts.insert( + part_number, + UploadedPart { + etag: etag.clone(), + data, + uploaded_at: SystemTime::now(), + }, + ); + + Ok(etag) + } + + async fn list_parts( + &self, + id: &ObjectId, + upload_id: &UploadId, + max_parts: Option, + part_number_marker: Option, + ) -> Result { + let store = self.multipart_store.lock().unwrap(); + let upload = store + .get(&(id.clone(), upload_id.clone())) + .ok_or_else(|| Error::generic("multipart upload not found"))?; + + let iter = upload + .parts + .iter() + .filter(|(pn, _)| part_number_marker.is_none_or(|marker| **pn > marker)); + + let max = max_parts.unwrap_or(u32::MAX) as usize; + let all: Vec<_> = iter.collect(); + let is_truncated = all.len() > max; + let page: Vec<_> = all.into_iter().take(max).collect(); + + let next_part_number_marker = if is_truncated { + page.last().map(|(pn, _)| **pn) + } else { + None + }; + + let parts = page + .into_iter() + .map(|(pn, part)| Part { + part_number: *pn, + etag: part.etag.clone(), + last_modified: part.uploaded_at, + size: part.data.len() as u64, + }) + .collect(); + + Ok(ListPartsResponse { + parts, + is_truncated, + next_part_number_marker, + }) + } + + async fn abort_multipart( + &self, + id: &ObjectId, + upload_id: &UploadId, + ) -> Result { + self.multipart_store + .lock() + .unwrap() + .remove(&(id.clone(), upload_id.clone())); + Ok(()) + } + + async fn complete_multipart( + &self, + id: &ObjectId, + upload_id: &UploadId, + parts: Vec, + ) -> Result { + let upload = self + .multipart_store + .lock() + .unwrap() + .remove(&(id.clone(), upload_id.clone())) + .ok_or_else(|| Error::generic("multipart upload not found"))?; + + for completed in &parts { + match upload.parts.get(&completed.part_number) { + None => { + return Ok(Some(crate::multipart::CompleteMultipartError { + code: "InvalidPart".into(), + message: format!("part number {} was not uploaded", completed.part_number), + })); + } + Some(stored) if stored.etag != completed.etag => { + return Ok(Some(crate::multipart::CompleteMultipartError { + code: "InvalidPart".into(), + message: format!( + "etag mismatch for part {}: expected {}, got {}", + completed.part_number, stored.etag, completed.etag + ), + })); + } + _ => {} + } + } + + let mut payload = BytesMut::new(); + for completed in &parts { + let stored = &upload.parts[&completed.part_number]; + payload.extend_from_slice(&stored.data); + } + + let mut metadata = upload.metadata; + metadata.size = Some(payload.len()); + + self.store + .lock() + .unwrap() + .insert(id.clone(), StoreEntry::Object(metadata, payload.freeze())); + + Ok(None) + } +} + /// Returns `true` if `entry` matches the expected tombstone redirect state. /// /// - `expected = None`: matches any non-tombstone (absent or inline object). @@ -257,3 +437,282 @@ impl Entry { } } } + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use objectstore_types::metadata::ExpirationPolicy; + use objectstore_types::scope::{Scope, Scopes}; + + use super::*; + use crate::backend::common::Backend; + use crate::id::ObjectContext; + use crate::stream; + + fn make_id() -> ObjectId { + ObjectId::random(ObjectContext { + usecase: "testing".into(), + scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]), + }) + } + + #[tokio::test] + async fn multipart_single_part() { + let backend = InMemoryBackend::new("test"); + let id = make_id(); + let metadata = Metadata { + content_type: "text/plain".into(), + expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(3600)), + origin: Some("203.0.113.42".into()), + custom: [("foo".into(), "bar".into())].into(), + ..Default::default() + }; + + let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap(); + + let data = b"hello, multipart world!"; + let etag = backend + .upload_part( + &id, + &upload_id, + 1, + data.len() as u64, + None, + stream::single(data.to_vec()), + ) + .await + .unwrap(); + + let result = backend + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag, + }], + ) + .await + .unwrap(); + assert!(result.is_none(), "expected no error on complete"); + + let (meta, body) = backend.get_object(&id).await.unwrap().unwrap(); + let payload = stream::read_to_vec(body).await.unwrap(); + assert_eq!(payload, data); + assert_eq!(meta.content_type, "text/plain".to_string()); + assert_eq!( + meta.expiration_policy, + ExpirationPolicy::TimeToIdle(Duration::from_secs(3600)) + ); + assert_eq!(meta.origin, Some("203.0.113.42".into())); + assert_eq!(meta.custom, [("foo".into(), "bar".into())].into()); + } + + #[tokio::test] + async fn multipart_multiple_parts() { + let backend = InMemoryBackend::new("test"); + let id = make_id(); + let metadata = Metadata::default(); + + let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap(); + + let part1 = b"aaaa".to_vec(); + let part2 = b"bbbb".to_vec(); + let part3 = b"cc".to_vec(); + + let etag1 = backend + .upload_part( + &id, + &upload_id, + 1, + part1.len() as u64, + None, + stream::single(part1.clone()), + ) + .await + .unwrap(); + let etag2 = backend + .upload_part( + &id, + &upload_id, + 2, + part2.len() as u64, + None, + stream::single(part2.clone()), + ) + .await + .unwrap(); + let etag3 = backend + .upload_part( + &id, + &upload_id, + 3, + part3.len() as u64, + None, + stream::single(part3.clone()), + ) + .await + .unwrap(); + + let result = backend + .complete_multipart( + &id, + &upload_id, + vec![ + CompletedPart { + part_number: 1, + etag: etag1, + }, + CompletedPart { + part_number: 2, + etag: etag2, + }, + CompletedPart { + part_number: 3, + etag: etag3, + }, + ], + ) + .await + .unwrap(); + assert!(result.is_none()); + + let (_, body) = backend.get_object(&id).await.unwrap().unwrap(); + let payload = stream::read_to_vec(body).await.unwrap(); + assert_eq!(payload, b"aaaabbbbcc"); + } + + #[tokio::test] + async fn multipart_list_parts() { + let backend = InMemoryBackend::new("test"); + let id = make_id(); + let metadata = Metadata::default(); + + let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap(); + + let etag1 = backend + .upload_part(&id, &upload_id, 1, 3, None, stream::single(b"aaa".to_vec())) + .await + .unwrap(); + let etag2 = backend + .upload_part(&id, &upload_id, 2, 3, None, stream::single(b"bbb".to_vec())) + .await + .unwrap(); + + let list = backend + .list_parts(&id, &upload_id, None, None) + .await + .unwrap(); + assert_eq!(list.parts.len(), 2); + assert_eq!(list.parts[0].part_number, 1); + assert_eq!(list.parts[0].etag, etag1); + assert_eq!(list.parts[0].size, 3); + assert_eq!(list.parts[1].part_number, 2); + assert_eq!(list.parts[1].etag, etag2); + assert_eq!(list.parts[1].size, 3); + + // Pagination + let page1 = backend + .list_parts(&id, &upload_id, Some(1), None) + .await + .unwrap(); + assert_eq!(page1.parts.len(), 1); + assert_eq!(page1.parts[0].part_number, 1); + assert!(page1.is_truncated); + assert!(page1.next_part_number_marker.is_some()); + + let page2 = backend + .list_parts(&id, &upload_id, Some(1), page1.next_part_number_marker) + .await + .unwrap(); + assert_eq!(page2.parts.len(), 1); + assert_eq!(page2.parts[0].part_number, 2); + + backend.abort_multipart(&id, &upload_id).await.unwrap(); + } + + #[tokio::test] + async fn multipart_abort() { + let backend = InMemoryBackend::new("test"); + let id = make_id(); + let metadata = Metadata::default(); + + let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap(); + + backend + .upload_part( + &id, + &upload_id, + 1, + 5, + None, + stream::single(b"hello".to_vec()), + ) + .await + .unwrap(); + + backend.abort_multipart(&id, &upload_id).await.unwrap(); + + let result = backend.get_object(&id).await.unwrap(); + assert!(result.is_none(), "object should not exist after abort"); + } + + #[tokio::test] + async fn multipart_invalid_etag() { + let backend = InMemoryBackend::new("test"); + let id = make_id(); + let metadata = Metadata::default(); + + let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap(); + + backend + .upload_part( + &id, + &upload_id, + 1, + 5, + None, + stream::single(b"hello".to_vec()), + ) + .await + .unwrap(); + + let result = backend + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag: "wrong-etag".into(), + }], + ) + .await + .unwrap(); + assert!(result.is_some(), "expected error for bad etag"); + assert_eq!(result.unwrap().code, "InvalidPart"); + } + + #[tokio::test] + async fn multipart_missing_part() { + let backend = InMemoryBackend::new("test"); + let id = make_id(); + let metadata = Metadata::default(); + + let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap(); + + let result = backend + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 99, + etag: "whatever".into(), + }], + ) + .await + .unwrap(); + assert!(result.is_some(), "expected error for missing part"); + assert_eq!(result.unwrap().code, "InvalidPart"); + } +} diff --git a/objectstore-service/src/backend/local_fs.rs b/objectstore-service/src/backend/local_fs.rs index 71a7d1a5..8cee91e3 100644 --- a/objectstore-service/src/backend/local_fs.rs +++ b/objectstore-service/src/backend/local_fs.rs @@ -3,16 +3,24 @@ use std::io::ErrorKind; use std::path::PathBuf; use std::pin::pin; +use std::time::SystemTime; -use futures_util::StreamExt; +use bytes::BytesMut; +use futures_util::{StreamExt, TryStreamExt}; use objectstore_types::metadata::Metadata; use tokio::fs::OpenOptions; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}; +use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}; use tokio_util::io::{ReaderStream, StreamReader}; -use crate::backend::common::{Backend, DeleteResponse, GetResponse, PutResponse}; +use crate::backend::common::{ + Backend, DeleteResponse, GetResponse, MultipartUploadBackend, PutResponse, +}; use crate::error::{Error, Result}; use crate::id::ObjectId; +use crate::multipart::{ + AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse, + ListPartsResponse, Part, PartNumber, UploadId, UploadPartResponse, +}; use crate::stream::{self, ClientStream}; /// Configuration for [`LocalFsBackend`]. @@ -152,6 +160,226 @@ impl Backend for LocalFsBackend { } } +impl LocalFsBackend { + fn multipart_dir(&self, id: &ObjectId, upload_id: &UploadId) -> PathBuf { + self.path + .join("__multipart__") + .join(id.as_storage_path().to_string()) + .join(upload_id) + } +} + +#[async_trait::async_trait] +impl MultipartUploadBackend for LocalFsBackend { + async fn initiate_multipart( + &self, + id: &ObjectId, + metadata: &Metadata, + ) -> Result { + let upload_id = uuid::Uuid::now_v7().to_string(); + let dir = self.multipart_dir(id, &upload_id); + tokio::fs::create_dir_all(&dir).await?; + + let meta_path = dir.join("metadata.json"); + let metadata_json = serde_json::to_string(metadata).map_err(|cause| Error::Serde { + context: "failed to serialize multipart metadata".to_string(), + cause, + })?; + tokio::fs::write(meta_path, metadata_json).await?; + + Ok(upload_id) + } + + async fn upload_part( + &self, + id: &ObjectId, + upload_id: &UploadId, + part_number: PartNumber, + _content_length: u64, + _content_md5: Option<&str>, + body: ClientStream, + ) -> Result { + let dir = self.multipart_dir(id, upload_id); + if !tokio::fs::try_exists(&dir).await? { + return Err(Error::generic("multipart upload not found")); + } + + let data: BytesMut = body.try_collect().await?; + let etag = format!("\"etag-{part_number}-{}\"", data.len()); + + let header = serde_json::json!({ + "etag": etag, + "uploaded_at": SystemTime::now(), + "size": data.len(), + }); + let header_line = serde_json::to_string(&header).map_err(|cause| Error::Serde { + context: "failed to serialize part header".to_string(), + cause, + })?; + + let part_path = dir.join(format!("{part_number}.part")); + let file = OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .open(part_path) + .await?; + let mut writer = BufWriter::new(file); + writer.write_all(header_line.as_bytes()).await?; + writer.write_all(b"\n").await?; + writer.write_all(&data).await?; + writer.flush().await?; + + Ok(etag) + } + + async fn list_parts( + &self, + id: &ObjectId, + upload_id: &UploadId, + max_parts: Option, + part_number_marker: Option, + ) -> Result { + let dir = self.multipart_dir(id, upload_id); + if !tokio::fs::try_exists(&dir).await? { + return Err(Error::generic("multipart upload not found")); + } + + let mut entries = tokio::fs::read_dir(&dir).await?; + let mut parts = Vec::new(); + + while let Some(entry) = entries.next_entry().await? { + let name = entry.file_name(); + let name_str = name.to_string_lossy(); + let Some(pn_str) = name_str.strip_suffix(".part") else { + continue; + }; + let Ok(pn) = pn_str.parse::() else { + continue; + }; + + if part_number_marker.is_some_and(|marker| pn <= marker) { + continue; + } + + let file = tokio::fs::File::open(entry.path()).await?; + let mut reader = BufReader::new(file); + let mut header_line = String::new(); + reader.read_line(&mut header_line).await?; + let header: serde_json::Value = + serde_json::from_str(header_line.trim_end()).map_err(|cause| Error::Serde { + context: "failed to deserialize part header".to_string(), + cause, + })?; + + parts.push(Part { + part_number: pn, + etag: header["etag"].as_str().unwrap_or("").to_string(), + last_modified: serde_json::from_value(header["uploaded_at"].clone()) + .unwrap_or(SystemTime::UNIX_EPOCH), + size: header["size"].as_u64().unwrap_or(0), + }); + } + + parts.sort_by_key(|p| p.part_number); + + let max = max_parts.unwrap_or(u32::MAX) as usize; + let is_truncated = parts.len() > max; + parts.truncate(max); + + let next_part_number_marker = if is_truncated { + parts.last().map(|p| p.part_number) + } else { + None + }; + + Ok(ListPartsResponse { + parts, + is_truncated, + next_part_number_marker, + }) + } + + async fn abort_multipart( + &self, + id: &ObjectId, + upload_id: &UploadId, + ) -> Result { + let dir = self.multipart_dir(id, upload_id); + if tokio::fs::try_exists(&dir).await? { + tokio::fs::remove_dir_all(dir).await?; + } + Ok(()) + } + + async fn complete_multipart( + &self, + id: &ObjectId, + upload_id: &UploadId, + parts: Vec, + ) -> Result { + let dir = self.multipart_dir(id, upload_id); + if !tokio::fs::try_exists(&dir).await? { + return Err(Error::generic("multipart upload not found")); + } + + // Read metadata + let meta_path = dir.join("metadata.json"); + let meta_bytes = tokio::fs::read(&meta_path).await?; + let metadata: Metadata = + serde_json::from_slice(&meta_bytes).map_err(|cause| Error::Serde { + context: "failed to deserialize multipart metadata".to_string(), + cause, + })?; + + // Validate and assemble parts + let mut payload = Vec::new(); + for completed in &parts { + let part_path = dir.join(format!("{}.part", completed.part_number)); + if !tokio::fs::try_exists(&part_path).await? { + return Ok(Some(crate::multipart::CompleteMultipartError { + code: "InvalidPart".into(), + message: format!("part number {} was not uploaded", completed.part_number), + })); + } + + let file = tokio::fs::File::open(&part_path).await?; + let mut reader = BufReader::new(file); + let mut header_line = String::new(); + reader.read_line(&mut header_line).await?; + let header: serde_json::Value = + serde_json::from_str(header_line.trim_end()).map_err(|cause| Error::Serde { + context: "failed to deserialize part header".to_string(), + cause, + })?; + + let stored_etag = header["etag"].as_str().unwrap_or(""); + if stored_etag != completed.etag { + return Ok(Some(crate::multipart::CompleteMultipartError { + code: "InvalidPart".into(), + message: format!( + "etag mismatch for part {}: expected {}, got {}", + completed.part_number, stored_etag, completed.etag + ), + })); + } + + let mut part_data = Vec::new(); + reader.read_to_end(&mut part_data).await?; + payload.extend_from_slice(&part_data); + } + + // Write the final object using put_object + let stream = stream::single(payload); + self.put_object(id, &metadata, stream).await?; + + // Clean up multipart state + tokio::fs::remove_dir_all(dir).await?; + + Ok(None) + } +} + #[cfg(test)] mod tests { use std::time::{Duration, SystemTime}; @@ -254,4 +482,278 @@ mod tests { let result = backend.get_metadata(&id).await.unwrap(); assert!(result.is_none()); } + + fn make_id() -> ObjectId { + ObjectId::random(ObjectContext { + usecase: "testing".into(), + scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]), + }) + } + + fn make_backend() -> (tempfile::TempDir, LocalFsBackend) { + let tempdir = tempfile::tempdir().unwrap(); + let backend = LocalFsBackend::new(FileSystemConfig { + path: tempdir.path().to_path_buf(), + }); + (tempdir, backend) + } + + #[tokio::test] + async fn multipart_single_part() { + let (_tempdir, backend) = make_backend(); + let id = make_id(); + let metadata = Metadata { + content_type: "text/plain".into(), + expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(3600)), + origin: Some("203.0.113.42".into()), + custom: [("foo".into(), "bar".into())].into(), + ..Default::default() + }; + + let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap(); + + let data = b"hello, multipart world!"; + let etag = backend + .upload_part( + &id, + &upload_id, + 1, + data.len() as u64, + None, + stream::single(data.to_vec()), + ) + .await + .unwrap(); + + let result = backend + .complete_multipart( + &id, + &upload_id, + vec![crate::multipart::CompletedPart { + part_number: 1, + etag, + }], + ) + .await + .unwrap(); + assert!(result.is_none(), "expected no error on complete"); + + let (meta, body) = backend.get_object(&id).await.unwrap().unwrap(); + let payload: BytesMut = body.try_collect().await.unwrap(); + assert_eq!(payload.as_ref(), data); + assert_eq!(meta.content_type, "text/plain".to_string()); + assert_eq!( + meta.expiration_policy, + ExpirationPolicy::TimeToIdle(Duration::from_secs(3600)) + ); + assert_eq!(meta.origin, Some("203.0.113.42".into())); + assert_eq!(meta.custom, [("foo".into(), "bar".into())].into()); + } + + #[tokio::test] + async fn multipart_multiple_parts() { + let (_tempdir, backend) = make_backend(); + let id = make_id(); + let metadata = Metadata::default(); + + let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap(); + + let part1 = b"aaaa".to_vec(); + let part2 = b"bbbb".to_vec(); + let part3 = b"cc".to_vec(); + + let etag1 = backend + .upload_part( + &id, + &upload_id, + 1, + part1.len() as u64, + None, + stream::single(part1.clone()), + ) + .await + .unwrap(); + let etag2 = backend + .upload_part( + &id, + &upload_id, + 2, + part2.len() as u64, + None, + stream::single(part2.clone()), + ) + .await + .unwrap(); + let etag3 = backend + .upload_part( + &id, + &upload_id, + 3, + part3.len() as u64, + None, + stream::single(part3.clone()), + ) + .await + .unwrap(); + + let result = backend + .complete_multipart( + &id, + &upload_id, + vec![ + crate::multipart::CompletedPart { + part_number: 1, + etag: etag1, + }, + crate::multipart::CompletedPart { + part_number: 2, + etag: etag2, + }, + crate::multipart::CompletedPart { + part_number: 3, + etag: etag3, + }, + ], + ) + .await + .unwrap(); + assert!(result.is_none()); + + let (_, body) = backend.get_object(&id).await.unwrap().unwrap(); + let payload: BytesMut = body.try_collect().await.unwrap(); + assert_eq!(payload.as_ref(), b"aaaabbbbcc"); + } + + #[tokio::test] + async fn multipart_list_parts() { + let (_tempdir, backend) = make_backend(); + let id = make_id(); + let metadata = Metadata::default(); + + let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap(); + + let etag1 = backend + .upload_part(&id, &upload_id, 1, 3, None, stream::single(b"aaa".to_vec())) + .await + .unwrap(); + let etag2 = backend + .upload_part(&id, &upload_id, 2, 3, None, stream::single(b"bbb".to_vec())) + .await + .unwrap(); + + let list = backend + .list_parts(&id, &upload_id, None, None) + .await + .unwrap(); + assert_eq!(list.parts.len(), 2); + assert_eq!(list.parts[0].part_number, 1); + assert_eq!(list.parts[0].etag, etag1); + assert_eq!(list.parts[0].size, 3); + assert_eq!(list.parts[1].part_number, 2); + assert_eq!(list.parts[1].etag, etag2); + assert_eq!(list.parts[1].size, 3); + + // Pagination + let page1 = backend + .list_parts(&id, &upload_id, Some(1), None) + .await + .unwrap(); + assert_eq!(page1.parts.len(), 1); + assert_eq!(page1.parts[0].part_number, 1); + assert!(page1.is_truncated); + assert!(page1.next_part_number_marker.is_some()); + + let page2 = backend + .list_parts(&id, &upload_id, Some(1), page1.next_part_number_marker) + .await + .unwrap(); + assert_eq!(page2.parts.len(), 1); + assert_eq!(page2.parts[0].part_number, 2); + + backend.abort_multipart(&id, &upload_id).await.unwrap(); + } + + #[tokio::test] + async fn multipart_abort() { + let (_tempdir, backend) = make_backend(); + let id = make_id(); + let metadata = Metadata::default(); + + let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap(); + + backend + .upload_part( + &id, + &upload_id, + 1, + 5, + None, + stream::single(b"hello".to_vec()), + ) + .await + .unwrap(); + + backend.abort_multipart(&id, &upload_id).await.unwrap(); + + let result = backend.get_object(&id).await.unwrap(); + assert!(result.is_none(), "object should not exist after abort"); + } + + #[tokio::test] + async fn multipart_invalid_etag() { + let (_tempdir, backend) = make_backend(); + let id = make_id(); + let metadata = Metadata::default(); + + let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap(); + + backend + .upload_part( + &id, + &upload_id, + 1, + 5, + None, + stream::single(b"hello".to_vec()), + ) + .await + .unwrap(); + + let result = backend + .complete_multipart( + &id, + &upload_id, + vec![crate::multipart::CompletedPart { + part_number: 1, + etag: "wrong-etag".into(), + }], + ) + .await + .unwrap(); + assert!(result.is_some(), "expected error for bad etag"); + assert_eq!(result.unwrap().code, "InvalidPart"); + } + + #[tokio::test] + async fn multipart_missing_part() { + let (_tempdir, backend) = make_backend(); + let id = make_id(); + let metadata = Metadata::default(); + + let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap(); + + let result = backend + .complete_multipart( + &id, + &upload_id, + vec![crate::multipart::CompletedPart { + part_number: 99, + etag: "whatever".into(), + }], + ) + .await + .unwrap(); + assert!(result.is_some(), "expected error for missing part"); + assert_eq!(result.unwrap().code, "InvalidPart"); + } } From 803c658f7e45f8813a28f3983b85592ba552125b Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Mon, 4 May 2026 13:16:27 +0200 Subject: [PATCH 2/5] fix(multipart): preserve upload state on failed complete_multipart validation InMemoryBackend::complete_multipart was calling .remove() on the multipart store before validating parts. If validation failed, the upload was permanently destroyed and the client could not retry. Changed to .get() for validation, only removing after successful assembly. Added retry-after-failure test coverage for both InMemory and LocalFs backends. --- objectstore-service/src/backend/in_memory.rs | 120 +++++++++++++------ objectstore-service/src/backend/local_fs.rs | 42 ++++++- 2 files changed, 127 insertions(+), 35 deletions(-) diff --git a/objectstore-service/src/backend/in_memory.rs b/objectstore-service/src/backend/in_memory.rs index 2c949ed9..fec2b21a 100644 --- a/objectstore-service/src/backend/in_memory.rs +++ b/objectstore-service/src/backend/in_memory.rs @@ -329,47 +329,59 @@ impl MultipartUploadBackend for InMemoryBackend { upload_id: &UploadId, parts: Vec, ) -> Result { - let upload = self - .multipart_store - .lock() - .unwrap() - .remove(&(id.clone(), upload_id.clone())) - .ok_or_else(|| Error::generic("multipart upload not found"))?; - - for completed in &parts { - match upload.parts.get(&completed.part_number) { - None => { - return Ok(Some(crate::multipart::CompleteMultipartError { - code: "InvalidPart".into(), - message: format!("part number {} was not uploaded", completed.part_number), - })); + let key = (id.clone(), upload_id.clone()); + + // Validate and assemble while holding the multipart lock, but don't + // remove the upload yet — a failed validation must leave it intact so + // the client can retry. + let assembled = { + let store = self.multipart_store.lock().unwrap(); + let upload = store + .get(&key) + .ok_or_else(|| Error::generic("multipart upload not found"))?; + + for completed in &parts { + match upload.parts.get(&completed.part_number) { + None => { + return Ok(Some(crate::multipart::CompleteMultipartError { + code: "InvalidPart".into(), + message: format!( + "part number {} was not uploaded", + completed.part_number + ), + })); + } + Some(stored) if stored.etag != completed.etag => { + return Ok(Some(crate::multipart::CompleteMultipartError { + code: "InvalidPart".into(), + message: format!( + "etag mismatch for part {}: expected {}, got {}", + completed.part_number, stored.etag, completed.etag + ), + })); + } + _ => {} } - Some(stored) if stored.etag != completed.etag => { - return Ok(Some(crate::multipart::CompleteMultipartError { - code: "InvalidPart".into(), - message: format!( - "etag mismatch for part {}: expected {}, got {}", - completed.part_number, stored.etag, completed.etag - ), - })); - } - _ => {} } - } - let mut payload = BytesMut::new(); - for completed in &parts { - let stored = &upload.parts[&completed.part_number]; - payload.extend_from_slice(&stored.data); - } + let mut payload = BytesMut::new(); + for completed in &parts { + let stored = &upload.parts[&completed.part_number]; + payload.extend_from_slice(&stored.data); + } - let mut metadata = upload.metadata; - metadata.size = Some(payload.len()); + let mut metadata = upload.metadata.clone(); + metadata.size = Some(payload.len()); + + (metadata, payload.freeze()) + }; self.store .lock() .unwrap() - .insert(id.clone(), StoreEntry::Object(metadata, payload.freeze())); + .insert(id.clone(), StoreEntry::Object(assembled.0, assembled.1)); + + self.multipart_store.lock().unwrap().remove(&key); Ok(None) } @@ -666,7 +678,7 @@ mod tests { let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap(); - backend + let etag = backend .upload_part( &id, &upload_id, @@ -691,6 +703,20 @@ mod tests { .unwrap(); assert!(result.is_some(), "expected error for bad etag"); assert_eq!(result.unwrap().code, "InvalidPart"); + + // Upload must survive a failed complete so the client can retry. + let result = backend + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag, + }], + ) + .await + .unwrap(); + assert!(result.is_none(), "retry with correct etag should succeed"); } #[tokio::test] @@ -701,6 +727,18 @@ mod tests { let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap(); + let etag = backend + .upload_part( + &id, + &upload_id, + 1, + 5, + None, + stream::single(b"hello".to_vec()), + ) + .await + .unwrap(); + let result = backend .complete_multipart( &id, @@ -714,5 +752,19 @@ mod tests { .unwrap(); assert!(result.is_some(), "expected error for missing part"); assert_eq!(result.unwrap().code, "InvalidPart"); + + // Upload must survive a failed complete so the client can retry. + let result = backend + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag, + }], + ) + .await + .unwrap(); + assert!(result.is_none(), "retry with correct part should succeed"); } } diff --git a/objectstore-service/src/backend/local_fs.rs b/objectstore-service/src/backend/local_fs.rs index 8cee91e3..5e717475 100644 --- a/objectstore-service/src/backend/local_fs.rs +++ b/objectstore-service/src/backend/local_fs.rs @@ -707,7 +707,7 @@ mod tests { let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap(); - backend + let etag = backend .upload_part( &id, &upload_id, @@ -732,6 +732,20 @@ mod tests { .unwrap(); assert!(result.is_some(), "expected error for bad etag"); assert_eq!(result.unwrap().code, "InvalidPart"); + + // Upload must survive a failed complete so the client can retry. + let result = backend + .complete_multipart( + &id, + &upload_id, + vec![crate::multipart::CompletedPart { + part_number: 1, + etag, + }], + ) + .await + .unwrap(); + assert!(result.is_none(), "retry with correct etag should succeed"); } #[tokio::test] @@ -742,6 +756,18 @@ mod tests { let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap(); + let etag = backend + .upload_part( + &id, + &upload_id, + 1, + 5, + None, + stream::single(b"hello".to_vec()), + ) + .await + .unwrap(); + let result = backend .complete_multipart( &id, @@ -755,5 +781,19 @@ mod tests { .unwrap(); assert!(result.is_some(), "expected error for missing part"); assert_eq!(result.unwrap().code, "InvalidPart"); + + // Upload must survive a failed complete so the client can retry. + let result = backend + .complete_multipart( + &id, + &upload_id, + vec![crate::multipart::CompletedPart { + part_number: 1, + etag, + }], + ) + .await + .unwrap(); + assert!(result.is_none(), "retry with correct part should succeed"); } } From eb79b97b2aaeea892825d0cbad71172ebe8fff8a Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Tue, 12 May 2026 12:51:45 +0200 Subject: [PATCH 3/5] perf(multipart): stream part uploads and assembly in LocalFs backend upload_part and complete_multipart were buffering entire payloads in memory. Both now use StreamReader + BufWriter + tokio::io::copy to stream data to disk, matching the put_object pattern. --- objectstore-service/src/backend/local_fs.rs | 65 ++++++++++++++++----- 1 file changed, 49 insertions(+), 16 deletions(-) diff --git a/objectstore-service/src/backend/local_fs.rs b/objectstore-service/src/backend/local_fs.rs index 5e717475..5f6d33f5 100644 --- a/objectstore-service/src/backend/local_fs.rs +++ b/objectstore-service/src/backend/local_fs.rs @@ -5,11 +5,10 @@ use std::path::PathBuf; use std::pin::pin; use std::time::SystemTime; -use bytes::BytesMut; -use futures_util::{StreamExt, TryStreamExt}; +use futures_util::StreamExt; use objectstore_types::metadata::Metadata; use tokio::fs::OpenOptions; -use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}; use tokio_util::io::{ReaderStream, StreamReader}; use crate::backend::common::{ @@ -195,7 +194,7 @@ impl MultipartUploadBackend for LocalFsBackend { id: &ObjectId, upload_id: &UploadId, part_number: PartNumber, - _content_length: u64, + content_length: u64, _content_md5: Option<&str>, body: ClientStream, ) -> Result { @@ -204,13 +203,12 @@ impl MultipartUploadBackend for LocalFsBackend { return Err(Error::generic("multipart upload not found")); } - let data: BytesMut = body.try_collect().await?; - let etag = format!("\"etag-{part_number}-{}\"", data.len()); + let etag = format!("\"etag-{part_number}-{content_length}\""); let header = serde_json::json!({ "etag": etag, "uploaded_at": SystemTime::now(), - "size": data.len(), + "size": content_length, }); let header_line = serde_json::to_string(&header).map_err(|cause| Error::Serde { context: "failed to serialize part header".to_string(), @@ -224,11 +222,23 @@ impl MultipartUploadBackend for LocalFsBackend { .truncate(true) .open(part_path) .await?; + + let mut reader = pin!(StreamReader::new(body)); let mut writer = BufWriter::new(file); writer.write_all(header_line.as_bytes()).await?; writer.write_all(b"\n").await?; - writer.write_all(&data).await?; + + tokio::io::copy(&mut reader, &mut writer) + .await + .map_err(|e| match stream::unpack_client_error(&e) { + Some(ce) => Error::Client(ce), + None => e.into(), + })?; + writer.flush().await?; + let file = writer.into_inner(); + file.sync_data().await?; + drop(file); Ok(etag) } @@ -332,8 +342,7 @@ impl MultipartUploadBackend for LocalFsBackend { cause, })?; - // Validate and assemble parts - let mut payload = Vec::new(); + // Validate all parts (headers only) before writing anything for completed in &parts { let part_path = dir.join(format!("{}.part", completed.part_number)); if !tokio::fs::try_exists(&part_path).await? { @@ -363,15 +372,39 @@ impl MultipartUploadBackend for LocalFsBackend { ), })); } + } - let mut part_data = Vec::new(); - reader.read_to_end(&mut part_data).await?; - payload.extend_from_slice(&part_data); + // Stream parts directly to the final object file + let path = self.path.join(id.as_storage_path().to_string()); + tokio::fs::create_dir_all(path.parent().unwrap()).await?; + let file = OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .open(path) + .await?; + let mut writer = BufWriter::new(file); + + let metadata_json = serde_json::to_string(&metadata).map_err(|cause| Error::Serde { + context: "failed to serialize metadata".to_string(), + cause, + })?; + writer.write_all(metadata_json.as_bytes()).await?; + writer.write_all(b"\n").await?; + + for completed in &parts { + let part_path = dir.join(format!("{}.part", completed.part_number)); + let file = tokio::fs::File::open(&part_path).await?; + let mut reader = BufReader::new(file); + let mut header_line = String::new(); + reader.read_line(&mut header_line).await?; + tokio::io::copy(&mut reader, &mut writer).await?; } - // Write the final object using put_object - let stream = stream::single(payload); - self.put_object(id, &metadata, stream).await?; + writer.flush().await?; + let file = writer.into_inner(); + file.sync_data().await?; + drop(file); // Clean up multipart state tokio::fs::remove_dir_all(dir).await?; From 66ebfd4e7097aec315bb10d83e6dfd21a90358b6 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Tue, 12 May 2026 13:09:56 +0200 Subject: [PATCH 4/5] chore(multipart): drop premature content_length validation Add a TODO to validate bytes copied against content_length once we have a proper BadRequest error variant at the service layer. --- objectstore-service/src/backend/local_fs.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/objectstore-service/src/backend/local_fs.rs b/objectstore-service/src/backend/local_fs.rs index 5f6d33f5..7604c024 100644 --- a/objectstore-service/src/backend/local_fs.rs +++ b/objectstore-service/src/backend/local_fs.rs @@ -228,13 +228,17 @@ impl MultipartUploadBackend for LocalFsBackend { writer.write_all(header_line.as_bytes()).await?; writer.write_all(b"\n").await?; - tokio::io::copy(&mut reader, &mut writer) + let _bytes_copied = tokio::io::copy(&mut reader, &mut writer) .await .map_err(|e| match stream::unpack_client_error(&e) { Some(ce) => Error::Client(ce), None => e.into(), })?; + // TODO: validate bytes_copied against content_length and return a BadRequest-style + // error. Needs a service-layer error variant that maps to HTTP 400 without abusing + // ClientError (which is meant for stream errors). + writer.flush().await?; let file = writer.into_inner(); file.sync_data().await?; From 78f0d7fd6bc0580258476acc45e80775f5b5bfd7 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Tue, 12 May 2026 13:13:17 +0200 Subject: [PATCH 5/5] chore(multipart): add TODO for InvalidPartOrder validation S3 and GCS reject CompleteMultipartUpload requests where parts are not in ascending order. Add TODOs to LocalFs and InMemory backends to implement this validation once we have a proper client error variant. --- objectstore-service/src/backend/in_memory.rs | 3 +++ objectstore-service/src/backend/local_fs.rs | 3 +++ 2 files changed, 6 insertions(+) diff --git a/objectstore-service/src/backend/in_memory.rs b/objectstore-service/src/backend/in_memory.rs index fec2b21a..253aafcd 100644 --- a/objectstore-service/src/backend/in_memory.rs +++ b/objectstore-service/src/backend/in_memory.rs @@ -331,6 +331,9 @@ impl MultipartUploadBackend for InMemoryBackend { ) -> Result { let key = (id.clone(), upload_id.clone()); + // TODO: validate that parts are in ascending part_number order and reject with + // InvalidPartOrder if not (matches S3/GCS behavior). Needs a proper client error variant. + // Validate and assemble while holding the multipart lock, but don't // remove the upload yet — a failed validation must leave it intact so // the client can retry. diff --git a/objectstore-service/src/backend/local_fs.rs b/objectstore-service/src/backend/local_fs.rs index 7604c024..c4518a18 100644 --- a/objectstore-service/src/backend/local_fs.rs +++ b/objectstore-service/src/backend/local_fs.rs @@ -346,6 +346,9 @@ impl MultipartUploadBackend for LocalFsBackend { cause, })?; + // TODO: validate that parts are in ascending part_number order and reject with + // InvalidPartOrder if not (matches S3/GCS behavior). Needs a proper client error variant. + // Validate all parts (headers only) before writing anything for completed in &parts { let part_path = dir.join(format!("{}.part", completed.part_number));