From 7dad5b11c67970a39f01be4143c04e88eaa125a1 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Tue, 26 May 2026 16:15:59 +0200 Subject: [PATCH 01/22] feat: add Assembling phase to ChangeGuard for multipart uploads MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The existing ChangeGuard assumes every LT write uses a unique key and can be safely cleaned up if the guard drops before CAS commits. For multipart uploads the physical key is fixed at initiation time, so a retry of complete_multipart reuses the same key. Immediate cleanup on guard drop would race with retries and delete successfully assembled blobs. Introduce ChangePhase::Assembling: when the guard drops in this phase, in-process cleanup is skipped and the changelog entry persists. A cleanup_after timestamp on Change tells scan() to filter out entries whose grace period hasn't elapsed. Once the deadline passes, a recovery scan picks up the entry and uses read_tombstone() to determine the correct cleanup action — preserving blobs that a retry committed and deleting genuine orphans. Refs: FS-339 --- objectstore-service/src/backend/changelog.rs | 250 ++++++++++++++++++- objectstore-service/src/backend/tiered.rs | 106 +++++++- 2 files changed, 346 insertions(+), 10 deletions(-) diff --git a/objectstore-service/src/backend/changelog.rs b/objectstore-service/src/backend/changelog.rs index 548be885..6c943060 100644 --- a/objectstore-service/src/backend/changelog.rs +++ b/objectstore-service/src/backend/changelog.rs @@ -22,7 +22,7 @@ use std::collections::HashMap; use std::fmt; use std::sync::{Arc, Mutex}; -use std::time::Duration; +use std::time::{Duration, SystemTime}; use tokio_util::task::TaskTracker; use tokio_util::task::task_tracker::TaskTrackerToken; @@ -36,6 +36,10 @@ const INITIAL_BACKOFF: Duration = Duration::from_millis(100); /// Maximum delay for exponential backoff retries in background cleanup tasks. const MAX_BACKOFF: Duration = Duration::from_secs(30); +/// Amount of time for which a change is kept in the Assembling state before becoming eligible for +/// cleanup. +const ASSEMBLING_CLEANUP_DELAY: Duration = Duration::from_hours(24); + /// Unique identifier for a change log entry. /// /// Generated per-operation as a UUIDv7. In durable storage, scoped to the @@ -76,6 +80,11 @@ pub struct Change { /// /// Needs cleanup on success (the CAS committed and the old blob is unreferenced). pub old: Option, + /// Earliest time at which this entry becomes eligible for cleanup. + /// + /// [`ChangeLog::scan`] automatically filters out entries whose deadline has not yet been + /// reached. + pub cleanup_after: Option, } /// Manager for multi-step storage changes, including backends and durable log. @@ -134,6 +143,31 @@ impl ChangeManager { Ok(ChangeGuard { state: Some(state) }) } + /// Records the change to the log and returns a guard. + /// + /// Behaves like [`Self::record`], except that the guard is created in the `Assembling` state and + /// cleanup is only attempted after `ASSEMBLING_CLEANUP_DELAY`. + pub async fn record_assembling(self: Arc, mut change: Change) -> Result { + change.cleanup_after = Some(SystemTime::now() + ASSEMBLING_CLEANUP_DELAY); + + let token = self.tracker.token(); + + let id = ChangeId::new(); + self.changelog.record(&id, &change).await?; + + let state = ChangeState { + id, + change, + phase: ChangePhase::Assembling { + deadline: SystemTime::now() + ASSEMBLING_CLEANUP_DELAY, + }, + manager: self.clone(), + _token: token, + }; + + Ok(ChangeGuard { state: Some(state) }) + } + /// Scans the changelog for outstanding entries and runs cleanup for each. /// /// Spawn this into a background task at startup to recover from any orphaned objects after a @@ -211,6 +245,18 @@ pub struct InMemoryChangeLog { entries: Arc>>, } +impl InMemoryChangeLog { + /// Returns the total number of entries, ignoring `cleanup_after` filtering. + pub fn len(&self) -> usize { + self.entries.lock().expect("lock poisoned").len() + } + + /// Returns `true` if the log contains no entries, ignoring `cleanup_after` filtering. + pub fn is_empty(&self) -> bool { + self.entries.lock().expect("lock poisoned").is_empty() + } +} + #[async_trait::async_trait] impl ChangeLog for InMemoryChangeLog { async fn record(&self, id: &ChangeId, change: &Change) -> Result<()> { @@ -226,9 +272,14 @@ impl ChangeLog for InMemoryChangeLog { } async fn scan(&self) -> Result> { + let now = SystemTime::now(); let entries = self.entries.lock().expect("lock poisoned"); let result = entries .iter() + .filter(|(_, change)| match change.cleanup_after { + None => true, + Some(deadline) => now >= deadline, + }) .map(|(id, change)| (id.clone(), change.clone())) .collect(); Ok(result) @@ -265,6 +316,15 @@ pub enum ChangePhase { Recovered, /// The change is recorded in the log and LT upload has started. Recorded, + /// The LT blob originated from a multipart upload and is being assembled. + /// + /// Multipart upload completion can fail, and we want the client to be able to retry it + /// without the change cleanup process racing to delete the LT blob. + /// Therefore, cleanup of changes in this phase is deferred. + Assembling { + /// Earliest time at which this entry becomes eligible for cleanup. + deadline: SystemTime, + }, /// LT upload has succeeded and the tombstone is being updated. Written, /// The tombstone update failed due to a conflict. @@ -306,6 +366,13 @@ impl ChangeState { // For `Recovered`, we must first check the state of the tombstone. ChangePhase::Recovered => self.read_tombstone().await, ChangePhase::Recorded => self.change.old.clone(), + ChangePhase::Assembling { .. } => { + objectstore_log::error!( + change = ?self.change, + "Assembling change should not reach in-process cleanup" + ); + return; + } // For `Written`, the CAS outcome is unknown — read HV to determine it. ChangePhase::Written => self.read_tombstone().await, ChangePhase::Lost => self.change.old.clone(), @@ -373,12 +440,21 @@ impl ChangeState { impl Drop for ChangeState { fn drop(&mut self) { - if self.phase != ChangePhase::Completed { - objectstore_log::error!( - change = ?self.change, - phase = ?self.phase, - "Operation dropped without completing cleanup" - ); + match self.phase { + ChangePhase::Completed => {} + ChangePhase::Assembling { .. } => { + objectstore_log::debug!( + change = ?self.change, + "Assembling change deferred to persistent changelog recovery" + ); + } + _ => { + objectstore_log::error!( + change = ?self.change, + phase = ?self.phase, + "Operation dropped without completing cleanup" + ); + } } } } @@ -406,12 +482,11 @@ impl Drop for ChangeGuard { fn drop(&mut self) { if let Some(state) = self.state.take() && state.phase != ChangePhase::Completed + && !matches!(state.phase, ChangePhase::Assembling { .. }) && let Ok(handle) = tokio::runtime::Handle::try_current() { handle.spawn(state.cleanup()); } - - // NB: Drop of `ChangeState` logs an error if cleanup is not scheduled. } } @@ -440,6 +515,7 @@ mod tests { id: make_id("object-key"), new: Some(make_id("object-key/rev1")), old: None, + cleanup_after: None, }; log.record(&id, &change).await.unwrap(); @@ -457,6 +533,7 @@ mod tests { id: make_id("object-key"), new: None, old: Some(make_id("object-key/rev1")), + cleanup_after: None, }; log.record(&id, &change).await.unwrap(); @@ -495,6 +572,7 @@ mod tests { id: make_id("crash-test"), new: Some(make_id("crash-test/rev")), old: None, + cleanup_after: None, })) .unwrap() // Runtime drops here while `guard` is still alive outside it. @@ -508,4 +586,158 @@ mod tests { let entries = rt.block_on(log.scan()).unwrap(); assert_eq!(entries.len(), 1, "log entry must persist"); } + + #[tokio::test] + async fn scan_filters_by_cleanup_after() { + let log = InMemoryChangeLog::default(); + + let ready_id = ChangeId::new(); + log.record( + &ready_id, + &Change { + id: make_id("ready"), + new: Some(make_id("ready/rev")), + old: None, + cleanup_after: None, + }, + ) + .await + .unwrap(); + + let expired_id = ChangeId::new(); + log.record( + &expired_id, + &Change { + id: make_id("expired"), + new: Some(make_id("expired/rev")), + old: None, + cleanup_after: Some(SystemTime::now() - Duration::from_secs(1)), + }, + ) + .await + .unwrap(); + + let deferred_id = ChangeId::new(); + log.record( + &deferred_id, + &Change { + id: make_id("deferred"), + new: Some(make_id("deferred/rev")), + old: None, + cleanup_after: Some(SystemTime::now() + Duration::from_hours(24)), + }, + ) + .await + .unwrap(); + + assert_eq!(log.len(), 3); + + let entries = log.scan().await.unwrap(); + assert_eq!(entries.len(), 2); + + let ids: Vec<_> = entries.iter().map(|(id, _)| id).collect(); + assert!(ids.contains(&&ready_id)); + assert!(ids.contains(&&expired_id)); + } + + /// After a multipart `complete` created an LT blob but the tombstone was never + /// committed, recovery (once the grace period expires) deletes the orphaned blob. + #[tokio::test] + async fn recovery_cleans_up_orphaned_assembling_blob() { + use crate::backend::common::Backend; + use crate::backend::in_memory::InMemoryBackend; + + let hv = InMemoryBackend::new("hv"); + let lt = InMemoryBackend::new("lt"); + let log = InMemoryChangeLog::default(); + + let logical = make_id("obj"); + let physical = make_id("obj/rev1"); + + // Simulate a blob that was assembled in LT but never got a tombstone. + lt.put_object( + &physical, + &Default::default(), + crate::stream::single("data"), + ) + .await + .unwrap(); + + // Insert a changelog entry with the grace period already expired. + let change_id = ChangeId::new(); + let change = Change { + id: logical, + new: Some(physical.clone()), + old: None, + cleanup_after: Some(SystemTime::now() - Duration::from_secs(1)), + }; + log.record(&change_id, &change).await.unwrap(); + + let manager = ChangeManager::new(Box::new(hv), Box::new(lt.clone()), Box::new(log.clone())); + manager.recover().await.unwrap(); + + lt.get(&physical).expect_not_found(); + + let entries = log.scan().await.unwrap(); + assert!( + entries.is_empty(), + "changelog entry not removed after recovery" + ); + } + + /// If a `complete_multipart` retry succeeded and committed the tombstone + /// before recovery runs, recovery must preserve the referenced blob. + #[tokio::test] + async fn recovery_preserves_assembling_blob_when_retry_committed() { + use crate::backend::common::{Backend, TieredWrite, Tombstone}; + use crate::backend::in_memory::InMemoryBackend; + use objectstore_types::metadata::ExpirationPolicy; + + let hv = InMemoryBackend::new("hv"); + let lt = InMemoryBackend::new("lt"); + let log = InMemoryChangeLog::default(); + + let logical = make_id("obj"); + let physical = make_id("obj/rev1"); + + // Blob exists in LT. + lt.put_object( + &physical, + &Default::default(), + crate::stream::single("data"), + ) + .await + .unwrap(); + + // A retry committed the tombstone pointing to this blob. + let tombstone = Tombstone { + target: physical.clone(), + expiration_policy: ExpirationPolicy::Manual, + }; + hv.compare_and_write(&logical, None, TieredWrite::Tombstone(tombstone)) + .await + .unwrap(); + + // Insert an expired changelog entry. + let change_id = ChangeId::new(); + let change = Change { + id: logical, + new: Some(physical.clone()), + old: None, + cleanup_after: Some(SystemTime::now() - Duration::from_secs(1)), + }; + log.record(&change_id, &change).await.unwrap(); + + let manager = ChangeManager::new(Box::new(hv), Box::new(lt.clone()), Box::new(log.clone())); + manager.recover().await.unwrap(); + + // Blob must still exist — the tombstone references it. + lt.get(&physical).expect_object(); + + let entries = log.scan().await.unwrap(); + assert!( + entries.is_empty(), + "changelog entry not removed after recovery" + ); + } } diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index 6f314233..e7c13c76 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -241,6 +241,11 @@ impl TieredStorage { self.inner.clone().record(change).await } + /// Records the change to the log in the `Assembling` phase, and returns a guard that cleans up on drop. + async fn record_assembling(&self, change: Change) -> Result { + self.inner.clone().record_assembling(change).await + } + /// Returns the name of the backend corresponding to the given routing choice. fn backend_type(&self, choice: &BackendChoice) -> &'static str { match choice { @@ -276,6 +281,7 @@ impl TieredStorage { id: id.clone(), new: None, old: Some(target.clone()), + cleanup_after: None, }) .await?; @@ -317,6 +323,7 @@ impl TieredStorage { id: id.clone(), new: Some(new.clone()), old: current.clone(), + cleanup_after: None, }) .await?; @@ -471,6 +478,7 @@ impl Backend for TieredStorage { id: id.clone(), new: None, old: Some(tombstone.target.clone()), + cleanup_after: None, }) .await?; guard.advance(ChangePhase::Written); @@ -684,10 +692,11 @@ impl MultipartUploadBackend for TieredStorage { key: tiered.revision, }; let mut guard = self - .record_change(Change { + .record_assembling(Change { id: id.clone(), new: Some(physical.clone()), old: current.clone(), + cleanup_after: None, }) .await?; @@ -1364,6 +1373,7 @@ mod tests { id: make_id("object-key"), new: Some(make_id("cleanup-target")), old: None, + cleanup_after: None, }; // Build the guard inside a temporary runtime, then let the runtime drop @@ -1387,6 +1397,7 @@ mod tests { id: make_id("object-key"), new: None, old: None, + cleanup_after: None, }; let mut guard = storage.record_change(change).await.unwrap(); @@ -1729,6 +1740,99 @@ mod tests { assert_eq!(body, payload2); } + // --- Multipart assembling guard --- + + /// Completes the multipart upload on the inner backend, then returns an error + /// to simulate a lost response. + #[derive(Debug)] + struct CompleteMultipartThenFail; + + #[async_trait::async_trait] + impl Hooks for CompleteMultipartThenFail { + async fn complete_multipart( + &self, + inner: &InMemoryBackend, + id: &ObjectId, + upload_id: &UploadId, + parts: Vec, + ) -> Result { + inner.complete_multipart(id, upload_id, parts).await?; + Err(Error::Io(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "simulated lost response", + ))) + } + } + + /// When `complete_multipart` succeeds on the inner backend but the response is + /// lost, the guard drops in `Assembling` phase. In-process cleanup must be + /// skipped (the blob must survive) and the changelog entry must persist for + /// a future recovery scan. + #[tokio::test] + async fn assembling_guard_skips_cleanup_and_preserves_log_entry() { + let lt_inner = InMemoryBackend::new("lt"); + let log = InMemoryChangeLog::default(); + let storage = TieredStorage::new( + Box::new(InMemoryBackend::new("hv")), + Box::new(TestBackend::with_inner( + lt_inner.clone(), + CompleteMultipartThenFail, + )), + Box::new(log.clone()), + ); + + let id = make_id("mp-assembling"); + let upload_id = storage + .initiate_multipart(&id, &Default::default()) + .await + .unwrap(); + + let payload = vec![0xABu8; 2 * 1024 * 1024]; + let etag = storage + .upload_part( + &id, + &upload_id, + 1, + payload.len() as u64, + None, + stream::single(payload), + ) + .await + .unwrap(); + + // complete_multipart fails (simulated lost response) — guard drops in Assembling. + let result = storage + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag, + }], + ) + .await; + assert!(result.is_err()); + + // Drain any background tasks. + storage.join().await; + + // The blob must NOT have been cleaned up. + assert!( + !lt_inner.is_empty(), + "LT blob should survive Assembling guard drop" + ); + + // The changelog entry must persist for future recovery. + // scan() filters by cleanup_after (24h in the future), so the entry is hidden. + let entries = log.scan().await.unwrap(); + assert!( + entries.is_empty(), + "entry should be filtered by scan() due to cleanup_after" + ); + // But the entry is still in the log. + assert_eq!(log.len(), 1, "changelog entry must persist"); + } + #[tokio::test] async fn multipart_list_parts() { let (storage, _hv, _lt, _) = make_tiered_storage(); From ec8413dfcd25dbb81f3b3771f01ca264d8e0285f Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Tue, 26 May 2026 16:57:41 +0200 Subject: [PATCH 02/22] improve --- objectstore-service/src/backend/changelog.rs | 24 ++++++++------------ objectstore-service/src/backend/tiered.rs | 13 +++++++---- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/objectstore-service/src/backend/changelog.rs b/objectstore-service/src/backend/changelog.rs index 6c943060..753cfc34 100644 --- a/objectstore-service/src/backend/changelog.rs +++ b/objectstore-service/src/backend/changelog.rs @@ -36,10 +36,6 @@ const INITIAL_BACKOFF: Duration = Duration::from_millis(100); /// Maximum delay for exponential backoff retries in background cleanup tasks. const MAX_BACKOFF: Duration = Duration::from_secs(30); -/// Amount of time for which a change is kept in the Assembling state before becoming eligible for -/// cleanup. -const ASSEMBLING_CLEANUP_DELAY: Duration = Duration::from_hours(24); - /// Unique identifier for a change log entry. /// /// Generated per-operation as a UUIDv7. In durable storage, scoped to the @@ -82,8 +78,7 @@ pub struct Change { pub old: Option, /// Earliest time at which this entry becomes eligible for cleanup. /// - /// [`ChangeLog::scan`] automatically filters out entries whose deadline has not yet been - /// reached. + /// [`ChangeLog::scan`] filters out the entry, unless the deadline has passed. pub cleanup_after: Option, } @@ -143,12 +138,15 @@ impl ChangeManager { Ok(ChangeGuard { state: Some(state) }) } - /// Records the change to the log and returns a guard. + /// Records the change to the log and returns a guard in the `Assembling` state. /// - /// Behaves like [`Self::record`], except that the guard is created in the `Assembling` state and - /// cleanup is only attempted after `ASSEMBLING_CLEANUP_DELAY`. - pub async fn record_assembling(self: Arc, mut change: Change) -> Result { - change.cleanup_after = Some(SystemTime::now() + ASSEMBLING_CLEANUP_DELAY); + /// Behaves like [`Self::record`], except that the guard is created in the + /// `Assembling` state with [`Change::cleanup_after`] as its deadline. + /// This has the affect that cleanup will only be performed after the deadline has passed. + pub async fn record_assembling(self: Arc, change: Change) -> Result { + let deadline = change + .cleanup_after + .expect("assembling Change must have a cleanup_after deadline"); let token = self.tracker.token(); @@ -158,9 +156,7 @@ impl ChangeManager { let state = ChangeState { id, change, - phase: ChangePhase::Assembling { - deadline: SystemTime::now() + ASSEMBLING_CLEANUP_DELAY, - }, + phase: ChangePhase::Assembling { deadline }, manager: self.clone(), _token: token, }; diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index e7c13c76..16c56d50 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -99,7 +99,7 @@ use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; -use std::time::Instant; +use std::time::{Duration, Instant, SystemTime}; use base64::Engine as _; use bytes::Bytes; @@ -123,6 +123,13 @@ use crate::stream::{ClientStream, SizedPeek}; /// The threshold up until which we will go to the "high volume" backend. const BACKEND_SIZE_THRESHOLD: usize = 1024 * 1024; // 1 MiB + +/// Amount of time for which a `Change` generated by a `complete_multipart` operation is kept in the `Assembling` +/// state before becoming eligible for cleanup by the `ChangeLog` recovery process. +/// This allows the client to retry the `complete_multipart` operation upon any failures for at least this long, +/// avoiding scenarios where the `ChangeLog` recovery would race to delete the assembled LT blob. +const MULTIPART_COMPLETE_CLEANUP_DELAY: Duration = Duration::from_hours(24); + /// Creates a new [`ObjectId`] with the same context but a unique revision key. /// /// The new key has the format `{original_key}/{uuid_v7}`, producing a distinct @@ -696,7 +703,7 @@ impl MultipartUploadBackend for TieredStorage { id: id.clone(), new: Some(physical.clone()), old: current.clone(), - cleanup_after: None, + cleanup_after: Some(SystemTime::now() + MULTIPART_COMPLETE_CLEANUP_DELAY), }) .await?; @@ -743,8 +750,6 @@ impl MultipartUploadBackend for TieredStorage { #[cfg(test)] mod tests { - use std::time::Duration; - use objectstore_types::metadata::{ExpirationPolicy, Metadata}; use objectstore_types::scope::{Scope, Scopes}; From 922af5e4a2333cec15a03f0b5c6f956e18bcdf57 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Tue, 26 May 2026 17:20:16 +0200 Subject: [PATCH 03/22] typo --- objectstore-service/src/backend/changelog.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/objectstore-service/src/backend/changelog.rs b/objectstore-service/src/backend/changelog.rs index 753cfc34..43c221f2 100644 --- a/objectstore-service/src/backend/changelog.rs +++ b/objectstore-service/src/backend/changelog.rs @@ -142,7 +142,7 @@ impl ChangeManager { /// /// Behaves like [`Self::record`], except that the guard is created in the /// `Assembling` state with [`Change::cleanup_after`] as its deadline. - /// This has the affect that cleanup will only be performed after the deadline has passed. + /// This has the effect that cleanup will only be performed after the deadline has passed. pub async fn record_assembling(self: Arc, change: Change) -> Result { let deadline = change .cleanup_after From 20f873561ef28ac5ff2002b7b4738dcb4b3235fc Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Tue, 26 May 2026 17:21:10 +0200 Subject: [PATCH 04/22] rename --- objectstore-service/src/backend/changelog.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/objectstore-service/src/backend/changelog.rs b/objectstore-service/src/backend/changelog.rs index 43c221f2..5e590201 100644 --- a/objectstore-service/src/backend/changelog.rs +++ b/objectstore-service/src/backend/changelog.rs @@ -144,7 +144,7 @@ impl ChangeManager { /// `Assembling` state with [`Change::cleanup_after`] as its deadline. /// This has the effect that cleanup will only be performed after the deadline has passed. pub async fn record_assembling(self: Arc, change: Change) -> Result { - let deadline = change + let cleanup_after = change .cleanup_after .expect("assembling Change must have a cleanup_after deadline"); @@ -156,7 +156,7 @@ impl ChangeManager { let state = ChangeState { id, change, - phase: ChangePhase::Assembling { deadline }, + phase: ChangePhase::Assembling { cleanup_after }, manager: self.clone(), _token: token, }; @@ -319,7 +319,7 @@ pub enum ChangePhase { /// Therefore, cleanup of changes in this phase is deferred. Assembling { /// Earliest time at which this entry becomes eligible for cleanup. - deadline: SystemTime, + cleanup_after: SystemTime, }, /// LT upload has succeeded and the tombstone is being updated. Written, From a762fd584e024b251f47eef9f28f3c7b32918139 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Tue, 26 May 2026 17:25:40 +0200 Subject: [PATCH 05/22] improve --- objectstore-service/src/backend/changelog.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/objectstore-service/src/backend/changelog.rs b/objectstore-service/src/backend/changelog.rs index 5e590201..3db65692 100644 --- a/objectstore-service/src/backend/changelog.rs +++ b/objectstore-service/src/backend/changelog.rs @@ -365,7 +365,7 @@ impl ChangeState { ChangePhase::Assembling { .. } => { objectstore_log::error!( change = ?self.change, - "Assembling change should not reach in-process cleanup" + "Assembling change reached in-process cleanup, ignoring (this should not happen)" ); return; } @@ -437,13 +437,7 @@ impl ChangeState { impl Drop for ChangeState { fn drop(&mut self) { match self.phase { - ChangePhase::Completed => {} - ChangePhase::Assembling { .. } => { - objectstore_log::debug!( - change = ?self.change, - "Assembling change deferred to persistent changelog recovery" - ); - } + ChangePhase::Completed | ChangePhase::Assembling { .. } => {} _ => { objectstore_log::error!( change = ?self.change, @@ -483,6 +477,8 @@ impl Drop for ChangeGuard { { handle.spawn(state.cleanup()); } + + // NB: Drop of `ChangeState` logs an error if cleanup is not scheduled. } } From 29dcacd7c9d3d80932fc14b2a98f731ab9386070 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Tue, 26 May 2026 17:28:12 +0200 Subject: [PATCH 06/22] improve --- objectstore-service/src/backend/changelog.rs | 103 ------------------- 1 file changed, 103 deletions(-) diff --git a/objectstore-service/src/backend/changelog.rs b/objectstore-service/src/backend/changelog.rs index 3db65692..16bbe3cc 100644 --- a/objectstore-service/src/backend/changelog.rs +++ b/objectstore-service/src/backend/changelog.rs @@ -622,8 +622,6 @@ mod tests { .await .unwrap(); - assert_eq!(log.len(), 3); - let entries = log.scan().await.unwrap(); assert_eq!(entries.len(), 2); @@ -631,105 +629,4 @@ mod tests { assert!(ids.contains(&&ready_id)); assert!(ids.contains(&&expired_id)); } - - /// After a multipart `complete` created an LT blob but the tombstone was never - /// committed, recovery (once the grace period expires) deletes the orphaned blob. - #[tokio::test] - async fn recovery_cleans_up_orphaned_assembling_blob() { - use crate::backend::common::Backend; - use crate::backend::in_memory::InMemoryBackend; - - let hv = InMemoryBackend::new("hv"); - let lt = InMemoryBackend::new("lt"); - let log = InMemoryChangeLog::default(); - - let logical = make_id("obj"); - let physical = make_id("obj/rev1"); - - // Simulate a blob that was assembled in LT but never got a tombstone. - lt.put_object( - &physical, - &Default::default(), - crate::stream::single("data"), - ) - .await - .unwrap(); - - // Insert a changelog entry with the grace period already expired. - let change_id = ChangeId::new(); - let change = Change { - id: logical, - new: Some(physical.clone()), - old: None, - cleanup_after: Some(SystemTime::now() - Duration::from_secs(1)), - }; - log.record(&change_id, &change).await.unwrap(); - - let manager = ChangeManager::new(Box::new(hv), Box::new(lt.clone()), Box::new(log.clone())); - manager.recover().await.unwrap(); - - lt.get(&physical).expect_not_found(); - - let entries = log.scan().await.unwrap(); - assert!( - entries.is_empty(), - "changelog entry not removed after recovery" - ); - } - - /// If a `complete_multipart` retry succeeded and committed the tombstone - /// before recovery runs, recovery must preserve the referenced blob. - #[tokio::test] - async fn recovery_preserves_assembling_blob_when_retry_committed() { - use crate::backend::common::{Backend, TieredWrite, Tombstone}; - use crate::backend::in_memory::InMemoryBackend; - use objectstore_types::metadata::ExpirationPolicy; - - let hv = InMemoryBackend::new("hv"); - let lt = InMemoryBackend::new("lt"); - let log = InMemoryChangeLog::default(); - - let logical = make_id("obj"); - let physical = make_id("obj/rev1"); - - // Blob exists in LT. - lt.put_object( - &physical, - &Default::default(), - crate::stream::single("data"), - ) - .await - .unwrap(); - - // A retry committed the tombstone pointing to this blob. - let tombstone = Tombstone { - target: physical.clone(), - expiration_policy: ExpirationPolicy::Manual, - }; - hv.compare_and_write(&logical, None, TieredWrite::Tombstone(tombstone)) - .await - .unwrap(); - - // Insert an expired changelog entry. - let change_id = ChangeId::new(); - let change = Change { - id: logical, - new: Some(physical.clone()), - old: None, - cleanup_after: Some(SystemTime::now() - Duration::from_secs(1)), - }; - log.record(&change_id, &change).await.unwrap(); - - let manager = ChangeManager::new(Box::new(hv), Box::new(lt.clone()), Box::new(log.clone())); - manager.recover().await.unwrap(); - - // Blob must still exist — the tombstone references it. - lt.get(&physical).expect_object(); - - let entries = log.scan().await.unwrap(); - assert!( - entries.is_empty(), - "changelog entry not removed after recovery" - ); - } } From 7426ccd64bcf69161b22e8d5cb463364b4c7a37b Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Tue, 26 May 2026 17:30:22 +0200 Subject: [PATCH 07/22] improve --- objectstore-service/src/backend/changelog.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/objectstore-service/src/backend/changelog.rs b/objectstore-service/src/backend/changelog.rs index 16bbe3cc..ba16721b 100644 --- a/objectstore-service/src/backend/changelog.rs +++ b/objectstore-service/src/backend/changelog.rs @@ -144,10 +144,7 @@ impl ChangeManager { /// `Assembling` state with [`Change::cleanup_after`] as its deadline. /// This has the effect that cleanup will only be performed after the deadline has passed. pub async fn record_assembling(self: Arc, change: Change) -> Result { - let cleanup_after = change - .cleanup_after - .expect("assembling Change must have a cleanup_after deadline"); - + let cleanup_after = change.cleanup_after.unwrap_or_else(|| SystemTime::now()); let token = self.tracker.token(); let id = ChangeId::new(); From a05c3e08c057909a049d1be948d6a3e113eb9a78 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Tue, 26 May 2026 17:38:57 +0200 Subject: [PATCH 08/22] improve --- objectstore-service/src/backend/changelog.rs | 18 +- objectstore-service/src/backend/tiered.rs | 172 ++++++++++++++++++- 2 files changed, 172 insertions(+), 18 deletions(-) diff --git a/objectstore-service/src/backend/changelog.rs b/objectstore-service/src/backend/changelog.rs index ba16721b..eccf55a2 100644 --- a/objectstore-service/src/backend/changelog.rs +++ b/objectstore-service/src/backend/changelog.rs @@ -144,7 +144,7 @@ impl ChangeManager { /// `Assembling` state with [`Change::cleanup_after`] as its deadline. /// This has the effect that cleanup will only be performed after the deadline has passed. pub async fn record_assembling(self: Arc, change: Change) -> Result { - let cleanup_after = change.cleanup_after.unwrap_or_else(|| SystemTime::now()); + let cleanup_after = change.cleanup_after.unwrap_or_else(SystemTime::now); let token = self.tracker.token(); let id = ChangeId::new(); @@ -238,15 +238,15 @@ pub struct InMemoryChangeLog { entries: Arc>>, } +#[cfg(test)] impl InMemoryChangeLog { - /// Returns the total number of entries, ignoring `cleanup_after` filtering. - pub fn len(&self) -> usize { - self.entries.lock().expect("lock poisoned").len() - } - - /// Returns `true` if the log contains no entries, ignoring `cleanup_after` filtering. - pub fn is_empty(&self) -> bool { - self.entries.lock().expect("lock poisoned").is_empty() + /// Sets `cleanup_after` to the past for all entries, making them immediately + /// eligible for [`ChangeLog::scan`]. + pub fn expire_all(&self) { + let mut entries = self.entries.lock().expect("lock poisoned"); + for change in entries.values_mut() { + change.cleanup_after = Some(SystemTime::UNIX_EPOCH); + } } } diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index 16c56d50..77148c5a 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -1761,7 +1761,10 @@ mod tests { upload_id: &UploadId, parts: Vec, ) -> Result { - inner.complete_multipart(id, upload_id, parts).await?; + inner + .complete_multipart(id, upload_id, parts) + .await + .unwrap(); Err(Error::Io(std::io::Error::new( std::io::ErrorKind::TimedOut, "simulated lost response", @@ -1774,7 +1777,7 @@ mod tests { /// skipped (the blob must survive) and the changelog entry must persist for /// a future recovery scan. #[tokio::test] - async fn assembling_guard_skips_cleanup_and_preserves_log_entry() { + async fn assembling_guard_defers_cleanup() { let lt_inner = InMemoryBackend::new("lt"); let log = InMemoryChangeLog::default(); let storage = TieredStorage::new( @@ -1826,16 +1829,167 @@ mod tests { !lt_inner.is_empty(), "LT blob should survive Assembling guard drop" ); + } - // The changelog entry must persist for future recovery. - // scan() filters by cleanup_after (24h in the future), so the entry is hidden. - let entries = log.scan().await.unwrap(); + /// End-to-end: `complete_multipart` assembles the blob but the response is + /// lost. After the grace period expires, changelog recovery deletes the + /// orphaned blob because no tombstone was ever committed. + #[tokio::test] + async fn assembling_recovery_cleans_up_orphaned_blob() { + let hv = InMemoryBackend::new("hv"); + let lt_inner = InMemoryBackend::new("lt"); + let log = InMemoryChangeLog::default(); + let storage = TieredStorage::new( + Box::new(hv.clone()), + Box::new(TestBackend::with_inner( + lt_inner.clone(), + CompleteMultipartThenFail, + )), + Box::new(log.clone()), + ); + + let id = make_id("mp-orphan-e2e"); + let upload_id = storage + .initiate_multipart(&id, &Default::default()) + .await + .unwrap(); + + let tiered_id: TieredUploadId = (&upload_id).try_into().unwrap(); + let physical = ObjectId { + context: id.context.clone(), + key: tiered_id.revision, + }; + + let payload = vec![0xABu8; 2 * 1024 * 1024]; + let etag = storage + .upload_part( + &id, + &upload_id, + 1, + payload.len() as u64, + None, + stream::single(payload), + ) + .await + .unwrap(); + + // complete_multipart assembles the blob but the response is lost. + let result = storage + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag, + }], + ) + .await; + assert!(result.is_err()); + storage.join().await; + + // Blob exists, no tombstone committed. + lt_inner.get(&physical).expect_object(); + hv.get(&id).expect_not_found(); + + // Simulate the grace period expiring and run recovery. + log.expire_all(); + let manager = ChangeManager::new( + Box::new(hv.clone()), + Box::new(lt_inner.clone()), + Box::new(log.clone()), + ); + manager.recover().await.unwrap(); + + // Orphaned blob must be cleaned up. + lt_inner.get(&physical).expect_not_found(); + let remaining = log.scan().await.unwrap(); assert!( - entries.is_empty(), - "entry should be filtered by scan() due to cleanup_after" + remaining.is_empty(), + "changelog entry not removed after recovery" + ); + } + + /// End-to-end: `complete_multipart` assembles the blob and the response is + /// lost, but a retry succeeds and commits the tombstone. After the grace + /// period expires, changelog recovery preserves the referenced blob. + #[tokio::test] + async fn assembling_recovery_preserves_blob_when_retry_committed() { + let hv = InMemoryBackend::new("hv"); + let lt_inner = InMemoryBackend::new("lt"); + let log = InMemoryChangeLog::default(); + let storage = TieredStorage::new( + Box::new(hv.clone()), + Box::new(TestBackend::with_inner( + lt_inner.clone(), + CompleteMultipartThenFail, + )), + Box::new(log.clone()), + ); + + let id = make_id("mp-retry-e2e"); + let upload_id = storage + .initiate_multipart(&id, &Default::default()) + .await + .unwrap(); + + let tiered_id: TieredUploadId = (&upload_id).try_into().unwrap(); + let physical = ObjectId { + context: id.context.clone(), + key: tiered_id.revision, + }; + + let payload = vec![0xABu8; 2 * 1024 * 1024]; + let etag = storage + .upload_part( + &id, + &upload_id, + 1, + payload.len() as u64, + None, + stream::single(payload), + ) + .await + .unwrap(); + + // complete_multipart assembles the blob but the response is lost. + let result = storage + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag, + }], + ) + .await; + assert!(result.is_err()); + storage.join().await; + + // Simulate a successful retry that committed the tombstone. + let tombstone = Tombstone { + target: physical.clone(), + expiration_policy: ExpirationPolicy::Manual, + }; + hv.compare_and_write(&id, None, TieredWrite::Tombstone(tombstone)) + .await + .unwrap(); + + // Simulate the grace period expiring and run recovery. + log.expire_all(); + let manager = ChangeManager::new( + Box::new(hv.clone()), + Box::new(lt_inner.clone()), + Box::new(log.clone()), + ); + manager.recover().await.unwrap(); + + // Blob must still exist — the tombstone references it. + lt_inner.get(&physical).expect_object(); + let remaining = log.scan().await.unwrap(); + assert!( + remaining.is_empty(), + "changelog entry not removed after recovery" ); - // But the entry is still in the log. - assert_eq!(log.len(), 1, "changelog entry must persist"); } #[tokio::test] From d5e0656a145f3f093da2e10e630869bb079eaf12 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Tue, 26 May 2026 17:45:21 +0200 Subject: [PATCH 09/22] improve --- objectstore-service/src/backend/changelog.rs | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/objectstore-service/src/backend/changelog.rs b/objectstore-service/src/backend/changelog.rs index eccf55a2..8ec46602 100644 --- a/objectstore-service/src/backend/changelog.rs +++ b/objectstore-service/src/backend/changelog.rs @@ -140,11 +140,9 @@ impl ChangeManager { /// Records the change to the log and returns a guard in the `Assembling` state. /// - /// Behaves like [`Self::record`], except that the guard is created in the - /// `Assembling` state with [`Change::cleanup_after`] as its deadline. - /// This has the effect that cleanup will only be performed after the deadline has passed. + /// Behaves like [`Self::record`], except that the guard is created in the `Assembling` state. + /// This has the effect of deferring cleanup, e.g. to let the user retry the assembly. pub async fn record_assembling(self: Arc, change: Change) -> Result { - let cleanup_after = change.cleanup_after.unwrap_or_else(SystemTime::now); let token = self.tracker.token(); let id = ChangeId::new(); @@ -153,7 +151,7 @@ impl ChangeManager { let state = ChangeState { id, change, - phase: ChangePhase::Assembling { cleanup_after }, + phase: ChangePhase::Assembling, manager: self.clone(), _token: token, }; @@ -314,10 +312,7 @@ pub enum ChangePhase { /// Multipart upload completion can fail, and we want the client to be able to retry it /// without the change cleanup process racing to delete the LT blob. /// Therefore, cleanup of changes in this phase is deferred. - Assembling { - /// Earliest time at which this entry becomes eligible for cleanup. - cleanup_after: SystemTime, - }, + Assembling, /// LT upload has succeeded and the tombstone is being updated. Written, /// The tombstone update failed due to a conflict. @@ -359,7 +354,7 @@ impl ChangeState { // For `Recovered`, we must first check the state of the tombstone. ChangePhase::Recovered => self.read_tombstone().await, ChangePhase::Recorded => self.change.old.clone(), - ChangePhase::Assembling { .. } => { + ChangePhase::Assembling => { objectstore_log::error!( change = ?self.change, "Assembling change reached in-process cleanup, ignoring (this should not happen)" @@ -434,7 +429,7 @@ impl ChangeState { impl Drop for ChangeState { fn drop(&mut self) { match self.phase { - ChangePhase::Completed | ChangePhase::Assembling { .. } => {} + ChangePhase::Completed | ChangePhase::Assembling => {} _ => { objectstore_log::error!( change = ?self.change, @@ -469,7 +464,7 @@ impl Drop for ChangeGuard { fn drop(&mut self) { if let Some(state) = self.state.take() && state.phase != ChangePhase::Completed - && !matches!(state.phase, ChangePhase::Assembling { .. }) + && !matches!(state.phase, ChangePhase::Assembling) && let Ok(handle) = tokio::runtime::Handle::try_current() { handle.spawn(state.cleanup()); From 627c81904e6393432da5683d0df133599fe0940a Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Tue, 26 May 2026 17:51:40 +0200 Subject: [PATCH 10/22] wip --- objectstore-service/src/backend/changelog.rs | 34 ++++++++------------ 1 file changed, 13 insertions(+), 21 deletions(-) diff --git a/objectstore-service/src/backend/changelog.rs b/objectstore-service/src/backend/changelog.rs index 8ec46602..d2254385 100644 --- a/objectstore-service/src/backend/changelog.rs +++ b/objectstore-service/src/backend/changelog.rs @@ -236,18 +236,6 @@ pub struct InMemoryChangeLog { entries: Arc>>, } -#[cfg(test)] -impl InMemoryChangeLog { - /// Sets `cleanup_after` to the past for all entries, making them immediately - /// eligible for [`ChangeLog::scan`]. - pub fn expire_all(&self) { - let mut entries = self.entries.lock().expect("lock poisoned"); - for change in entries.values_mut() { - change.cleanup_after = Some(SystemTime::UNIX_EPOCH); - } - } -} - #[async_trait::async_trait] impl ChangeLog for InMemoryChangeLog { async fn record(&self, id: &ChangeId, change: &Change) -> Result<()> { @@ -277,6 +265,17 @@ impl ChangeLog for InMemoryChangeLog { } } +#[cfg(test)] +impl InMemoryChangeLog { + /// Sets [`Change::cleanup_after`] to the past for all entries, forcing them to be returned by a subsequent [`ChangeLog::scan`]. + pub fn expire_all(&self) { + let mut entries = self.entries.lock().expect("lock poisoned"); + for change in entries.values_mut() { + change.cleanup_after = Some(SystemTime::UNIX_EPOCH); + } + } +} + /// [`ChangeLog`] implementation that discards all entries. /// /// Used as the default when no durable log is configured. Provides no @@ -354,18 +353,11 @@ impl ChangeState { // For `Recovered`, we must first check the state of the tombstone. ChangePhase::Recovered => self.read_tombstone().await, ChangePhase::Recorded => self.change.old.clone(), - ChangePhase::Assembling => { - objectstore_log::error!( - change = ?self.change, - "Assembling change reached in-process cleanup, ignoring (this should not happen)" - ); - return; - } // For `Written`, the CAS outcome is unknown — read HV to determine it. ChangePhase::Written => self.read_tombstone().await, ChangePhase::Lost => self.change.old.clone(), ChangePhase::Updated => self.change.new.clone(), - ChangePhase::Completed => return, // unreachable + ChangePhase::Assembling | ChangePhase::Completed => return, // unreachable }; if current != self.change.old @@ -429,7 +421,7 @@ impl ChangeState { impl Drop for ChangeState { fn drop(&mut self) { match self.phase { - ChangePhase::Completed | ChangePhase::Assembling => {} + ChangePhase::Assembling | ChangePhase::Completed => {} _ => { objectstore_log::error!( change = ?self.change, From 98c7d1818b5e70054a95fe2b3ad548a07d670527 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Tue, 26 May 2026 18:28:39 +0200 Subject: [PATCH 11/22] improve --- objectstore-service/src/backend/changelog.rs | 3 +- objectstore-service/src/backend/tiered.rs | 177 +++++++++---------- 2 files changed, 81 insertions(+), 99 deletions(-) diff --git a/objectstore-service/src/backend/changelog.rs b/objectstore-service/src/backend/changelog.rs index d2254385..3de7d40e 100644 --- a/objectstore-service/src/backend/changelog.rs +++ b/objectstore-service/src/backend/changelog.rs @@ -141,7 +141,8 @@ impl ChangeManager { /// Records the change to the log and returns a guard in the `Assembling` state. /// /// Behaves like [`Self::record`], except that the guard is created in the `Assembling` state. - /// This has the effect of deferring cleanup, e.g. to let the user retry the assembly. + /// Unlike other states, this guard does nothing on drop, deferring cleanup to + /// [`Change::cleanup_after`] via the [`ChangeLog`]. pub async fn record_assembling(self: Arc, change: Change) -> Result { let token = self.tracker.token(); diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index 77148c5a..e4d023a9 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -124,7 +124,7 @@ use crate::stream::{ClientStream, SizedPeek}; /// The threshold up until which we will go to the "high volume" backend. const BACKEND_SIZE_THRESHOLD: usize = 1024 * 1024; // 1 MiB -/// Amount of time for which a `Change` generated by a `complete_multipart` operation is kept in the `Assembling` +/// Amount of time for which a `Change` generated by a `complete_multipart` operation is kept in the `Assembling` /// state before becoming eligible for cleanup by the `ChangeLog` recovery process. /// This allows the client to retry the `complete_multipart` operation upon any failures for at least this long, /// avoiding scenarios where the `ChangeLog` recovery would race to delete the assembled LT blob. @@ -248,7 +248,8 @@ impl TieredStorage { self.inner.clone().record(change).await } - /// Records the change to the log in the `Assembling` phase, and returns a guard that cleans up on drop. + /// Records the change to the log in the `Assembling` phase, and returns a guard that does + /// nothing on drop unless advanced. async fn record_assembling(&self, change: Change) -> Result { self.inner.clone().record_assembling(change).await } @@ -750,6 +751,7 @@ impl MultipartUploadBackend for TieredStorage { #[cfg(test)] mod tests { + use futures::lock::Mutex; use objectstore_types::metadata::{ExpirationPolicy, Metadata}; use objectstore_types::scope::{Scope, Scopes}; @@ -1747,8 +1749,7 @@ mod tests { // --- Multipart assembling guard --- - /// Completes the multipart upload on the inner backend, then returns an error - /// to simulate a lost response. + /// Completes the multipart upload, then returns an error to simulate a lost response. #[derive(Debug)] struct CompleteMultipartThenFail; @@ -1772,70 +1773,10 @@ mod tests { } } - /// When `complete_multipart` succeeds on the inner backend but the response is - /// lost, the guard drops in `Assembling` phase. In-process cleanup must be - /// skipped (the blob must survive) and the changelog entry must persist for - /// a future recovery scan. + /// `self.lt.complete_multipart` assembles the blob successfully, but somehow returns an error. + /// After `MULTIPART_COMPLETE_CLEANUP_DELAY`, `ChangeLog` recovery deletes the orphaned blob. #[tokio::test] - async fn assembling_guard_defers_cleanup() { - let lt_inner = InMemoryBackend::new("lt"); - let log = InMemoryChangeLog::default(); - let storage = TieredStorage::new( - Box::new(InMemoryBackend::new("hv")), - Box::new(TestBackend::with_inner( - lt_inner.clone(), - CompleteMultipartThenFail, - )), - Box::new(log.clone()), - ); - - let id = make_id("mp-assembling"); - let upload_id = storage - .initiate_multipart(&id, &Default::default()) - .await - .unwrap(); - - let payload = vec![0xABu8; 2 * 1024 * 1024]; - let etag = storage - .upload_part( - &id, - &upload_id, - 1, - payload.len() as u64, - None, - stream::single(payload), - ) - .await - .unwrap(); - - // complete_multipart fails (simulated lost response) — guard drops in Assembling. - let result = storage - .complete_multipart( - &id, - &upload_id, - vec![CompletedPart { - part_number: 1, - etag, - }], - ) - .await; - assert!(result.is_err()); - - // Drain any background tasks. - storage.join().await; - - // The blob must NOT have been cleaned up. - assert!( - !lt_inner.is_empty(), - "LT blob should survive Assembling guard drop" - ); - } - - /// End-to-end: `complete_multipart` assembles the blob but the response is - /// lost. After the grace period expires, changelog recovery deletes the - /// orphaned blob because no tombstone was ever committed. - #[tokio::test] - async fn assembling_recovery_cleans_up_orphaned_blob() { + async fn cleans_up_orphan_after_failed_multipart_complete() { let hv = InMemoryBackend::new("hv"); let lt_inner = InMemoryBackend::new("lt"); let log = InMemoryChangeLog::default(); @@ -1848,7 +1789,7 @@ mod tests { Box::new(log.clone()), ); - let id = make_id("mp-orphan-e2e"); + let id = make_id("mp-orphan"); let upload_id = storage .initiate_multipart(&id, &Default::default()) .await @@ -1873,7 +1814,6 @@ mod tests { .await .unwrap(); - // complete_multipart assembles the blob but the response is lost. let result = storage .complete_multipart( &id, @@ -1887,11 +1827,12 @@ mod tests { assert!(result.is_err()); storage.join().await; - // Blob exists, no tombstone committed. + // The LT blob is orphaned, and no cleanup has been performed (yet), due to the guard being + // dropped while in the `Assembling` state. lt_inner.get(&physical).expect_object(); hv.get(&id).expect_not_found(); - // Simulate the grace period expiring and run recovery. + // Simulate the passage of time and run recovery. log.expire_all(); let manager = ChangeManager::new( Box::new(hv.clone()), @@ -1900,20 +1841,57 @@ mod tests { ); manager.recover().await.unwrap(); - // Orphaned blob must be cleaned up. + // The orphaned LT blob has been cleaned up. lt_inner.get(&physical).expect_not_found(); + // The change has been removed from the log. let remaining = log.scan().await.unwrap(); - assert!( - remaining.is_empty(), - "changelog entry not removed after recovery" - ); + assert!(remaining.is_empty()); + } + + #[derive(Debug)] + struct FailOnFirstCompleteMultipartAttempt { + attempt: Mutex, + } + + impl FailOnFirstCompleteMultipartAttempt { + fn new() -> Self { + Self { + attempt: Mutex::new(0), + } + } } - /// End-to-end: `complete_multipart` assembles the blob and the response is - /// lost, but a retry succeeds and commits the tombstone. After the grace - /// period expires, changelog recovery preserves the referenced blob. + #[async_trait::async_trait] + impl Hooks for FailOnFirstCompleteMultipartAttempt { + async fn complete_multipart( + &self, + inner: &InMemoryBackend, + id: &ObjectId, + upload_id: &UploadId, + parts: Vec, + ) -> Result { + let mut attempt = self.attempt.lock().await; + *attempt += 1; + if *attempt == 1 { + return Err(Error::Io(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "simulated lost response", + ))); + } else { + return Ok(inner + .complete_multipart(id, upload_id, parts) + .await + .unwrap()); + } + } + } + + /// The first call to `complete_multipart` fails, which generates a `Change` entry. + /// The second call succeeds. + /// When it's time to clean up, nothing is deleted, as the `complete_multipart` eventually went + /// through before the cleanup deadline. #[tokio::test] - async fn assembling_recovery_preserves_blob_when_retry_committed() { + async fn doesnt_clean_up_blob_after_successful_multipart_complete_on_second_try() { let hv = InMemoryBackend::new("hv"); let lt_inner = InMemoryBackend::new("lt"); let log = InMemoryChangeLog::default(); @@ -1921,12 +1899,12 @@ mod tests { Box::new(hv.clone()), Box::new(TestBackend::with_inner( lt_inner.clone(), - CompleteMultipartThenFail, + FailOnFirstCompleteMultipartAttempt::new(), )), Box::new(log.clone()), ); - let id = make_id("mp-retry-e2e"); + let id = make_id("mp-retry"); let upload_id = storage .initiate_multipart(&id, &Default::default()) .await @@ -1951,30 +1929,35 @@ mod tests { .await .unwrap(); - // complete_multipart assembles the blob but the response is lost. + // The first `complete_multipart` call fails. let result = storage .complete_multipart( &id, &upload_id, vec![CompletedPart { part_number: 1, - etag, + etag: etag.clone(), }], ) .await; assert!(result.is_err()); storage.join().await; - // Simulate a successful retry that committed the tombstone. - let tombstone = Tombstone { - target: physical.clone(), - expiration_policy: ExpirationPolicy::Manual, - }; - hv.compare_and_write(&id, None, TieredWrite::Tombstone(tombstone)) - .await - .unwrap(); + // The second `complete_multipart` call succeeds. + let result = storage + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag, + }], + ) + .await; + assert!(result.is_ok()); + storage.join().await; - // Simulate the grace period expiring and run recovery. + // Simulate the passage of time and run recovery. log.expire_all(); let manager = ChangeManager::new( Box::new(hv.clone()), @@ -1983,13 +1966,11 @@ mod tests { ); manager.recover().await.unwrap(); - // Blob must still exist — the tombstone references it. + // The LT blob must not have been cleaned up, as the write eventually went through. lt_inner.get(&physical).expect_object(); + // The change has been removed from the log. let remaining = log.scan().await.unwrap(); - assert!( - remaining.is_empty(), - "changelog entry not removed after recovery" - ); + assert!(remaining.is_empty()); } #[tokio::test] From b34f103b76c5489b6ce87f55ac55548777dc22be Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Tue, 26 May 2026 18:29:00 +0200 Subject: [PATCH 12/22] improve --- objectstore-service/src/backend/tiered.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index e4d023a9..401551c2 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -1773,7 +1773,7 @@ mod tests { } } - /// `self.lt.complete_multipart` assembles the blob successfully, but somehow returns an error. + /// `complete_multipart` on the inner LT backend assembles the blob successfully, but somehow returns an error. /// After `MULTIPART_COMPLETE_CLEANUP_DELAY`, `ChangeLog` recovery deletes the orphaned blob. #[tokio::test] async fn cleans_up_orphan_after_failed_multipart_complete() { From 9f8c11df0a88c1e6b0807a18f1a8a64a64800169 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Tue, 26 May 2026 18:38:09 +0200 Subject: [PATCH 13/22] improve --- objectstore-service/src/backend/changelog.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/objectstore-service/src/backend/changelog.rs b/objectstore-service/src/backend/changelog.rs index 3de7d40e..1d39ecf3 100644 --- a/objectstore-service/src/backend/changelog.rs +++ b/objectstore-service/src/backend/changelog.rs @@ -141,8 +141,8 @@ impl ChangeManager { /// Records the change to the log and returns a guard in the `Assembling` state. /// /// Behaves like [`Self::record`], except that the guard is created in the `Assembling` state. - /// Unlike other states, this guard does nothing on drop, deferring cleanup to - /// [`Change::cleanup_after`] via the [`ChangeLog`]. + /// Unlike other states, this guard does nothing on drop, leaving the burden of cleaning up to + /// the [`ChangeLog`]. pub async fn record_assembling(self: Arc, change: Change) -> Result { let token = self.tracker.token(); @@ -456,8 +456,8 @@ impl ChangeGuard { impl Drop for ChangeGuard { fn drop(&mut self) { if let Some(state) = self.state.take() + && state.phase != ChangePhase::Assembling && state.phase != ChangePhase::Completed - && !matches!(state.phase, ChangePhase::Assembling) && let Ok(handle) = tokio::runtime::Handle::try_current() { handle.spawn(state.cleanup()); From 6f626efafa4f2370cf0b227620eaaab71ef3c99f Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Tue, 26 May 2026 18:40:15 +0200 Subject: [PATCH 14/22] improve --- objectstore-service/src/backend/tiered.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index 401551c2..52924808 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -1966,8 +1966,11 @@ mod tests { ); manager.recover().await.unwrap(); - // The LT blob must not have been cleaned up, as the write eventually went through. + // The LT blob has not been cleaned up, as the write eventually went through. lt_inner.get(&physical).expect_object(); + // The tombstone still points to the blob. + let tombstone = hv.get(&id).expect_tombstone(); + assert_eq!(tombstone.target, physical); // The change has been removed from the log. let remaining = log.scan().await.unwrap(); assert!(remaining.is_empty()); From 02f9d179e3575f207b8a422a765504360526ed92 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Thu, 28 May 2026 18:17:38 +0200 Subject: [PATCH 15/22] wip --- objectstore-service/src/backend/changelog.rs | 8 +- objectstore-service/src/backend/tiered.rs | 194 +++++++++++-------- 2 files changed, 124 insertions(+), 78 deletions(-) diff --git a/objectstore-service/src/backend/changelog.rs b/objectstore-service/src/backend/changelog.rs index 1d39ecf3..1e82c365 100644 --- a/objectstore-service/src/backend/changelog.rs +++ b/objectstore-service/src/backend/changelog.rs @@ -422,7 +422,13 @@ impl ChangeState { impl Drop for ChangeState { fn drop(&mut self) { match self.phase { - ChangePhase::Assembling | ChangePhase::Completed => {} + ChangePhase::Completed => {} + ChangePhase::Assembling => { + objectstore_log::warn!( + change = ?self.change, + "Operation dropped in Assembling state, cleanup deferred to ChageLog recovery" + ); + } _ => { objectstore_log::error!( change = ?self.change, diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index 52924808..da42452b 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -688,17 +688,21 @@ impl MultipartUploadBackend for TieredStorage { ) -> Result { let tiered: TieredUploadId = upload_id.try_into()?; - // 1. Read current HV revision to establish the write precondition + let physical = ObjectId { + context: id.context.clone(), + key: tiered.revision, + }; + + // 1. Read current HV revision to establish the write precondition. let current = match self.inner.high_volume.get_tiered_metadata(id).await? { + // Optimization: a previous attempt already finalized this revision and tombstone -- report success. + TieredMetadata::Tombstone(t) if t.target == physical => return Ok(None), TieredMetadata::Tombstone(t) => Some(t.target), _ => None, }; - // 2. Complete the upload, creating the object at the given revision key. - let physical = ObjectId { - context: id.context.clone(), - key: tiered.revision, - }; + // Register a guard with cleanup deferred to now + `MULTIPART_COMPLETE_CLEANUP_DELAY`, + // so that the user has the chance to retry finalizing the upload in this timeframe. let mut guard = self .record_assembling(Change { id: id.clone(), @@ -708,28 +712,63 @@ impl MultipartUploadBackend for TieredStorage { }) .await?; - let error = self + // 2. Complete the upload, creating the object at the given revision key. + let maybe_complete_multipart_err = match self .inner .long_term .complete_multipart(&physical, &tiered.upload_id, parts) - .await?; - - if error.is_some() { - return Ok(error); - } - - guard.advance(ChangePhase::Written); + .await + { + // The request went through but we got an error in the response body. + // Transparently proxy the error to the user. + Ok(error) => { + if error.is_some() { + return Ok(error); + } + None + } + // We got status 4xx/5xx, or a network error. + // Either way, `complete_multipart` might have been completed successfully, + // either now or in a previous attempt (in that case, that's a 404 and we indeed end up + // here). + // We cannot know if that's the case yet, so we continue to the next steps. + Err(err) => Some(err), + }; // 3. Retrieve the metadata of the object, which was determined at initiation time, to // get the expiration policy. - let metadata = self - .inner - .long_term - .get_metadata(&physical) - .await? - .ok_or_else(|| { - Error::generic("completed multipart object not found in long-term storage") - })?; + // + // This also serves as an existence check to understand if the LT revision was actually + // created successfully in this or a previous attempt, in which case we just need to + // finalize the tombstone. + let metadata = self.inner.long_term.get_metadata(&physical).await; + + let metadata = match (metadata, maybe_complete_multipart_err) { + // The LT revision already exists, so we can continue to finalize the tombstone. + (Ok(Some(metadata)), _) => metadata, + // The LT revision doesn't exist, cannot proceed. + (Ok(None), Some(err)) => return Err(err), + // The `complete_multipart` succeeded, creating the object, but the `get_metadata` + // immediately after failed to find the object. This should never happen. + (Ok(None), None) => { + objectstore_log::error!( + id = ?id, + upload_id = ?upload_id, + physical = ?physical, + "complete_multipart call succeeded on long_term backend, but subsequent get_metadata found no object" + ); + return Err(Error::generic( + "completed multipart object not found in long-term storage", + )); + } + // Failed to `get_metadata`, cannot proceed. + (Err(get_metadata_err), maybe_complete_multipart_err) => { + // Prefer the `complete_multipart_err`, as it's likely more informative. + // TODO(FS-358): convert this properly. Right now `ApiErrorResponse` will turn this into a 500, + // but we would actually want to transparently surface the original status (and message?) instead. + return Err(maybe_complete_multipart_err.unwrap_or(get_metadata_err)); + } + }; // 4. CAS commit: write tombstone only if HV state matches what we saw. let tombstone = Tombstone { @@ -756,11 +795,12 @@ mod tests { use objectstore_types::scope::{Scope, Scopes}; use super::*; - use crate::backend::changelog::{InMemoryChangeLog, NoopChangeLog}; + use crate::backend::changelog::{ChangeId, InMemoryChangeLog, NoopChangeLog}; use crate::backend::in_memory::InMemoryBackend; use crate::backend::testing::{Hooks, TestBackend}; use crate::error::Error; use crate::id::ObjectContext; + use crate::multipart::CompleteMultipartError; use crate::stream::{self, ClientStream}; fn make_context() -> ObjectContext { @@ -1680,6 +1720,52 @@ mod tests { assert!(storage.get_object(&id).await.unwrap().is_none()); } + #[tokio::test] + async fn multipart_list_parts() { + let (storage, _hv, _lt, _) = make_tiered_storage(); + let id = make_id("mp-list"); + + let upload_id = storage + .initiate_multipart(&id, &Default::default()) + .await + .unwrap(); + + let part1 = vec![0xAAu8; 100]; + let part2 = vec![0xBBu8; 200]; + storage + .upload_part( + &id, + &upload_id, + 1, + part1.len() as u64, + None, + stream::single(part1), + ) + .await + .unwrap(); + storage + .upload_part( + &id, + &upload_id, + 2, + part2.len() as u64, + None, + stream::single(part2), + ) + .await + .unwrap(); + + let resp = storage + .list_parts(&id, &upload_id, None, None) + .await + .unwrap(); + assert_eq!(resp.parts.len(), 2); + assert_eq!(resp.parts[0].part_number, 1); + assert_eq!(resp.parts[0].size, 100); + assert_eq!(resp.parts[1].part_number, 2); + assert_eq!(resp.parts[1].size, 200); + } + #[tokio::test] async fn multipart_overwrites_existing_tombstone() { let (storage, hv, lt, _) = make_tiered_storage(); @@ -1747,14 +1833,14 @@ mod tests { assert_eq!(body, payload2); } - // --- Multipart assembling guard --- + // --- Multipart completion failure handling (consistency, retries, delayed cleanup) --- - /// Completes the multipart upload, then returns an error to simulate a lost response. + /// Completes the multipart upload, but returns an io error to simulate a network error. #[derive(Debug)] - struct CompleteMultipartThenFail; + struct CompleteMultipartButReturnError; #[async_trait::async_trait] - impl Hooks for CompleteMultipartThenFail { + impl Hooks for CompleteMultipartButReturnError { async fn complete_multipart( &self, inner: &InMemoryBackend, @@ -1768,7 +1854,7 @@ mod tests { .unwrap(); Err(Error::Io(std::io::Error::new( std::io::ErrorKind::TimedOut, - "simulated lost response", + "simulated network error", ))) } } @@ -1784,7 +1870,7 @@ mod tests { Box::new(hv.clone()), Box::new(TestBackend::with_inner( lt_inner.clone(), - CompleteMultipartThenFail, + CompleteMultipartButReturnError {}, )), Box::new(log.clone()), ); @@ -1875,7 +1961,7 @@ mod tests { if *attempt == 1 { return Err(Error::Io(std::io::Error::new( std::io::ErrorKind::TimedOut, - "simulated lost response", + "simulated network error", ))); } else { return Ok(inner @@ -1886,7 +1972,7 @@ mod tests { } } - /// The first call to `complete_multipart` fails, which generates a `Change` entry. + /// The first attempt to `complete_multipart` fails, which generates a `Change` entry. /// The second call succeeds. /// When it's time to clean up, nothing is deleted, as the `complete_multipart` eventually went /// through before the cleanup deadline. @@ -1975,50 +2061,4 @@ mod tests { let remaining = log.scan().await.unwrap(); assert!(remaining.is_empty()); } - - #[tokio::test] - async fn multipart_list_parts() { - let (storage, _hv, _lt, _) = make_tiered_storage(); - let id = make_id("mp-list"); - - let upload_id = storage - .initiate_multipart(&id, &Default::default()) - .await - .unwrap(); - - let part1 = vec![0xAAu8; 100]; - let part2 = vec![0xBBu8; 200]; - storage - .upload_part( - &id, - &upload_id, - 1, - part1.len() as u64, - None, - stream::single(part1), - ) - .await - .unwrap(); - storage - .upload_part( - &id, - &upload_id, - 2, - part2.len() as u64, - None, - stream::single(part2), - ) - .await - .unwrap(); - - let resp = storage - .list_parts(&id, &upload_id, None, None) - .await - .unwrap(); - assert_eq!(resp.parts.len(), 2); - assert_eq!(resp.parts[0].part_number, 1); - assert_eq!(resp.parts[0].size, 100); - assert_eq!(resp.parts[1].part_number, 2); - assert_eq!(resp.parts[1].size, 200); - } } From 2bf8358f40ca47db47d4c038aac8acddd25a19a8 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Thu, 28 May 2026 18:19:37 +0200 Subject: [PATCH 16/22] wip --- objectstore-service/src/backend/tiered.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index da42452b..cc9322f4 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -1977,7 +1977,7 @@ mod tests { /// When it's time to clean up, nothing is deleted, as the `complete_multipart` eventually went /// through before the cleanup deadline. #[tokio::test] - async fn doesnt_clean_up_blob_after_successful_multipart_complete_on_second_try() { + async fn multipart_complete_can_be_retried_and_leaves_state_consistent() { let hv = InMemoryBackend::new("hv"); let lt_inner = InMemoryBackend::new("lt"); let log = InMemoryChangeLog::default(); From 65f284c5d76584c5c88ba0607ad6f3fe70ac2638 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Thu, 28 May 2026 18:20:15 +0200 Subject: [PATCH 17/22] wip --- objectstore-service/src/backend/tiered.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index cc9322f4..98076b3b 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -1977,7 +1977,7 @@ mod tests { /// When it's time to clean up, nothing is deleted, as the `complete_multipart` eventually went /// through before the cleanup deadline. #[tokio::test] - async fn multipart_complete_can_be_retried_and_leaves_state_consistent() { + async fn multipart_complete_can_be_retried_if_backend_errs_and_leaves_state_consistent() { let hv = InMemoryBackend::new("hv"); let lt_inner = InMemoryBackend::new("lt"); let log = InMemoryChangeLog::default(); From 57d453696a855904e9866ae25bf703775633dcce Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Thu, 28 May 2026 18:38:11 +0200 Subject: [PATCH 18/22] wip --- objectstore-service/src/backend/tiered.rs | 129 +++++++++++++++++++++- 1 file changed, 128 insertions(+), 1 deletion(-) diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index 98076b3b..5ec2e83e 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -1977,7 +1977,7 @@ mod tests { /// When it's time to clean up, nothing is deleted, as the `complete_multipart` eventually went /// through before the cleanup deadline. #[tokio::test] - async fn multipart_complete_can_be_retried_if_backend_errs_and_leaves_state_consistent() { + async fn multipart_complete_succeeds_on_retry_and_leaves_state_consistent() { let hv = InMemoryBackend::new("hv"); let lt_inner = InMemoryBackend::new("lt"); let log = InMemoryChangeLog::default(); @@ -2061,4 +2061,131 @@ mod tests { let remaining = log.scan().await.unwrap(); assert!(remaining.is_empty()); } + + #[derive(Debug)] + struct FailOnFirstGetMetadataAttempt { + attempt: Mutex, + } + + impl FailOnFirstGetMetadataAttempt { + fn new() -> Self { + Self { + attempt: Mutex::new(0), + } + } + } + + #[async_trait::async_trait] + impl Hooks for FailOnFirstGetMetadataAttempt { + async fn get_metadata( + &self, + inner: &InMemoryBackend, + id: &ObjectId, + ) -> Result { + let mut attempt = self.attempt.lock().await; + *attempt += 1; + if *attempt == 1 { + return Err(Error::Io(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "simulated network error", + ))); + } else { + inner.get_metadata(id).await + } + } + } + + /// The first attempt to `complete_multipart` succeeds on the LT backend, but the subsequent + /// `get_metadata` call fails with a network error, causing the overall `complete_multipart` + /// to fail. The second call retries and succeeds (the LT object already exists from the first + /// attempt). + /// When it's time to clean up, nothing is deleted, as the `complete_multipart` eventually went + /// through before the cleanup deadline. + #[tokio::test] + async fn multipart_complete_succeeds_on_retry_if_get_metadata_errs_and_leaves_state_consistent() + { + let hv = InMemoryBackend::new("hv"); + let lt_inner = InMemoryBackend::new("lt"); + let log = InMemoryChangeLog::default(); + let storage = TieredStorage::new( + Box::new(hv.clone()), + Box::new(TestBackend::with_inner( + lt_inner.clone(), + FailOnFirstGetMetadataAttempt::new(), + )), + Box::new(log.clone()), + ); + + let id = make_id("mp-retry-meta"); + let upload_id = storage + .initiate_multipart(&id, &Default::default()) + .await + .unwrap(); + + let tiered_id: TieredUploadId = (&upload_id).try_into().unwrap(); + let physical = ObjectId { + context: id.context.clone(), + key: tiered_id.revision, + }; + + let payload = vec![0xABu8; 2 * 1024 * 1024]; + let etag = storage + .upload_part( + &id, + &upload_id, + 1, + payload.len() as u64, + None, + stream::single(payload), + ) + .await + .unwrap(); + + // The first `complete_multipart` call fails (get_metadata network error), even though it + // internally creates the LT blob. + let result = storage + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag: etag.clone(), + }], + ) + .await; + assert!(result.is_err()); + storage.join().await; + + // The second `complete_multipart` call succeeds. + let result = storage + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag, + }], + ) + .await; + assert!(result.is_ok()); + storage.join().await; + + // Simulate the passage of time and run recovery. + log.expire_all(); + let manager = ChangeManager::new( + Box::new(hv.clone()), + Box::new(lt_inner.clone()), + Box::new(log.clone()), + ); + manager.recover().await.unwrap(); + + // The LT blob has not been cleaned up, as the write eventually went through. + lt_inner.get(&physical).expect_object(); + // The tombstone still points to the blob. + let tombstone = hv.get(&id).expect_tombstone(); + assert_eq!(tombstone.target, physical); + // The change has been removed from the log. + let remaining = log.scan().await.unwrap(); + assert!(remaining.is_empty()); + } } From 3010525ffa22f7e8c23e2197275422d889eed417 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Thu, 28 May 2026 18:41:15 +0200 Subject: [PATCH 19/22] wip --- objectstore-service/src/backend/tiered.rs | 24 +++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index 5ec2e83e..59994c38 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -2010,7 +2010,7 @@ mod tests { 1, payload.len() as u64, None, - stream::single(payload), + stream::single(payload.clone()), ) .await .unwrap(); @@ -2043,6 +2043,11 @@ mod tests { assert!(result.is_ok()); storage.join().await; + // The object is there. + let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); + let body = stream::read_to_vec(s).await.unwrap(); + assert_eq!(body, payload); + // Simulate the passage of time and run recovery. log.expire_all(); let manager = ChangeManager::new( @@ -2060,6 +2065,11 @@ mod tests { // The change has been removed from the log. let remaining = log.scan().await.unwrap(); assert!(remaining.is_empty()); + + // The object is still there after recovery. + let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); + let body = stream::read_to_vec(s).await.unwrap(); + assert_eq!(body, payload); } #[derive(Debug)] @@ -2136,7 +2146,7 @@ mod tests { 1, payload.len() as u64, None, - stream::single(payload), + stream::single(payload.clone()), ) .await .unwrap(); @@ -2170,6 +2180,11 @@ mod tests { assert!(result.is_ok()); storage.join().await; + // The object is there. + let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); + let body = stream::read_to_vec(s).await.unwrap(); + assert_eq!(body, payload); + // Simulate the passage of time and run recovery. log.expire_all(); let manager = ChangeManager::new( @@ -2187,5 +2202,10 @@ mod tests { // The change has been removed from the log. let remaining = log.scan().await.unwrap(); assert!(remaining.is_empty()); + + // The object is there after recovery. + let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); + let body = stream::read_to_vec(s).await.unwrap(); + assert_eq!(body, payload); } } From f39125ea5e8a6d1836c9aab4a6328afe58c195ee Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Thu, 28 May 2026 18:42:16 +0200 Subject: [PATCH 20/22] clippy --- objectstore-service/src/backend/tiered.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index 59994c38..2dc55342 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -795,12 +795,12 @@ mod tests { use objectstore_types::scope::{Scope, Scopes}; use super::*; - use crate::backend::changelog::{ChangeId, InMemoryChangeLog, NoopChangeLog}; + use crate::backend::changelog::{InMemoryChangeLog, NoopChangeLog}; use crate::backend::in_memory::InMemoryBackend; use crate::backend::testing::{Hooks, TestBackend}; use crate::error::Error; use crate::id::ObjectContext; - use crate::multipart::CompleteMultipartError; + use crate::stream::{self, ClientStream}; fn make_context() -> ObjectContext { From 470e59374e26f0b092ce10790582452b9285eb77 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Thu, 28 May 2026 18:48:19 +0200 Subject: [PATCH 21/22] fix log message --- objectstore-service/src/backend/changelog.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/objectstore-service/src/backend/changelog.rs b/objectstore-service/src/backend/changelog.rs index 1e82c365..885613c6 100644 --- a/objectstore-service/src/backend/changelog.rs +++ b/objectstore-service/src/backend/changelog.rs @@ -426,7 +426,7 @@ impl Drop for ChangeState { ChangePhase::Assembling => { objectstore_log::warn!( change = ?self.change, - "Operation dropped in Assembling state, cleanup deferred to ChageLog recovery" + "Operation dropped in Assembling state, cleanup deferred to ChangeLog recovery" ); } _ => { From d32e4c2dbda9e74858d9757bcc23fdde0c14d4e4 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Thu, 28 May 2026 18:55:32 +0200 Subject: [PATCH 22/22] update test --- objectstore-service/src/backend/tiered.rs | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index 2dc55342..4d329c3e 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -1835,7 +1835,9 @@ mod tests { // --- Multipart completion failure handling (consistency, retries, delayed cleanup) --- - /// Completes the multipart upload, but returns an io error to simulate a network error. + /// Assembles the blob via `complete_multipart`, but returns an error to simulate a network + /// failure on the response path. Also fails `get_metadata` so the tiered layer cannot + /// recover by detecting the already-assembled blob. #[derive(Debug)] struct CompleteMultipartButReturnError; @@ -1854,12 +1856,24 @@ mod tests { .unwrap(); Err(Error::Io(std::io::Error::new( std::io::ErrorKind::TimedOut, - "simulated network error", + "simulated network error on complete_multipart", + ))) + } + + async fn get_metadata( + &self, + _inner: &InMemoryBackend, + _id: &ObjectId, + ) -> Result { + Err(Error::Io(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "simulated network error on get_metadata", ))) } } - /// `complete_multipart` on the inner LT backend assembles the blob successfully, but somehow returns an error. + /// `complete_multipart` on the inner LT backend assembles the blob successfully, but both + /// `complete_multipart` and `get_metadata` return errors, so the tiered layer cannot finalize. /// After `MULTIPART_COMPLETE_CLEANUP_DELAY`, `ChangeLog` recovery deletes the orphaned blob. #[tokio::test] async fn cleans_up_orphan_after_failed_multipart_complete() {