diff --git a/objectstore-service/src/backend/changelog.rs b/objectstore-service/src/backend/changelog.rs index 548be885..885613c6 100644 --- a/objectstore-service/src/backend/changelog.rs +++ b/objectstore-service/src/backend/changelog.rs @@ -22,7 +22,7 @@ use std::collections::HashMap; use std::fmt; use std::sync::{Arc, Mutex}; -use std::time::Duration; +use std::time::{Duration, SystemTime}; use tokio_util::task::TaskTracker; use tokio_util::task::task_tracker::TaskTrackerToken; @@ -76,6 +76,10 @@ pub struct Change { /// /// Needs cleanup on success (the CAS committed and the old blob is unreferenced). pub old: Option, + /// Earliest time at which this entry becomes eligible for cleanup. + /// + /// [`ChangeLog::scan`] filters out the entry, unless the deadline has passed. + pub cleanup_after: Option, } /// Manager for multi-step storage changes, including backends and durable log. @@ -134,6 +138,28 @@ impl ChangeManager { Ok(ChangeGuard { state: Some(state) }) } + /// Records the change to the log and returns a guard in the `Assembling` state. + /// + /// Behaves like [`Self::record`], except that the guard is created in the `Assembling` state. + /// Unlike other states, this guard does nothing on drop, leaving the burden of cleaning up to + /// the [`ChangeLog`]. + pub async fn record_assembling(self: Arc, change: Change) -> Result { + let token = self.tracker.token(); + + let id = ChangeId::new(); + self.changelog.record(&id, &change).await?; + + let state = ChangeState { + id, + change, + phase: ChangePhase::Assembling, + manager: self.clone(), + _token: token, + }; + + Ok(ChangeGuard { state: Some(state) }) + } + /// Scans the changelog for outstanding entries and runs cleanup for each. /// /// Spawn this into a background task at startup to recover from any orphaned objects after a @@ -226,15 +252,31 @@ impl ChangeLog for InMemoryChangeLog { } async fn scan(&self) -> Result> { + let now = SystemTime::now(); let entries = self.entries.lock().expect("lock poisoned"); let result = entries .iter() + .filter(|(_, change)| match change.cleanup_after { + None => true, + Some(deadline) => now >= deadline, + }) .map(|(id, change)| (id.clone(), change.clone())) .collect(); Ok(result) } } +#[cfg(test)] +impl InMemoryChangeLog { + /// Sets [`Change::cleanup_after`] to the past for all entries, forcing them to be returned by a subsequent [`ChangeLog::scan`]. + pub fn expire_all(&self) { + let mut entries = self.entries.lock().expect("lock poisoned"); + for change in entries.values_mut() { + change.cleanup_after = Some(SystemTime::UNIX_EPOCH); + } + } +} + /// [`ChangeLog`] implementation that discards all entries. /// /// Used as the default when no durable log is configured. Provides no @@ -265,6 +307,12 @@ pub enum ChangePhase { Recovered, /// The change is recorded in the log and LT upload has started. Recorded, + /// The LT blob originated from a multipart upload and is being assembled. + /// + /// Multipart upload completion can fail, and we want the client to be able to retry it + /// without the change cleanup process racing to delete the LT blob. + /// Therefore, cleanup of changes in this phase is deferred. + Assembling, /// LT upload has succeeded and the tombstone is being updated. Written, /// The tombstone update failed due to a conflict. @@ -310,7 +358,7 @@ impl ChangeState { ChangePhase::Written => self.read_tombstone().await, ChangePhase::Lost => self.change.old.clone(), ChangePhase::Updated => self.change.new.clone(), - ChangePhase::Completed => return, // unreachable + ChangePhase::Assembling | ChangePhase::Completed => return, // unreachable }; if current != self.change.old @@ -373,12 +421,21 @@ impl ChangeState { impl Drop for ChangeState { fn drop(&mut self) { - if self.phase != ChangePhase::Completed { - objectstore_log::error!( - change = ?self.change, - phase = ?self.phase, - "Operation dropped without completing cleanup" - ); + match self.phase { + ChangePhase::Completed => {} + ChangePhase::Assembling => { + objectstore_log::warn!( + change = ?self.change, + "Operation dropped in Assembling state, cleanup deferred to ChangeLog recovery" + ); + } + _ => { + objectstore_log::error!( + change = ?self.change, + phase = ?self.phase, + "Operation dropped without completing cleanup" + ); + } } } } @@ -405,6 +462,7 @@ impl ChangeGuard { impl Drop for ChangeGuard { fn drop(&mut self) { if let Some(state) = self.state.take() + && state.phase != ChangePhase::Assembling && state.phase != ChangePhase::Completed && let Ok(handle) = tokio::runtime::Handle::try_current() { @@ -440,6 +498,7 @@ mod tests { id: make_id("object-key"), new: Some(make_id("object-key/rev1")), old: None, + cleanup_after: None, }; log.record(&id, &change).await.unwrap(); @@ -457,6 +516,7 @@ mod tests { id: make_id("object-key"), new: None, old: Some(make_id("object-key/rev1")), + cleanup_after: None, }; log.record(&id, &change).await.unwrap(); @@ -495,6 +555,7 @@ mod tests { id: make_id("crash-test"), new: Some(make_id("crash-test/rev")), old: None, + cleanup_after: None, })) .unwrap() // Runtime drops here while `guard` is still alive outside it. @@ -508,4 +569,55 @@ mod tests { let entries = rt.block_on(log.scan()).unwrap(); assert_eq!(entries.len(), 1, "log entry must persist"); } + + #[tokio::test] + async fn scan_filters_by_cleanup_after() { + let log = InMemoryChangeLog::default(); + + let ready_id = ChangeId::new(); + log.record( + &ready_id, + &Change { + id: make_id("ready"), + new: Some(make_id("ready/rev")), + old: None, + cleanup_after: None, + }, + ) + .await + .unwrap(); + + let expired_id = ChangeId::new(); + log.record( + &expired_id, + &Change { + id: make_id("expired"), + new: Some(make_id("expired/rev")), + old: None, + cleanup_after: Some(SystemTime::now() - Duration::from_secs(1)), + }, + ) + .await + .unwrap(); + + let deferred_id = ChangeId::new(); + log.record( + &deferred_id, + &Change { + id: make_id("deferred"), + new: Some(make_id("deferred/rev")), + old: None, + cleanup_after: Some(SystemTime::now() + Duration::from_hours(24)), + }, + ) + .await + .unwrap(); + + let entries = log.scan().await.unwrap(); + assert_eq!(entries.len(), 2); + + let ids: Vec<_> = entries.iter().map(|(id, _)| id).collect(); + assert!(ids.contains(&&ready_id)); + assert!(ids.contains(&&expired_id)); + } } diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index 6f314233..4d329c3e 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -99,7 +99,7 @@ use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; -use std::time::Instant; +use std::time::{Duration, Instant, SystemTime}; use base64::Engine as _; use bytes::Bytes; @@ -123,6 +123,13 @@ use crate::stream::{ClientStream, SizedPeek}; /// The threshold up until which we will go to the "high volume" backend. const BACKEND_SIZE_THRESHOLD: usize = 1024 * 1024; // 1 MiB + +/// Amount of time for which a `Change` generated by a `complete_multipart` operation is kept in the `Assembling` +/// state before becoming eligible for cleanup by the `ChangeLog` recovery process. +/// This allows the client to retry the `complete_multipart` operation upon any failures for at least this long, +/// avoiding scenarios where the `ChangeLog` recovery would race to delete the assembled LT blob. +const MULTIPART_COMPLETE_CLEANUP_DELAY: Duration = Duration::from_hours(24); + /// Creates a new [`ObjectId`] with the same context but a unique revision key. /// /// The new key has the format `{original_key}/{uuid_v7}`, producing a distinct @@ -241,6 +248,12 @@ impl TieredStorage { self.inner.clone().record(change).await } + /// Records the change to the log in the `Assembling` phase, and returns a guard that does + /// nothing on drop unless advanced. + async fn record_assembling(&self, change: Change) -> Result { + self.inner.clone().record_assembling(change).await + } + /// Returns the name of the backend corresponding to the given routing choice. fn backend_type(&self, choice: &BackendChoice) -> &'static str { match choice { @@ -276,6 +289,7 @@ impl TieredStorage { id: id.clone(), new: None, old: Some(target.clone()), + cleanup_after: None, }) .await?; @@ -317,6 +331,7 @@ impl TieredStorage { id: id.clone(), new: Some(new.clone()), old: current.clone(), + cleanup_after: None, }) .await?; @@ -471,6 +486,7 @@ impl Backend for TieredStorage { id: id.clone(), new: None, old: Some(tombstone.target.clone()), + cleanup_after: None, }) .await?; guard.advance(ChangePhase::Written); @@ -672,47 +688,87 @@ impl MultipartUploadBackend for TieredStorage { ) -> Result { let tiered: TieredUploadId = upload_id.try_into()?; - // 1. Read current HV revision to establish the write precondition + let physical = ObjectId { + context: id.context.clone(), + key: tiered.revision, + }; + + // 1. Read current HV revision to establish the write precondition. let current = match self.inner.high_volume.get_tiered_metadata(id).await? { + // Optimization: a previous attempt already finalized this revision and tombstone -- report success. + TieredMetadata::Tombstone(t) if t.target == physical => return Ok(None), TieredMetadata::Tombstone(t) => Some(t.target), _ => None, }; - // 2. Complete the upload, creating the object at the given revision key. - let physical = ObjectId { - context: id.context.clone(), - key: tiered.revision, - }; + // Register a guard with cleanup deferred to now + `MULTIPART_COMPLETE_CLEANUP_DELAY`, + // so that the user has the chance to retry finalizing the upload in this timeframe. let mut guard = self - .record_change(Change { + .record_assembling(Change { id: id.clone(), new: Some(physical.clone()), old: current.clone(), + cleanup_after: Some(SystemTime::now() + MULTIPART_COMPLETE_CLEANUP_DELAY), }) .await?; - let error = self + // 2. Complete the upload, creating the object at the given revision key. + let maybe_complete_multipart_err = match self .inner .long_term .complete_multipart(&physical, &tiered.upload_id, parts) - .await?; - - if error.is_some() { - return Ok(error); - } - - guard.advance(ChangePhase::Written); + .await + { + // The request went through but we got an error in the response body. + // Transparently proxy the error to the user. + Ok(error) => { + if error.is_some() { + return Ok(error); + } + None + } + // We got status 4xx/5xx, or a network error. + // Either way, `complete_multipart` might have been completed successfully, + // either now or in a previous attempt (in that case, that's a 404 and we indeed end up + // here). + // We cannot know if that's the case yet, so we continue to the next steps. + Err(err) => Some(err), + }; // 3. Retrieve the metadata of the object, which was determined at initiation time, to // get the expiration policy. - let metadata = self - .inner - .long_term - .get_metadata(&physical) - .await? - .ok_or_else(|| { - Error::generic("completed multipart object not found in long-term storage") - })?; + // + // This also serves as an existence check to understand if the LT revision was actually + // created successfully in this or a previous attempt, in which case we just need to + // finalize the tombstone. + let metadata = self.inner.long_term.get_metadata(&physical).await; + + let metadata = match (metadata, maybe_complete_multipart_err) { + // The LT revision already exists, so we can continue to finalize the tombstone. + (Ok(Some(metadata)), _) => metadata, + // The LT revision doesn't exist, cannot proceed. + (Ok(None), Some(err)) => return Err(err), + // The `complete_multipart` succeeded, creating the object, but the `get_metadata` + // immediately after failed to find the object. This should never happen. + (Ok(None), None) => { + objectstore_log::error!( + id = ?id, + upload_id = ?upload_id, + physical = ?physical, + "complete_multipart call succeeded on long_term backend, but subsequent get_metadata found no object" + ); + return Err(Error::generic( + "completed multipart object not found in long-term storage", + )); + } + // Failed to `get_metadata`, cannot proceed. + (Err(get_metadata_err), maybe_complete_multipart_err) => { + // Prefer the `complete_multipart_err`, as it's likely more informative. + // TODO(FS-358): convert this properly. Right now `ApiErrorResponse` will turn this into a 500, + // but we would actually want to transparently surface the original status (and message?) instead. + return Err(maybe_complete_multipart_err.unwrap_or(get_metadata_err)); + } + }; // 4. CAS commit: write tombstone only if HV state matches what we saw. let tombstone = Tombstone { @@ -734,8 +790,7 @@ impl MultipartUploadBackend for TieredStorage { #[cfg(test)] mod tests { - use std::time::Duration; - + use futures::lock::Mutex; use objectstore_types::metadata::{ExpirationPolicy, Metadata}; use objectstore_types::scope::{Scope, Scopes}; @@ -745,6 +800,7 @@ mod tests { use crate::backend::testing::{Hooks, TestBackend}; use crate::error::Error; use crate::id::ObjectContext; + use crate::stream::{self, ClientStream}; fn make_context() -> ObjectContext { @@ -1364,6 +1420,7 @@ mod tests { id: make_id("object-key"), new: Some(make_id("cleanup-target")), old: None, + cleanup_after: None, }; // Build the guard inside a temporary runtime, then let the runtime drop @@ -1387,6 +1444,7 @@ mod tests { id: make_id("object-key"), new: None, old: None, + cleanup_after: None, }; let mut guard = storage.record_change(change).await.unwrap(); @@ -1662,6 +1720,52 @@ mod tests { assert!(storage.get_object(&id).await.unwrap().is_none()); } + #[tokio::test] + async fn multipart_list_parts() { + let (storage, _hv, _lt, _) = make_tiered_storage(); + let id = make_id("mp-list"); + + let upload_id = storage + .initiate_multipart(&id, &Default::default()) + .await + .unwrap(); + + let part1 = vec![0xAAu8; 100]; + let part2 = vec![0xBBu8; 200]; + storage + .upload_part( + &id, + &upload_id, + 1, + part1.len() as u64, + None, + stream::single(part1), + ) + .await + .unwrap(); + storage + .upload_part( + &id, + &upload_id, + 2, + part2.len() as u64, + None, + stream::single(part2), + ) + .await + .unwrap(); + + let resp = storage + .list_parts(&id, &upload_id, None, None) + .await + .unwrap(); + assert_eq!(resp.parts.len(), 2); + assert_eq!(resp.parts[0].part_number, 1); + assert_eq!(resp.parts[0].size, 100); + assert_eq!(resp.parts[1].part_number, 2); + assert_eq!(resp.parts[1].size, 200); + } + #[tokio::test] async fn multipart_overwrites_existing_tombstone() { let (storage, hv, lt, _) = make_tiered_storage(); @@ -1729,49 +1833,393 @@ mod tests { assert_eq!(body, payload2); } + // --- Multipart completion failure handling (consistency, retries, delayed cleanup) --- + + /// Assembles the blob via `complete_multipart`, but returns an error to simulate a network + /// failure on the response path. Also fails `get_metadata` so the tiered layer cannot + /// recover by detecting the already-assembled blob. + #[derive(Debug)] + struct CompleteMultipartButReturnError; + + #[async_trait::async_trait] + impl Hooks for CompleteMultipartButReturnError { + async fn complete_multipart( + &self, + inner: &InMemoryBackend, + id: &ObjectId, + upload_id: &UploadId, + parts: Vec, + ) -> Result { + inner + .complete_multipart(id, upload_id, parts) + .await + .unwrap(); + Err(Error::Io(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "simulated network error on complete_multipart", + ))) + } + + async fn get_metadata( + &self, + _inner: &InMemoryBackend, + _id: &ObjectId, + ) -> Result { + Err(Error::Io(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "simulated network error on get_metadata", + ))) + } + } + + /// `complete_multipart` on the inner LT backend assembles the blob successfully, but both + /// `complete_multipart` and `get_metadata` return errors, so the tiered layer cannot finalize. + /// After `MULTIPART_COMPLETE_CLEANUP_DELAY`, `ChangeLog` recovery deletes the orphaned blob. #[tokio::test] - async fn multipart_list_parts() { - let (storage, _hv, _lt, _) = make_tiered_storage(); - let id = make_id("mp-list"); + async fn cleans_up_orphan_after_failed_multipart_complete() { + let hv = InMemoryBackend::new("hv"); + let lt_inner = InMemoryBackend::new("lt"); + let log = InMemoryChangeLog::default(); + let storage = TieredStorage::new( + Box::new(hv.clone()), + Box::new(TestBackend::with_inner( + lt_inner.clone(), + CompleteMultipartButReturnError {}, + )), + Box::new(log.clone()), + ); + let id = make_id("mp-orphan"); let upload_id = storage .initiate_multipart(&id, &Default::default()) .await .unwrap(); - let part1 = vec![0xAAu8; 100]; - let part2 = vec![0xBBu8; 200]; - storage + let tiered_id: TieredUploadId = (&upload_id).try_into().unwrap(); + let physical = ObjectId { + context: id.context.clone(), + key: tiered_id.revision, + }; + + let payload = vec![0xABu8; 2 * 1024 * 1024]; + let etag = storage .upload_part( &id, &upload_id, 1, - part1.len() as u64, + payload.len() as u64, None, - stream::single(part1), + stream::single(payload), ) .await .unwrap(); - storage + + let result = storage + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag, + }], + ) + .await; + assert!(result.is_err()); + storage.join().await; + + // The LT blob is orphaned, and no cleanup has been performed (yet), due to the guard being + // dropped while in the `Assembling` state. + lt_inner.get(&physical).expect_object(); + hv.get(&id).expect_not_found(); + + // Simulate the passage of time and run recovery. + log.expire_all(); + let manager = ChangeManager::new( + Box::new(hv.clone()), + Box::new(lt_inner.clone()), + Box::new(log.clone()), + ); + manager.recover().await.unwrap(); + + // The orphaned LT blob has been cleaned up. + lt_inner.get(&physical).expect_not_found(); + // The change has been removed from the log. + let remaining = log.scan().await.unwrap(); + assert!(remaining.is_empty()); + } + + #[derive(Debug)] + struct FailOnFirstCompleteMultipartAttempt { + attempt: Mutex, + } + + impl FailOnFirstCompleteMultipartAttempt { + fn new() -> Self { + Self { + attempt: Mutex::new(0), + } + } + } + + #[async_trait::async_trait] + impl Hooks for FailOnFirstCompleteMultipartAttempt { + async fn complete_multipart( + &self, + inner: &InMemoryBackend, + id: &ObjectId, + upload_id: &UploadId, + parts: Vec, + ) -> Result { + let mut attempt = self.attempt.lock().await; + *attempt += 1; + if *attempt == 1 { + return Err(Error::Io(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "simulated network error", + ))); + } else { + return Ok(inner + .complete_multipart(id, upload_id, parts) + .await + .unwrap()); + } + } + } + + /// The first attempt to `complete_multipart` fails, which generates a `Change` entry. + /// The second call succeeds. + /// When it's time to clean up, nothing is deleted, as the `complete_multipart` eventually went + /// through before the cleanup deadline. + #[tokio::test] + async fn multipart_complete_succeeds_on_retry_and_leaves_state_consistent() { + let hv = InMemoryBackend::new("hv"); + let lt_inner = InMemoryBackend::new("lt"); + let log = InMemoryChangeLog::default(); + let storage = TieredStorage::new( + Box::new(hv.clone()), + Box::new(TestBackend::with_inner( + lt_inner.clone(), + FailOnFirstCompleteMultipartAttempt::new(), + )), + Box::new(log.clone()), + ); + + let id = make_id("mp-retry"); + let upload_id = storage + .initiate_multipart(&id, &Default::default()) + .await + .unwrap(); + + let tiered_id: TieredUploadId = (&upload_id).try_into().unwrap(); + let physical = ObjectId { + context: id.context.clone(), + key: tiered_id.revision, + }; + + let payload = vec![0xABu8; 2 * 1024 * 1024]; + let etag = storage .upload_part( &id, &upload_id, - 2, - part2.len() as u64, + 1, + payload.len() as u64, None, - stream::single(part2), + stream::single(payload.clone()), ) .await .unwrap(); - let resp = storage - .list_parts(&id, &upload_id, None, None) + // The first `complete_multipart` call fails. + let result = storage + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag: etag.clone(), + }], + ) + .await; + assert!(result.is_err()); + storage.join().await; + + // The second `complete_multipart` call succeeds. + let result = storage + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag, + }], + ) + .await; + assert!(result.is_ok()); + storage.join().await; + + // The object is there. + let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); + let body = stream::read_to_vec(s).await.unwrap(); + assert_eq!(body, payload); + + // Simulate the passage of time and run recovery. + log.expire_all(); + let manager = ChangeManager::new( + Box::new(hv.clone()), + Box::new(lt_inner.clone()), + Box::new(log.clone()), + ); + manager.recover().await.unwrap(); + + // The LT blob has not been cleaned up, as the write eventually went through. + lt_inner.get(&physical).expect_object(); + // The tombstone still points to the blob. + let tombstone = hv.get(&id).expect_tombstone(); + assert_eq!(tombstone.target, physical); + // The change has been removed from the log. + let remaining = log.scan().await.unwrap(); + assert!(remaining.is_empty()); + + // The object is still there after recovery. + let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); + let body = stream::read_to_vec(s).await.unwrap(); + assert_eq!(body, payload); + } + + #[derive(Debug)] + struct FailOnFirstGetMetadataAttempt { + attempt: Mutex, + } + + impl FailOnFirstGetMetadataAttempt { + fn new() -> Self { + Self { + attempt: Mutex::new(0), + } + } + } + + #[async_trait::async_trait] + impl Hooks for FailOnFirstGetMetadataAttempt { + async fn get_metadata( + &self, + inner: &InMemoryBackend, + id: &ObjectId, + ) -> Result { + let mut attempt = self.attempt.lock().await; + *attempt += 1; + if *attempt == 1 { + return Err(Error::Io(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "simulated network error", + ))); + } else { + inner.get_metadata(id).await + } + } + } + + /// The first attempt to `complete_multipart` succeeds on the LT backend, but the subsequent + /// `get_metadata` call fails with a network error, causing the overall `complete_multipart` + /// to fail. The second call retries and succeeds (the LT object already exists from the first + /// attempt). + /// When it's time to clean up, nothing is deleted, as the `complete_multipart` eventually went + /// through before the cleanup deadline. + #[tokio::test] + async fn multipart_complete_succeeds_on_retry_if_get_metadata_errs_and_leaves_state_consistent() + { + let hv = InMemoryBackend::new("hv"); + let lt_inner = InMemoryBackend::new("lt"); + let log = InMemoryChangeLog::default(); + let storage = TieredStorage::new( + Box::new(hv.clone()), + Box::new(TestBackend::with_inner( + lt_inner.clone(), + FailOnFirstGetMetadataAttempt::new(), + )), + Box::new(log.clone()), + ); + + let id = make_id("mp-retry-meta"); + let upload_id = storage + .initiate_multipart(&id, &Default::default()) .await .unwrap(); - assert_eq!(resp.parts.len(), 2); - assert_eq!(resp.parts[0].part_number, 1); - assert_eq!(resp.parts[0].size, 100); - assert_eq!(resp.parts[1].part_number, 2); - assert_eq!(resp.parts[1].size, 200); + + let tiered_id: TieredUploadId = (&upload_id).try_into().unwrap(); + let physical = ObjectId { + context: id.context.clone(), + key: tiered_id.revision, + }; + + let payload = vec![0xABu8; 2 * 1024 * 1024]; + let etag = storage + .upload_part( + &id, + &upload_id, + 1, + payload.len() as u64, + None, + stream::single(payload.clone()), + ) + .await + .unwrap(); + + // The first `complete_multipart` call fails (get_metadata network error), even though it + // internally creates the LT blob. + let result = storage + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag: etag.clone(), + }], + ) + .await; + assert!(result.is_err()); + storage.join().await; + + // The second `complete_multipart` call succeeds. + let result = storage + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag, + }], + ) + .await; + assert!(result.is_ok()); + storage.join().await; + + // The object is there. + let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); + let body = stream::read_to_vec(s).await.unwrap(); + assert_eq!(body, payload); + + // Simulate the passage of time and run recovery. + log.expire_all(); + let manager = ChangeManager::new( + Box::new(hv.clone()), + Box::new(lt_inner.clone()), + Box::new(log.clone()), + ); + manager.recover().await.unwrap(); + + // The LT blob has not been cleaned up, as the write eventually went through. + lt_inner.get(&physical).expect_object(); + // The tombstone still points to the blob. + let tombstone = hv.get(&id).expect_tombstone(); + assert_eq!(tombstone.target, physical); + // The change has been removed from the log. + let remaining = log.scan().await.unwrap(); + assert!(remaining.is_empty()); + + // The object is there after recovery. + let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); + let body = stream::read_to_vec(s).await.unwrap(); + assert_eq!(body, payload); } }