From 8713145dc0141cddb6c97c85984b8918faddfd20 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Mon, 4 May 2026 17:35:58 +0200 Subject: [PATCH 01/11] feat: Implement MultipartUploadBackend on TieredStorage --- Cargo.lock | 1 + objectstore-service/Cargo.toml | 1 + objectstore-service/src/backend/changelog.rs | 6 +- objectstore-service/src/backend/mod.rs | 17 +- objectstore-service/src/backend/testing.rs | 69 +- objectstore-service/src/backend/tiered.rs | 656 ++++++++++++++++++- 6 files changed, 740 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 58557d39..108984c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2566,6 +2566,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "base64", "bigtable_rs", "bytes", "chrono", diff --git a/objectstore-service/Cargo.toml b/objectstore-service/Cargo.toml index be44d790..e4434d42 100644 --- a/objectstore-service/Cargo.toml +++ b/objectstore-service/Cargo.toml @@ -12,6 +12,7 @@ publish = false [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } +base64 = "0.22" bigtable_rs = { git = "https://github.com/getsentry/bigtable_rs.git", rev = "4cb75bc5e5f87204363973f6302107768e64972e" } chrono = "0.4" bytes = { workspace = true } diff --git a/objectstore-service/src/backend/changelog.rs b/objectstore-service/src/backend/changelog.rs index 0e05bb84..548be885 100644 --- a/objectstore-service/src/backend/changelog.rs +++ b/objectstore-service/src/backend/changelog.rs @@ -27,7 +27,7 @@ use std::time::Duration; use tokio_util::task::TaskTracker; use tokio_util::task::task_tracker::TaskTrackerToken; -use crate::backend::common::{Backend, HighVolumeBackend, TieredMetadata}; +use crate::backend::common::{HighVolumeBackend, MultipartUploadBackend, TieredMetadata}; use crate::error::Result; use crate::id::ObjectId; @@ -88,7 +88,7 @@ pub struct ChangeManager { /// The backend for small objects (≤ 1 MiB). pub(crate) high_volume: Box, /// The backend for large objects (> 1 MiB). - pub(crate) long_term: Box, + pub(crate) long_term: Box, /// Durable write-ahead log for multi-step changes. pub(crate) changelog: Box, /// Tracks outstanding background cleanup operations for graceful shutdown. @@ -99,7 +99,7 @@ impl ChangeManager { /// Creates a new `ChangeManager` with the given backends and changelog. pub fn new( high_volume: Box, - long_term: Box, + long_term: Box, changelog: Box, ) -> Arc { Arc::new(Self { diff --git a/objectstore-service/src/backend/mod.rs b/objectstore-service/src/backend/mod.rs index 401aad1b..944c5fa8 100644 --- a/objectstore-service/src/backend/mod.rs +++ b/objectstore-service/src/backend/mod.rs @@ -60,7 +60,7 @@ pub async fn from_config(config: StorageConfig) -> Result { let hv = hv_from_config(c.high_volume).await?; - let lt = from_leaf_config(*c.long_term).await?; + let lt = lt_from_leaf_config(*c.long_term).await?; let log = Box::new(changelog::NoopChangeLog); Box::new(tiered::TieredStorage::new(hv, lt, log)) } @@ -71,6 +71,21 @@ pub async fn from_config(config: StorageConfig) -> Result Result> { + Ok(match config { + StorageConfig::FileSystem(c) => Box::new(local_fs::LocalFsBackend::new(c)), + StorageConfig::Gcs(c) => Box::new(gcs::GcsBackend::new(c).await?), + _ => anyhow::bail!( + "long-term backend does not support multipart uploads; \ + only filesystem and gcs are supported" + ), + }) +} + async fn from_leaf_config(config: StorageConfig) -> Result> { Ok(match config { StorageConfig::FileSystem(c) => Box::new(local_fs::LocalFsBackend::new(c)), diff --git a/objectstore-service/src/backend/testing.rs b/objectstore-service/src/backend/testing.rs index edfc813d..3d1379c1 100644 --- a/objectstore-service/src/backend/testing.rs +++ b/objectstore-service/src/backend/testing.rs @@ -40,12 +40,16 @@ use bytes::Bytes; use objectstore_types::metadata::Metadata; use crate::backend::common::{ - Backend, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse, PutResponse, - TieredGet, TieredMetadata, TieredWrite, Tombstone, + Backend, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse, + MultipartUploadBackend, PutResponse, TieredGet, TieredMetadata, TieredWrite, Tombstone, }; use crate::backend::in_memory::InMemoryBackend; use crate::error::Result; use crate::id::ObjectId; +use crate::multipart::{ + AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse, + ListPartsResponse, PartNumber, UploadId, UploadPartResponse, +}; use crate::stream::ClientStream; /// Hooks for [`TestBackend`]. @@ -258,3 +262,64 @@ impl HighVolumeBackend for TestBackend { .await } } + +#[async_trait::async_trait] +impl MultipartUploadBackend for TestBackend { + async fn initiate_multipart( + &self, + id: &ObjectId, + metadata: &Metadata, + ) -> Result { + self.inner.initiate_multipart(id, metadata).await + } + + async fn upload_part( + &self, + id: &ObjectId, + upload_id: &UploadId, + part_number: PartNumber, + content_length: u64, + content_md5: Option<&str>, + body: ClientStream, + ) -> Result { + self.inner + .upload_part( + id, + upload_id, + part_number, + content_length, + content_md5, + body, + ) + .await + } + + async fn list_parts( + &self, + id: &ObjectId, + upload_id: &UploadId, + max_parts: Option, + part_number_marker: Option, + ) -> Result { + self.inner + .list_parts(id, upload_id, max_parts, part_number_marker) + .await + } + + async fn abort_multipart( + &self, + id: &ObjectId, + upload_id: &UploadId, + ) -> Result { + self.inner.abort_multipart(id, upload_id).await + } + + async fn complete_multipart( + &self, + id: &ObjectId, + upload_id: &UploadId, + parts: Vec, + ) -> Result { + self.inner.complete_multipart(id, upload_id, parts).await + } +} diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index ec626ca2..37093250 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -108,12 +108,16 @@ use serde::{Deserialize, Serialize}; use crate::backend::changelog::{Change, ChangeGuard, ChangeLog, ChangeManager, ChangePhase}; use crate::backend::common::{ - Backend, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse, PutResponse, - TieredGet, TieredMetadata, TieredWrite, Tombstone, + Backend, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse, + MultipartUploadBackend, PutResponse, TieredGet, TieredMetadata, TieredWrite, Tombstone, }; use crate::backend::{HighVolumeStorageConfig, StorageConfig}; -use crate::error::Result; +use crate::error::{Error, Result}; use crate::id::ObjectId; +use crate::multipart::{ + AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse, + ListPartsResponse, PartNumber, UploadId, UploadPartResponse, +}; use crate::stream::{ClientStream, SizedPeek}; /// The threshold up until which we will go to the "high volume" backend. @@ -219,7 +223,7 @@ impl TieredStorage { /// Creates a new `TieredStorage` with the given backends and change log. pub fn new( high_volume: Box, - long_term: Box, + long_term: Box, changelog: Box, ) -> Self { let inner = ChangeManager::new(high_volume, long_term, changelog); @@ -540,6 +544,191 @@ where ) } +/// Token encoding the multipart upload state for TieredStorage. +/// +/// Contains only the physical key suffix (no usecase/scopes) and the upstream +/// upload ID. The full physical [`ObjectId`] is reconstructed by combining the +/// key with the context from the request's `id` parameter. +#[derive(Serialize, Deserialize)] +struct MultipartToken { + /// Key portion of the physical revision in LT (e.g. `"myfile/{uuid}"`). + physical_key: String, + /// Upload ID returned by the LT backend's `initiate_multipart`. + upload_id: String, +} + +fn encode_multipart_token(token: &MultipartToken) -> Result { + use base64::Engine; + let json = + serde_json::to_vec(token).map_err(|e| Error::serde("encoding multipart token", e))?; + Ok(base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(json)) +} + +fn decode_multipart_token(upload_id: &UploadId) -> Result { + use base64::Engine; + let json = base64::engine::general_purpose::URL_SAFE_NO_PAD + .decode(upload_id.as_bytes()) + .map_err(|e| Error::generic(format!("invalid multipart upload ID encoding: {e}")))?; + serde_json::from_slice(&json).map_err(|e| Error::serde("decoding multipart token", e)) +} + +#[async_trait::async_trait] +impl MultipartUploadBackend for TieredStorage { + async fn initiate_multipart( + &self, + id: &ObjectId, + metadata: &Metadata, + ) -> Result { + let physical = new_long_term_revision(id); + + let upstream_upload_id = self + .inner + .long_term + .initiate_multipart(&physical, metadata) + .await?; + + encode_multipart_token(&MultipartToken { + physical_key: physical.key, + upload_id: upstream_upload_id, + }) + } + + async fn upload_part( + &self, + id: &ObjectId, + upload_id: &UploadId, + part_number: PartNumber, + content_length: u64, + content_md5: Option<&str>, + body: ClientStream, + ) -> Result { + let token = decode_multipart_token(upload_id)?; + let physical = ObjectId { + context: id.context.clone(), + key: token.physical_key, + }; + + self.inner + .long_term + .upload_part( + &physical, + &token.upload_id, + part_number, + content_length, + content_md5, + body, + ) + .await + } + + async fn list_parts( + &self, + id: &ObjectId, + upload_id: &UploadId, + max_parts: Option, + part_number_marker: Option, + ) -> Result { + let token = decode_multipart_token(upload_id)?; + let physical = ObjectId { + context: id.context.clone(), + key: token.physical_key, + }; + + self.inner + .long_term + .list_parts(&physical, &token.upload_id, max_parts, part_number_marker) + .await + } + + async fn abort_multipart( + &self, + id: &ObjectId, + upload_id: &UploadId, + ) -> Result { + let token = decode_multipart_token(upload_id)?; + let physical = ObjectId { + context: id.context.clone(), + key: token.physical_key, + }; + + self.inner + .long_term + .abort_multipart(&physical, &token.upload_id) + .await + } + + async fn complete_multipart( + &self, + id: &ObjectId, + upload_id: &UploadId, + parts: Vec, + ) -> Result { + let token = decode_multipart_token(upload_id)?; + let physical = ObjectId { + context: id.context.clone(), + key: token.physical_key, + }; + + // 1. Complete on the LT backend (assembles parts into a readable object). + // Done before recording the change: protocol errors (invalid part, etag + // mismatch) leave no assembled blob and must not create a changelog entry. + let error = self + .inner + .long_term + .complete_multipart(&physical, &token.upload_id, parts) + .await?; + + if error.is_some() { + return Ok(error); + } + + // 2. Retrieve metadata from the completed object for the tombstone. + // Done before recording the change: a transient failure here leaves an + // orphan blob (bounded by TTL) but no changelog entry and no guard that + // would actively delete the just-completed upload. + let metadata = self + .inner + .long_term + .get_metadata(&physical) + .await? + .ok_or_else(|| { + Error::generic("completed multipart object not found in long-term storage") + })?; + + // 3. Read current HV revision to establish the write precondition. + let current = match self.inner.high_volume.get_tiered_metadata(id).await? { + TieredMetadata::Tombstone(t) => Some(t.target), + _ => None, + }; + + // 4. Record change now that the LT blob exists and needs tracking. + let mut guard = self + .record_change(Change { + id: id.clone(), + new: Some(physical.clone()), + old: current.clone(), + }) + .await?; + guard.advance(ChangePhase::Written); + + // 5. CAS commit: write tombstone only if HV state matches what we saw. + let tombstone = Tombstone { + target: physical.clone(), + expiration_policy: metadata.expiration_policy, + }; + let written = self + .inner + .high_volume + .compare_and_write(id, current.as_ref(), TieredWrite::Tombstone(tombstone)) + .await?; + + // 6. Let the guard handle cleanup based on the CAS outcome. + guard.advance(ChangePhase::compare_and_write(written)); + + Ok(None) + } +} + #[cfg(test)] mod tests { use std::time::Duration; @@ -1286,4 +1475,463 @@ mod tests { "changelog entry not removed after cleanup" ); } + + // --- Multipart upload --- + + #[test] + fn multipart_token_roundtrip() { + let token = MultipartToken { + physical_key: "my-key/01924a6f-7e28-7b9a-9c1d-abcdef123456".into(), + upload_id: "upstream-upload-id-abc".into(), + }; + let encoded = encode_multipart_token(&token).unwrap(); + let decoded = decode_multipart_token(&encoded).unwrap(); + assert_eq!(decoded.physical_key, token.physical_key); + assert_eq!(decoded.upload_id, token.upload_id); + } + + #[test] + fn multipart_invalid_token_errors() { + let result = decode_multipart_token(&"not-valid-base64!!!".into()); + assert!(result.is_err()); + + // Valid base64 but not valid JSON. + use base64::Engine; + let bad_json = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(b"not json"); + let result = decode_multipart_token(&bad_json); + assert!(result.is_err()); + } + + #[tokio::test] + async fn multipart_single_part_roundtrip() { + let (storage, hv, lt, _) = make_tiered_storage(); + let id = make_id("mp-single"); + let metadata = Metadata { + content_type: "application/octet-stream".into(), + expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(3600)), + ..Default::default() + }; + let payload = vec![0xABu8; 2 * 1024 * 1024]; // 2 MiB + + let upload_id = storage.initiate_multipart(&id, &metadata).await.unwrap(); + + let etag = storage + .upload_part( + &id, + &upload_id, + 1, + payload.len() as u64, + None, + stream::single(payload.clone()), + ) + .await + .unwrap(); + + let error = storage + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag, + }], + ) + .await + .unwrap(); + assert!( + error.is_none(), + "complete_multipart returned error: {error:?}" + ); + + // get_object should follow the tombstone and return the payload. + let (got_meta, s) = storage.get_object(&id).await.unwrap().unwrap(); + let body = stream::read_to_vec(s).await.unwrap(); + assert_eq!(body, payload); + assert_eq!(got_meta.content_type, "application/octet-stream"); + + // HV should have a tombstone, LT should have the object at the physical key. + let tombstone = hv.get(&id).expect_tombstone(); + assert!( + tombstone.target.key().starts_with(id.key()), + "tombstone target should be a revision key" + ); + lt.get(&tombstone.target).expect_object(); + } + + #[tokio::test] + async fn multipart_multiple_parts() { + let (storage, _hv, _lt, _) = make_tiered_storage(); + let id = make_id("mp-multi"); + + let upload_id = storage + .initiate_multipart(&id, &Default::default()) + .await + .unwrap(); + + let part1 = vec![0xAAu8; 512 * 1024]; + let part2 = vec![0xBBu8; 512 * 1024]; + let part3 = vec![0xCCu8; 512 * 1024]; + + let etag1 = storage + .upload_part( + &id, + &upload_id, + 1, + part1.len() as u64, + None, + stream::single(part1.clone()), + ) + .await + .unwrap(); + let etag2 = storage + .upload_part( + &id, + &upload_id, + 2, + part2.len() as u64, + None, + stream::single(part2.clone()), + ) + .await + .unwrap(); + let etag3 = storage + .upload_part( + &id, + &upload_id, + 3, + part3.len() as u64, + None, + stream::single(part3.clone()), + ) + .await + .unwrap(); + + let error = storage + .complete_multipart( + &id, + &upload_id, + vec![ + CompletedPart { + part_number: 1, + etag: etag1, + }, + CompletedPart { + part_number: 2, + etag: etag2, + }, + CompletedPart { + part_number: 3, + etag: etag3, + }, + ], + ) + .await + .unwrap(); + assert!(error.is_none()); + + let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); + let body = stream::read_to_vec(s).await.unwrap(); + + let mut expected = Vec::new(); + expected.extend_from_slice(&part1); + expected.extend_from_slice(&part2); + expected.extend_from_slice(&part3); + assert_eq!(body, expected); + } + + #[tokio::test] + async fn multipart_abort() { + let (storage, hv, _lt, _) = make_tiered_storage(); + let id = make_id("mp-abort"); + + let upload_id = storage + .initiate_multipart(&id, &Default::default()) + .await + .unwrap(); + + // Upload a part then abort. + let payload = vec![0xABu8; 100]; + storage + .upload_part( + &id, + &upload_id, + 1, + payload.len() as u64, + None, + stream::single(payload), + ) + .await + .unwrap(); + + storage.abort_multipart(&id, &upload_id).await.unwrap(); + + // No tombstone should have been written. + hv.get(&id).expect_not_found(); + + // The object should not be reachable. + assert!(storage.get_object(&id).await.unwrap().is_none()); + } + + #[tokio::test] + async fn multipart_overwrites_existing_tombstone() { + let (storage, hv, lt, _) = make_tiered_storage(); + let id = make_id("mp-overwrite"); + + // First: put a large object via the normal path. + let payload1 = vec![0xAAu8; 2 * 1024 * 1024]; + storage + .put_object(&id, &Default::default(), stream::single(payload1)) + .await + .unwrap(); + let old_lt_id = hv.get(&id).expect_tombstone().target; + + // Second: overwrite via multipart. + let upload_id = storage + .initiate_multipart(&id, &Default::default()) + .await + .unwrap(); + + let payload2 = vec![0xBBu8; 2 * 1024 * 1024]; + let etag = storage + .upload_part( + &id, + &upload_id, + 1, + payload2.len() as u64, + None, + stream::single(payload2.clone()), + ) + .await + .unwrap(); + + let error = storage + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag, + }], + ) + .await + .unwrap(); + assert!(error.is_none()); + + // New tombstone should point to a different revision. + let new_lt_id = hv.get(&id).expect_tombstone().target; + assert_ne!(old_lt_id, new_lt_id); + + // Drain background cleanup. + storage.join().await; + + // Old LT blob should be cleaned up. + lt.get(&old_lt_id).expect_not_found(); + lt.get(&new_lt_id).expect_object(); + + // Read back the new data. + let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); + let body = stream::read_to_vec(s).await.unwrap(); + assert_eq!(body, payload2); + } + + #[tokio::test] + async fn multipart_list_parts() { + let (storage, _hv, _lt, _) = make_tiered_storage(); + let id = make_id("mp-list"); + + let upload_id = storage + .initiate_multipart(&id, &Default::default()) + .await + .unwrap(); + + let part1 = vec![0xAAu8; 100]; + let part2 = vec![0xBBu8; 200]; + storage + .upload_part( + &id, + &upload_id, + 1, + part1.len() as u64, + None, + stream::single(part1), + ) + .await + .unwrap(); + storage + .upload_part( + &id, + &upload_id, + 2, + part2.len() as u64, + None, + stream::single(part2), + ) + .await + .unwrap(); + + let resp = storage + .list_parts(&id, &upload_id, None, None) + .await + .unwrap(); + assert_eq!(resp.parts.len(), 2); + assert_eq!(resp.parts[0].part_number, 1); + assert_eq!(resp.parts[0].size, 100); + assert_eq!(resp.parts[1].part_number, 2); + assert_eq!(resp.parts[1].size, 200); + } + + // --- Multipart consistency --- + + /// Initiates a multipart upload, uploads a single part, and returns the + /// (upload_id, etag) pair ready for complete_multipart. + async fn initiate_and_upload( + storage: &TieredStorage, + id: &ObjectId, + payload: Vec, + ) -> (UploadId, String) { + let upload_id = storage + .initiate_multipart(id, &Default::default()) + .await + .unwrap(); + let etag = storage + .upload_part( + id, + &upload_id, + 1, + payload.len() as u64, + None, + stream::single(payload), + ) + .await + .unwrap(); + (upload_id, etag) + } + + #[tokio::test] + async fn multipart_complete_cas_conflict_cleans_up_new_blob() { + let hv = TestBackend::new(CasConflict); + let lt = InMemoryBackend::new("lt"); + let log = NoopChangeLog; + let storage = TieredStorage::new(Box::new(hv), Box::new(lt.clone()), Box::new(log)); + + let id = make_id("mp-cas-conflict"); + let (upload_id, etag) = initiate_and_upload(&storage, &id, vec![0xABu8; 100]).await; + + storage + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag, + }], + ) + .await + .unwrap(); + + storage.join().await; + + assert!( + lt.is_empty(), + "LT blob should be cleaned up after CAS conflict" + ); + } + + #[tokio::test] + async fn multipart_complete_no_orphan_when_cas_fails() { + let lt = InMemoryBackend::new("lt"); + let hv = TestBackend::new(FailCas(false)); + let log = NoopChangeLog; + let storage = TieredStorage::new(Box::new(hv), Box::new(lt.clone()), Box::new(log)); + + let id = make_id("mp-orphan-test"); + let (upload_id, etag) = initiate_and_upload(&storage, &id, vec![0xABu8; 100]).await; + + let result = storage + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag, + }], + ) + .await; + + assert!(result.is_err()); + + storage.join().await; + + assert!(lt.is_empty(), "long-term object not cleaned up"); + } + + #[tokio::test] + async fn multipart_complete_written_cleanup_after_lost_cas_response() { + let (storage, hv, lt, log) = make_tiered_storage(); + let id = make_id("mp-written"); + + // First: establish a tombstone via normal put. + let payload1 = vec![0xAAu8; 2 * 1024 * 1024]; + storage + .put_object(&id, &Default::default(), stream::single(payload1)) + .await + .unwrap(); + let tombstone1 = hv.get(&id).expect_tombstone().target; + + // Second: complete a multipart upload through a broken storage where + // CAS succeeds but returns an error (simulating lost response). + let broken_storage = TieredStorage::new( + Box::new(TestBackend::with_inner(hv.clone(), FailCas(true))), + Box::new(lt.clone()), + Box::new(log.clone()), + ); + let (upload_id, etag) = initiate_and_upload(&broken_storage, &id, vec![0xBBu8; 100]).await; + let result = broken_storage + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag, + }], + ) + .await; + assert!(result.is_err()); + + let tombstone2 = hv.get(&id).expect_tombstone().target; + assert_ne!(tombstone1, tombstone2); + + // Guard reads HV, sees the new tombstone won, cleans up the old blob. + broken_storage.join().await; + lt.get(&tombstone1).expect_not_found(); + lt.get(&tombstone2).expect_object(); + } + + #[tokio::test] + async fn multipart_complete_changelog_entry_removed() { + let (storage, _hv, _lt, log) = make_tiered_storage(); + let id = make_id("mp-changelog"); + + let (upload_id, etag) = initiate_and_upload(&storage, &id, vec![0xABu8; 100]).await; + + storage + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag, + }], + ) + .await + .unwrap(); + + storage.join().await; + + let entries = log.scan().await.unwrap(); + assert!( + entries.is_empty(), + "changelog entry not removed after successful multipart complete" + ); + } } From 4159e840e7838f603e9e34b8d18ac544b47f2ccf Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Mon, 4 May 2026 17:41:32 +0200 Subject: [PATCH 02/11] fix: resolve broken rustdoc link to MultipartUploadBackend --- objectstore-service/src/backend/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/objectstore-service/src/backend/mod.rs b/objectstore-service/src/backend/mod.rs index 944c5fa8..a559d29b 100644 --- a/objectstore-service/src/backend/mod.rs +++ b/objectstore-service/src/backend/mod.rs @@ -72,7 +72,7 @@ pub async fn from_config(config: StorageConfig) -> Result Result> { From e087daec0ec5e9f9e27d97e3f1c7a3e0f2749b9a Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Mon, 4 May 2026 18:25:09 +0200 Subject: [PATCH 03/11] better config --- objectstore-server/src/config.rs | 8 +++--- objectstore-service/src/backend/mod.rs | 32 +++++++++++++++-------- objectstore-service/src/backend/tiered.rs | 8 +++--- objectstore-test/src/server.rs | 10 +++++-- 4 files changed, 38 insertions(+), 20 deletions(-) diff --git a/objectstore-server/src/config.rs b/objectstore-server/src/config.rs index 297ffecd..c8b2eb13 100644 --- a/objectstore-server/src/config.rs +++ b/objectstore-server/src/config.rs @@ -46,7 +46,7 @@ use secrecy::{CloneableSecret, SecretBox, SerializableSecret, zeroize::Zeroize}; use serde::{Deserialize, Serialize}; pub use objectstore_log::{LevelFilter, LogFormat, LoggingConfig}; -pub use objectstore_service::backend::StorageConfig; +pub use objectstore_service::backend::{MultipartUploadStorageConfig, StorageConfig}; use crate::killswitches::Killswitches; use crate::rate_limits::RateLimits; @@ -625,7 +625,7 @@ impl Config { mod tests { use std::io::Write; - use objectstore_service::backend::HighVolumeStorageConfig; + use objectstore_service::backend::{HighVolumeStorageConfig, MultipartUploadStorageConfig}; use secrecy::ExposeSecret; use crate::killswitches::Killswitch; @@ -771,7 +771,7 @@ mod tests { }; let HighVolumeStorageConfig::BigTable(hv) = &c.high_volume; assert_eq!(hv.project_id, "my-project"); - let StorageConfig::Gcs(lt) = c.long_term.as_ref() else { + let MultipartUploadStorageConfig::Gcs(lt) = &c.long_term else { panic!("expected gcs long_term"); }; assert_eq!(lt.bucket, "my-objectstore-bucket"); @@ -800,7 +800,7 @@ mod tests { assert_eq!(hv.project_id, "my-project"); assert_eq!(hv.instance_name, "my-instance"); assert_eq!(hv.table_name, "my-table"); - let StorageConfig::FileSystem(lt) = c.long_term.as_ref() else { + let MultipartUploadStorageConfig::FileSystem(lt) = &c.long_term else { panic!("expected filesystem long_term"); }; assert_eq!(lt.path, Path::new("/data/lt")); diff --git a/objectstore-service/src/backend/mod.rs b/objectstore-service/src/backend/mod.rs index a559d29b..3647bdba 100644 --- a/objectstore-service/src/backend/mod.rs +++ b/objectstore-service/src/backend/mod.rs @@ -60,7 +60,7 @@ pub async fn from_config(config: StorageConfig) -> Result { let hv = hv_from_config(c.high_volume).await?; - let lt = lt_from_leaf_config(*c.long_term).await?; + let lt = lt_from_config(c.long_term).await?; let log = Box::new(changelog::NoopChangeLog); Box::new(tiered::TieredStorage::new(hv, lt, log)) } @@ -71,18 +71,28 @@ pub async fn from_config(config: StorageConfig) -> Result Result> { Ok(match config { - StorageConfig::FileSystem(c) => Box::new(local_fs::LocalFsBackend::new(c)), - StorageConfig::Gcs(c) => Box::new(gcs::GcsBackend::new(c).await?), - _ => anyhow::bail!( - "long-term backend does not support multipart uploads; \ - only filesystem and gcs are supported" - ), + MultipartUploadStorageConfig::FileSystem(c) => Box::new(local_fs::LocalFsBackend::new(c)), + MultipartUploadStorageConfig::Gcs(c) => Box::new(gcs::GcsBackend::new(c).await?), }) } diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index 37093250..872b1cd5 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -111,7 +111,7 @@ use crate::backend::common::{ Backend, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse, MultipartUploadBackend, PutResponse, TieredGet, TieredMetadata, TieredWrite, Tombstone, }; -use crate::backend::{HighVolumeStorageConfig, StorageConfig}; +use crate::backend::{HighVolumeStorageConfig, MultipartUploadStorageConfig}; use crate::error::{Error, Result}; use crate::id::ObjectId; use crate::multipart::{ @@ -137,7 +137,7 @@ fn new_long_term_revision(id: &ObjectId) -> ObjectId { /// Configuration for [`TieredStorage`]. /// /// Composes two backends into a tiered routing setup: `high_volume` for small -/// objects and `long_term` for large objects. Nesting [`StorageConfig::Tiered`] +/// objects and `long_term` for large objects. Nesting [`super::StorageConfig::Tiered`] /// inside another tiered config is not supported. /// /// # Example @@ -162,7 +162,9 @@ pub struct TieredStorageConfig { /// only BigTable). pub high_volume: HighVolumeStorageConfig, /// Backend for large, long-term objects. - pub long_term: Box, + /// + /// Must be a backend that implements [`MultipartUploadBackend`](super::common::MultipartUploadBackend). + pub long_term: MultipartUploadStorageConfig, } /// Two-tier storage backend that routes objects by size. diff --git a/objectstore-test/src/server.rs b/objectstore-test/src/server.rs index 9b705b80..6b0036fd 100644 --- a/objectstore-test/src/server.rs +++ b/objectstore-test/src/server.rs @@ -16,7 +16,9 @@ use std::net::{SocketAddr, TcpListener}; use std::path::PathBuf; use std::sync::LazyLock; -use objectstore_server::config::{AuthZVerificationKey, Config, StorageConfig}; +use objectstore_server::config::{ + AuthZVerificationKey, Config, MultipartUploadStorageConfig, StorageConfig, +}; use objectstore_server::state::Services; use objectstore_server::web::App; use objectstore_types::auth::Permission; @@ -134,7 +136,11 @@ fn replace_fs_paths(config: &mut StorageConfig, tempdirs: &mut Vec) { tempdirs.push(dir); } StorageConfig::Tiered(c) => { - replace_fs_paths(&mut c.long_term, tempdirs); + if let MultipartUploadStorageConfig::FileSystem(fs) = &mut c.long_term { + let dir = tempfile::tempdir().unwrap(); + fs.path = dir.path().into(); + tempdirs.push(dir); + } } StorageConfig::S3Compatible(_) | StorageConfig::Gcs(_) | StorageConfig::BigTable(_) => {} } From 3d1a808c0844f82e9b032797c2635a832d36aa71 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Mon, 4 May 2026 18:29:17 +0200 Subject: [PATCH 04/11] fix: guard multipart complete against orphaned blobs on metadata failure Move ChangeGuard creation before get_metadata so that if the metadata read fails after complete_multipart succeeds, the guard cleans up the assembled blob. Previously, Manual-expiration objects could be orphaned permanently. Also fix redundant rustdoc link. --- objectstore-service/src/backend/tiered.rs | 31 +++++++++++------------ 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index 872b1cd5..58020d53 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -163,7 +163,7 @@ pub struct TieredStorageConfig { pub high_volume: HighVolumeStorageConfig, /// Backend for large, long-term objects. /// - /// Must be a backend that implements [`MultipartUploadBackend`](super::common::MultipartUploadBackend). + /// Must be a backend that implements [`MultipartUploadBackend`]. pub long_term: MultipartUploadStorageConfig, } @@ -684,26 +684,15 @@ impl MultipartUploadBackend for TieredStorage { return Ok(error); } - // 2. Retrieve metadata from the completed object for the tombstone. - // Done before recording the change: a transient failure here leaves an - // orphan blob (bounded by TTL) but no changelog entry and no guard that - // would actively delete the just-completed upload. - let metadata = self - .inner - .long_term - .get_metadata(&physical) - .await? - .ok_or_else(|| { - Error::generic("completed multipart object not found in long-term storage") - })?; - - // 3. Read current HV revision to establish the write precondition. + // 2. Read current HV revision to establish the write precondition. let current = match self.inner.high_volume.get_tiered_metadata(id).await? { TieredMetadata::Tombstone(t) => Some(t.target), _ => None, }; - // 4. Record change now that the LT blob exists and needs tracking. + // 3. Record change now that the LT blob exists and needs tracking. + // Created before get_metadata so the guard cleans up the assembled + // blob if the metadata read fails. let mut guard = self .record_change(Change { id: id.clone(), @@ -713,6 +702,16 @@ impl MultipartUploadBackend for TieredStorage { .await?; guard.advance(ChangePhase::Written); + // 4. Retrieve metadata from the completed object for the tombstone. + let metadata = self + .inner + .long_term + .get_metadata(&physical) + .await? + .ok_or_else(|| { + Error::generic("completed multipart object not found in long-term storage") + })?; + // 5. CAS commit: write tombstone only if HV state matches what we saw. let tombstone = Tombstone { target: physical.clone(), From 7bbf0fe6bbf2ebe8d8a6bd17b8cf6d98eb4280a1 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 6 May 2026 16:27:20 +0200 Subject: [PATCH 05/11] rearrange --- objectstore-service/src/backend/mod.rs | 50 +++++++++++++------------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/objectstore-service/src/backend/mod.rs b/objectstore-service/src/backend/mod.rs index 3647bdba..b4a8fbe0 100644 --- a/objectstore-service/src/backend/mod.rs +++ b/objectstore-service/src/backend/mod.rs @@ -71,31 +71,6 @@ pub async fn from_config(config: StorageConfig) -> Result Result> { - Ok(match config { - MultipartUploadStorageConfig::FileSystem(c) => Box::new(local_fs::LocalFsBackend::new(c)), - MultipartUploadStorageConfig::Gcs(c) => Box::new(gcs::GcsBackend::new(c).await?), - }) -} - async fn from_leaf_config(config: StorageConfig) -> Result> { Ok(match config { StorageConfig::FileSystem(c) => Box::new(local_fs::LocalFsBackend::new(c)), @@ -129,3 +104,28 @@ async fn hv_from_config( HighVolumeStorageConfig::BigTable(c) => Box::new(bigtable::BigTableBackend::new(c).await?), }) } + +/// Configuration for the long-term backend in a [`tiered::TieredStorageConfig`]. +/// +/// Only backends that implement [`common::MultipartUploadBackend`] are valid here. +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(tag = "type", rename_all = "lowercase")] +pub enum MultipartUploadStorageConfig { + /// Local filesystem storage backend (type `"filesystem"`). + FileSystem(local_fs::FileSystemConfig), + + /// [Google Cloud Storage] backend (type `"gcs"`). + /// + /// [Google Cloud Storage]: https://cloud.google.com/storage + Gcs(gcs::GcsConfig), +} + +/// Constructs a type-erased [`common::MultipartUploadBackend`] from the given config. +async fn lt_from_config( + config: MultipartUploadStorageConfig, +) -> anyhow::Result> { + Ok(match config { + MultipartUploadStorageConfig::FileSystem(c) => Box::new(local_fs::LocalFsBackend::new(c)), + MultipartUploadStorageConfig::Gcs(c) => Box::new(gcs::GcsBackend::new(c).await?), + }) +} From ae731e1232916e24782fd89aae626da88b5d7614 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 6 May 2026 16:35:38 +0200 Subject: [PATCH 06/11] uniformize hooks implementation --- objectstore-service/src/backend/testing.rs | 87 ++++++++++++++++++++-- 1 file changed, 81 insertions(+), 6 deletions(-) diff --git a/objectstore-service/src/backend/testing.rs b/objectstore-service/src/backend/testing.rs index 3d1379c1..d41a066b 100644 --- a/objectstore-service/src/backend/testing.rs +++ b/objectstore-service/src/backend/testing.rs @@ -154,6 +154,76 @@ pub trait Hooks: fmt::Debug + Send + Sync + 'static { ) -> Result { inner.compare_and_write(id, current, write).await } + + // --- MultipartUploadBackend methods --- + + /// Intercepts [`MultipartUploadBackend::initiate_multipart`]. Default delegates to `inner`. + async fn initiate_multipart( + &self, + inner: &InMemoryBackend, + id: &ObjectId, + metadata: &Metadata, + ) -> Result { + inner.initiate_multipart(id, metadata).await + } + + /// Intercepts [`MultipartUploadBackend::upload_part`]. Default delegates to `inner`. + async fn upload_part( + &self, + inner: &InMemoryBackend, + id: &ObjectId, + upload_id: &UploadId, + part_number: PartNumber, + content_length: u64, + content_md5: Option<&str>, + body: ClientStream, + ) -> Result { + inner + .upload_part( + id, + upload_id, + part_number, + content_length, + content_md5, + body, + ) + .await + } + + /// Intercepts [`MultipartUploadBackend::list_parts`]. Default delegates to `inner`. + async fn list_parts( + &self, + inner: &InMemoryBackend, + id: &ObjectId, + upload_id: &UploadId, + max_parts: Option, + part_number_marker: Option, + ) -> Result { + inner + .list_parts(id, upload_id, max_parts, part_number_marker) + .await + } + + /// Intercepts [`MultipartUploadBackend::abort_multipart`]. Default delegates to `inner`. + async fn abort_multipart( + &self, + inner: &InMemoryBackend, + id: &ObjectId, + upload_id: &UploadId, + ) -> Result { + inner.abort_multipart(id, upload_id).await + } + + /// Intercepts [`MultipartUploadBackend::complete_multipart`]. Default delegates to `inner`. + async fn complete_multipart( + &self, + inner: &InMemoryBackend, + id: &ObjectId, + upload_id: &UploadId, + parts: Vec, + ) -> Result { + inner.complete_multipart(id, upload_id, parts).await + } } /// Generic test backend that implements both [`Backend`] and [`HighVolumeBackend`]. @@ -270,7 +340,9 @@ impl MultipartUploadBackend for TestBackend { id: &ObjectId, metadata: &Metadata, ) -> Result { - self.inner.initiate_multipart(id, metadata).await + self.hooks + .initiate_multipart(&self.inner, id, metadata) + .await } async fn upload_part( @@ -282,8 +354,9 @@ impl MultipartUploadBackend for TestBackend { content_md5: Option<&str>, body: ClientStream, ) -> Result { - self.inner + self.hooks .upload_part( + &self.inner, id, upload_id, part_number, @@ -301,8 +374,8 @@ impl MultipartUploadBackend for TestBackend { max_parts: Option, part_number_marker: Option, ) -> Result { - self.inner - .list_parts(id, upload_id, max_parts, part_number_marker) + self.hooks + .list_parts(&self.inner, id, upload_id, max_parts, part_number_marker) .await } @@ -311,7 +384,7 @@ impl MultipartUploadBackend for TestBackend { id: &ObjectId, upload_id: &UploadId, ) -> Result { - self.inner.abort_multipart(id, upload_id).await + self.hooks.abort_multipart(&self.inner, id, upload_id).await } async fn complete_multipart( @@ -320,6 +393,8 @@ impl MultipartUploadBackend for TestBackend { upload_id: &UploadId, parts: Vec, ) -> Result { - self.inner.complete_multipart(id, upload_id, parts).await + self.hooks + .complete_multipart(&self.inner, id, upload_id, parts) + .await } } From 0cffb7c86b2aaab9deaf2cbc4409623f2ec9004b Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Thu, 7 May 2026 10:41:50 +0200 Subject: [PATCH 07/11] improvements --- objectstore-service/src/backend/tiered.rs | 340 ++++++---------------- 1 file changed, 90 insertions(+), 250 deletions(-) diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index 58020d53..0398e668 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -101,6 +101,7 @@ use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::Instant; +use base64::Engine as _; use bytes::Bytes; use futures_util::{Stream, StreamExt}; use objectstore_types::metadata::Metadata; @@ -546,32 +547,33 @@ where ) } -/// Token encoding the multipart upload state for TieredStorage. -/// -/// Contains only the physical key suffix (no usecase/scopes) and the upstream -/// upload ID. The full physical [`ObjectId`] is reconstructed by combining the -/// key with the context from the request's `id` parameter. -#[derive(Serialize, Deserialize)] -struct MultipartToken { - /// Key portion of the physical revision in LT (e.g. `"myfile/{uuid}"`). - physical_key: String, - /// Upload ID returned by the LT backend's `initiate_multipart`. +/// The multipart upload state for TieredStorage. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +struct TieredUploadId { + revision: String, upload_id: String, } -fn encode_multipart_token(token: &MultipartToken) -> Result { - use base64::Engine; - let json = - serde_json::to_vec(token).map_err(|e| Error::serde("encoding multipart token", e))?; - Ok(base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(json)) +impl TryInto for TieredUploadId { + type Error = Error; + + fn try_into(self) -> Result { + use base64::Engine; + let json = + serde_json::to_vec(&self).map_err(|e| Error::serde("encoding multipart token", e))?; + Ok(base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(json)) + } } -fn decode_multipart_token(upload_id: &UploadId) -> Result { - use base64::Engine; - let json = base64::engine::general_purpose::URL_SAFE_NO_PAD - .decode(upload_id.as_bytes()) - .map_err(|e| Error::generic(format!("invalid multipart upload ID encoding: {e}")))?; - serde_json::from_slice(&json).map_err(|e| Error::serde("decoding multipart token", e)) +impl TryFrom<&UploadId> for TieredUploadId { + type Error = Error; + + fn try_from(value: &UploadId) -> Result { + let json = base64::engine::general_purpose::URL_SAFE_NO_PAD + .decode(value.as_bytes()) + .map_err(|e| Error::generic(format!("invalid multipart upload ID: {e}")))?; + serde_json::from_slice(&json).map_err(|e| Error::serde("decoding multipart token", e)) + } } #[async_trait::async_trait] @@ -583,16 +585,17 @@ impl MultipartUploadBackend for TieredStorage { ) -> Result { let physical = new_long_term_revision(id); - let upstream_upload_id = self + let id = self .inner .long_term .initiate_multipart(&physical, metadata) .await?; - encode_multipart_token(&MultipartToken { - physical_key: physical.key, - upload_id: upstream_upload_id, - }) + let id = TieredUploadId { + revision: physical.key, + upload_id: id, + }; + id.try_into() } async fn upload_part( @@ -604,17 +607,18 @@ impl MultipartUploadBackend for TieredStorage { content_md5: Option<&str>, body: ClientStream, ) -> Result { - let token = decode_multipart_token(upload_id)?; + let tiered: TieredUploadId = upload_id.try_into()?; + let physical = ObjectId { context: id.context.clone(), - key: token.physical_key, + key: tiered.revision, }; self.inner .long_term .upload_part( &physical, - &token.upload_id, + &tiered.upload_id, part_number, content_length, content_md5, @@ -630,15 +634,16 @@ impl MultipartUploadBackend for TieredStorage { max_parts: Option, part_number_marker: Option, ) -> Result { - let token = decode_multipart_token(upload_id)?; + let tiered: TieredUploadId = upload_id.try_into()?; + let physical = ObjectId { context: id.context.clone(), - key: token.physical_key, + key: tiered.revision, }; self.inner .long_term - .list_parts(&physical, &token.upload_id, max_parts, part_number_marker) + .list_parts(&physical, &tiered.upload_id, max_parts, part_number_marker) .await } @@ -647,15 +652,16 @@ impl MultipartUploadBackend for TieredStorage { id: &ObjectId, upload_id: &UploadId, ) -> Result { - let token = decode_multipart_token(upload_id)?; + let tiered: TieredUploadId = upload_id.try_into()?; + let physical = ObjectId { context: id.context.clone(), - key: token.physical_key, + key: tiered.revision, }; self.inner .long_term - .abort_multipart(&physical, &token.upload_id) + .abort_multipart(&physical, &tiered.upload_id) .await } @@ -665,34 +671,19 @@ impl MultipartUploadBackend for TieredStorage { upload_id: &UploadId, parts: Vec, ) -> Result { - let token = decode_multipart_token(upload_id)?; - let physical = ObjectId { - context: id.context.clone(), - key: token.physical_key, - }; - - // 1. Complete on the LT backend (assembles parts into a readable object). - // Done before recording the change: protocol errors (invalid part, etag - // mismatch) leave no assembled blob and must not create a changelog entry. - let error = self - .inner - .long_term - .complete_multipart(&physical, &token.upload_id, parts) - .await?; + let tiered: TieredUploadId = upload_id.try_into()?; - if error.is_some() { - return Ok(error); - } - - // 2. Read current HV revision to establish the write precondition. + // 1. Read current HV revision to establish the write precondition let current = match self.inner.high_volume.get_tiered_metadata(id).await? { TieredMetadata::Tombstone(t) => Some(t.target), _ => None, }; - // 3. Record change now that the LT blob exists and needs tracking. - // Created before get_metadata so the guard cleans up the assembled - // blob if the metadata read fails. + // 2. Complete the upload, creating the object at the given revision key. + let physical = ObjectId { + context: id.context.clone(), + key: tiered.revision, + }; let mut guard = self .record_change(Change { id: id.clone(), @@ -700,9 +691,21 @@ impl MultipartUploadBackend for TieredStorage { old: current.clone(), }) .await?; + + let error = self + .inner + .long_term + .complete_multipart(&physical, &tiered.upload_id, parts) + .await?; + + if error.is_some() { + return Ok(error); + } + guard.advance(ChangePhase::Written); - // 4. Retrieve metadata from the completed object for the tombstone. + // 3. Retrieve the metadata of the object, which was determined at initiation time, to + // get the expiration policy. let metadata = self .inner .long_term @@ -712,7 +715,7 @@ impl MultipartUploadBackend for TieredStorage { Error::generic("completed multipart object not found in long-term storage") })?; - // 5. CAS commit: write tombstone only if HV state matches what we saw. + // 4. CAS commit: write tombstone only if HV state matches what we saw. let tombstone = Tombstone { target: physical.clone(), expiration_policy: metadata.expiration_policy, @@ -723,7 +726,7 @@ impl MultipartUploadBackend for TieredStorage { .compare_and_write(id, current.as_ref(), TieredWrite::Tombstone(tombstone)) .await?; - // 6. Let the guard handle cleanup based on the CAS outcome. + // Update guard and let it schedule cleanup in the background. guard.advance(ChangePhase::compare_and_write(written)); Ok(None) @@ -1480,27 +1483,14 @@ mod tests { // --- Multipart upload --- #[test] - fn multipart_token_roundtrip() { - let token = MultipartToken { - physical_key: "my-key/01924a6f-7e28-7b9a-9c1d-abcdef123456".into(), + fn multipart_upload_id_roundtrip() { + let id = TieredUploadId { + revision: "my-key/01924a6f-7e28-7b9a-9c1d-abcdef123456".into(), upload_id: "upstream-upload-id-abc".into(), }; - let encoded = encode_multipart_token(&token).unwrap(); - let decoded = decode_multipart_token(&encoded).unwrap(); - assert_eq!(decoded.physical_key, token.physical_key); - assert_eq!(decoded.upload_id, token.upload_id); - } - - #[test] - fn multipart_invalid_token_errors() { - let result = decode_multipart_token(&"not-valid-base64!!!".into()); - assert!(result.is_err()); - - // Valid base64 but not valid JSON. - use base64::Engine; - let bad_json = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(b"not json"); - let result = decode_multipart_token(&bad_json); - assert!(result.is_err()); + let encoded: UploadId = id.clone().try_into().unwrap(); + let decoded: TieredUploadId = (&encoded.clone()).try_into().unwrap(); + assert_eq!(decoded, id); } #[tokio::test] @@ -1560,9 +1550,9 @@ mod tests { } #[tokio::test] - async fn multipart_multiple_parts() { + async fn multipart_upload() { let (storage, _hv, _lt, _) = make_tiered_storage(); - let id = make_id("mp-multi"); + let id = make_id("multipart"); let upload_id = storage .initiate_multipart(&id, &Default::default()) @@ -1573,14 +1563,14 @@ mod tests { let part2 = vec![0xBBu8; 512 * 1024]; let part3 = vec![0xCCu8; 512 * 1024]; - let etag1 = storage + let etag3 = storage .upload_part( &id, &upload_id, - 1, - part1.len() as u64, + 3, + part3.len() as u64, None, - stream::single(part1.clone()), + stream::single(part3.clone()), ) .await .unwrap(); @@ -1595,14 +1585,14 @@ mod tests { ) .await .unwrap(); - let etag3 = storage + let etag1 = storage .upload_part( &id, &upload_id, - 3, - part3.len() as u64, + 1, + part1.len() as u64, None, - stream::single(part3.clone()), + stream::single(part1.clone()), ) .await .unwrap(); @@ -1678,7 +1668,7 @@ mod tests { let (storage, hv, lt, _) = make_tiered_storage(); let id = make_id("mp-overwrite"); - // First: put a large object via the normal path. + // Put a large object via the normal path. let payload1 = vec![0xAAu8; 2 * 1024 * 1024]; storage .put_object(&id, &Default::default(), stream::single(payload1)) @@ -1686,7 +1676,7 @@ mod tests { .unwrap(); let old_lt_id = hv.get(&id).expect_tombstone().target; - // Second: overwrite via multipart. + // Overwrite via multipart. let upload_id = storage .initiate_multipart(&id, &Default::default()) .await @@ -1705,6 +1695,11 @@ mod tests { .await .unwrap(); + // The multipart upload is not finalized, so the tombstone still points to the old + // revision. + let lt_id = hv.get(&id).expect_tombstone().target; + assert_eq!(old_lt_id, lt_id); + let error = storage .complete_multipart( &id, @@ -1718,18 +1713,18 @@ mod tests { .unwrap(); assert!(error.is_none()); - // New tombstone should point to a different revision. + // Now the upload has been finalized, so the new tombstone points to the new revision. let new_lt_id = hv.get(&id).expect_tombstone().target; assert_ne!(old_lt_id, new_lt_id); - // Drain background cleanup. + // Wait for background cleanup. storage.join().await; - // Old LT blob should be cleaned up. + // Old revision should be cleaned up. lt.get(&old_lt_id).expect_not_found(); lt.get(&new_lt_id).expect_object(); - // Read back the new data. + // Assert the contents of the new revision. let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); let body = stream::read_to_vec(s).await.unwrap(); assert_eq!(body, payload2); @@ -1780,159 +1775,4 @@ mod tests { assert_eq!(resp.parts[1].part_number, 2); assert_eq!(resp.parts[1].size, 200); } - - // --- Multipart consistency --- - - /// Initiates a multipart upload, uploads a single part, and returns the - /// (upload_id, etag) pair ready for complete_multipart. - async fn initiate_and_upload( - storage: &TieredStorage, - id: &ObjectId, - payload: Vec, - ) -> (UploadId, String) { - let upload_id = storage - .initiate_multipart(id, &Default::default()) - .await - .unwrap(); - let etag = storage - .upload_part( - id, - &upload_id, - 1, - payload.len() as u64, - None, - stream::single(payload), - ) - .await - .unwrap(); - (upload_id, etag) - } - - #[tokio::test] - async fn multipart_complete_cas_conflict_cleans_up_new_blob() { - let hv = TestBackend::new(CasConflict); - let lt = InMemoryBackend::new("lt"); - let log = NoopChangeLog; - let storage = TieredStorage::new(Box::new(hv), Box::new(lt.clone()), Box::new(log)); - - let id = make_id("mp-cas-conflict"); - let (upload_id, etag) = initiate_and_upload(&storage, &id, vec![0xABu8; 100]).await; - - storage - .complete_multipart( - &id, - &upload_id, - vec![CompletedPart { - part_number: 1, - etag, - }], - ) - .await - .unwrap(); - - storage.join().await; - - assert!( - lt.is_empty(), - "LT blob should be cleaned up after CAS conflict" - ); - } - - #[tokio::test] - async fn multipart_complete_no_orphan_when_cas_fails() { - let lt = InMemoryBackend::new("lt"); - let hv = TestBackend::new(FailCas(false)); - let log = NoopChangeLog; - let storage = TieredStorage::new(Box::new(hv), Box::new(lt.clone()), Box::new(log)); - - let id = make_id("mp-orphan-test"); - let (upload_id, etag) = initiate_and_upload(&storage, &id, vec![0xABu8; 100]).await; - - let result = storage - .complete_multipart( - &id, - &upload_id, - vec![CompletedPart { - part_number: 1, - etag, - }], - ) - .await; - - assert!(result.is_err()); - - storage.join().await; - - assert!(lt.is_empty(), "long-term object not cleaned up"); - } - - #[tokio::test] - async fn multipart_complete_written_cleanup_after_lost_cas_response() { - let (storage, hv, lt, log) = make_tiered_storage(); - let id = make_id("mp-written"); - - // First: establish a tombstone via normal put. - let payload1 = vec![0xAAu8; 2 * 1024 * 1024]; - storage - .put_object(&id, &Default::default(), stream::single(payload1)) - .await - .unwrap(); - let tombstone1 = hv.get(&id).expect_tombstone().target; - - // Second: complete a multipart upload through a broken storage where - // CAS succeeds but returns an error (simulating lost response). - let broken_storage = TieredStorage::new( - Box::new(TestBackend::with_inner(hv.clone(), FailCas(true))), - Box::new(lt.clone()), - Box::new(log.clone()), - ); - let (upload_id, etag) = initiate_and_upload(&broken_storage, &id, vec![0xBBu8; 100]).await; - let result = broken_storage - .complete_multipart( - &id, - &upload_id, - vec![CompletedPart { - part_number: 1, - etag, - }], - ) - .await; - assert!(result.is_err()); - - let tombstone2 = hv.get(&id).expect_tombstone().target; - assert_ne!(tombstone1, tombstone2); - - // Guard reads HV, sees the new tombstone won, cleans up the old blob. - broken_storage.join().await; - lt.get(&tombstone1).expect_not_found(); - lt.get(&tombstone2).expect_object(); - } - - #[tokio::test] - async fn multipart_complete_changelog_entry_removed() { - let (storage, _hv, _lt, log) = make_tiered_storage(); - let id = make_id("mp-changelog"); - - let (upload_id, etag) = initiate_and_upload(&storage, &id, vec![0xABu8; 100]).await; - - storage - .complete_multipart( - &id, - &upload_id, - vec![CompletedPart { - part_number: 1, - etag, - }], - ) - .await - .unwrap(); - - storage.join().await; - - let entries = log.scan().await.unwrap(); - assert!( - entries.is_empty(), - "changelog entry not removed after successful multipart complete" - ); - } } From d6eb979ded41bf86329f839e42154863fdbc8b42 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Thu, 7 May 2026 10:48:26 +0200 Subject: [PATCH 08/11] lints --- objectstore-service/src/backend/testing.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/objectstore-service/src/backend/testing.rs b/objectstore-service/src/backend/testing.rs index d41a066b..4b767e21 100644 --- a/objectstore-service/src/backend/testing.rs +++ b/objectstore-service/src/backend/testing.rs @@ -168,6 +168,7 @@ pub trait Hooks: fmt::Debug + Send + Sync + 'static { } /// Intercepts [`MultipartUploadBackend::upload_part`]. Default delegates to `inner`. + #[allow(clippy::too_many_arguments)] async fn upload_part( &self, inner: &InMemoryBackend, From c21fe73bebb4f596265ec830595cc9e98ca2f44f Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Tue, 12 May 2026 13:51:37 +0200 Subject: [PATCH 09/11] fix --- objectstore-service/src/backend/tiered.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index 0398e668..6f314233 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -558,7 +558,6 @@ impl TryInto for TieredUploadId { type Error = Error; fn try_into(self) -> Result { - use base64::Engine; let json = serde_json::to_vec(&self).map_err(|e| Error::serde("encoding multipart token", e))?; Ok(base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(json)) From 376e9d8f5c881bc8971054a0bf2bc610061a4c4c Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Tue, 26 May 2026 11:51:08 +0200 Subject: [PATCH 10/11] add a comment --- objectstore-service/src/backend/common.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/objectstore-service/src/backend/common.rs b/objectstore-service/src/backend/common.rs index 28df85cc..8c106965 100644 --- a/objectstore-service/src/backend/common.rs +++ b/objectstore-service/src/backend/common.rs @@ -103,6 +103,12 @@ pub trait MultipartUploadBackend: Backend + fmt::Debug + Send + Sync + 'static { /// Finalizes the upload identified by `(id, upload_id)` with the given /// ordered list of parts. + /// + /// Note that this returns `Result>`. + /// It's therefore possible to get `Ok(Some(err))`, meaning that at the server level this will + /// translate to HTTP `200 OK` with an error contained in the response body. + /// We need to do it this way to mirror backends that also behave like this (namely S3 and + /// GCS). async fn complete_multipart( &self, id: &ObjectId, From 59919c0d939b3c0a80eda4e06483cd561f243411 Mon Sep 17 00:00:00 2001 From: Lorenzo Cian <17258265+lcian@users.noreply.github.com> Date: Fri, 29 May 2026 10:01:09 +0200 Subject: [PATCH 11/11] feat(changelog): Add and use Assembling phase with deferred cleanup (#480) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ⚠️ Stacked on https://github.com/getsentry/objectstore/pull/458 This addresses the issue being discussed in https://github.com/getsentry/objectstore/pull/458#discussion_r3200217052/ by introducing a new `ChangePhase::Assembling` variant. The corresponding `Change` carries a `cleanup_after` timestamp, allowing the caller to defer cleanup. In practice, we use this in `TieredStorage::complete_multipart` with an arbitrary 24 hours deadline (should we change this?), during which the user can freely retry completing the upload. Note that this `ChangePhase` is special, as it behaves like `ChangePhase::complete` in-process: - It does nothing on `Drop`, as we assume that the deadline hasn't passed yet. - The corresponding `ChangeGuard` for it also doesn't spawn any task on `Drop`, due to the same assumption. As a consequence, this depends on a persistent implementation of the `ChangeLog` to actually clean anything up, or at least on a task that periodically scans the in-memory `ChangeLog` for `Change`s to clean up, both of which we don't have yet. In production, we will eventually always use a persistent `ChangeLog`, and other code paths exist that similarly require a persistent `ChangeLog` to actually ensure that no LT orphans are created, so I think it's fine for this change to also make that assumption. Part of FS-339. Will be squash-merged into https://github.com/getsentry/objectstore/pull/458 once approved. --- objectstore-service/src/backend/changelog.rs | 128 ++++- objectstore-service/src/backend/tiered.rs | 538 +++++++++++++++++-- 2 files changed, 613 insertions(+), 53 deletions(-) diff --git a/objectstore-service/src/backend/changelog.rs b/objectstore-service/src/backend/changelog.rs index 548be885..885613c6 100644 --- a/objectstore-service/src/backend/changelog.rs +++ b/objectstore-service/src/backend/changelog.rs @@ -22,7 +22,7 @@ use std::collections::HashMap; use std::fmt; use std::sync::{Arc, Mutex}; -use std::time::Duration; +use std::time::{Duration, SystemTime}; use tokio_util::task::TaskTracker; use tokio_util::task::task_tracker::TaskTrackerToken; @@ -76,6 +76,10 @@ pub struct Change { /// /// Needs cleanup on success (the CAS committed and the old blob is unreferenced). pub old: Option, + /// Earliest time at which this entry becomes eligible for cleanup. + /// + /// [`ChangeLog::scan`] filters out the entry, unless the deadline has passed. + pub cleanup_after: Option, } /// Manager for multi-step storage changes, including backends and durable log. @@ -134,6 +138,28 @@ impl ChangeManager { Ok(ChangeGuard { state: Some(state) }) } + /// Records the change to the log and returns a guard in the `Assembling` state. + /// + /// Behaves like [`Self::record`], except that the guard is created in the `Assembling` state. + /// Unlike other states, this guard does nothing on drop, leaving the burden of cleaning up to + /// the [`ChangeLog`]. + pub async fn record_assembling(self: Arc, change: Change) -> Result { + let token = self.tracker.token(); + + let id = ChangeId::new(); + self.changelog.record(&id, &change).await?; + + let state = ChangeState { + id, + change, + phase: ChangePhase::Assembling, + manager: self.clone(), + _token: token, + }; + + Ok(ChangeGuard { state: Some(state) }) + } + /// Scans the changelog for outstanding entries and runs cleanup for each. /// /// Spawn this into a background task at startup to recover from any orphaned objects after a @@ -226,15 +252,31 @@ impl ChangeLog for InMemoryChangeLog { } async fn scan(&self) -> Result> { + let now = SystemTime::now(); let entries = self.entries.lock().expect("lock poisoned"); let result = entries .iter() + .filter(|(_, change)| match change.cleanup_after { + None => true, + Some(deadline) => now >= deadline, + }) .map(|(id, change)| (id.clone(), change.clone())) .collect(); Ok(result) } } +#[cfg(test)] +impl InMemoryChangeLog { + /// Sets [`Change::cleanup_after`] to the past for all entries, forcing them to be returned by a subsequent [`ChangeLog::scan`]. + pub fn expire_all(&self) { + let mut entries = self.entries.lock().expect("lock poisoned"); + for change in entries.values_mut() { + change.cleanup_after = Some(SystemTime::UNIX_EPOCH); + } + } +} + /// [`ChangeLog`] implementation that discards all entries. /// /// Used as the default when no durable log is configured. Provides no @@ -265,6 +307,12 @@ pub enum ChangePhase { Recovered, /// The change is recorded in the log and LT upload has started. Recorded, + /// The LT blob originated from a multipart upload and is being assembled. + /// + /// Multipart upload completion can fail, and we want the client to be able to retry it + /// without the change cleanup process racing to delete the LT blob. + /// Therefore, cleanup of changes in this phase is deferred. + Assembling, /// LT upload has succeeded and the tombstone is being updated. Written, /// The tombstone update failed due to a conflict. @@ -310,7 +358,7 @@ impl ChangeState { ChangePhase::Written => self.read_tombstone().await, ChangePhase::Lost => self.change.old.clone(), ChangePhase::Updated => self.change.new.clone(), - ChangePhase::Completed => return, // unreachable + ChangePhase::Assembling | ChangePhase::Completed => return, // unreachable }; if current != self.change.old @@ -373,12 +421,21 @@ impl ChangeState { impl Drop for ChangeState { fn drop(&mut self) { - if self.phase != ChangePhase::Completed { - objectstore_log::error!( - change = ?self.change, - phase = ?self.phase, - "Operation dropped without completing cleanup" - ); + match self.phase { + ChangePhase::Completed => {} + ChangePhase::Assembling => { + objectstore_log::warn!( + change = ?self.change, + "Operation dropped in Assembling state, cleanup deferred to ChangeLog recovery" + ); + } + _ => { + objectstore_log::error!( + change = ?self.change, + phase = ?self.phase, + "Operation dropped without completing cleanup" + ); + } } } } @@ -405,6 +462,7 @@ impl ChangeGuard { impl Drop for ChangeGuard { fn drop(&mut self) { if let Some(state) = self.state.take() + && state.phase != ChangePhase::Assembling && state.phase != ChangePhase::Completed && let Ok(handle) = tokio::runtime::Handle::try_current() { @@ -440,6 +498,7 @@ mod tests { id: make_id("object-key"), new: Some(make_id("object-key/rev1")), old: None, + cleanup_after: None, }; log.record(&id, &change).await.unwrap(); @@ -457,6 +516,7 @@ mod tests { id: make_id("object-key"), new: None, old: Some(make_id("object-key/rev1")), + cleanup_after: None, }; log.record(&id, &change).await.unwrap(); @@ -495,6 +555,7 @@ mod tests { id: make_id("crash-test"), new: Some(make_id("crash-test/rev")), old: None, + cleanup_after: None, })) .unwrap() // Runtime drops here while `guard` is still alive outside it. @@ -508,4 +569,55 @@ mod tests { let entries = rt.block_on(log.scan()).unwrap(); assert_eq!(entries.len(), 1, "log entry must persist"); } + + #[tokio::test] + async fn scan_filters_by_cleanup_after() { + let log = InMemoryChangeLog::default(); + + let ready_id = ChangeId::new(); + log.record( + &ready_id, + &Change { + id: make_id("ready"), + new: Some(make_id("ready/rev")), + old: None, + cleanup_after: None, + }, + ) + .await + .unwrap(); + + let expired_id = ChangeId::new(); + log.record( + &expired_id, + &Change { + id: make_id("expired"), + new: Some(make_id("expired/rev")), + old: None, + cleanup_after: Some(SystemTime::now() - Duration::from_secs(1)), + }, + ) + .await + .unwrap(); + + let deferred_id = ChangeId::new(); + log.record( + &deferred_id, + &Change { + id: make_id("deferred"), + new: Some(make_id("deferred/rev")), + old: None, + cleanup_after: Some(SystemTime::now() + Duration::from_hours(24)), + }, + ) + .await + .unwrap(); + + let entries = log.scan().await.unwrap(); + assert_eq!(entries.len(), 2); + + let ids: Vec<_> = entries.iter().map(|(id, _)| id).collect(); + assert!(ids.contains(&&ready_id)); + assert!(ids.contains(&&expired_id)); + } } diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index 6f314233..4d329c3e 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -99,7 +99,7 @@ use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; -use std::time::Instant; +use std::time::{Duration, Instant, SystemTime}; use base64::Engine as _; use bytes::Bytes; @@ -123,6 +123,13 @@ use crate::stream::{ClientStream, SizedPeek}; /// The threshold up until which we will go to the "high volume" backend. const BACKEND_SIZE_THRESHOLD: usize = 1024 * 1024; // 1 MiB + +/// Amount of time for which a `Change` generated by a `complete_multipart` operation is kept in the `Assembling` +/// state before becoming eligible for cleanup by the `ChangeLog` recovery process. +/// This allows the client to retry the `complete_multipart` operation upon any failures for at least this long, +/// avoiding scenarios where the `ChangeLog` recovery would race to delete the assembled LT blob. +const MULTIPART_COMPLETE_CLEANUP_DELAY: Duration = Duration::from_hours(24); + /// Creates a new [`ObjectId`] with the same context but a unique revision key. /// /// The new key has the format `{original_key}/{uuid_v7}`, producing a distinct @@ -241,6 +248,12 @@ impl TieredStorage { self.inner.clone().record(change).await } + /// Records the change to the log in the `Assembling` phase, and returns a guard that does + /// nothing on drop unless advanced. + async fn record_assembling(&self, change: Change) -> Result { + self.inner.clone().record_assembling(change).await + } + /// Returns the name of the backend corresponding to the given routing choice. fn backend_type(&self, choice: &BackendChoice) -> &'static str { match choice { @@ -276,6 +289,7 @@ impl TieredStorage { id: id.clone(), new: None, old: Some(target.clone()), + cleanup_after: None, }) .await?; @@ -317,6 +331,7 @@ impl TieredStorage { id: id.clone(), new: Some(new.clone()), old: current.clone(), + cleanup_after: None, }) .await?; @@ -471,6 +486,7 @@ impl Backend for TieredStorage { id: id.clone(), new: None, old: Some(tombstone.target.clone()), + cleanup_after: None, }) .await?; guard.advance(ChangePhase::Written); @@ -672,47 +688,87 @@ impl MultipartUploadBackend for TieredStorage { ) -> Result { let tiered: TieredUploadId = upload_id.try_into()?; - // 1. Read current HV revision to establish the write precondition + let physical = ObjectId { + context: id.context.clone(), + key: tiered.revision, + }; + + // 1. Read current HV revision to establish the write precondition. let current = match self.inner.high_volume.get_tiered_metadata(id).await? { + // Optimization: a previous attempt already finalized this revision and tombstone -- report success. + TieredMetadata::Tombstone(t) if t.target == physical => return Ok(None), TieredMetadata::Tombstone(t) => Some(t.target), _ => None, }; - // 2. Complete the upload, creating the object at the given revision key. - let physical = ObjectId { - context: id.context.clone(), - key: tiered.revision, - }; + // Register a guard with cleanup deferred to now + `MULTIPART_COMPLETE_CLEANUP_DELAY`, + // so that the user has the chance to retry finalizing the upload in this timeframe. let mut guard = self - .record_change(Change { + .record_assembling(Change { id: id.clone(), new: Some(physical.clone()), old: current.clone(), + cleanup_after: Some(SystemTime::now() + MULTIPART_COMPLETE_CLEANUP_DELAY), }) .await?; - let error = self + // 2. Complete the upload, creating the object at the given revision key. + let maybe_complete_multipart_err = match self .inner .long_term .complete_multipart(&physical, &tiered.upload_id, parts) - .await?; - - if error.is_some() { - return Ok(error); - } - - guard.advance(ChangePhase::Written); + .await + { + // The request went through but we got an error in the response body. + // Transparently proxy the error to the user. + Ok(error) => { + if error.is_some() { + return Ok(error); + } + None + } + // We got status 4xx/5xx, or a network error. + // Either way, `complete_multipart` might have been completed successfully, + // either now or in a previous attempt (in that case, that's a 404 and we indeed end up + // here). + // We cannot know if that's the case yet, so we continue to the next steps. + Err(err) => Some(err), + }; // 3. Retrieve the metadata of the object, which was determined at initiation time, to // get the expiration policy. - let metadata = self - .inner - .long_term - .get_metadata(&physical) - .await? - .ok_or_else(|| { - Error::generic("completed multipart object not found in long-term storage") - })?; + // + // This also serves as an existence check to understand if the LT revision was actually + // created successfully in this or a previous attempt, in which case we just need to + // finalize the tombstone. + let metadata = self.inner.long_term.get_metadata(&physical).await; + + let metadata = match (metadata, maybe_complete_multipart_err) { + // The LT revision already exists, so we can continue to finalize the tombstone. + (Ok(Some(metadata)), _) => metadata, + // The LT revision doesn't exist, cannot proceed. + (Ok(None), Some(err)) => return Err(err), + // The `complete_multipart` succeeded, creating the object, but the `get_metadata` + // immediately after failed to find the object. This should never happen. + (Ok(None), None) => { + objectstore_log::error!( + id = ?id, + upload_id = ?upload_id, + physical = ?physical, + "complete_multipart call succeeded on long_term backend, but subsequent get_metadata found no object" + ); + return Err(Error::generic( + "completed multipart object not found in long-term storage", + )); + } + // Failed to `get_metadata`, cannot proceed. + (Err(get_metadata_err), maybe_complete_multipart_err) => { + // Prefer the `complete_multipart_err`, as it's likely more informative. + // TODO(FS-358): convert this properly. Right now `ApiErrorResponse` will turn this into a 500, + // but we would actually want to transparently surface the original status (and message?) instead. + return Err(maybe_complete_multipart_err.unwrap_or(get_metadata_err)); + } + }; // 4. CAS commit: write tombstone only if HV state matches what we saw. let tombstone = Tombstone { @@ -734,8 +790,7 @@ impl MultipartUploadBackend for TieredStorage { #[cfg(test)] mod tests { - use std::time::Duration; - + use futures::lock::Mutex; use objectstore_types::metadata::{ExpirationPolicy, Metadata}; use objectstore_types::scope::{Scope, Scopes}; @@ -745,6 +800,7 @@ mod tests { use crate::backend::testing::{Hooks, TestBackend}; use crate::error::Error; use crate::id::ObjectContext; + use crate::stream::{self, ClientStream}; fn make_context() -> ObjectContext { @@ -1364,6 +1420,7 @@ mod tests { id: make_id("object-key"), new: Some(make_id("cleanup-target")), old: None, + cleanup_after: None, }; // Build the guard inside a temporary runtime, then let the runtime drop @@ -1387,6 +1444,7 @@ mod tests { id: make_id("object-key"), new: None, old: None, + cleanup_after: None, }; let mut guard = storage.record_change(change).await.unwrap(); @@ -1662,6 +1720,52 @@ mod tests { assert!(storage.get_object(&id).await.unwrap().is_none()); } + #[tokio::test] + async fn multipart_list_parts() { + let (storage, _hv, _lt, _) = make_tiered_storage(); + let id = make_id("mp-list"); + + let upload_id = storage + .initiate_multipart(&id, &Default::default()) + .await + .unwrap(); + + let part1 = vec![0xAAu8; 100]; + let part2 = vec![0xBBu8; 200]; + storage + .upload_part( + &id, + &upload_id, + 1, + part1.len() as u64, + None, + stream::single(part1), + ) + .await + .unwrap(); + storage + .upload_part( + &id, + &upload_id, + 2, + part2.len() as u64, + None, + stream::single(part2), + ) + .await + .unwrap(); + + let resp = storage + .list_parts(&id, &upload_id, None, None) + .await + .unwrap(); + assert_eq!(resp.parts.len(), 2); + assert_eq!(resp.parts[0].part_number, 1); + assert_eq!(resp.parts[0].size, 100); + assert_eq!(resp.parts[1].part_number, 2); + assert_eq!(resp.parts[1].size, 200); + } + #[tokio::test] async fn multipart_overwrites_existing_tombstone() { let (storage, hv, lt, _) = make_tiered_storage(); @@ -1729,49 +1833,393 @@ mod tests { assert_eq!(body, payload2); } + // --- Multipart completion failure handling (consistency, retries, delayed cleanup) --- + + /// Assembles the blob via `complete_multipart`, but returns an error to simulate a network + /// failure on the response path. Also fails `get_metadata` so the tiered layer cannot + /// recover by detecting the already-assembled blob. + #[derive(Debug)] + struct CompleteMultipartButReturnError; + + #[async_trait::async_trait] + impl Hooks for CompleteMultipartButReturnError { + async fn complete_multipart( + &self, + inner: &InMemoryBackend, + id: &ObjectId, + upload_id: &UploadId, + parts: Vec, + ) -> Result { + inner + .complete_multipart(id, upload_id, parts) + .await + .unwrap(); + Err(Error::Io(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "simulated network error on complete_multipart", + ))) + } + + async fn get_metadata( + &self, + _inner: &InMemoryBackend, + _id: &ObjectId, + ) -> Result { + Err(Error::Io(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "simulated network error on get_metadata", + ))) + } + } + + /// `complete_multipart` on the inner LT backend assembles the blob successfully, but both + /// `complete_multipart` and `get_metadata` return errors, so the tiered layer cannot finalize. + /// After `MULTIPART_COMPLETE_CLEANUP_DELAY`, `ChangeLog` recovery deletes the orphaned blob. #[tokio::test] - async fn multipart_list_parts() { - let (storage, _hv, _lt, _) = make_tiered_storage(); - let id = make_id("mp-list"); + async fn cleans_up_orphan_after_failed_multipart_complete() { + let hv = InMemoryBackend::new("hv"); + let lt_inner = InMemoryBackend::new("lt"); + let log = InMemoryChangeLog::default(); + let storage = TieredStorage::new( + Box::new(hv.clone()), + Box::new(TestBackend::with_inner( + lt_inner.clone(), + CompleteMultipartButReturnError {}, + )), + Box::new(log.clone()), + ); + let id = make_id("mp-orphan"); let upload_id = storage .initiate_multipart(&id, &Default::default()) .await .unwrap(); - let part1 = vec![0xAAu8; 100]; - let part2 = vec![0xBBu8; 200]; - storage + let tiered_id: TieredUploadId = (&upload_id).try_into().unwrap(); + let physical = ObjectId { + context: id.context.clone(), + key: tiered_id.revision, + }; + + let payload = vec![0xABu8; 2 * 1024 * 1024]; + let etag = storage .upload_part( &id, &upload_id, 1, - part1.len() as u64, + payload.len() as u64, None, - stream::single(part1), + stream::single(payload), ) .await .unwrap(); - storage + + let result = storage + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag, + }], + ) + .await; + assert!(result.is_err()); + storage.join().await; + + // The LT blob is orphaned, and no cleanup has been performed (yet), due to the guard being + // dropped while in the `Assembling` state. + lt_inner.get(&physical).expect_object(); + hv.get(&id).expect_not_found(); + + // Simulate the passage of time and run recovery. + log.expire_all(); + let manager = ChangeManager::new( + Box::new(hv.clone()), + Box::new(lt_inner.clone()), + Box::new(log.clone()), + ); + manager.recover().await.unwrap(); + + // The orphaned LT blob has been cleaned up. + lt_inner.get(&physical).expect_not_found(); + // The change has been removed from the log. + let remaining = log.scan().await.unwrap(); + assert!(remaining.is_empty()); + } + + #[derive(Debug)] + struct FailOnFirstCompleteMultipartAttempt { + attempt: Mutex, + } + + impl FailOnFirstCompleteMultipartAttempt { + fn new() -> Self { + Self { + attempt: Mutex::new(0), + } + } + } + + #[async_trait::async_trait] + impl Hooks for FailOnFirstCompleteMultipartAttempt { + async fn complete_multipart( + &self, + inner: &InMemoryBackend, + id: &ObjectId, + upload_id: &UploadId, + parts: Vec, + ) -> Result { + let mut attempt = self.attempt.lock().await; + *attempt += 1; + if *attempt == 1 { + return Err(Error::Io(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "simulated network error", + ))); + } else { + return Ok(inner + .complete_multipart(id, upload_id, parts) + .await + .unwrap()); + } + } + } + + /// The first attempt to `complete_multipart` fails, which generates a `Change` entry. + /// The second call succeeds. + /// When it's time to clean up, nothing is deleted, as the `complete_multipart` eventually went + /// through before the cleanup deadline. + #[tokio::test] + async fn multipart_complete_succeeds_on_retry_and_leaves_state_consistent() { + let hv = InMemoryBackend::new("hv"); + let lt_inner = InMemoryBackend::new("lt"); + let log = InMemoryChangeLog::default(); + let storage = TieredStorage::new( + Box::new(hv.clone()), + Box::new(TestBackend::with_inner( + lt_inner.clone(), + FailOnFirstCompleteMultipartAttempt::new(), + )), + Box::new(log.clone()), + ); + + let id = make_id("mp-retry"); + let upload_id = storage + .initiate_multipart(&id, &Default::default()) + .await + .unwrap(); + + let tiered_id: TieredUploadId = (&upload_id).try_into().unwrap(); + let physical = ObjectId { + context: id.context.clone(), + key: tiered_id.revision, + }; + + let payload = vec![0xABu8; 2 * 1024 * 1024]; + let etag = storage .upload_part( &id, &upload_id, - 2, - part2.len() as u64, + 1, + payload.len() as u64, None, - stream::single(part2), + stream::single(payload.clone()), ) .await .unwrap(); - let resp = storage - .list_parts(&id, &upload_id, None, None) + // The first `complete_multipart` call fails. + let result = storage + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag: etag.clone(), + }], + ) + .await; + assert!(result.is_err()); + storage.join().await; + + // The second `complete_multipart` call succeeds. + let result = storage + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag, + }], + ) + .await; + assert!(result.is_ok()); + storage.join().await; + + // The object is there. + let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); + let body = stream::read_to_vec(s).await.unwrap(); + assert_eq!(body, payload); + + // Simulate the passage of time and run recovery. + log.expire_all(); + let manager = ChangeManager::new( + Box::new(hv.clone()), + Box::new(lt_inner.clone()), + Box::new(log.clone()), + ); + manager.recover().await.unwrap(); + + // The LT blob has not been cleaned up, as the write eventually went through. + lt_inner.get(&physical).expect_object(); + // The tombstone still points to the blob. + let tombstone = hv.get(&id).expect_tombstone(); + assert_eq!(tombstone.target, physical); + // The change has been removed from the log. + let remaining = log.scan().await.unwrap(); + assert!(remaining.is_empty()); + + // The object is still there after recovery. + let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); + let body = stream::read_to_vec(s).await.unwrap(); + assert_eq!(body, payload); + } + + #[derive(Debug)] + struct FailOnFirstGetMetadataAttempt { + attempt: Mutex, + } + + impl FailOnFirstGetMetadataAttempt { + fn new() -> Self { + Self { + attempt: Mutex::new(0), + } + } + } + + #[async_trait::async_trait] + impl Hooks for FailOnFirstGetMetadataAttempt { + async fn get_metadata( + &self, + inner: &InMemoryBackend, + id: &ObjectId, + ) -> Result { + let mut attempt = self.attempt.lock().await; + *attempt += 1; + if *attempt == 1 { + return Err(Error::Io(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "simulated network error", + ))); + } else { + inner.get_metadata(id).await + } + } + } + + /// The first attempt to `complete_multipart` succeeds on the LT backend, but the subsequent + /// `get_metadata` call fails with a network error, causing the overall `complete_multipart` + /// to fail. The second call retries and succeeds (the LT object already exists from the first + /// attempt). + /// When it's time to clean up, nothing is deleted, as the `complete_multipart` eventually went + /// through before the cleanup deadline. + #[tokio::test] + async fn multipart_complete_succeeds_on_retry_if_get_metadata_errs_and_leaves_state_consistent() + { + let hv = InMemoryBackend::new("hv"); + let lt_inner = InMemoryBackend::new("lt"); + let log = InMemoryChangeLog::default(); + let storage = TieredStorage::new( + Box::new(hv.clone()), + Box::new(TestBackend::with_inner( + lt_inner.clone(), + FailOnFirstGetMetadataAttempt::new(), + )), + Box::new(log.clone()), + ); + + let id = make_id("mp-retry-meta"); + let upload_id = storage + .initiate_multipart(&id, &Default::default()) .await .unwrap(); - assert_eq!(resp.parts.len(), 2); - assert_eq!(resp.parts[0].part_number, 1); - assert_eq!(resp.parts[0].size, 100); - assert_eq!(resp.parts[1].part_number, 2); - assert_eq!(resp.parts[1].size, 200); + + let tiered_id: TieredUploadId = (&upload_id).try_into().unwrap(); + let physical = ObjectId { + context: id.context.clone(), + key: tiered_id.revision, + }; + + let payload = vec![0xABu8; 2 * 1024 * 1024]; + let etag = storage + .upload_part( + &id, + &upload_id, + 1, + payload.len() as u64, + None, + stream::single(payload.clone()), + ) + .await + .unwrap(); + + // The first `complete_multipart` call fails (get_metadata network error), even though it + // internally creates the LT blob. + let result = storage + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag: etag.clone(), + }], + ) + .await; + assert!(result.is_err()); + storage.join().await; + + // The second `complete_multipart` call succeeds. + let result = storage + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag, + }], + ) + .await; + assert!(result.is_ok()); + storage.join().await; + + // The object is there. + let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); + let body = stream::read_to_vec(s).await.unwrap(); + assert_eq!(body, payload); + + // Simulate the passage of time and run recovery. + log.expire_all(); + let manager = ChangeManager::new( + Box::new(hv.clone()), + Box::new(lt_inner.clone()), + Box::new(log.clone()), + ); + manager.recover().await.unwrap(); + + // The LT blob has not been cleaned up, as the write eventually went through. + lt_inner.get(&physical).expect_object(); + // The tombstone still points to the blob. + let tombstone = hv.get(&id).expect_tombstone(); + assert_eq!(tombstone.target, physical); + // The change has been removed from the log. + let remaining = log.scan().await.unwrap(); + assert!(remaining.is_empty()); + + // The object is there after recovery. + let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); + let body = stream::read_to_vec(s).await.unwrap(); + assert_eq!(body, payload); } }