diff --git a/objectstore-service/src/backend/in_memory.rs b/objectstore-service/src/backend/in_memory.rs index 5536f3b4..253aafcd 100644 --- a/objectstore-service/src/backend/in_memory.rs +++ b/objectstore-service/src/backend/in_memory.rs @@ -5,19 +5,24 @@ //! backend is [`Clone`] so tests can hold a handle for direct inspection while //! the service owns a boxed copy. -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::sync::{Arc, Mutex}; +use std::time::SystemTime; use bytes::{Bytes, BytesMut}; use futures_util::TryStreamExt; use objectstore_types::metadata::Metadata; use super::common::{ - DeleteResponse, GetResponse, HighVolumeBackend, PutResponse, TieredGet, TieredMetadata, - TieredWrite, Tombstone, + DeleteResponse, GetResponse, HighVolumeBackend, MultipartUploadBackend, PutResponse, TieredGet, + TieredMetadata, TieredWrite, Tombstone, }; use crate::error::{Error, Result}; use crate::id::ObjectId; +use crate::multipart::{ + AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse, + ListPartsResponse, Part, PartNumber, UploadId, UploadPartResponse, +}; use crate::stream::ClientStream; /// An entry in the in-memory store. @@ -29,6 +34,21 @@ enum StoreEntry { type Store = HashMap; +#[derive(Clone, Debug)] +struct MultipartUpload { + metadata: Metadata, + parts: BTreeMap, +} + +#[derive(Clone, Debug)] +struct UploadedPart { + etag: String, + data: Bytes, + uploaded_at: SystemTime, +} + +type MultipartStore = HashMap<(ObjectId, UploadId), MultipartUpload>; + /// In-memory [`Backend`](super::common::Backend) backed by a `HashMap`. /// /// Removes the need for filesystem tempdir management in unit tests. The @@ -38,6 +58,7 @@ type Store = HashMap; pub struct InMemoryBackend { name: &'static str, store: Arc>, + multipart_store: Arc>, } impl InMemoryBackend { @@ -46,6 +67,7 @@ impl InMemoryBackend { Self { name, store: Arc::new(Mutex::new(HashMap::new())), + multipart_store: Arc::new(Mutex::new(HashMap::new())), } } @@ -195,6 +217,179 @@ impl HighVolumeBackend for InMemoryBackend { } } +#[async_trait::async_trait] +impl MultipartUploadBackend for InMemoryBackend { + async fn initiate_multipart( + &self, + id: &ObjectId, + metadata: &Metadata, + ) -> Result { + let upload_id = uuid::Uuid::now_v7().to_string(); + let upload = MultipartUpload { + metadata: metadata.clone(), + parts: BTreeMap::new(), + }; + self.multipart_store + .lock() + .unwrap() + .insert((id.clone(), upload_id.clone()), upload); + Ok(upload_id) + } + + async fn upload_part( + &self, + id: &ObjectId, + upload_id: &UploadId, + part_number: PartNumber, + _content_length: u64, + _content_md5: Option<&str>, + body: ClientStream, + ) -> Result { + let data: BytesMut = body.try_collect().await?; + let data = data.freeze(); + let etag = format!("\"etag-{part_number}-{}\"", data.len()); + + let mut store = self.multipart_store.lock().unwrap(); + let upload = store + .get_mut(&(id.clone(), upload_id.clone())) + .ok_or_else(|| Error::generic("multipart upload not found"))?; + + upload.parts.insert( + part_number, + UploadedPart { + etag: etag.clone(), + data, + uploaded_at: SystemTime::now(), + }, + ); + + Ok(etag) + } + + async fn list_parts( + &self, + id: &ObjectId, + upload_id: &UploadId, + max_parts: Option, + part_number_marker: Option, + ) -> Result { + let store = self.multipart_store.lock().unwrap(); + let upload = store + .get(&(id.clone(), upload_id.clone())) + .ok_or_else(|| Error::generic("multipart upload not found"))?; + + let iter = upload + .parts + .iter() + .filter(|(pn, _)| part_number_marker.is_none_or(|marker| **pn > marker)); + + let max = max_parts.unwrap_or(u32::MAX) as usize; + let all: Vec<_> = iter.collect(); + let is_truncated = all.len() > max; + let page: Vec<_> = all.into_iter().take(max).collect(); + + let next_part_number_marker = if is_truncated { + page.last().map(|(pn, _)| **pn) + } else { + None + }; + + let parts = page + .into_iter() + .map(|(pn, part)| Part { + part_number: *pn, + etag: part.etag.clone(), + last_modified: part.uploaded_at, + size: part.data.len() as u64, + }) + .collect(); + + Ok(ListPartsResponse { + parts, + is_truncated, + next_part_number_marker, + }) + } + + async fn abort_multipart( + &self, + id: &ObjectId, + upload_id: &UploadId, + ) -> Result { + self.multipart_store + .lock() + .unwrap() + .remove(&(id.clone(), upload_id.clone())); + Ok(()) + } + + async fn complete_multipart( + &self, + id: &ObjectId, + upload_id: &UploadId, + parts: Vec, + ) -> Result { + let key = (id.clone(), upload_id.clone()); + + // TODO: validate that parts are in ascending part_number order and reject with + // InvalidPartOrder if not (matches S3/GCS behavior). Needs a proper client error variant. + + // Validate and assemble while holding the multipart lock, but don't + // remove the upload yet — a failed validation must leave it intact so + // the client can retry. + let assembled = { + let store = self.multipart_store.lock().unwrap(); + let upload = store + .get(&key) + .ok_or_else(|| Error::generic("multipart upload not found"))?; + + for completed in &parts { + match upload.parts.get(&completed.part_number) { + None => { + return Ok(Some(crate::multipart::CompleteMultipartError { + code: "InvalidPart".into(), + message: format!( + "part number {} was not uploaded", + completed.part_number + ), + })); + } + Some(stored) if stored.etag != completed.etag => { + return Ok(Some(crate::multipart::CompleteMultipartError { + code: "InvalidPart".into(), + message: format!( + "etag mismatch for part {}: expected {}, got {}", + completed.part_number, stored.etag, completed.etag + ), + })); + } + _ => {} + } + } + + let mut payload = BytesMut::new(); + for completed in &parts { + let stored = &upload.parts[&completed.part_number]; + payload.extend_from_slice(&stored.data); + } + + let mut metadata = upload.metadata.clone(); + metadata.size = Some(payload.len()); + + (metadata, payload.freeze()) + }; + + self.store + .lock() + .unwrap() + .insert(id.clone(), StoreEntry::Object(assembled.0, assembled.1)); + + self.multipart_store.lock().unwrap().remove(&key); + + Ok(None) + } +} + /// Returns `true` if `entry` matches the expected tombstone redirect state. /// /// - `expected = None`: matches any non-tombstone (absent or inline object). @@ -257,3 +452,322 @@ impl Entry { } } } + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use objectstore_types::metadata::ExpirationPolicy; + use objectstore_types::scope::{Scope, Scopes}; + + use super::*; + use crate::backend::common::Backend; + use crate::id::ObjectContext; + use crate::stream; + + fn make_id() -> ObjectId { + ObjectId::random(ObjectContext { + usecase: "testing".into(), + scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]), + }) + } + + #[tokio::test] + async fn multipart_single_part() { + let backend = InMemoryBackend::new("test"); + let id = make_id(); + let metadata = Metadata { + content_type: "text/plain".into(), + expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(3600)), + origin: Some("203.0.113.42".into()), + custom: [("foo".into(), "bar".into())].into(), + ..Default::default() + }; + + let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap(); + + let data = b"hello, multipart world!"; + let etag = backend + .upload_part( + &id, + &upload_id, + 1, + data.len() as u64, + None, + stream::single(data.to_vec()), + ) + .await + .unwrap(); + + let result = backend + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag, + }], + ) + .await + .unwrap(); + assert!(result.is_none(), "expected no error on complete"); + + let (meta, body) = backend.get_object(&id).await.unwrap().unwrap(); + let payload = stream::read_to_vec(body).await.unwrap(); + assert_eq!(payload, data); + assert_eq!(meta.content_type, "text/plain".to_string()); + assert_eq!( + meta.expiration_policy, + ExpirationPolicy::TimeToIdle(Duration::from_secs(3600)) + ); + assert_eq!(meta.origin, Some("203.0.113.42".into())); + assert_eq!(meta.custom, [("foo".into(), "bar".into())].into()); + } + + #[tokio::test] + async fn multipart_multiple_parts() { + let backend = InMemoryBackend::new("test"); + let id = make_id(); + let metadata = Metadata::default(); + + let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap(); + + let part1 = b"aaaa".to_vec(); + let part2 = b"bbbb".to_vec(); + let part3 = b"cc".to_vec(); + + let etag1 = backend + .upload_part( + &id, + &upload_id, + 1, + part1.len() as u64, + None, + stream::single(part1.clone()), + ) + .await + .unwrap(); + let etag2 = backend + .upload_part( + &id, + &upload_id, + 2, + part2.len() as u64, + None, + stream::single(part2.clone()), + ) + .await + .unwrap(); + let etag3 = backend + .upload_part( + &id, + &upload_id, + 3, + part3.len() as u64, + None, + stream::single(part3.clone()), + ) + .await + .unwrap(); + + let result = backend + .complete_multipart( + &id, + &upload_id, + vec![ + CompletedPart { + part_number: 1, + etag: etag1, + }, + CompletedPart { + part_number: 2, + etag: etag2, + }, + CompletedPart { + part_number: 3, + etag: etag3, + }, + ], + ) + .await + .unwrap(); + assert!(result.is_none()); + + let (_, body) = backend.get_object(&id).await.unwrap().unwrap(); + let payload = stream::read_to_vec(body).await.unwrap(); + assert_eq!(payload, b"aaaabbbbcc"); + } + + #[tokio::test] + async fn multipart_list_parts() { + let backend = InMemoryBackend::new("test"); + let id = make_id(); + let metadata = Metadata::default(); + + let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap(); + + let etag1 = backend + .upload_part(&id, &upload_id, 1, 3, None, stream::single(b"aaa".to_vec())) + .await + .unwrap(); + let etag2 = backend + .upload_part(&id, &upload_id, 2, 3, None, stream::single(b"bbb".to_vec())) + .await + .unwrap(); + + let list = backend + .list_parts(&id, &upload_id, None, None) + .await + .unwrap(); + assert_eq!(list.parts.len(), 2); + assert_eq!(list.parts[0].part_number, 1); + assert_eq!(list.parts[0].etag, etag1); + assert_eq!(list.parts[0].size, 3); + assert_eq!(list.parts[1].part_number, 2); + assert_eq!(list.parts[1].etag, etag2); + assert_eq!(list.parts[1].size, 3); + + // Pagination + let page1 = backend + .list_parts(&id, &upload_id, Some(1), None) + .await + .unwrap(); + assert_eq!(page1.parts.len(), 1); + assert_eq!(page1.parts[0].part_number, 1); + assert!(page1.is_truncated); + assert!(page1.next_part_number_marker.is_some()); + + let page2 = backend + .list_parts(&id, &upload_id, Some(1), page1.next_part_number_marker) + .await + .unwrap(); + assert_eq!(page2.parts.len(), 1); + assert_eq!(page2.parts[0].part_number, 2); + + backend.abort_multipart(&id, &upload_id).await.unwrap(); + } + + #[tokio::test] + async fn multipart_abort() { + let backend = InMemoryBackend::new("test"); + let id = make_id(); + let metadata = Metadata::default(); + + let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap(); + + backend + .upload_part( + &id, + &upload_id, + 1, + 5, + None, + stream::single(b"hello".to_vec()), + ) + .await + .unwrap(); + + backend.abort_multipart(&id, &upload_id).await.unwrap(); + + let result = backend.get_object(&id).await.unwrap(); + assert!(result.is_none(), "object should not exist after abort"); + } + + #[tokio::test] + async fn multipart_invalid_etag() { + let backend = InMemoryBackend::new("test"); + let id = make_id(); + let metadata = Metadata::default(); + + let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap(); + + let etag = backend + .upload_part( + &id, + &upload_id, + 1, + 5, + None, + stream::single(b"hello".to_vec()), + ) + .await + .unwrap(); + + let result = backend + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag: "wrong-etag".into(), + }], + ) + .await + .unwrap(); + assert!(result.is_some(), "expected error for bad etag"); + assert_eq!(result.unwrap().code, "InvalidPart"); + + // Upload must survive a failed complete so the client can retry. + let result = backend + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag, + }], + ) + .await + .unwrap(); + assert!(result.is_none(), "retry with correct etag should succeed"); + } + + #[tokio::test] + async fn multipart_missing_part() { + let backend = InMemoryBackend::new("test"); + let id = make_id(); + let metadata = Metadata::default(); + + let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap(); + + let etag = backend + .upload_part( + &id, + &upload_id, + 1, + 5, + None, + stream::single(b"hello".to_vec()), + ) + .await + .unwrap(); + + let result = backend + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 99, + etag: "whatever".into(), + }], + ) + .await + .unwrap(); + assert!(result.is_some(), "expected error for missing part"); + assert_eq!(result.unwrap().code, "InvalidPart"); + + // Upload must survive a failed complete so the client can retry. + let result = backend + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag, + }], + ) + .await + .unwrap(); + assert!(result.is_none(), "retry with correct part should succeed"); + } +} diff --git a/objectstore-service/src/backend/local_fs.rs b/objectstore-service/src/backend/local_fs.rs index 71a7d1a5..c4518a18 100644 --- a/objectstore-service/src/backend/local_fs.rs +++ b/objectstore-service/src/backend/local_fs.rs @@ -3,6 +3,7 @@ use std::io::ErrorKind; use std::path::PathBuf; use std::pin::pin; +use std::time::SystemTime; use futures_util::StreamExt; use objectstore_types::metadata::Metadata; @@ -10,9 +11,15 @@ use tokio::fs::OpenOptions; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}; use tokio_util::io::{ReaderStream, StreamReader}; -use crate::backend::common::{Backend, DeleteResponse, GetResponse, PutResponse}; +use crate::backend::common::{ + Backend, DeleteResponse, GetResponse, MultipartUploadBackend, PutResponse, +}; use crate::error::{Error, Result}; use crate::id::ObjectId; +use crate::multipart::{ + AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse, + ListPartsResponse, Part, PartNumber, UploadId, UploadPartResponse, +}; use crate::stream::{self, ClientStream}; /// Configuration for [`LocalFsBackend`]. @@ -152,6 +159,267 @@ impl Backend for LocalFsBackend { } } +impl LocalFsBackend { + fn multipart_dir(&self, id: &ObjectId, upload_id: &UploadId) -> PathBuf { + self.path + .join("__multipart__") + .join(id.as_storage_path().to_string()) + .join(upload_id) + } +} + +#[async_trait::async_trait] +impl MultipartUploadBackend for LocalFsBackend { + async fn initiate_multipart( + &self, + id: &ObjectId, + metadata: &Metadata, + ) -> Result { + let upload_id = uuid::Uuid::now_v7().to_string(); + let dir = self.multipart_dir(id, &upload_id); + tokio::fs::create_dir_all(&dir).await?; + + let meta_path = dir.join("metadata.json"); + let metadata_json = serde_json::to_string(metadata).map_err(|cause| Error::Serde { + context: "failed to serialize multipart metadata".to_string(), + cause, + })?; + tokio::fs::write(meta_path, metadata_json).await?; + + Ok(upload_id) + } + + async fn upload_part( + &self, + id: &ObjectId, + upload_id: &UploadId, + part_number: PartNumber, + content_length: u64, + _content_md5: Option<&str>, + body: ClientStream, + ) -> Result { + let dir = self.multipart_dir(id, upload_id); + if !tokio::fs::try_exists(&dir).await? { + return Err(Error::generic("multipart upload not found")); + } + + let etag = format!("\"etag-{part_number}-{content_length}\""); + + let header = serde_json::json!({ + "etag": etag, + "uploaded_at": SystemTime::now(), + "size": content_length, + }); + let header_line = serde_json::to_string(&header).map_err(|cause| Error::Serde { + context: "failed to serialize part header".to_string(), + cause, + })?; + + let part_path = dir.join(format!("{part_number}.part")); + let file = OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .open(part_path) + .await?; + + let mut reader = pin!(StreamReader::new(body)); + let mut writer = BufWriter::new(file); + writer.write_all(header_line.as_bytes()).await?; + writer.write_all(b"\n").await?; + + let _bytes_copied = tokio::io::copy(&mut reader, &mut writer) + .await + .map_err(|e| match stream::unpack_client_error(&e) { + Some(ce) => Error::Client(ce), + None => e.into(), + })?; + + // TODO: validate bytes_copied against content_length and return a BadRequest-style + // error. Needs a service-layer error variant that maps to HTTP 400 without abusing + // ClientError (which is meant for stream errors). + + writer.flush().await?; + let file = writer.into_inner(); + file.sync_data().await?; + drop(file); + + Ok(etag) + } + + async fn list_parts( + &self, + id: &ObjectId, + upload_id: &UploadId, + max_parts: Option, + part_number_marker: Option, + ) -> Result { + let dir = self.multipart_dir(id, upload_id); + if !tokio::fs::try_exists(&dir).await? { + return Err(Error::generic("multipart upload not found")); + } + + let mut entries = tokio::fs::read_dir(&dir).await?; + let mut parts = Vec::new(); + + while let Some(entry) = entries.next_entry().await? { + let name = entry.file_name(); + let name_str = name.to_string_lossy(); + let Some(pn_str) = name_str.strip_suffix(".part") else { + continue; + }; + let Ok(pn) = pn_str.parse::() else { + continue; + }; + + if part_number_marker.is_some_and(|marker| pn <= marker) { + continue; + } + + let file = tokio::fs::File::open(entry.path()).await?; + let mut reader = BufReader::new(file); + let mut header_line = String::new(); + reader.read_line(&mut header_line).await?; + let header: serde_json::Value = + serde_json::from_str(header_line.trim_end()).map_err(|cause| Error::Serde { + context: "failed to deserialize part header".to_string(), + cause, + })?; + + parts.push(Part { + part_number: pn, + etag: header["etag"].as_str().unwrap_or("").to_string(), + last_modified: serde_json::from_value(header["uploaded_at"].clone()) + .unwrap_or(SystemTime::UNIX_EPOCH), + size: header["size"].as_u64().unwrap_or(0), + }); + } + + parts.sort_by_key(|p| p.part_number); + + let max = max_parts.unwrap_or(u32::MAX) as usize; + let is_truncated = parts.len() > max; + parts.truncate(max); + + let next_part_number_marker = if is_truncated { + parts.last().map(|p| p.part_number) + } else { + None + }; + + Ok(ListPartsResponse { + parts, + is_truncated, + next_part_number_marker, + }) + } + + async fn abort_multipart( + &self, + id: &ObjectId, + upload_id: &UploadId, + ) -> Result { + let dir = self.multipart_dir(id, upload_id); + if tokio::fs::try_exists(&dir).await? { + tokio::fs::remove_dir_all(dir).await?; + } + Ok(()) + } + + async fn complete_multipart( + &self, + id: &ObjectId, + upload_id: &UploadId, + parts: Vec, + ) -> Result { + let dir = self.multipart_dir(id, upload_id); + if !tokio::fs::try_exists(&dir).await? { + return Err(Error::generic("multipart upload not found")); + } + + // Read metadata + let meta_path = dir.join("metadata.json"); + let meta_bytes = tokio::fs::read(&meta_path).await?; + let metadata: Metadata = + serde_json::from_slice(&meta_bytes).map_err(|cause| Error::Serde { + context: "failed to deserialize multipart metadata".to_string(), + cause, + })?; + + // TODO: validate that parts are in ascending part_number order and reject with + // InvalidPartOrder if not (matches S3/GCS behavior). Needs a proper client error variant. + + // Validate all parts (headers only) before writing anything + for completed in &parts { + let part_path = dir.join(format!("{}.part", completed.part_number)); + if !tokio::fs::try_exists(&part_path).await? { + return Ok(Some(crate::multipart::CompleteMultipartError { + code: "InvalidPart".into(), + message: format!("part number {} was not uploaded", completed.part_number), + })); + } + + let file = tokio::fs::File::open(&part_path).await?; + let mut reader = BufReader::new(file); + let mut header_line = String::new(); + reader.read_line(&mut header_line).await?; + let header: serde_json::Value = + serde_json::from_str(header_line.trim_end()).map_err(|cause| Error::Serde { + context: "failed to deserialize part header".to_string(), + cause, + })?; + + let stored_etag = header["etag"].as_str().unwrap_or(""); + if stored_etag != completed.etag { + return Ok(Some(crate::multipart::CompleteMultipartError { + code: "InvalidPart".into(), + message: format!( + "etag mismatch for part {}: expected {}, got {}", + completed.part_number, stored_etag, completed.etag + ), + })); + } + } + + // Stream parts directly to the final object file + let path = self.path.join(id.as_storage_path().to_string()); + tokio::fs::create_dir_all(path.parent().unwrap()).await?; + let file = OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .open(path) + .await?; + let mut writer = BufWriter::new(file); + + let metadata_json = serde_json::to_string(&metadata).map_err(|cause| Error::Serde { + context: "failed to serialize metadata".to_string(), + cause, + })?; + writer.write_all(metadata_json.as_bytes()).await?; + writer.write_all(b"\n").await?; + + for completed in &parts { + let part_path = dir.join(format!("{}.part", completed.part_number)); + let file = tokio::fs::File::open(&part_path).await?; + let mut reader = BufReader::new(file); + let mut header_line = String::new(); + reader.read_line(&mut header_line).await?; + tokio::io::copy(&mut reader, &mut writer).await?; + } + + writer.flush().await?; + let file = writer.into_inner(); + file.sync_data().await?; + drop(file); + + // Clean up multipart state + tokio::fs::remove_dir_all(dir).await?; + + Ok(None) + } +} + #[cfg(test)] mod tests { use std::time::{Duration, SystemTime}; @@ -254,4 +522,318 @@ mod tests { let result = backend.get_metadata(&id).await.unwrap(); assert!(result.is_none()); } + + fn make_id() -> ObjectId { + ObjectId::random(ObjectContext { + usecase: "testing".into(), + scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]), + }) + } + + fn make_backend() -> (tempfile::TempDir, LocalFsBackend) { + let tempdir = tempfile::tempdir().unwrap(); + let backend = LocalFsBackend::new(FileSystemConfig { + path: tempdir.path().to_path_buf(), + }); + (tempdir, backend) + } + + #[tokio::test] + async fn multipart_single_part() { + let (_tempdir, backend) = make_backend(); + let id = make_id(); + let metadata = Metadata { + content_type: "text/plain".into(), + expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(3600)), + origin: Some("203.0.113.42".into()), + custom: [("foo".into(), "bar".into())].into(), + ..Default::default() + }; + + let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap(); + + let data = b"hello, multipart world!"; + let etag = backend + .upload_part( + &id, + &upload_id, + 1, + data.len() as u64, + None, + stream::single(data.to_vec()), + ) + .await + .unwrap(); + + let result = backend + .complete_multipart( + &id, + &upload_id, + vec![crate::multipart::CompletedPart { + part_number: 1, + etag, + }], + ) + .await + .unwrap(); + assert!(result.is_none(), "expected no error on complete"); + + let (meta, body) = backend.get_object(&id).await.unwrap().unwrap(); + let payload: BytesMut = body.try_collect().await.unwrap(); + assert_eq!(payload.as_ref(), data); + assert_eq!(meta.content_type, "text/plain".to_string()); + assert_eq!( + meta.expiration_policy, + ExpirationPolicy::TimeToIdle(Duration::from_secs(3600)) + ); + assert_eq!(meta.origin, Some("203.0.113.42".into())); + assert_eq!(meta.custom, [("foo".into(), "bar".into())].into()); + } + + #[tokio::test] + async fn multipart_multiple_parts() { + let (_tempdir, backend) = make_backend(); + let id = make_id(); + let metadata = Metadata::default(); + + let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap(); + + let part1 = b"aaaa".to_vec(); + let part2 = b"bbbb".to_vec(); + let part3 = b"cc".to_vec(); + + let etag1 = backend + .upload_part( + &id, + &upload_id, + 1, + part1.len() as u64, + None, + stream::single(part1.clone()), + ) + .await + .unwrap(); + let etag2 = backend + .upload_part( + &id, + &upload_id, + 2, + part2.len() as u64, + None, + stream::single(part2.clone()), + ) + .await + .unwrap(); + let etag3 = backend + .upload_part( + &id, + &upload_id, + 3, + part3.len() as u64, + None, + stream::single(part3.clone()), + ) + .await + .unwrap(); + + let result = backend + .complete_multipart( + &id, + &upload_id, + vec![ + crate::multipart::CompletedPart { + part_number: 1, + etag: etag1, + }, + crate::multipart::CompletedPart { + part_number: 2, + etag: etag2, + }, + crate::multipart::CompletedPart { + part_number: 3, + etag: etag3, + }, + ], + ) + .await + .unwrap(); + assert!(result.is_none()); + + let (_, body) = backend.get_object(&id).await.unwrap().unwrap(); + let payload: BytesMut = body.try_collect().await.unwrap(); + assert_eq!(payload.as_ref(), b"aaaabbbbcc"); + } + + #[tokio::test] + async fn multipart_list_parts() { + let (_tempdir, backend) = make_backend(); + let id = make_id(); + let metadata = Metadata::default(); + + let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap(); + + let etag1 = backend + .upload_part(&id, &upload_id, 1, 3, None, stream::single(b"aaa".to_vec())) + .await + .unwrap(); + let etag2 = backend + .upload_part(&id, &upload_id, 2, 3, None, stream::single(b"bbb".to_vec())) + .await + .unwrap(); + + let list = backend + .list_parts(&id, &upload_id, None, None) + .await + .unwrap(); + assert_eq!(list.parts.len(), 2); + assert_eq!(list.parts[0].part_number, 1); + assert_eq!(list.parts[0].etag, etag1); + assert_eq!(list.parts[0].size, 3); + assert_eq!(list.parts[1].part_number, 2); + assert_eq!(list.parts[1].etag, etag2); + assert_eq!(list.parts[1].size, 3); + + // Pagination + let page1 = backend + .list_parts(&id, &upload_id, Some(1), None) + .await + .unwrap(); + assert_eq!(page1.parts.len(), 1); + assert_eq!(page1.parts[0].part_number, 1); + assert!(page1.is_truncated); + assert!(page1.next_part_number_marker.is_some()); + + let page2 = backend + .list_parts(&id, &upload_id, Some(1), page1.next_part_number_marker) + .await + .unwrap(); + assert_eq!(page2.parts.len(), 1); + assert_eq!(page2.parts[0].part_number, 2); + + backend.abort_multipart(&id, &upload_id).await.unwrap(); + } + + #[tokio::test] + async fn multipart_abort() { + let (_tempdir, backend) = make_backend(); + let id = make_id(); + let metadata = Metadata::default(); + + let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap(); + + backend + .upload_part( + &id, + &upload_id, + 1, + 5, + None, + stream::single(b"hello".to_vec()), + ) + .await + .unwrap(); + + backend.abort_multipart(&id, &upload_id).await.unwrap(); + + let result = backend.get_object(&id).await.unwrap(); + assert!(result.is_none(), "object should not exist after abort"); + } + + #[tokio::test] + async fn multipart_invalid_etag() { + let (_tempdir, backend) = make_backend(); + let id = make_id(); + let metadata = Metadata::default(); + + let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap(); + + let etag = backend + .upload_part( + &id, + &upload_id, + 1, + 5, + None, + stream::single(b"hello".to_vec()), + ) + .await + .unwrap(); + + let result = backend + .complete_multipart( + &id, + &upload_id, + vec![crate::multipart::CompletedPart { + part_number: 1, + etag: "wrong-etag".into(), + }], + ) + .await + .unwrap(); + assert!(result.is_some(), "expected error for bad etag"); + assert_eq!(result.unwrap().code, "InvalidPart"); + + // Upload must survive a failed complete so the client can retry. + let result = backend + .complete_multipart( + &id, + &upload_id, + vec![crate::multipart::CompletedPart { + part_number: 1, + etag, + }], + ) + .await + .unwrap(); + assert!(result.is_none(), "retry with correct etag should succeed"); + } + + #[tokio::test] + async fn multipart_missing_part() { + let (_tempdir, backend) = make_backend(); + let id = make_id(); + let metadata = Metadata::default(); + + let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap(); + + let etag = backend + .upload_part( + &id, + &upload_id, + 1, + 5, + None, + stream::single(b"hello".to_vec()), + ) + .await + .unwrap(); + + let result = backend + .complete_multipart( + &id, + &upload_id, + vec![crate::multipart::CompletedPart { + part_number: 99, + etag: "whatever".into(), + }], + ) + .await + .unwrap(); + assert!(result.is_some(), "expected error for missing part"); + assert_eq!(result.unwrap().code, "InvalidPart"); + + // Upload must survive a failed complete so the client can retry. + let result = backend + .complete_multipart( + &id, + &upload_id, + vec![crate::multipart::CompletedPart { + part_number: 1, + etag, + }], + ) + .await + .unwrap(); + assert!(result.is_none(), "retry with correct part should succeed"); + } }