diff --git a/Cargo.lock b/Cargo.lock index 58557d39..108984c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2566,6 +2566,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "base64", "bigtable_rs", "bytes", "chrono", diff --git a/objectstore-server/src/config.rs b/objectstore-server/src/config.rs index 297ffecd..c8b2eb13 100644 --- a/objectstore-server/src/config.rs +++ b/objectstore-server/src/config.rs @@ -46,7 +46,7 @@ use secrecy::{CloneableSecret, SecretBox, SerializableSecret, zeroize::Zeroize}; use serde::{Deserialize, Serialize}; pub use objectstore_log::{LevelFilter, LogFormat, LoggingConfig}; -pub use objectstore_service::backend::StorageConfig; +pub use objectstore_service::backend::{MultipartUploadStorageConfig, StorageConfig}; use crate::killswitches::Killswitches; use crate::rate_limits::RateLimits; @@ -625,7 +625,7 @@ impl Config { mod tests { use std::io::Write; - use objectstore_service::backend::HighVolumeStorageConfig; + use objectstore_service::backend::{HighVolumeStorageConfig, MultipartUploadStorageConfig}; use secrecy::ExposeSecret; use crate::killswitches::Killswitch; @@ -771,7 +771,7 @@ mod tests { }; let HighVolumeStorageConfig::BigTable(hv) = &c.high_volume; assert_eq!(hv.project_id, "my-project"); - let StorageConfig::Gcs(lt) = c.long_term.as_ref() else { + let MultipartUploadStorageConfig::Gcs(lt) = &c.long_term else { panic!("expected gcs long_term"); }; assert_eq!(lt.bucket, "my-objectstore-bucket"); @@ -800,7 +800,7 @@ mod tests { assert_eq!(hv.project_id, "my-project"); assert_eq!(hv.instance_name, "my-instance"); assert_eq!(hv.table_name, "my-table"); - let StorageConfig::FileSystem(lt) = c.long_term.as_ref() else { + let MultipartUploadStorageConfig::FileSystem(lt) = &c.long_term else { panic!("expected filesystem long_term"); }; assert_eq!(lt.path, Path::new("/data/lt")); diff --git a/objectstore-service/Cargo.toml b/objectstore-service/Cargo.toml index be44d790..e4434d42 100644 --- a/objectstore-service/Cargo.toml +++ b/objectstore-service/Cargo.toml @@ -12,6 +12,7 @@ publish = false [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } +base64 = "0.22" bigtable_rs = { git = "https://github.com/getsentry/bigtable_rs.git", rev = "4cb75bc5e5f87204363973f6302107768e64972e" } chrono = "0.4" bytes = { workspace = true } diff --git a/objectstore-service/src/backend/changelog.rs b/objectstore-service/src/backend/changelog.rs index 0e05bb84..885613c6 100644 --- a/objectstore-service/src/backend/changelog.rs +++ b/objectstore-service/src/backend/changelog.rs @@ -22,12 +22,12 @@ use std::collections::HashMap; use std::fmt; use std::sync::{Arc, Mutex}; -use std::time::Duration; +use std::time::{Duration, SystemTime}; use tokio_util::task::TaskTracker; use tokio_util::task::task_tracker::TaskTrackerToken; -use crate::backend::common::{Backend, HighVolumeBackend, TieredMetadata}; +use crate::backend::common::{HighVolumeBackend, MultipartUploadBackend, TieredMetadata}; use crate::error::Result; use crate::id::ObjectId; @@ -76,6 +76,10 @@ pub struct Change { /// /// Needs cleanup on success (the CAS committed and the old blob is unreferenced). pub old: Option, + /// Earliest time at which this entry becomes eligible for cleanup. + /// + /// [`ChangeLog::scan`] filters out the entry, unless the deadline has passed. + pub cleanup_after: Option, } /// Manager for multi-step storage changes, including backends and durable log. @@ -88,7 +92,7 @@ pub struct ChangeManager { /// The backend for small objects (≤ 1 MiB). pub(crate) high_volume: Box, /// The backend for large objects (> 1 MiB). - pub(crate) long_term: Box, + pub(crate) long_term: Box, /// Durable write-ahead log for multi-step changes. pub(crate) changelog: Box, /// Tracks outstanding background cleanup operations for graceful shutdown. @@ -99,7 +103,7 @@ impl ChangeManager { /// Creates a new `ChangeManager` with the given backends and changelog. pub fn new( high_volume: Box, - long_term: Box, + long_term: Box, changelog: Box, ) -> Arc { Arc::new(Self { @@ -134,6 +138,28 @@ impl ChangeManager { Ok(ChangeGuard { state: Some(state) }) } + /// Records the change to the log and returns a guard in the `Assembling` state. + /// + /// Behaves like [`Self::record`], except that the guard is created in the `Assembling` state. + /// Unlike other states, this guard does nothing on drop, leaving the burden of cleaning up to + /// the [`ChangeLog`]. + pub async fn record_assembling(self: Arc, change: Change) -> Result { + let token = self.tracker.token(); + + let id = ChangeId::new(); + self.changelog.record(&id, &change).await?; + + let state = ChangeState { + id, + change, + phase: ChangePhase::Assembling, + manager: self.clone(), + _token: token, + }; + + Ok(ChangeGuard { state: Some(state) }) + } + /// Scans the changelog for outstanding entries and runs cleanup for each. /// /// Spawn this into a background task at startup to recover from any orphaned objects after a @@ -226,15 +252,31 @@ impl ChangeLog for InMemoryChangeLog { } async fn scan(&self) -> Result> { + let now = SystemTime::now(); let entries = self.entries.lock().expect("lock poisoned"); let result = entries .iter() + .filter(|(_, change)| match change.cleanup_after { + None => true, + Some(deadline) => now >= deadline, + }) .map(|(id, change)| (id.clone(), change.clone())) .collect(); Ok(result) } } +#[cfg(test)] +impl InMemoryChangeLog { + /// Sets [`Change::cleanup_after`] to the past for all entries, forcing them to be returned by a subsequent [`ChangeLog::scan`]. + pub fn expire_all(&self) { + let mut entries = self.entries.lock().expect("lock poisoned"); + for change in entries.values_mut() { + change.cleanup_after = Some(SystemTime::UNIX_EPOCH); + } + } +} + /// [`ChangeLog`] implementation that discards all entries. /// /// Used as the default when no durable log is configured. Provides no @@ -265,6 +307,12 @@ pub enum ChangePhase { Recovered, /// The change is recorded in the log and LT upload has started. Recorded, + /// The LT blob originated from a multipart upload and is being assembled. + /// + /// Multipart upload completion can fail, and we want the client to be able to retry it + /// without the change cleanup process racing to delete the LT blob. + /// Therefore, cleanup of changes in this phase is deferred. + Assembling, /// LT upload has succeeded and the tombstone is being updated. Written, /// The tombstone update failed due to a conflict. @@ -310,7 +358,7 @@ impl ChangeState { ChangePhase::Written => self.read_tombstone().await, ChangePhase::Lost => self.change.old.clone(), ChangePhase::Updated => self.change.new.clone(), - ChangePhase::Completed => return, // unreachable + ChangePhase::Assembling | ChangePhase::Completed => return, // unreachable }; if current != self.change.old @@ -373,12 +421,21 @@ impl ChangeState { impl Drop for ChangeState { fn drop(&mut self) { - if self.phase != ChangePhase::Completed { - objectstore_log::error!( - change = ?self.change, - phase = ?self.phase, - "Operation dropped without completing cleanup" - ); + match self.phase { + ChangePhase::Completed => {} + ChangePhase::Assembling => { + objectstore_log::warn!( + change = ?self.change, + "Operation dropped in Assembling state, cleanup deferred to ChangeLog recovery" + ); + } + _ => { + objectstore_log::error!( + change = ?self.change, + phase = ?self.phase, + "Operation dropped without completing cleanup" + ); + } } } } @@ -405,6 +462,7 @@ impl ChangeGuard { impl Drop for ChangeGuard { fn drop(&mut self) { if let Some(state) = self.state.take() + && state.phase != ChangePhase::Assembling && state.phase != ChangePhase::Completed && let Ok(handle) = tokio::runtime::Handle::try_current() { @@ -440,6 +498,7 @@ mod tests { id: make_id("object-key"), new: Some(make_id("object-key/rev1")), old: None, + cleanup_after: None, }; log.record(&id, &change).await.unwrap(); @@ -457,6 +516,7 @@ mod tests { id: make_id("object-key"), new: None, old: Some(make_id("object-key/rev1")), + cleanup_after: None, }; log.record(&id, &change).await.unwrap(); @@ -495,6 +555,7 @@ mod tests { id: make_id("crash-test"), new: Some(make_id("crash-test/rev")), old: None, + cleanup_after: None, })) .unwrap() // Runtime drops here while `guard` is still alive outside it. @@ -508,4 +569,55 @@ mod tests { let entries = rt.block_on(log.scan()).unwrap(); assert_eq!(entries.len(), 1, "log entry must persist"); } + + #[tokio::test] + async fn scan_filters_by_cleanup_after() { + let log = InMemoryChangeLog::default(); + + let ready_id = ChangeId::new(); + log.record( + &ready_id, + &Change { + id: make_id("ready"), + new: Some(make_id("ready/rev")), + old: None, + cleanup_after: None, + }, + ) + .await + .unwrap(); + + let expired_id = ChangeId::new(); + log.record( + &expired_id, + &Change { + id: make_id("expired"), + new: Some(make_id("expired/rev")), + old: None, + cleanup_after: Some(SystemTime::now() - Duration::from_secs(1)), + }, + ) + .await + .unwrap(); + + let deferred_id = ChangeId::new(); + log.record( + &deferred_id, + &Change { + id: make_id("deferred"), + new: Some(make_id("deferred/rev")), + old: None, + cleanup_after: Some(SystemTime::now() + Duration::from_hours(24)), + }, + ) + .await + .unwrap(); + + let entries = log.scan().await.unwrap(); + assert_eq!(entries.len(), 2); + + let ids: Vec<_> = entries.iter().map(|(id, _)| id).collect(); + assert!(ids.contains(&&ready_id)); + assert!(ids.contains(&&expired_id)); + } } diff --git a/objectstore-service/src/backend/common.rs b/objectstore-service/src/backend/common.rs index 28df85cc..8c106965 100644 --- a/objectstore-service/src/backend/common.rs +++ b/objectstore-service/src/backend/common.rs @@ -103,6 +103,12 @@ pub trait MultipartUploadBackend: Backend + fmt::Debug + Send + Sync + 'static { /// Finalizes the upload identified by `(id, upload_id)` with the given /// ordered list of parts. + /// + /// Note that this returns `Result>`. + /// It's therefore possible to get `Ok(Some(err))`, meaning that at the server level this will + /// translate to HTTP `200 OK` with an error contained in the response body. + /// We need to do it this way to mirror backends that also behave like this (namely S3 and + /// GCS). async fn complete_multipart( &self, id: &ObjectId, diff --git a/objectstore-service/src/backend/mod.rs b/objectstore-service/src/backend/mod.rs index 401aad1b..b4a8fbe0 100644 --- a/objectstore-service/src/backend/mod.rs +++ b/objectstore-service/src/backend/mod.rs @@ -60,7 +60,7 @@ pub async fn from_config(config: StorageConfig) -> Result { let hv = hv_from_config(c.high_volume).await?; - let lt = from_leaf_config(*c.long_term).await?; + let lt = lt_from_config(c.long_term).await?; let log = Box::new(changelog::NoopChangeLog); Box::new(tiered::TieredStorage::new(hv, lt, log)) } @@ -104,3 +104,28 @@ async fn hv_from_config( HighVolumeStorageConfig::BigTable(c) => Box::new(bigtable::BigTableBackend::new(c).await?), }) } + +/// Configuration for the long-term backend in a [`tiered::TieredStorageConfig`]. +/// +/// Only backends that implement [`common::MultipartUploadBackend`] are valid here. +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(tag = "type", rename_all = "lowercase")] +pub enum MultipartUploadStorageConfig { + /// Local filesystem storage backend (type `"filesystem"`). + FileSystem(local_fs::FileSystemConfig), + + /// [Google Cloud Storage] backend (type `"gcs"`). + /// + /// [Google Cloud Storage]: https://cloud.google.com/storage + Gcs(gcs::GcsConfig), +} + +/// Constructs a type-erased [`common::MultipartUploadBackend`] from the given config. +async fn lt_from_config( + config: MultipartUploadStorageConfig, +) -> anyhow::Result> { + Ok(match config { + MultipartUploadStorageConfig::FileSystem(c) => Box::new(local_fs::LocalFsBackend::new(c)), + MultipartUploadStorageConfig::Gcs(c) => Box::new(gcs::GcsBackend::new(c).await?), + }) +} diff --git a/objectstore-service/src/backend/testing.rs b/objectstore-service/src/backend/testing.rs index edfc813d..4b767e21 100644 --- a/objectstore-service/src/backend/testing.rs +++ b/objectstore-service/src/backend/testing.rs @@ -40,12 +40,16 @@ use bytes::Bytes; use objectstore_types::metadata::Metadata; use crate::backend::common::{ - Backend, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse, PutResponse, - TieredGet, TieredMetadata, TieredWrite, Tombstone, + Backend, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse, + MultipartUploadBackend, PutResponse, TieredGet, TieredMetadata, TieredWrite, Tombstone, }; use crate::backend::in_memory::InMemoryBackend; use crate::error::Result; use crate::id::ObjectId; +use crate::multipart::{ + AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse, + ListPartsResponse, PartNumber, UploadId, UploadPartResponse, +}; use crate::stream::ClientStream; /// Hooks for [`TestBackend`]. @@ -150,6 +154,77 @@ pub trait Hooks: fmt::Debug + Send + Sync + 'static { ) -> Result { inner.compare_and_write(id, current, write).await } + + // --- MultipartUploadBackend methods --- + + /// Intercepts [`MultipartUploadBackend::initiate_multipart`]. Default delegates to `inner`. + async fn initiate_multipart( + &self, + inner: &InMemoryBackend, + id: &ObjectId, + metadata: &Metadata, + ) -> Result { + inner.initiate_multipart(id, metadata).await + } + + /// Intercepts [`MultipartUploadBackend::upload_part`]. Default delegates to `inner`. + #[allow(clippy::too_many_arguments)] + async fn upload_part( + &self, + inner: &InMemoryBackend, + id: &ObjectId, + upload_id: &UploadId, + part_number: PartNumber, + content_length: u64, + content_md5: Option<&str>, + body: ClientStream, + ) -> Result { + inner + .upload_part( + id, + upload_id, + part_number, + content_length, + content_md5, + body, + ) + .await + } + + /// Intercepts [`MultipartUploadBackend::list_parts`]. Default delegates to `inner`. + async fn list_parts( + &self, + inner: &InMemoryBackend, + id: &ObjectId, + upload_id: &UploadId, + max_parts: Option, + part_number_marker: Option, + ) -> Result { + inner + .list_parts(id, upload_id, max_parts, part_number_marker) + .await + } + + /// Intercepts [`MultipartUploadBackend::abort_multipart`]. Default delegates to `inner`. + async fn abort_multipart( + &self, + inner: &InMemoryBackend, + id: &ObjectId, + upload_id: &UploadId, + ) -> Result { + inner.abort_multipart(id, upload_id).await + } + + /// Intercepts [`MultipartUploadBackend::complete_multipart`]. Default delegates to `inner`. + async fn complete_multipart( + &self, + inner: &InMemoryBackend, + id: &ObjectId, + upload_id: &UploadId, + parts: Vec, + ) -> Result { + inner.complete_multipart(id, upload_id, parts).await + } } /// Generic test backend that implements both [`Backend`] and [`HighVolumeBackend`]. @@ -258,3 +333,69 @@ impl HighVolumeBackend for TestBackend { .await } } + +#[async_trait::async_trait] +impl MultipartUploadBackend for TestBackend { + async fn initiate_multipart( + &self, + id: &ObjectId, + metadata: &Metadata, + ) -> Result { + self.hooks + .initiate_multipart(&self.inner, id, metadata) + .await + } + + async fn upload_part( + &self, + id: &ObjectId, + upload_id: &UploadId, + part_number: PartNumber, + content_length: u64, + content_md5: Option<&str>, + body: ClientStream, + ) -> Result { + self.hooks + .upload_part( + &self.inner, + id, + upload_id, + part_number, + content_length, + content_md5, + body, + ) + .await + } + + async fn list_parts( + &self, + id: &ObjectId, + upload_id: &UploadId, + max_parts: Option, + part_number_marker: Option, + ) -> Result { + self.hooks + .list_parts(&self.inner, id, upload_id, max_parts, part_number_marker) + .await + } + + async fn abort_multipart( + &self, + id: &ObjectId, + upload_id: &UploadId, + ) -> Result { + self.hooks.abort_multipart(&self.inner, id, upload_id).await + } + + async fn complete_multipart( + &self, + id: &ObjectId, + upload_id: &UploadId, + parts: Vec, + ) -> Result { + self.hooks + .complete_multipart(&self.inner, id, upload_id, parts) + .await + } +} diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index ec626ca2..4d329c3e 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -99,8 +99,9 @@ use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; -use std::time::Instant; +use std::time::{Duration, Instant, SystemTime}; +use base64::Engine as _; use bytes::Bytes; use futures_util::{Stream, StreamExt}; use objectstore_types::metadata::Metadata; @@ -108,16 +109,27 @@ use serde::{Deserialize, Serialize}; use crate::backend::changelog::{Change, ChangeGuard, ChangeLog, ChangeManager, ChangePhase}; use crate::backend::common::{ - Backend, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse, PutResponse, - TieredGet, TieredMetadata, TieredWrite, Tombstone, + Backend, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse, + MultipartUploadBackend, PutResponse, TieredGet, TieredMetadata, TieredWrite, Tombstone, }; -use crate::backend::{HighVolumeStorageConfig, StorageConfig}; -use crate::error::Result; +use crate::backend::{HighVolumeStorageConfig, MultipartUploadStorageConfig}; +use crate::error::{Error, Result}; use crate::id::ObjectId; +use crate::multipart::{ + AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse, + ListPartsResponse, PartNumber, UploadId, UploadPartResponse, +}; use crate::stream::{ClientStream, SizedPeek}; /// The threshold up until which we will go to the "high volume" backend. const BACKEND_SIZE_THRESHOLD: usize = 1024 * 1024; // 1 MiB + +/// Amount of time for which a `Change` generated by a `complete_multipart` operation is kept in the `Assembling` +/// state before becoming eligible for cleanup by the `ChangeLog` recovery process. +/// This allows the client to retry the `complete_multipart` operation upon any failures for at least this long, +/// avoiding scenarios where the `ChangeLog` recovery would race to delete the assembled LT blob. +const MULTIPART_COMPLETE_CLEANUP_DELAY: Duration = Duration::from_hours(24); + /// Creates a new [`ObjectId`] with the same context but a unique revision key. /// /// The new key has the format `{original_key}/{uuid_v7}`, producing a distinct @@ -133,7 +145,7 @@ fn new_long_term_revision(id: &ObjectId) -> ObjectId { /// Configuration for [`TieredStorage`]. /// /// Composes two backends into a tiered routing setup: `high_volume` for small -/// objects and `long_term` for large objects. Nesting [`StorageConfig::Tiered`] +/// objects and `long_term` for large objects. Nesting [`super::StorageConfig::Tiered`] /// inside another tiered config is not supported. /// /// # Example @@ -158,7 +170,9 @@ pub struct TieredStorageConfig { /// only BigTable). pub high_volume: HighVolumeStorageConfig, /// Backend for large, long-term objects. - pub long_term: Box, + /// + /// Must be a backend that implements [`MultipartUploadBackend`]. + pub long_term: MultipartUploadStorageConfig, } /// Two-tier storage backend that routes objects by size. @@ -219,7 +233,7 @@ impl TieredStorage { /// Creates a new `TieredStorage` with the given backends and change log. pub fn new( high_volume: Box, - long_term: Box, + long_term: Box, changelog: Box, ) -> Self { let inner = ChangeManager::new(high_volume, long_term, changelog); @@ -234,6 +248,12 @@ impl TieredStorage { self.inner.clone().record(change).await } + /// Records the change to the log in the `Assembling` phase, and returns a guard that does + /// nothing on drop unless advanced. + async fn record_assembling(&self, change: Change) -> Result { + self.inner.clone().record_assembling(change).await + } + /// Returns the name of the backend corresponding to the given routing choice. fn backend_type(&self, choice: &BackendChoice) -> &'static str { match choice { @@ -269,6 +289,7 @@ impl TieredStorage { id: id.clone(), new: None, old: Some(target.clone()), + cleanup_after: None, }) .await?; @@ -310,6 +331,7 @@ impl TieredStorage { id: id.clone(), new: Some(new.clone()), old: current.clone(), + cleanup_after: None, }) .await?; @@ -464,6 +486,7 @@ impl Backend for TieredStorage { id: id.clone(), new: None, old: Some(tombstone.target.clone()), + cleanup_after: None, }) .await?; guard.advance(ChangePhase::Written); @@ -540,10 +563,234 @@ where ) } +/// The multipart upload state for TieredStorage. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +struct TieredUploadId { + revision: String, + upload_id: String, +} + +impl TryInto for TieredUploadId { + type Error = Error; + + fn try_into(self) -> Result { + let json = + serde_json::to_vec(&self).map_err(|e| Error::serde("encoding multipart token", e))?; + Ok(base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(json)) + } +} + +impl TryFrom<&UploadId> for TieredUploadId { + type Error = Error; + + fn try_from(value: &UploadId) -> Result { + let json = base64::engine::general_purpose::URL_SAFE_NO_PAD + .decode(value.as_bytes()) + .map_err(|e| Error::generic(format!("invalid multipart upload ID: {e}")))?; + serde_json::from_slice(&json).map_err(|e| Error::serde("decoding multipart token", e)) + } +} + +#[async_trait::async_trait] +impl MultipartUploadBackend for TieredStorage { + async fn initiate_multipart( + &self, + id: &ObjectId, + metadata: &Metadata, + ) -> Result { + let physical = new_long_term_revision(id); + + let id = self + .inner + .long_term + .initiate_multipart(&physical, metadata) + .await?; + + let id = TieredUploadId { + revision: physical.key, + upload_id: id, + }; + id.try_into() + } + + async fn upload_part( + &self, + id: &ObjectId, + upload_id: &UploadId, + part_number: PartNumber, + content_length: u64, + content_md5: Option<&str>, + body: ClientStream, + ) -> Result { + let tiered: TieredUploadId = upload_id.try_into()?; + + let physical = ObjectId { + context: id.context.clone(), + key: tiered.revision, + }; + + self.inner + .long_term + .upload_part( + &physical, + &tiered.upload_id, + part_number, + content_length, + content_md5, + body, + ) + .await + } + + async fn list_parts( + &self, + id: &ObjectId, + upload_id: &UploadId, + max_parts: Option, + part_number_marker: Option, + ) -> Result { + let tiered: TieredUploadId = upload_id.try_into()?; + + let physical = ObjectId { + context: id.context.clone(), + key: tiered.revision, + }; + + self.inner + .long_term + .list_parts(&physical, &tiered.upload_id, max_parts, part_number_marker) + .await + } + + async fn abort_multipart( + &self, + id: &ObjectId, + upload_id: &UploadId, + ) -> Result { + let tiered: TieredUploadId = upload_id.try_into()?; + + let physical = ObjectId { + context: id.context.clone(), + key: tiered.revision, + }; + + self.inner + .long_term + .abort_multipart(&physical, &tiered.upload_id) + .await + } + + async fn complete_multipart( + &self, + id: &ObjectId, + upload_id: &UploadId, + parts: Vec, + ) -> Result { + let tiered: TieredUploadId = upload_id.try_into()?; + + let physical = ObjectId { + context: id.context.clone(), + key: tiered.revision, + }; + + // 1. Read current HV revision to establish the write precondition. + let current = match self.inner.high_volume.get_tiered_metadata(id).await? { + // Optimization: a previous attempt already finalized this revision and tombstone -- report success. + TieredMetadata::Tombstone(t) if t.target == physical => return Ok(None), + TieredMetadata::Tombstone(t) => Some(t.target), + _ => None, + }; + + // Register a guard with cleanup deferred to now + `MULTIPART_COMPLETE_CLEANUP_DELAY`, + // so that the user has the chance to retry finalizing the upload in this timeframe. + let mut guard = self + .record_assembling(Change { + id: id.clone(), + new: Some(physical.clone()), + old: current.clone(), + cleanup_after: Some(SystemTime::now() + MULTIPART_COMPLETE_CLEANUP_DELAY), + }) + .await?; + + // 2. Complete the upload, creating the object at the given revision key. + let maybe_complete_multipart_err = match self + .inner + .long_term + .complete_multipart(&physical, &tiered.upload_id, parts) + .await + { + // The request went through but we got an error in the response body. + // Transparently proxy the error to the user. + Ok(error) => { + if error.is_some() { + return Ok(error); + } + None + } + // We got status 4xx/5xx, or a network error. + // Either way, `complete_multipart` might have been completed successfully, + // either now or in a previous attempt (in that case, that's a 404 and we indeed end up + // here). + // We cannot know if that's the case yet, so we continue to the next steps. + Err(err) => Some(err), + }; + + // 3. Retrieve the metadata of the object, which was determined at initiation time, to + // get the expiration policy. + // + // This also serves as an existence check to understand if the LT revision was actually + // created successfully in this or a previous attempt, in which case we just need to + // finalize the tombstone. + let metadata = self.inner.long_term.get_metadata(&physical).await; + + let metadata = match (metadata, maybe_complete_multipart_err) { + // The LT revision already exists, so we can continue to finalize the tombstone. + (Ok(Some(metadata)), _) => metadata, + // The LT revision doesn't exist, cannot proceed. + (Ok(None), Some(err)) => return Err(err), + // The `complete_multipart` succeeded, creating the object, but the `get_metadata` + // immediately after failed to find the object. This should never happen. + (Ok(None), None) => { + objectstore_log::error!( + id = ?id, + upload_id = ?upload_id, + physical = ?physical, + "complete_multipart call succeeded on long_term backend, but subsequent get_metadata found no object" + ); + return Err(Error::generic( + "completed multipart object not found in long-term storage", + )); + } + // Failed to `get_metadata`, cannot proceed. + (Err(get_metadata_err), maybe_complete_multipart_err) => { + // Prefer the `complete_multipart_err`, as it's likely more informative. + // TODO(FS-358): convert this properly. Right now `ApiErrorResponse` will turn this into a 500, + // but we would actually want to transparently surface the original status (and message?) instead. + return Err(maybe_complete_multipart_err.unwrap_or(get_metadata_err)); + } + }; + + // 4. CAS commit: write tombstone only if HV state matches what we saw. + let tombstone = Tombstone { + target: physical.clone(), + expiration_policy: metadata.expiration_policy, + }; + let written = self + .inner + .high_volume + .compare_and_write(id, current.as_ref(), TieredWrite::Tombstone(tombstone)) + .await?; + + // Update guard and let it schedule cleanup in the background. + guard.advance(ChangePhase::compare_and_write(written)); + + Ok(None) + } +} + #[cfg(test)] mod tests { - use std::time::Duration; - + use futures::lock::Mutex; use objectstore_types::metadata::{ExpirationPolicy, Metadata}; use objectstore_types::scope::{Scope, Scopes}; @@ -553,6 +800,7 @@ mod tests { use crate::backend::testing::{Hooks, TestBackend}; use crate::error::Error; use crate::id::ObjectContext; + use crate::stream::{self, ClientStream}; fn make_context() -> ObjectContext { @@ -1172,6 +1420,7 @@ mod tests { id: make_id("object-key"), new: Some(make_id("cleanup-target")), old: None, + cleanup_after: None, }; // Build the guard inside a temporary runtime, then let the runtime drop @@ -1195,6 +1444,7 @@ mod tests { id: make_id("object-key"), new: None, old: None, + cleanup_after: None, }; let mut guard = storage.record_change(change).await.unwrap(); @@ -1286,4 +1536,690 @@ mod tests { "changelog entry not removed after cleanup" ); } + + // --- Multipart upload --- + + #[test] + fn multipart_upload_id_roundtrip() { + let id = TieredUploadId { + revision: "my-key/01924a6f-7e28-7b9a-9c1d-abcdef123456".into(), + upload_id: "upstream-upload-id-abc".into(), + }; + let encoded: UploadId = id.clone().try_into().unwrap(); + let decoded: TieredUploadId = (&encoded.clone()).try_into().unwrap(); + assert_eq!(decoded, id); + } + + #[tokio::test] + async fn multipart_single_part_roundtrip() { + let (storage, hv, lt, _) = make_tiered_storage(); + let id = make_id("mp-single"); + let metadata = Metadata { + content_type: "application/octet-stream".into(), + expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(3600)), + ..Default::default() + }; + let payload = vec![0xABu8; 2 * 1024 * 1024]; // 2 MiB + + let upload_id = storage.initiate_multipart(&id, &metadata).await.unwrap(); + + let etag = storage + .upload_part( + &id, + &upload_id, + 1, + payload.len() as u64, + None, + stream::single(payload.clone()), + ) + .await + .unwrap(); + + let error = storage + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag, + }], + ) + .await + .unwrap(); + assert!( + error.is_none(), + "complete_multipart returned error: {error:?}" + ); + + // get_object should follow the tombstone and return the payload. + let (got_meta, s) = storage.get_object(&id).await.unwrap().unwrap(); + let body = stream::read_to_vec(s).await.unwrap(); + assert_eq!(body, payload); + assert_eq!(got_meta.content_type, "application/octet-stream"); + + // HV should have a tombstone, LT should have the object at the physical key. + let tombstone = hv.get(&id).expect_tombstone(); + assert!( + tombstone.target.key().starts_with(id.key()), + "tombstone target should be a revision key" + ); + lt.get(&tombstone.target).expect_object(); + } + + #[tokio::test] + async fn multipart_upload() { + let (storage, _hv, _lt, _) = make_tiered_storage(); + let id = make_id("multipart"); + + let upload_id = storage + .initiate_multipart(&id, &Default::default()) + .await + .unwrap(); + + let part1 = vec![0xAAu8; 512 * 1024]; + let part2 = vec![0xBBu8; 512 * 1024]; + let part3 = vec![0xCCu8; 512 * 1024]; + + let etag3 = storage + .upload_part( + &id, + &upload_id, + 3, + part3.len() as u64, + None, + stream::single(part3.clone()), + ) + .await + .unwrap(); + let etag2 = storage + .upload_part( + &id, + &upload_id, + 2, + part2.len() as u64, + None, + stream::single(part2.clone()), + ) + .await + .unwrap(); + let etag1 = storage + .upload_part( + &id, + &upload_id, + 1, + part1.len() as u64, + None, + stream::single(part1.clone()), + ) + .await + .unwrap(); + + let error = storage + .complete_multipart( + &id, + &upload_id, + vec![ + CompletedPart { + part_number: 1, + etag: etag1, + }, + CompletedPart { + part_number: 2, + etag: etag2, + }, + CompletedPart { + part_number: 3, + etag: etag3, + }, + ], + ) + .await + .unwrap(); + assert!(error.is_none()); + + let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); + let body = stream::read_to_vec(s).await.unwrap(); + + let mut expected = Vec::new(); + expected.extend_from_slice(&part1); + expected.extend_from_slice(&part2); + expected.extend_from_slice(&part3); + assert_eq!(body, expected); + } + + #[tokio::test] + async fn multipart_abort() { + let (storage, hv, _lt, _) = make_tiered_storage(); + let id = make_id("mp-abort"); + + let upload_id = storage + .initiate_multipart(&id, &Default::default()) + .await + .unwrap(); + + // Upload a part then abort. + let payload = vec![0xABu8; 100]; + storage + .upload_part( + &id, + &upload_id, + 1, + payload.len() as u64, + None, + stream::single(payload), + ) + .await + .unwrap(); + + storage.abort_multipart(&id, &upload_id).await.unwrap(); + + // No tombstone should have been written. + hv.get(&id).expect_not_found(); + + // The object should not be reachable. + assert!(storage.get_object(&id).await.unwrap().is_none()); + } + + #[tokio::test] + async fn multipart_list_parts() { + let (storage, _hv, _lt, _) = make_tiered_storage(); + let id = make_id("mp-list"); + + let upload_id = storage + .initiate_multipart(&id, &Default::default()) + .await + .unwrap(); + + let part1 = vec![0xAAu8; 100]; + let part2 = vec![0xBBu8; 200]; + storage + .upload_part( + &id, + &upload_id, + 1, + part1.len() as u64, + None, + stream::single(part1), + ) + .await + .unwrap(); + storage + .upload_part( + &id, + &upload_id, + 2, + part2.len() as u64, + None, + stream::single(part2), + ) + .await + .unwrap(); + + let resp = storage + .list_parts(&id, &upload_id, None, None) + .await + .unwrap(); + assert_eq!(resp.parts.len(), 2); + assert_eq!(resp.parts[0].part_number, 1); + assert_eq!(resp.parts[0].size, 100); + assert_eq!(resp.parts[1].part_number, 2); + assert_eq!(resp.parts[1].size, 200); + } + + #[tokio::test] + async fn multipart_overwrites_existing_tombstone() { + let (storage, hv, lt, _) = make_tiered_storage(); + let id = make_id("mp-overwrite"); + + // Put a large object via the normal path. + let payload1 = vec![0xAAu8; 2 * 1024 * 1024]; + storage + .put_object(&id, &Default::default(), stream::single(payload1)) + .await + .unwrap(); + let old_lt_id = hv.get(&id).expect_tombstone().target; + + // Overwrite via multipart. + let upload_id = storage + .initiate_multipart(&id, &Default::default()) + .await + .unwrap(); + + let payload2 = vec![0xBBu8; 2 * 1024 * 1024]; + let etag = storage + .upload_part( + &id, + &upload_id, + 1, + payload2.len() as u64, + None, + stream::single(payload2.clone()), + ) + .await + .unwrap(); + + // The multipart upload is not finalized, so the tombstone still points to the old + // revision. + let lt_id = hv.get(&id).expect_tombstone().target; + assert_eq!(old_lt_id, lt_id); + + let error = storage + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag, + }], + ) + .await + .unwrap(); + assert!(error.is_none()); + + // Now the upload has been finalized, so the new tombstone points to the new revision. + let new_lt_id = hv.get(&id).expect_tombstone().target; + assert_ne!(old_lt_id, new_lt_id); + + // Wait for background cleanup. + storage.join().await; + + // Old revision should be cleaned up. + lt.get(&old_lt_id).expect_not_found(); + lt.get(&new_lt_id).expect_object(); + + // Assert the contents of the new revision. + let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); + let body = stream::read_to_vec(s).await.unwrap(); + assert_eq!(body, payload2); + } + + // --- Multipart completion failure handling (consistency, retries, delayed cleanup) --- + + /// Assembles the blob via `complete_multipart`, but returns an error to simulate a network + /// failure on the response path. Also fails `get_metadata` so the tiered layer cannot + /// recover by detecting the already-assembled blob. + #[derive(Debug)] + struct CompleteMultipartButReturnError; + + #[async_trait::async_trait] + impl Hooks for CompleteMultipartButReturnError { + async fn complete_multipart( + &self, + inner: &InMemoryBackend, + id: &ObjectId, + upload_id: &UploadId, + parts: Vec, + ) -> Result { + inner + .complete_multipart(id, upload_id, parts) + .await + .unwrap(); + Err(Error::Io(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "simulated network error on complete_multipart", + ))) + } + + async fn get_metadata( + &self, + _inner: &InMemoryBackend, + _id: &ObjectId, + ) -> Result { + Err(Error::Io(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "simulated network error on get_metadata", + ))) + } + } + + /// `complete_multipart` on the inner LT backend assembles the blob successfully, but both + /// `complete_multipart` and `get_metadata` return errors, so the tiered layer cannot finalize. + /// After `MULTIPART_COMPLETE_CLEANUP_DELAY`, `ChangeLog` recovery deletes the orphaned blob. + #[tokio::test] + async fn cleans_up_orphan_after_failed_multipart_complete() { + let hv = InMemoryBackend::new("hv"); + let lt_inner = InMemoryBackend::new("lt"); + let log = InMemoryChangeLog::default(); + let storage = TieredStorage::new( + Box::new(hv.clone()), + Box::new(TestBackend::with_inner( + lt_inner.clone(), + CompleteMultipartButReturnError {}, + )), + Box::new(log.clone()), + ); + + let id = make_id("mp-orphan"); + let upload_id = storage + .initiate_multipart(&id, &Default::default()) + .await + .unwrap(); + + let tiered_id: TieredUploadId = (&upload_id).try_into().unwrap(); + let physical = ObjectId { + context: id.context.clone(), + key: tiered_id.revision, + }; + + let payload = vec![0xABu8; 2 * 1024 * 1024]; + let etag = storage + .upload_part( + &id, + &upload_id, + 1, + payload.len() as u64, + None, + stream::single(payload), + ) + .await + .unwrap(); + + let result = storage + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag, + }], + ) + .await; + assert!(result.is_err()); + storage.join().await; + + // The LT blob is orphaned, and no cleanup has been performed (yet), due to the guard being + // dropped while in the `Assembling` state. + lt_inner.get(&physical).expect_object(); + hv.get(&id).expect_not_found(); + + // Simulate the passage of time and run recovery. + log.expire_all(); + let manager = ChangeManager::new( + Box::new(hv.clone()), + Box::new(lt_inner.clone()), + Box::new(log.clone()), + ); + manager.recover().await.unwrap(); + + // The orphaned LT blob has been cleaned up. + lt_inner.get(&physical).expect_not_found(); + // The change has been removed from the log. + let remaining = log.scan().await.unwrap(); + assert!(remaining.is_empty()); + } + + #[derive(Debug)] + struct FailOnFirstCompleteMultipartAttempt { + attempt: Mutex, + } + + impl FailOnFirstCompleteMultipartAttempt { + fn new() -> Self { + Self { + attempt: Mutex::new(0), + } + } + } + + #[async_trait::async_trait] + impl Hooks for FailOnFirstCompleteMultipartAttempt { + async fn complete_multipart( + &self, + inner: &InMemoryBackend, + id: &ObjectId, + upload_id: &UploadId, + parts: Vec, + ) -> Result { + let mut attempt = self.attempt.lock().await; + *attempt += 1; + if *attempt == 1 { + return Err(Error::Io(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "simulated network error", + ))); + } else { + return Ok(inner + .complete_multipart(id, upload_id, parts) + .await + .unwrap()); + } + } + } + + /// The first attempt to `complete_multipart` fails, which generates a `Change` entry. + /// The second call succeeds. + /// When it's time to clean up, nothing is deleted, as the `complete_multipart` eventually went + /// through before the cleanup deadline. + #[tokio::test] + async fn multipart_complete_succeeds_on_retry_and_leaves_state_consistent() { + let hv = InMemoryBackend::new("hv"); + let lt_inner = InMemoryBackend::new("lt"); + let log = InMemoryChangeLog::default(); + let storage = TieredStorage::new( + Box::new(hv.clone()), + Box::new(TestBackend::with_inner( + lt_inner.clone(), + FailOnFirstCompleteMultipartAttempt::new(), + )), + Box::new(log.clone()), + ); + + let id = make_id("mp-retry"); + let upload_id = storage + .initiate_multipart(&id, &Default::default()) + .await + .unwrap(); + + let tiered_id: TieredUploadId = (&upload_id).try_into().unwrap(); + let physical = ObjectId { + context: id.context.clone(), + key: tiered_id.revision, + }; + + let payload = vec![0xABu8; 2 * 1024 * 1024]; + let etag = storage + .upload_part( + &id, + &upload_id, + 1, + payload.len() as u64, + None, + stream::single(payload.clone()), + ) + .await + .unwrap(); + + // The first `complete_multipart` call fails. + let result = storage + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag: etag.clone(), + }], + ) + .await; + assert!(result.is_err()); + storage.join().await; + + // The second `complete_multipart` call succeeds. + let result = storage + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag, + }], + ) + .await; + assert!(result.is_ok()); + storage.join().await; + + // The object is there. + let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); + let body = stream::read_to_vec(s).await.unwrap(); + assert_eq!(body, payload); + + // Simulate the passage of time and run recovery. + log.expire_all(); + let manager = ChangeManager::new( + Box::new(hv.clone()), + Box::new(lt_inner.clone()), + Box::new(log.clone()), + ); + manager.recover().await.unwrap(); + + // The LT blob has not been cleaned up, as the write eventually went through. + lt_inner.get(&physical).expect_object(); + // The tombstone still points to the blob. + let tombstone = hv.get(&id).expect_tombstone(); + assert_eq!(tombstone.target, physical); + // The change has been removed from the log. + let remaining = log.scan().await.unwrap(); + assert!(remaining.is_empty()); + + // The object is still there after recovery. + let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); + let body = stream::read_to_vec(s).await.unwrap(); + assert_eq!(body, payload); + } + + #[derive(Debug)] + struct FailOnFirstGetMetadataAttempt { + attempt: Mutex, + } + + impl FailOnFirstGetMetadataAttempt { + fn new() -> Self { + Self { + attempt: Mutex::new(0), + } + } + } + + #[async_trait::async_trait] + impl Hooks for FailOnFirstGetMetadataAttempt { + async fn get_metadata( + &self, + inner: &InMemoryBackend, + id: &ObjectId, + ) -> Result { + let mut attempt = self.attempt.lock().await; + *attempt += 1; + if *attempt == 1 { + return Err(Error::Io(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "simulated network error", + ))); + } else { + inner.get_metadata(id).await + } + } + } + + /// The first attempt to `complete_multipart` succeeds on the LT backend, but the subsequent + /// `get_metadata` call fails with a network error, causing the overall `complete_multipart` + /// to fail. The second call retries and succeeds (the LT object already exists from the first + /// attempt). + /// When it's time to clean up, nothing is deleted, as the `complete_multipart` eventually went + /// through before the cleanup deadline. + #[tokio::test] + async fn multipart_complete_succeeds_on_retry_if_get_metadata_errs_and_leaves_state_consistent() + { + let hv = InMemoryBackend::new("hv"); + let lt_inner = InMemoryBackend::new("lt"); + let log = InMemoryChangeLog::default(); + let storage = TieredStorage::new( + Box::new(hv.clone()), + Box::new(TestBackend::with_inner( + lt_inner.clone(), + FailOnFirstGetMetadataAttempt::new(), + )), + Box::new(log.clone()), + ); + + let id = make_id("mp-retry-meta"); + let upload_id = storage + .initiate_multipart(&id, &Default::default()) + .await + .unwrap(); + + let tiered_id: TieredUploadId = (&upload_id).try_into().unwrap(); + let physical = ObjectId { + context: id.context.clone(), + key: tiered_id.revision, + }; + + let payload = vec![0xABu8; 2 * 1024 * 1024]; + let etag = storage + .upload_part( + &id, + &upload_id, + 1, + payload.len() as u64, + None, + stream::single(payload.clone()), + ) + .await + .unwrap(); + + // The first `complete_multipart` call fails (get_metadata network error), even though it + // internally creates the LT blob. + let result = storage + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag: etag.clone(), + }], + ) + .await; + assert!(result.is_err()); + storage.join().await; + + // The second `complete_multipart` call succeeds. + let result = storage + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag, + }], + ) + .await; + assert!(result.is_ok()); + storage.join().await; + + // The object is there. + let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); + let body = stream::read_to_vec(s).await.unwrap(); + assert_eq!(body, payload); + + // Simulate the passage of time and run recovery. + log.expire_all(); + let manager = ChangeManager::new( + Box::new(hv.clone()), + Box::new(lt_inner.clone()), + Box::new(log.clone()), + ); + manager.recover().await.unwrap(); + + // The LT blob has not been cleaned up, as the write eventually went through. + lt_inner.get(&physical).expect_object(); + // The tombstone still points to the blob. + let tombstone = hv.get(&id).expect_tombstone(); + assert_eq!(tombstone.target, physical); + // The change has been removed from the log. + let remaining = log.scan().await.unwrap(); + assert!(remaining.is_empty()); + + // The object is there after recovery. + let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); + let body = stream::read_to_vec(s).await.unwrap(); + assert_eq!(body, payload); + } } diff --git a/objectstore-test/src/server.rs b/objectstore-test/src/server.rs index 9b705b80..6b0036fd 100644 --- a/objectstore-test/src/server.rs +++ b/objectstore-test/src/server.rs @@ -16,7 +16,9 @@ use std::net::{SocketAddr, TcpListener}; use std::path::PathBuf; use std::sync::LazyLock; -use objectstore_server::config::{AuthZVerificationKey, Config, StorageConfig}; +use objectstore_server::config::{ + AuthZVerificationKey, Config, MultipartUploadStorageConfig, StorageConfig, +}; use objectstore_server::state::Services; use objectstore_server::web::App; use objectstore_types::auth::Permission; @@ -134,7 +136,11 @@ fn replace_fs_paths(config: &mut StorageConfig, tempdirs: &mut Vec) { tempdirs.push(dir); } StorageConfig::Tiered(c) => { - replace_fs_paths(&mut c.long_term, tempdirs); + if let MultipartUploadStorageConfig::FileSystem(fs) = &mut c.long_term { + let dir = tempfile::tempdir().unwrap(); + fs.path = dir.path().into(); + tempdirs.push(dir); + } } StorageConfig::S3Compatible(_) | StorageConfig::Gcs(_) | StorageConfig::BigTable(_) => {} }