From 15530ba0939ba15cff1c4df918cac8db47dd0216 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 3 Jun 2026 17:10:10 +0200 Subject: [PATCH 1/7] feat: objectstore_service::Error refactoring --- objectstore-server/src/endpoints/common.rs | 43 +-- objectstore-service/src/backend/bigtable.rs | 76 ++--- objectstore-service/src/backend/common.rs | 2 +- objectstore-service/src/backend/gcs.rs | 156 +++++----- objectstore-service/src/backend/in_memory.rs | 11 +- objectstore-service/src/backend/local_fs.rs | 73 ++--- .../src/backend/s3_compatible.rs | 70 ++--- objectstore-service/src/backend/tiered.rs | 63 ++-- objectstore-service/src/concurrency.rs | 13 +- objectstore-service/src/error.rs | 273 +++++++++--------- objectstore-service/src/service.rs | 18 +- objectstore-service/src/streaming.rs | 4 +- 12 files changed, 393 insertions(+), 409 deletions(-) diff --git a/objectstore-server/src/endpoints/common.rs b/objectstore-server/src/endpoints/common.rs index ab6a8240..c43d873f 100644 --- a/objectstore-server/src/endpoints/common.rs +++ b/objectstore-server/src/endpoints/common.rs @@ -5,7 +5,7 @@ use std::error::Error; use axum::Json; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; -use objectstore_service::error::Error as ServiceError; +use objectstore_service::error::{Error as ServiceError, ErrorKind}; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -24,7 +24,7 @@ pub enum ApiError { Auth(#[from] AuthError), /// Service errors, indicating that something went wrong when receiving or executing a request. - #[error("service error: {0}")] + #[error(transparent)] Service(#[from] ServiceError), /// Errors encountered when parsing or executing a batch request. @@ -51,18 +51,15 @@ pub struct ApiErrorResponse { } impl ApiErrorResponse { - /// Creates an error response from an error, extracting the full cause chain. + /// Creates an error response from an error, using only its [`Display`] message. + /// + /// The source chain is intentionally excluded from the HTTP response — it is + /// recorded in our internal logs via tracing instead. pub fn from_error(error: &E) -> Self { - let detail = Some(error.to_string()); - - let mut causes = Vec::new(); - let mut source = error.source(); - while let Some(s) = source { - causes.push(s.to_string()); - source = s.source(); + Self { + detail: Some(error.to_string()), + causes: Vec::new(), } - - Self { detail, causes } } } @@ -91,15 +88,19 @@ impl ApiError { StatusCode::INTERNAL_SERVER_ERROR } - ApiError::Service(ServiceError::Client(_)) => StatusCode::BAD_REQUEST, - ApiError::Service(ServiceError::Metadata(_)) => StatusCode::BAD_REQUEST, - ApiError::Service(ServiceError::InvalidUploadId(_)) => StatusCode::BAD_REQUEST, - ApiError::Service(ServiceError::AtCapacity) => StatusCode::TOO_MANY_REQUESTS, - ApiError::Service(ServiceError::NotImplemented) => StatusCode::NOT_IMPLEMENTED, - ApiError::Service(_) => { - objectstore_log::error!(!!self, "error handling request"); - StatusCode::INTERNAL_SERVER_ERROR - } + ApiError::Service(e) => match e.kind() { + ErrorKind::ClientStream | ErrorKind::BadRequest => StatusCode::BAD_REQUEST, + ErrorKind::TooManyRequests => StatusCode::TOO_MANY_REQUESTS, + ErrorKind::NotImplemented => StatusCode::NOT_IMPLEMENTED, + ErrorKind::Transient => { + objectstore_log::warn!(!!self, "transient error handling request"); + StatusCode::SERVICE_UNAVAILABLE + } + ErrorKind::Internal => { + objectstore_log::error!(!!self, "internal error handling request"); + StatusCode::INTERNAL_SERVER_ERROR + } + }, ApiError::Internal(_) => { objectstore_log::error!(!!self, "internal error"); diff --git a/objectstore-service/src/backend/bigtable.rs b/objectstore-service/src/backend/bigtable.rs index 56b609fb..b3f7d177 100644 --- a/objectstore-service/src/backend/bigtable.rs +++ b/objectstore-service/src/backend/bigtable.rs @@ -458,7 +458,7 @@ fn object_mutations( }; let metadata_bytes = serde_json::to_vec(metadata) - .map_err(|cause| Error::serde("failed to serialize metadata", cause))?; + .map_err(|e| Error::internal("failed to serialize metadata", e))?; Ok([ // NB: We explicitly delete the row to clear metadata on overwrite. @@ -520,7 +520,7 @@ fn tombstone_mutations(tombstone: &Tombstone, now: SystemTime) -> Result<[v2::Mu column_qualifier: COLUMN_TOMBSTONE_META.to_owned(), timestamp_micros, value: serde_json::to_vec(&tombstone_meta) - .map_err(|cause| Error::serde("failed to serialize tombstone", cause))?, + .map_err(|e| Error::internal("failed to serialize tombstone", e))?, })), ]) } @@ -593,8 +593,8 @@ impl RowData { } COLUMN_TOMBSTONE_META => { tombstone_meta_opt = - Some(serde_json::from_slice(&cell.value).map_err(|cause| { - Error::serde("failed to deserialize tombstone meta", cause) + Some(serde_json::from_slice(&cell.value).map_err(|e| { + Error::internal("failed to deserialize tombstone meta", e) })?); } COLUMN_METADATA => { @@ -609,8 +609,8 @@ impl RowData { }); } else { metadata_opt = - Some(serde_json::from_slice(&cell.value).map_err(|cause| { - Error::serde("failed to deserialize metadata", cause) + Some(serde_json::from_slice(&cell.value).map_err(|e| { + Error::internal("failed to deserialize metadata", e) })?); } } @@ -675,9 +675,9 @@ fn parse_redirect_target(redirect_path: &[u8], tombstone_id: &ObjectId) -> Resul Ok(tombstone_id.clone()) } else { let redirect_str = std::str::from_utf8(redirect_path) - .map_err(|_| Error::generic("invalid UTF-8 in redirect path"))?; + .map_err(|_| Error::internal_msg("invalid UTF-8 in redirect path"))?; ObjectId::from_storage_path(redirect_str) - .ok_or_else(|| Error::generic("corrupt redirect path")) + .ok_or_else(|| Error::internal_msg("corrupt redirect path")) } } @@ -912,7 +912,7 @@ impl Backend for BigTableBackend { async fn get_object(&self, id: &ObjectId) -> Result { match self.get_tiered_object(id).await? { TieredGet::Object(metadata, payload) => Ok(Some((metadata, payload))), - TieredGet::Tombstone(_) => Err(Error::UnexpectedTombstone), + TieredGet::Tombstone(_) => Err(Error::internal_msg("unexpected tombstone")), TieredGet::NotFound => Ok(None), } } @@ -921,7 +921,7 @@ impl Backend for BigTableBackend { async fn get_metadata(&self, id: &ObjectId) -> Result { match self.get_tiered_metadata(id).await? { TieredMetadata::Object(metadata) => Ok(Some(metadata)), - TieredMetadata::Tombstone(_) => Err(Error::UnexpectedTombstone), + TieredMetadata::Tombstone(_) => Err(Error::internal_msg("unexpected tombstone")), TieredMetadata::NotFound => Ok(None), } } @@ -984,7 +984,9 @@ impl HighVolumeBackend for BigTableBackend { } } - Err(Error::generic("BigTable: race loop in put_non_tombstone")) + Err(Error::internal_msg( + "BigTable: race loop in put_non_tombstone", + )) } #[tracing::instrument(level = "trace", fields(?id), skip_all)] @@ -1079,7 +1081,7 @@ impl HighVolumeBackend for BigTableBackend { } } - Err(Error::generic( + Err(Error::internal_msg( "BigTable: race loop in delete_non_tombstone", )) } @@ -1120,27 +1122,30 @@ impl HighVolumeBackend for BigTableBackend { /// required by BigTable, the resulting timestamp has millisecond precision, with the last digits at /// 0. fn ttl_to_micros(ttl: Duration, from: SystemTime) -> Result { - let deadline = from.checked_add(ttl).ok_or_else(|| Error::Generic { - context: format!( + let deadline = from.checked_add(ttl).ok_or_else(|| { + Error::internal_msg(format!( "TTL duration overflow: {} plus {}s cannot be represented as SystemTime", humantime::format_rfc3339_seconds(from), ttl.as_secs() - ), - cause: None, + )) })?; let millis = deadline .duration_since(SystemTime::UNIX_EPOCH) - .map_err(|e| Error::Generic { - context: format!( - "unable to get duration since UNIX_EPOCH for SystemTime {}", - humantime::format_rfc3339_seconds(deadline) - ), - cause: Some(Box::new(e)), + .map_err(|e| { + Error::internal( + format!( + "unable to get duration since UNIX_EPOCH for SystemTime {}", + humantime::format_rfc3339_seconds(deadline) + ), + e, + ) })? .as_millis(); - (millis * 1000).try_into().map_err(|e| Error::Generic { - context: format!("failed to convert {}ms to i64 microseconds", millis), - cause: Some(Box::new(e)), + (millis * 1000).try_into().map_err(|e| { + Error::internal( + format!("failed to convert {}ms to i64 microseconds", millis), + e, + ) }) } @@ -1163,10 +1168,7 @@ where Ok(res) => return Ok(res), Err(e) if retry_count >= REQUEST_RETRY_COUNT || !is_retryable(&e) => { objectstore_metrics::count!("bigtable.failures", action = context); - return Err(Error::Generic { - context: format!("Bigtable: `{context}` failed"), - cause: Some(Box::new(e)), - }); + return Err(Error::internal(format!("Bigtable: `{context}` failed"), e)); } Err(e) => { retry_count += 1; @@ -1742,14 +1744,14 @@ mod tests { } // Legacy reads must error rather than leak tombstone data. - assert!(matches!( - backend.get_object(&hv_id).await, - Err(Error::UnexpectedTombstone) - )); - assert!(matches!( - backend.get_metadata(&hv_id).await, - Err(Error::UnexpectedTombstone) - )); + let Err(err) = backend.get_object(&hv_id).await else { + panic!("expected error"); + }; + assert_eq!(err.description.as_deref(), Some("unexpected tombstone")); + let Err(err) = backend.get_metadata(&hv_id).await else { + panic!("expected error"); + }; + assert_eq!(err.description.as_deref(), Some("unexpected tombstone")); // Idempotent retry: retry with the same target succeeds let second = backend diff --git a/objectstore-service/src/backend/common.rs b/objectstore-service/src/backend/common.rs index 8efad5da..071525b0 100644 --- a/objectstore-service/src/backend/common.rs +++ b/objectstore-service/src/backend/common.rs @@ -69,7 +69,7 @@ pub trait Backend: fmt::Debug + Send + Sync + 'static { /// The default returns [`Error::NotImplemented`]. Backends that implement /// [`MultipartUploadBackend`] should override this to return `Ok(self)`. fn as_multipart_upload_backend(self: Arc) -> Result> { - Err(Error::NotImplemented) + Err(Error::not_implemented()) } } diff --git a/objectstore-service/src/backend/gcs.rs b/objectstore-service/src/backend/gcs.rs index 43cc45c1..128125ad 100644 --- a/objectstore-service/src/backend/gcs.rs +++ b/objectstore-service/src/backend/gcs.rs @@ -184,21 +184,28 @@ impl GcsObject { .metadata .remove(&GcsMetaKey::Expiration) .map(|s| s.parse()) - .transpose()? + .transpose() + .map_err(|e| Error::internal("GCS: failed to parse expiration", e))? .unwrap_or_default(); let origin = self.metadata.remove(&GcsMetaKey::Origin); let content_type = self.content_type; - let compression = self.content_encoding.map(|s| s.parse()).transpose()?; + let compression = self + .content_encoding + .map(|s| s.parse()) + .transpose() + .map_err(|e| { + Error::internal( + "GCS: failed to parse content-encoding from object metadata", + e, + ) + })?; let size = self .size .map(|size| size.parse()) .transpose() - .map_err(|e| Error::Generic { - context: "GCS: failed to parse size from object metadata".to_string(), - cause: Some(Box::new(e)), - })?; + .map_err(|e| Error::internal("GCS: failed to parse size from object metadata", e))?; let time_created = self.time_created; // At this point, all built-in metadata should have been removed from self.metadata. @@ -207,13 +214,10 @@ impl GcsObject { if let GcsMetaKey::Custom(custom_key) = key { custom.insert(custom_key, value); } else { - return Err(Error::Generic { - context: format!( - "GCS: unexpected built-in metadata key in object metadata: {}", - key - ), - cause: None, - }); + return Err(Error::internal_msg(format!( + "GCS: unexpected built-in metadata key in object metadata: {}", + key + ))); } } @@ -302,10 +306,10 @@ fn metadata_to_gcs_headers(metadata: &Metadata) -> Result { let formatted = humantime::format_rfc3339_seconds(custom_time); headers.insert( HeaderName::from_static("x-goog-custom-time"), - formatted.to_string().parse().map_err(|e| Error::Generic { - context: "GCS: invalid custom-time header value".into(), - cause: Some(Box::new(e)), - })?, + formatted + .to_string() + .parse() + .map_err(|e| Error::internal("GCS: invalid custom-time header value", e))?, ); } @@ -315,10 +319,7 @@ fn metadata_to_gcs_headers(metadata: &Metadata) -> Result { compression .to_string() .parse() - .map_err(|e| Error::Generic { - context: "GCS: invalid content-encoding header value".into(), - cause: Some(Box::new(e)), - })?, + .map_err(|e| Error::internal("GCS: invalid content-encoding header value", e))?, ); } @@ -348,13 +349,10 @@ fn insert_gcs_meta_header( ) -> Result<()> { let header_name = format!("x-goog-meta-{key}"); headers.insert( - HeaderName::try_from(&header_name).map_err(|e| Error::Generic { - context: format!("GCS: invalid header name: {header_name}"), - cause: Some(Box::new(e)), - })?, - value.parse().map_err(|e| Error::Generic { - context: format!("GCS: invalid header value for {header_name}"), - cause: Some(Box::new(e)), + HeaderName::try_from(&header_name) + .map_err(|e| Error::internal(format!("GCS: invalid header name: {header_name}"), e))?, + value.parse().map_err(|e| { + Error::internal(format!("GCS: invalid header value for {header_name}"), e) })?, ); Ok(()) @@ -362,7 +360,11 @@ fn insert_gcs_meta_header( /// Returns `true` if the error is a transient reqwest failure worth retrying. fn is_retryable(error: &Error) -> bool { - let Error::Reqwest { cause, .. } = error else { + let Some(cause) = error + .source + .as_ref() + .and_then(|s| s.downcast_ref::()) + else { return false; }; if cause.is_timeout() || cause.is_connect() || cause.is_request() { @@ -418,12 +420,11 @@ impl GcsBackend { let path = id.as_storage_path().to_string(); url.path_segments_mut() - .map_err(|()| Error::Generic { - context: format!( + .map_err(|()| { + Error::internal_msg(format!( "GCS: invalid endpoint URL, {} cannot be a base", self.endpoint - ), - cause: None, + )) })? .extend(&["storage", "v1", "b", &self.bucket, "o", &path]); @@ -435,12 +436,11 @@ impl GcsBackend { let mut url = self.endpoint.clone(); url.path_segments_mut() - .map_err(|()| Error::Generic { - context: format!( + .map_err(|()| { + Error::internal_msg(format!( "GCS: invalid endpoint URL, {} cannot be a base", self.endpoint - ), - cause: None, + )) })? .extend(&["upload", "storage", "v1", "b", &self.bucket, "o"]); @@ -460,12 +460,11 @@ impl GcsBackend { fn xml_object_url(&self, id: &ObjectId) -> Result { let mut url = self.endpoint.clone(); { - let mut segments = url.path_segments_mut().map_err(|()| Error::Generic { - context: format!( + let mut segments = url.path_segments_mut().map_err(|()| { + Error::internal_msg(format!( "GCS: invalid endpoint URL, {} cannot be a base", self.endpoint - ), - cause: None, + )) })?; segments.push(&self.bucket); for part in id.as_storage_path().to_string().split('/') { @@ -479,7 +478,10 @@ impl GcsBackend { async fn request(&self, method: Method, url: impl IntoUrl) -> Result { let mut builder = self.client.request(method, url); if let Some(provider) = &self.token_provider { - let token = provider.token(TOKEN_SCOPES).await?; + let token = provider + .token(TOKEN_SCOPES) + .await + .map_err(|e| Error::internal("GCS: authentication failed", e))?; builder = builder.bearer_auth(token.as_str()); } Ok(builder) @@ -517,7 +519,7 @@ impl GcsBackend { .await? .send() .await - .map_err(|e| Error::reqwest("GCS: get metadata request", e))?; + .map_err(|e| Error::internal("GCS: get metadata request", e))?; if resp.status() == StatusCode::NOT_FOUND { return Ok(None); @@ -525,10 +527,10 @@ impl GcsBackend { let metadata: GcsObject = resp .error_for_status() - .map_err(|e| Error::reqwest("GCS: get metadata status", e))? + .map_err(|e| Error::internal("GCS: get metadata status", e))? .json() .await - .map_err(|e| Error::reqwest("GCS: get metadata parse", e))?; + .map_err(|e| Error::internal("GCS: get metadata parse", e))?; Ok(Some(metadata)) }) @@ -578,7 +580,7 @@ impl GcsBackend { .send() .await .and_then(|r| r.error_for_status()) - .map_err(|e| Error::reqwest("GCS: update custom time", e))?; + .map_err(|e| Error::internal("GCS: update custom time", e))?; Ok(()) }) .await @@ -616,10 +618,8 @@ impl Backend for GcsBackend { // NB: Ensure the order of these fields and that a content-type is attached to them. Both // are required by the GCS API. - let metadata_json = serde_json::to_string(&gcs_metadata).map_err(|cause| Error::Serde { - context: "failed to serialize metadata for GCS upload".to_string(), - cause, - })?; + let metadata_json = serde_json::to_string(&gcs_metadata) + .map_err(|e| Error::internal("GCS: failed to serialize metadata", e))?; let multipart = multipart::Form::new() .part( @@ -632,9 +632,11 @@ impl Backend for GcsBackend { "media", multipart::Part::stream(Body::wrap_stream(stream)) .mime_str(&metadata.content_type) - .map_err(|e| Error::Generic { - context: format!("invalid mime type: {}", &metadata.content_type), - cause: Some(Box::new(e)), + .map_err(|e| { + Error::internal( + format!("GCS: invalid mime type: {}", &metadata.content_type), + e, + ) })?, ); @@ -651,8 +653,8 @@ impl Backend for GcsBackend { .await .and_then(|r| r.error_for_status()) .map_err(|e| match stream::unpack_client_error(&e) { - Some(ce) => Error::Client(ce), - _ => Error::reqwest("GCS: upload object", e), + Some(ce) => Error::client_stream(ce), + _ => Error::internal("error uploading upload object", e), })?; Ok(()) @@ -677,7 +679,7 @@ impl Backend for GcsBackend { .send() .await .and_then(|r| r.error_for_status()) - .map_err(|e| Error::reqwest("GCS: get payload", e)) + .map_err(|e| Error::internal("GCS: get payload", e)) }) .await?; @@ -707,7 +709,7 @@ impl Backend for GcsBackend { .await? .send() .await - .map_err(|e| Error::reqwest("GCS: delete object", e))?; + .map_err(|e| Error::internal("GCS: delete object", e))?; // Do not error for objects that do not exist if resp.status() == StatusCode::NOT_FOUND { @@ -715,7 +717,7 @@ impl Backend for GcsBackend { } resp.error_for_status() - .map_err(|e| Error::reqwest("GCS: delete object", e))?; + .map_err(|e| Error::internal("GCS: delete object", e))?; Ok(()) }) @@ -733,7 +735,8 @@ impl TryFrom for InitiateMultipartResponse { type Error = crate::error::Error; fn try_from(r: XmlInitiateMultipartUploadResponse) -> crate::error::Result { - Ok(UploadId::new(r.upload_id)?) + UploadId::new(r.upload_id) + .map_err(|e| Error::internal("GCS: invalid upload ID in response", e)) } } @@ -857,18 +860,15 @@ impl MultipartUploadBackend for GcsBackend { .send() .await .and_then(|r| r.error_for_status()) - .map_err(|e| Error::reqwest("GCS: initiate multipart upload", e))?; + .map_err(|e| Error::internal("GCS: initiate multipart upload", e))?; let body = resp .bytes() .await - .map_err(|e| Error::reqwest("GCS: read initiate multipart body", e))?; + .map_err(|e| Error::internal("GCS: read initiate multipart body", e))?; let xml: XmlInitiateMultipartUploadResponse = quick_xml::de::from_reader(body.as_ref()) - .map_err(|e| Error::Generic { - context: "GCS: failed to parse initiate multipart response".to_owned(), - cause: Some(Box::new(e)), - })?; + .map_err(|e| Error::internal("GCS: failed to parse initiate multipart response", e))?; Ok(xml.try_into()?) } @@ -903,14 +903,14 @@ impl MultipartUploadBackend for GcsBackend { .send() .await .and_then(|r| r.error_for_status()) - .map_err(|e| Error::reqwest("GCS: upload part", e))?; + .map_err(|e| Error::internal("GCS: upload part", e))?; let etag = resp .headers() .get(header::ETAG) .and_then(|v| v.to_str().ok()) .map(|s| s.to_owned()) - .ok_or_else(|| Error::generic("GCS: upload part response missing ETag header"))?; + .ok_or_else(|| Error::internal_msg("GCS: upload part response missing ETag header"))?; Ok(etag) } @@ -942,18 +942,15 @@ impl MultipartUploadBackend for GcsBackend { .send() .await .and_then(|r| r.error_for_status()) - .map_err(|e| Error::reqwest("GCS: list parts", e))?; + .map_err(|e| Error::internal("GCS: list parts", e))?; let body = resp .bytes() .await - .map_err(|e| Error::reqwest("GCS: read list parts body", e))?; + .map_err(|e| Error::internal("GCS: read list parts body", e))?; - let xml: XmlListPartsResponse = - quick_xml::de::from_reader(body.as_ref()).map_err(|e| Error::Generic { - context: "GCS: failed to parse list parts response".to_owned(), - cause: Some(Box::new(e)), - })?; + let xml: XmlListPartsResponse = quick_xml::de::from_reader(body.as_ref()) + .map_err(|e| Error::internal("GCS: failed to parse list parts response", e))?; Ok(xml.into()) } @@ -973,7 +970,7 @@ impl MultipartUploadBackend for GcsBackend { .send() .await .and_then(|r| r.error_for_status()) - .map_err(|e| Error::reqwest("GCS: abort multipart upload", e))?; + .map_err(|e| Error::internal("GCS: abort multipart upload", e))?; Ok(()) } @@ -990,9 +987,8 @@ impl MultipartUploadBackend for GcsBackend { url.query_pairs_mut().append_pair("uploadId", upload_id); let body = XmlCompleteMultipartUpload::from(parts); - let xml = quick_xml::se::to_string(&body).map_err(|e| Error::Generic { - context: "GCS: failed to serialize complete multipart request".into(), - cause: Some(Box::new(e)), + let xml = quick_xml::se::to_string(&body).map_err(|e| { + Error::internal("GCS: failed to serialize complete multipart request", e) })?; let resp = self @@ -1003,12 +999,12 @@ impl MultipartUploadBackend for GcsBackend { .send() .await .and_then(|r| r.error_for_status()) - .map_err(|e| Error::reqwest("GCS: complete multipart upload", e))?; + .map_err(|e| Error::internal("GCS: complete multipart upload", e))?; let body = resp .bytes() .await - .map_err(|e| Error::reqwest("GCS: read complete multipart body", e))?; + .map_err(|e| Error::internal("GCS: read complete multipart body", e))?; let error = quick_xml::de::from_reader::<_, XmlError>(body.as_ref()) .ok() diff --git a/objectstore-service/src/backend/in_memory.rs b/objectstore-service/src/backend/in_memory.rs index 9fc1499a..eb94d813 100644 --- a/objectstore-service/src/backend/in_memory.rs +++ b/objectstore-service/src/backend/in_memory.rs @@ -126,7 +126,7 @@ impl super::common::Backend for InMemoryBackend { let entry = self.store.lock().unwrap().get(id).cloned(); match entry { None => Ok(None), - Some(StoreEntry::Tombstone(_)) => Err(Error::UnexpectedTombstone), + Some(StoreEntry::Tombstone(_)) => Err(Error::internal_msg("unexpected tombstone")), Some(StoreEntry::Object(mut metadata, bytes)) => { metadata.size = Some(bytes.len()); let stream = crate::stream::single(bytes); @@ -228,7 +228,8 @@ impl MultipartUploadBackend for InMemoryBackend { id: &ObjectId, metadata: &Metadata, ) -> Result { - let upload_id = UploadId::new(uuid::Uuid::now_v7().to_string())?; + let upload_id = UploadId::new(uuid::Uuid::now_v7().to_string()) + .map_err(|e| Error::bad_request("invalid upload ID", e))?; let upload = MultipartUpload { metadata: metadata.clone(), parts: BTreeMap::new(), @@ -256,7 +257,7 @@ impl MultipartUploadBackend for InMemoryBackend { let mut store = self.multipart_store.lock().unwrap(); let upload = store .get_mut(&(id.clone(), upload_id.clone())) - .ok_or_else(|| Error::generic("multipart upload not found"))?; + .ok_or_else(|| Error::bad_request_msg("multipart upload not found"))?; upload.parts.insert( part_number, @@ -280,7 +281,7 @@ impl MultipartUploadBackend for InMemoryBackend { let store = self.multipart_store.lock().unwrap(); let upload = store .get(&(id.clone(), upload_id.clone())) - .ok_or_else(|| Error::generic("multipart upload not found"))?; + .ok_or_else(|| Error::bad_request_msg("multipart upload not found"))?; let iter = upload .parts @@ -345,7 +346,7 @@ impl MultipartUploadBackend for InMemoryBackend { let store = self.multipart_store.lock().unwrap(); let upload = store .get(&key) - .ok_or_else(|| Error::generic("multipart upload not found"))?; + .ok_or_else(|| Error::bad_request_msg("multipart upload not found"))?; for completed in &parts { match upload.parts.get(&completed.part_number) { diff --git a/objectstore-service/src/backend/local_fs.rs b/objectstore-service/src/backend/local_fs.rs index 533b616e..fa5c85f1 100644 --- a/objectstore-service/src/backend/local_fs.rs +++ b/objectstore-service/src/backend/local_fs.rs @@ -96,18 +96,15 @@ impl Backend for LocalFsBackend { let mut reader = pin!(StreamReader::new(stream)); let mut writer = BufWriter::new(file); - let metadata_json = serde_json::to_string(metadata).map_err(|cause| Error::Serde { - context: "failed to serialize metadata".to_string(), - cause, - })?; + let metadata_json = serde_json::to_string(metadata)?; writer.write_all(metadata_json.as_bytes()).await?; writer.write_all(b"\n").await?; tokio::io::copy(&mut reader, &mut writer) .await .map_err(|e| match stream::unpack_client_error(&e) { - Some(ce) => Error::Client(ce), - None => e.into(), + Some(ce) => Error::client_stream(ce), + None => Error::from(e), })?; writer.flush().await?; @@ -136,14 +133,10 @@ impl Backend for LocalFsBackend { let mut metadata_line = String::new(); reader.read_line(&mut metadata_line).await?; let file_len = reader.get_ref().metadata().await?.len(); - let mut metadata: Metadata = - serde_json::from_str(metadata_line.trim_end()).map_err(|cause| Error::Serde { - context: "failed to deserialize metadata".to_string(), - cause, - })?; + let mut metadata: Metadata = serde_json::from_str(metadata_line.trim_end())?; let payload_size = file_len .checked_sub(metadata_line.len() as u64) - .ok_or_else(|| Error::generic("local-fs file corrupted: shorter than header"))?; + .ok_or_else(|| Error::internal_msg("local-fs file corrupted: shorter than header"))?; metadata.size = Some(payload_size as usize); let stream = ReaderStream::new(reader); @@ -180,15 +173,13 @@ impl MultipartUploadBackend for LocalFsBackend { id: &ObjectId, metadata: &Metadata, ) -> Result { - let upload_id = UploadId::new(uuid::Uuid::now_v7().to_string())?; + let upload_id = UploadId::new(uuid::Uuid::now_v7().to_string()) + .map_err(|e| Error::bad_request("invalid upload ID", e))?; let dir = self.multipart_dir(id, &upload_id); tokio::fs::create_dir_all(&dir).await?; let meta_path = dir.join("metadata.json"); - let metadata_json = serde_json::to_string(metadata).map_err(|cause| Error::Serde { - context: "failed to serialize multipart metadata".to_string(), - cause, - })?; + let metadata_json = serde_json::to_string(metadata)?; tokio::fs::write(meta_path, metadata_json).await?; Ok(upload_id) @@ -205,7 +196,7 @@ impl MultipartUploadBackend for LocalFsBackend { ) -> Result { let dir = self.multipart_dir(id, upload_id); if !tokio::fs::try_exists(&dir).await? { - return Err(Error::generic("multipart upload not found")); + return Err(Error::bad_request_msg("multipart upload not found")); } let etag = format!("\"etag-{part_number}-{content_length}\""); @@ -215,10 +206,7 @@ impl MultipartUploadBackend for LocalFsBackend { "uploaded_at": SystemTime::now(), "size": content_length, }); - let header_line = serde_json::to_string(&header).map_err(|cause| Error::Serde { - context: "failed to serialize part header".to_string(), - cause, - })?; + let header_line = serde_json::to_string(&header)?; let part_path = dir.join(format!("{part_number}.part")); let file = OpenOptions::new() @@ -233,16 +221,18 @@ impl MultipartUploadBackend for LocalFsBackend { writer.write_all(header_line.as_bytes()).await?; writer.write_all(b"\n").await?; - let _bytes_copied = tokio::io::copy(&mut reader, &mut writer) + let bytes_copied = tokio::io::copy(&mut reader, &mut writer) .await .map_err(|e| match stream::unpack_client_error(&e) { - Some(ce) => Error::Client(ce), - None => e.into(), + Some(ce) => Error::client_stream(ce), + None => Error::from(e), })?; - // TODO: validate bytes_copied against content_length and return a BadRequest-style - // error. Needs a service-layer error variant that maps to HTTP 400 without abusing - // ClientError (which is meant for stream errors). + if bytes_copied != content_length { + return Err(Error::bad_request_msg(format!( + "content-length mismatch: expected {content_length} bytes, got {bytes_copied}" + ))); + } writer.flush().await?; let file = writer.into_inner(); @@ -261,7 +251,7 @@ impl MultipartUploadBackend for LocalFsBackend { ) -> Result { let dir = self.multipart_dir(id, upload_id); if !tokio::fs::try_exists(&dir).await? { - return Err(Error::generic("multipart upload not found")); + return Err(Error::bad_request_msg("multipart upload not found")); } let mut entries = tokio::fs::read_dir(&dir).await?; @@ -285,11 +275,7 @@ impl MultipartUploadBackend for LocalFsBackend { let mut reader = BufReader::new(file); let mut header_line = String::new(); reader.read_line(&mut header_line).await?; - let header: serde_json::Value = - serde_json::from_str(header_line.trim_end()).map_err(|cause| Error::Serde { - context: "failed to deserialize part header".to_string(), - cause, - })?; + let header: serde_json::Value = serde_json::from_str(header_line.trim_end())?; parts.push(Part { part_number: pn, @@ -339,17 +325,13 @@ impl MultipartUploadBackend for LocalFsBackend { ) -> Result { let dir = self.multipart_dir(id, upload_id); if !tokio::fs::try_exists(&dir).await? { - return Err(Error::generic("multipart upload not found")); + return Err(Error::bad_request_msg("multipart upload not found")); } // Read metadata let meta_path = dir.join("metadata.json"); let meta_bytes = tokio::fs::read(&meta_path).await?; - let metadata: Metadata = - serde_json::from_slice(&meta_bytes).map_err(|cause| Error::Serde { - context: "failed to deserialize multipart metadata".to_string(), - cause, - })?; + let metadata: Metadata = serde_json::from_slice(&meta_bytes)?; // TODO: validate that parts are in ascending part_number order and reject with // InvalidPartOrder if not (matches S3/GCS behavior). Needs a proper client error variant. @@ -368,11 +350,7 @@ impl MultipartUploadBackend for LocalFsBackend { let mut reader = BufReader::new(file); let mut header_line = String::new(); reader.read_line(&mut header_line).await?; - let header: serde_json::Value = - serde_json::from_str(header_line.trim_end()).map_err(|cause| Error::Serde { - context: "failed to deserialize part header".to_string(), - cause, - })?; + let header: serde_json::Value = serde_json::from_str(header_line.trim_end())?; let stored_etag = header["etag"].as_str().unwrap_or(""); if stored_etag != completed.etag { @@ -397,10 +375,7 @@ impl MultipartUploadBackend for LocalFsBackend { .await?; let mut writer = BufWriter::new(file); - let metadata_json = serde_json::to_string(&metadata).map_err(|cause| Error::Serde { - context: "failed to serialize metadata".to_string(), - cause, - })?; + let metadata_json = serde_json::to_string(&metadata)?; writer.write_all(metadata_json.as_bytes()).await?; writer.write_all(b"\n").await?; diff --git a/objectstore-service/src/backend/s3_compatible.rs b/objectstore-service/src/backend/s3_compatible.rs index 51069406..5a37ef51 100644 --- a/objectstore-service/src/backend/s3_compatible.rs +++ b/objectstore-service/src/backend/s3_compatible.rs @@ -120,16 +120,21 @@ impl S3CompatibleBackend { } /// Wraps [`Metadata::to_headers`] with GCS-specific concerns (tombstone + custom-time). -fn metadata_to_gcs_headers( - metadata: &Metadata, - prefix: &str, -) -> Result { - let mut headers = metadata.to_headers(prefix)?; +fn metadata_to_gcs_headers(metadata: &Metadata, prefix: &str) -> Result { + let mut headers = metadata + .to_headers(prefix) + .map_err(|e| Error::internal("S3: failed to serialize metadata headers", e))?; // GCS custom-time for lifecycle expiration if let Some(expires_in) = metadata.expiration_policy.expires_in() { let expires_at = humantime::format_rfc3339_seconds(std::time::SystemTime::now() + expires_in); - headers.append(GCS_CUSTOM_TIME, expires_at.to_string().parse()?); + headers.append( + GCS_CUSTOM_TIME, + expires_at + .to_string() + .parse() + .map_err(|e| Error::internal("S3: invalid custom-time header value", e))?, + ); } Ok(headers) } @@ -146,9 +151,11 @@ where provider .get_token() .await - .map_err(|err| Error::Generic { - context: "S3: failed to get authentication token".to_owned(), - cause: Some(err.into()), + .map_err(|err| { + Error::internal( + "S3: failed to get authentication token", + std::io::Error::other(err), + ) })? .as_str(), ); @@ -171,10 +178,7 @@ where .await? .send() .await - .map_err(|cause| Error::Reqwest { - context: "S3: failed to send request".to_string(), - cause, - })?; + .map_err(|e| Error::internal("S3: failed to send request", e))?; if response.status() == StatusCode::NOT_FOUND { objectstore_log::debug!("Object not found"); @@ -183,13 +187,11 @@ where let response = response .error_for_status() - .map_err(|cause| Error::Reqwest { - context: "S3: failed to get object".to_string(), - cause, - })?; + .map_err(|e| Error::internal("S3: failed to get object", e))?; let headers = response.headers(); - let mut metadata = Metadata::from_headers(headers, GCS_CUSTOM_PREFIX)?; + let mut metadata = Metadata::from_headers(headers, GCS_CUSTOM_PREFIX) + .map_err(|e| Error::internal("S3: failed to parse metadata from headers", e))?; metadata.size = response.content_length().map(|len| len as usize); // TODO: Schedule into background persistently so this doesn't get lost on restarts @@ -225,14 +227,13 @@ where .headers(metadata_to_gcs_headers(metadata, GCS_CUSTOM_PREFIX)?) .send() .await - .map_err(|cause| Error::Reqwest { - context: "S3: failed to send TTI update request".to_string(), - cause, - })? + .map_err(|e| Error::internal("S3: failed to send TTI update request", e))? .error_for_status() - .map_err(|cause| Error::Reqwest { - context: "S3: failed to update expiration time for object with TTI".to_string(), - cause, + .map_err(|e| { + Error::internal( + "S3: failed to update expiration time for object with TTI", + e, + ) })?; Ok(()) @@ -282,12 +283,9 @@ impl Backend for S3CompatibleBackend { .send() .await .and_then(|response| response.error_for_status()) - .map_err(|cause| match stream::unpack_client_error(&cause) { - Some(ce) => Error::Client(ce), - _ => Error::Reqwest { - context: "S3: failed to put object".to_string(), - cause, - }, + .map_err(|e| match stream::unpack_client_error(&e) { + Some(ce) => Error::client_stream(ce), + _ => Error::internal("S3: failed to put object", e), })?; Ok(()) @@ -320,20 +318,14 @@ impl Backend for S3CompatibleBackend { .await? .send() .await - .map_err(|cause| Error::Reqwest { - context: "S3: failed to send delete request".to_string(), - cause, - })?; + .map_err(|e| Error::internal("S3: failed to send delete request", e))?; // Do not error for objects that do not exist. if response.status() != StatusCode::NOT_FOUND { objectstore_log::debug!("Object not found"); response .error_for_status() - .map_err(|cause| Error::Reqwest { - context: "S3: failed to delete object".to_string(), - cause, - })?; + .map_err(|e| Error::internal("S3: failed to delete object", e))?; } Ok(()) diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index 1385e01d..8dd16800 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -578,11 +578,10 @@ impl TryInto for TieredUploadId { type Error = Error; fn try_into(self) -> Result { - let json = - serde_json::to_vec(&self).map_err(|e| Error::serde("encoding multipart token", e))?; - Ok(UploadId::new( - base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(json), - )?) + let json = serde_json::to_vec(&self) + .map_err(|e| Error::bad_request("encoding multipart token", e))?; + UploadId::new(base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(json)) + .map_err(|e| Error::bad_request("invalid upload ID", e)) } } @@ -592,8 +591,8 @@ impl TryFrom<&UploadId> for TieredUploadId { fn try_from(value: &UploadId) -> Result { let json = base64::engine::general_purpose::URL_SAFE_NO_PAD .decode(value.as_bytes()) - .map_err(|e| Error::generic(format!("invalid multipart upload ID: {e}")))?; - serde_json::from_slice(&json).map_err(|e| Error::serde("decoding multipart token", e)) + .map_err(|e| Error::bad_request_msg(format!("invalid multipart upload ID: {e}")))?; + serde_json::from_slice(&json).map_err(|e| Error::bad_request("decoding multipart token", e)) } } @@ -763,7 +762,7 @@ impl MultipartUploadBackend for TieredStorage { physical = ?physical, "complete_multipart call succeeded on long_term backend, but subsequent get_metadata found no object" ); - return Err(Error::generic( + return Err(Error::internal_msg( "completed multipart object not found in long-term storage", )); } @@ -1078,10 +1077,13 @@ mod tests { _inner: &InMemoryBackend, _id: &ObjectId, ) -> Result { - Err(Error::Io(std::io::Error::new( - std::io::ErrorKind::ConnectionRefused, + Err(Error::internal( "simulated long-term delete failure", - ))) + std::io::Error::new( + std::io::ErrorKind::ConnectionRefused, + "simulated long-term delete failure", + ), + )) } } @@ -1215,10 +1217,13 @@ mod tests { // simulate a network error _after_ commit went through inner.compare_and_write(id, current, write).await?; } - Err(Error::Io(std::io::Error::new( - std::io::ErrorKind::TimedOut, + Err(Error::internal( "simulated compare_and_write failure", - ))) + std::io::Error::new( + std::io::ErrorKind::TimedOut, + "simulated compare_and_write failure", + ), + )) } } @@ -1862,10 +1867,13 @@ mod tests { .complete_multipart(id, upload_id, parts) .await .unwrap(); - Err(Error::Io(std::io::Error::new( - std::io::ErrorKind::TimedOut, + Err(Error::internal( "simulated network error on complete_multipart", - ))) + std::io::Error::new( + std::io::ErrorKind::TimedOut, + "simulated network error on complete_multipart", + ), + )) } async fn get_metadata( @@ -1873,10 +1881,13 @@ mod tests { _inner: &InMemoryBackend, _id: &ObjectId, ) -> Result { - Err(Error::Io(std::io::Error::new( - std::io::ErrorKind::TimedOut, + Err(Error::internal( "simulated network error on get_metadata", - ))) + std::io::Error::new( + std::io::ErrorKind::TimedOut, + "simulated network error on get_metadata", + ), + )) } } @@ -1981,10 +1992,10 @@ mod tests { let mut attempt = self.attempt.lock().await; *attempt += 1; if *attempt == 1 { - return Err(Error::Io(std::io::Error::new( - std::io::ErrorKind::TimedOut, + return Err(Error::internal( "simulated network error", - ))); + std::io::Error::new(std::io::ErrorKind::TimedOut, "simulated network error"), + )); } else { return Ok(inner .complete_multipart(id, upload_id, parts) @@ -2117,10 +2128,10 @@ mod tests { let mut attempt = self.attempt.lock().await; *attempt += 1; if *attempt == 1 { - return Err(Error::Io(std::io::Error::new( - std::io::ErrorKind::TimedOut, + return Err(Error::internal( "simulated network error", - ))); + std::io::Error::new(std::io::ErrorKind::TimedOut, "simulated network error"), + )); } else { inner.get_metadata(id).await } diff --git a/objectstore-service/src/concurrency.rs b/objectstore-service/src/concurrency.rs index bd24b8a4..e72051ab 100644 --- a/objectstore-service/src/concurrency.rs +++ b/objectstore-service/src/concurrency.rs @@ -47,13 +47,13 @@ impl ConcurrencyLimiter { /// Returns a [`ConcurrencyPermit`] that releases all `count` permits and /// notifies waiters on drop, just like single-permit acquisition. /// - /// Returns [`Error::AtCapacity`] when fewer than `count` permits are available. + /// Returns [`Error::too_many_requests()`] when fewer than `count` permits are available. pub fn try_acquire_many(&self, count: usize) -> Result { let permit = self .semaphore .clone() .try_acquire_many_owned(count as u32) - .map_err(|_| Error::AtCapacity)?; + .map_err(|_| Error::too_many_requests())?; Ok(ConcurrencyPermit { permit: Some(permit), released: Arc::clone(&self.released), @@ -64,7 +64,7 @@ impl ConcurrencyLimiter { /// /// Convenience shorthand for `try_acquire_many(1)`. /// - /// Returns [`Error::AtCapacity`] when all permits are held. + /// Returns [`Error::too_many_requests()`] when all permits are held. pub fn try_acquire(&self) -> Result { self.try_acquire_many(1) } @@ -177,7 +177,7 @@ where if let Err(ref e) = result { let error = e as &dyn std::error::Error; - objectstore_log::event_dyn!(e.level(), error, operation, "Task failed"); + objectstore_log::error!(error, operation, "Task failed"); } objectstore_metrics::record!( @@ -193,7 +193,7 @@ where } .bind_hub(new_hub), ); - rx.await.map_err(|_| Error::Dropped)? + rx.await.map_err(|_| Error::internal_msg("task dropped"))? } #[cfg(test)] @@ -201,7 +201,6 @@ mod tests { use std::sync::atomic::{AtomicUsize, Ordering}; use super::*; - use crate::error::Error; #[test] fn available_permits_tracks_held() { @@ -251,7 +250,7 @@ mod tests { let _permit = limiter.try_acquire().unwrap(); let result = limiter.try_acquire(); - assert!(matches!(result, Err(Error::AtCapacity))); + assert!(result.is_err()); } #[test] diff --git a/objectstore-service/src/error.rs b/objectstore-service/src/error.rs index 46e38e28..3d4d55c8 100644 --- a/objectstore-service/src/error.rs +++ b/objectstore-service/src/error.rs @@ -3,158 +3,171 @@ //! [`Error`] covers I/O, serialization, HTTP, metadata, authentication, //! and backend-specific failures. [`Result`] is the corresponding alias. -use std::any::Any; - -use objectstore_log::Level; -use thiserror::Error as ThisError; +use std::{any::Any, borrow::Cow, fmt::Display}; use crate::stream::ClientError; /// Error type for service operations. -#[derive(Debug, ThisError)] -pub enum Error { - /// IO errors related to payload streaming or file operations. - #[error("i/o error: {0}")] - Io(#[from] std::io::Error), +#[derive(Debug)] +pub struct Error { + pub(crate) kind: ErrorKind, + pub(crate) description: Option>, + pub(crate) source: Option>, +} - /// Error originating from a client-supplied input stream. - /// - /// Indicates the client is at fault (e.g. dropped connection mid-upload) and should - /// map to a 4xx response rather than a 5xx. - #[error("error reading client stream: {0}")] - Client(#[from] ClientError), - - /// Errors related to de/serialization. - #[error("serde error: {context}")] - Serde { - /// Context describing what was being serialized/deserialized. - context: String, - /// The underlying serde error. - #[source] - cause: serde_json::Error, - }, - - /// All errors stemming from the reqwest client, used in multiple backends to send requests to - /// e.g. GCP APIs. - /// These can be network errors encountered when sending the requests, but can also indicate - /// errors returned by the API itself. - #[error("reqwest error: {context}")] - Reqwest { - /// Context describing the request that failed. - context: String, - /// The underlying reqwest error. - #[source] - cause: reqwest::Error, - }, - - /// Errors related to de/serialization and parsing of object metadata. - #[error("metadata error: {0}")] - Metadata(#[from] objectstore_types::metadata::Error), - - /// Errors encountered when attempting to authenticate with GCP. - #[error("GCP authentication error: {0}")] - GcpAuth(#[from] gcp_auth::Error), - - /// A spawned service task panicked. - #[error("service task failed: {0}")] - Panic(String), - - /// A spawned service task was dropped before it could deliver its result. - /// - /// This is an unexpected condition that can occur when the runtime drops the task for unknown - /// reasons. - #[error("task dropped")] - Dropped, - - /// A redirect tombstone was encountered at a place where it is not supported. - /// - /// This indicates a caller bug — tombstone-aware reads must go through the - /// [`HighVolumeBackend`](crate::backend::common::HighVolumeBackend) methods. - #[error("unexpected tombstone")] - UnexpectedTombstone, - - /// The service has reached its concurrency limit and cannot accept more operations. - #[error("concurrency limit reached")] - AtCapacity, - - /// Any other error stemming from one of the storage backends, which might be specific to that - /// backend or to a certain operation. - #[error("storage backend error: {context}")] - Generic { - /// Context describing the operation that failed. - context: String, - /// The underlying error, if available. - #[source] - cause: Option>, - }, - - /// The functionality is not implemented by this instance of the service. - #[error("not implemented")] - NotImplemented, +impl Error { + /// Returns the kind of this error. + pub fn kind(&self) -> &ErrorKind { + &self.kind + } +} - /// Invalid upload ID (e.g. path traversal attempt). - #[error(transparent)] - InvalidUploadId(#[from] objectstore_types::multipart::InvalidUploadId), +impl core::error::Error for Error { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + self.source + .as_deref() + .map(|e| e as &(dyn std::error::Error + 'static)) + } } -impl Error { - /// Creates an [`Error::Panic`] from a panic payload, extracting the message. - pub fn panic(payload: Box) -> Self { - let msg = if let Some(s) = payload.downcast_ref::<&str>() { - (*s).to_owned() - } else if let Some(s) = payload.downcast_ref::() { - s.clone() - } else { - "unknown panic".to_owned() +impl Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let kind_label = match &self.kind { + ErrorKind::ClientStream => "client stream error", + ErrorKind::Transient => "transient error", + ErrorKind::BadRequest => "bad request", + ErrorKind::NotImplemented => "not implemented", + ErrorKind::TooManyRequests => "too many requests", + ErrorKind::Internal => "internal error", }; - Self::Panic(msg) + write!(f, "{kind_label}")?; + if let Some(ref description) = self.description { + write!(f, ": {description}")?; + } + Ok(()) } +} - /// Creates an [`Error::Reqwest`] from a reqwest error with context. - pub fn reqwest(context: impl Into, cause: reqwest::Error) -> Self { - Self::Reqwest { - context: context.into(), - cause, +impl Error { + pub(crate) fn internal( + description: impl Into>, + source: impl std::error::Error + Send + Sync + 'static, + ) -> Self { + Self { + kind: ErrorKind::Internal, + description: Some(description.into()), + source: Some(Box::new(source)), } } - /// Creates an [`Error::Serde`] from a serde error with context. - pub fn serde(context: impl Into, cause: serde_json::Error) -> Self { - Self::Serde { - context: context.into(), - cause, + pub(crate) fn internal_msg(description: impl Into>) -> Self { + Self { + kind: ErrorKind::Internal, + description: Some(description.into()), + source: None, } } - /// Creates an [`Error::Generic`] with a context string and no cause. - pub fn generic(context: impl Into) -> Self { - Self::Generic { - context: context.into(), - cause: None, + pub(crate) fn bad_request( + description: impl Into>, + source: impl std::error::Error + Send + Sync + 'static, + ) -> Self { + Self { + kind: ErrorKind::BadRequest, + description: Some(description.into()), + source: Some(Box::new(source)), } } - /// Returns the appropriate log level for this error. - pub fn level(&self) -> Level { - match self { - // Malformed client input at DEBUG level - Self::Client(_) => Level::DEBUG, - Self::Metadata(_) => Level::DEBUG, - // Like rate limits, we treat capacity errors as warnings - Self::AtCapacity => Level::WARN, - // All other errors are service or backend failures - Self::Io(_) => Level::ERROR, - Self::Serde { .. } => Level::ERROR, - Self::Reqwest { .. } => Level::ERROR, - Self::GcpAuth(_) => Level::ERROR, - Self::Panic(_) => Level::ERROR, - Self::Dropped => Level::ERROR, - Self::UnexpectedTombstone => Level::ERROR, - Self::NotImplemented => Level::ERROR, - Self::InvalidUploadId(_) => Level::DEBUG, - Self::Generic { .. } => Level::ERROR, + pub(crate) fn bad_request_msg(description: impl Into>) -> Self { + Self { + kind: ErrorKind::BadRequest, + description: Some(description.into()), + source: None, } } + + pub(crate) fn client_stream(source: ClientError) -> Self { + Self { + kind: ErrorKind::ClientStream, + description: None, + source: Some(Box::new(source)), + } + } + + pub(crate) fn too_many_requests() -> Self { + Self { + kind: ErrorKind::TooManyRequests, + description: None, + source: None, + } + } + + pub(crate) fn not_implemented() -> Self { + Self { + kind: ErrorKind::NotImplemented, + description: None, + source: None, + } + } + + pub(crate) fn panic(payload: Box) -> Self { + let msg = if let Some(s) = payload.downcast_ref::<&str>() { + (*s).to_owned() + } else if let Some(s) = payload.downcast_ref::() { + s.clone() + } else { + "unknown panic".to_owned() + }; + Self::internal_msg(msg) + } +} + +impl From for Error { + fn from(source: ClientError) -> Self { + Self::client_stream(source) + } +} + +macro_rules! impl_from_internal { + ($($ty:ty),+ $(,)?) => { + $( + impl From<$ty> for Error { + fn from(source: $ty) -> Self { + Self { + kind: ErrorKind::Internal, + description: None, + source: Some(Box::new(source)), + } + } + } + )+ + }; +} + +impl_from_internal!( + std::io::Error, + serde_json::Error, + reqwest::Error, + gcp_auth::Error, + objectstore_types::metadata::Error, +); + +/// Classification of a service error. +#[derive(Debug)] +pub enum ErrorKind { + /// Error originating from a client-supplied input stream. + ClientStream, + /// Transient failure that may succeed on retry. + Transient, + /// Malformed or invalid client request. + BadRequest, + /// Functionality not implemented by this backend. + NotImplemented, + /// Service is at capacity. + TooManyRequests, + /// Internal service or backend failure. + Internal, } /// Result type for service operations. diff --git a/objectstore-service/src/service.rs b/objectstore-service/src/service.rs index 8505da44..358f5934 100644 --- a/objectstore-service/src/service.rs +++ b/objectstore-service/src/service.rs @@ -119,7 +119,7 @@ impl StorageService { let window = (available as f64 * 0.10).ceil() as usize; let acquire_result = match window { - 0 => Err(Error::AtCapacity), + 0 => Err(Error::too_many_requests()), _ => self.concurrency.try_acquire_many(window), }; let reservation = acquire_result.inspect_err(|_| { @@ -349,7 +349,6 @@ mod tests { use crate::backend::in_memory::InMemoryBackend; use crate::backend::testing::{Hooks, TestBackend}; use crate::backend::tiered::TieredStorage; - use crate::error::Error; use crate::stream::{self, ClientStream}; fn make_context() -> ObjectContext { @@ -544,9 +543,10 @@ mod tests { let id = ObjectId::new(make_context(), "panic-test".into()); let result = service.get_object(id).await; - let Err(Error::Panic(msg)) = result else { + let Err(err) = result else { panic!("expected Panic error"); }; + let msg = err.description.as_deref().unwrap_or(""); assert!(msg.contains("intentional panic in get_object"), "{msg}"); } @@ -684,10 +684,7 @@ mod tests { ) .await; - assert!( - matches!(result, Err(Error::AtCapacity)), - "expected AtCapacity, got {result:?}" - ); + assert!(result.is_err(), "expected AtCapacity error"); // Unblock the first operation. hv.hooks.resume.notify_one(); @@ -739,13 +736,10 @@ mod tests { // First operation panics — the permit must still be released. let id = ObjectId::new(make_context(), "panic-permit".into()); let result = service.get_object(id.clone()).await; - assert!(matches!(result, Err(Error::Panic(_)))); + assert!(result.is_err()); // Second operation should succeed in acquiring the permit (not AtCapacity). let result = service.get_object(id).await; - assert!( - !matches!(result, Err(Error::AtCapacity)), - "permit was not released after panic" - ); + assert!(result.is_ok(), "permit was not released after panic"); } } diff --git a/objectstore-service/src/streaming.rs b/objectstore-service/src/streaming.rs index 6bd4ed15..0d78117a 100644 --- a/objectstore-service/src/streaming.rs +++ b/objectstore-service/src/streaming.rs @@ -363,7 +363,7 @@ mod tests { #[test] fn at_capacity_when_no_permits() { let service = make_service_with_limit(0); - assert!(matches!(service.stream(), Err(Error::AtCapacity))); + assert!(service.stream().is_err()); } #[test] @@ -614,7 +614,7 @@ mod tests { // Permit is held — stream() must fail immediately with AtCapacity. assert!( - matches!(service.stream(), Err(Error::AtCapacity)), + service.stream().is_err(), "expected AtCapacity when all permits are held" ); From 962a190c970f7243550f5bf98a7e6dfe62316778 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 3 Jun 2026 17:17:44 +0200 Subject: [PATCH 2/7] improve --- objectstore-server/src/endpoints/common.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/objectstore-server/src/endpoints/common.rs b/objectstore-server/src/endpoints/common.rs index c43d873f..4cff77b3 100644 --- a/objectstore-server/src/endpoints/common.rs +++ b/objectstore-server/src/endpoints/common.rs @@ -51,15 +51,18 @@ pub struct ApiErrorResponse { } impl ApiErrorResponse { - /// Creates an error response from an error, using only its [`Display`] message. - /// - /// The source chain is intentionally excluded from the HTTP response — it is - /// recorded in our internal logs via tracing instead. + /// Creates an error response from an error, extracting the full cause chain. pub fn from_error(error: &E) -> Self { - Self { - detail: Some(error.to_string()), - causes: Vec::new(), + let detail = Some(error.to_string()); + + let mut causes = Vec::new(); + let mut source = error.source(); + while let Some(s) = source { + causes.push(s.to_string()); + source = s.source(); } + + Self { detail, causes } } } From f93e9dc2c461d2f258d870e0a1481124ea4e4bf5 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 5 Jun 2026 11:27:36 +0200 Subject: [PATCH 3/7] improve --- objectstore-service/src/backend/common.rs | 2 +- objectstore-service/src/concurrency.rs | 61 +++++++++++++++++++---- objectstore-service/src/error.rs | 28 +++-------- objectstore-service/src/service.rs | 51 ++++++++++++------- objectstore-service/src/streaming.rs | 15 +++--- 5 files changed, 101 insertions(+), 56 deletions(-) diff --git a/objectstore-service/src/backend/common.rs b/objectstore-service/src/backend/common.rs index 071525b0..8d3f2e1d 100644 --- a/objectstore-service/src/backend/common.rs +++ b/objectstore-service/src/backend/common.rs @@ -66,7 +66,7 @@ pub trait Backend: fmt::Debug + Send + Sync + 'static { /// Casts this backend into an [`Arc`] if supported. /// - /// The default returns [`Error::NotImplemented`]. Backends that implement + /// The default returns [`ErrorKind::NotImplemented`](crate::error::ErrorKind::NotImplemented). Backends that implement /// [`MultipartUploadBackend`] should override this to return `Ok(self)`. fn as_multipart_upload_backend(self: Arc) -> Result> { Err(Error::not_implemented()) diff --git a/objectstore-service/src/concurrency.rs b/objectstore-service/src/concurrency.rs index e72051ab..8262431c 100644 --- a/objectstore-service/src/concurrency.rs +++ b/objectstore-service/src/concurrency.rs @@ -16,7 +16,49 @@ use futures_util::FutureExt; use sentry::{Hub, SentryFutureExt, TransactionContext}; use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore}; -use crate::error::{Error, Result}; +use crate::error::{Error, ErrorKind, Result}; + +/// Errors produced by the concurrency and task-spawning infrastructure. +#[derive(Debug, PartialEq, Eq, thiserror::Error)] +pub enum ConcurrencyError { + /// The service has reached its concurrency limit. + #[error("concurrency limit reached")] + AtCapacity, + /// A spawned service task panicked. + #[error("service task panicked: {0}")] + Panic(String), + /// A spawned service task was dropped before delivering its result. + #[error("task dropped")] + Dropped, +} + +impl ConcurrencyError { + /// Creates a [`ConcurrencyError::Panic`] from a panic payload, extracting the message. + pub fn panic(payload: Box) -> Self { + let msg = if let Some(s) = payload.downcast_ref::<&str>() { + (*s).to_owned() + } else if let Some(s) = payload.downcast_ref::() { + s.clone() + } else { + "unknown panic".to_owned() + }; + Self::Panic(msg) + } +} + +impl From for Error { + fn from(source: ConcurrencyError) -> Self { + let kind = match &source { + ConcurrencyError::AtCapacity => ErrorKind::TooManyRequests, + ConcurrencyError::Panic(_) | ConcurrencyError::Dropped => ErrorKind::Internal, + }; + Self { + kind, + description: None, + source: Some(Box::new(source)), + } + } +} /// Interval for the periodic metrics emitter. const EMITTER_INTERVAL: Duration = Duration::from_secs(1); @@ -47,13 +89,13 @@ impl ConcurrencyLimiter { /// Returns a [`ConcurrencyPermit`] that releases all `count` permits and /// notifies waiters on drop, just like single-permit acquisition. /// - /// Returns [`Error::too_many_requests()`] when fewer than `count` permits are available. - pub fn try_acquire_many(&self, count: usize) -> Result { + /// Returns [`ConcurrencyError::AtCapacity`] when fewer than `count` permits are available. + pub fn try_acquire_many(&self, count: usize) -> Result { let permit = self .semaphore .clone() .try_acquire_many_owned(count as u32) - .map_err(|_| Error::too_many_requests())?; + .map_err(|_| ConcurrencyError::AtCapacity)?; Ok(ConcurrencyPermit { permit: Some(permit), released: Arc::clone(&self.released), @@ -64,8 +106,8 @@ impl ConcurrencyLimiter { /// /// Convenience shorthand for `try_acquire_many(1)`. /// - /// Returns [`Error::too_many_requests()`] when all permits are held. - pub fn try_acquire(&self) -> Result { + /// Returns [`ConcurrencyError::AtCapacity`] when all permits are held. + pub fn try_acquire(&self) -> Result { self.try_acquire_many(1) } @@ -173,7 +215,7 @@ where let result = std::panic::AssertUnwindSafe(f) .catch_unwind() .await - .unwrap_or_else(|payload| Err(Error::panic(payload))); + .unwrap_or_else(|payload| Err(ConcurrencyError::panic(payload).into())); if let Err(ref e) = result { let error = e as &dyn std::error::Error; @@ -193,7 +235,8 @@ where } .bind_hub(new_hub), ); - rx.await.map_err(|_| Error::internal_msg("task dropped"))? + rx.await + .map_err(|_| Error::from(ConcurrencyError::Dropped))? } #[cfg(test)] @@ -250,7 +293,7 @@ mod tests { let _permit = limiter.try_acquire().unwrap(); let result = limiter.try_acquire(); - assert!(result.is_err()); + assert!(matches!(result, Err(ConcurrencyError::AtCapacity))); } #[test] diff --git a/objectstore-service/src/error.rs b/objectstore-service/src/error.rs index 3d4d55c8..388f28af 100644 --- a/objectstore-service/src/error.rs +++ b/objectstore-service/src/error.rs @@ -3,7 +3,7 @@ //! [`Error`] covers I/O, serialization, HTTP, metadata, authentication, //! and backend-specific failures. [`Result`] is the corresponding alias. -use std::{any::Any, borrow::Cow, fmt::Display}; +use std::{borrow::Cow, fmt::Display}; use crate::stream::ClientError; @@ -20,6 +20,11 @@ impl Error { pub fn kind(&self) -> &ErrorKind { &self.kind } + + /// Attempts to downcast the source error to a concrete type. + pub fn downcast_ref(&self) -> Option<&T> { + self.source.as_ref()?.downcast_ref::() + } } impl core::error::Error for Error { @@ -95,14 +100,6 @@ impl Error { } } - pub(crate) fn too_many_requests() -> Self { - Self { - kind: ErrorKind::TooManyRequests, - description: None, - source: None, - } - } - pub(crate) fn not_implemented() -> Self { Self { kind: ErrorKind::NotImplemented, @@ -110,17 +107,6 @@ impl Error { source: None, } } - - pub(crate) fn panic(payload: Box) -> Self { - let msg = if let Some(s) = payload.downcast_ref::<&str>() { - (*s).to_owned() - } else if let Some(s) = payload.downcast_ref::() { - s.clone() - } else { - "unknown panic".to_owned() - }; - Self::internal_msg(msg) - } } impl From for Error { @@ -154,7 +140,7 @@ impl_from_internal!( ); /// Classification of a service error. -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq)] pub enum ErrorKind { /// Error originating from a client-supplied input stream. ClientStream, diff --git a/objectstore-service/src/service.rs b/objectstore-service/src/service.rs index 358f5934..be42123f 100644 --- a/objectstore-service/src/service.rs +++ b/objectstore-service/src/service.rs @@ -11,7 +11,7 @@ use std::sync::Arc; use objectstore_types::metadata::Metadata; use crate::backend::common::Backend; -use crate::concurrency::ConcurrencyLimiter; +use crate::concurrency::{ConcurrencyError, ConcurrencyLimiter}; use crate::error::{Error, Result}; use crate::id::{ObjectContext, ObjectId}; use crate::multipart::{ @@ -60,7 +60,6 @@ pub const DEFAULT_CONCURRENCY_LIMIT: usize = 500; /// blocked. Call [`join`](StorageService::join) during shutdown to wait for /// outstanding cleanup. Operations are also isolated from panics in backend /// code — a failure in one operation does not bring down other in-flight work. -/// See [`Error::Panic`]. /// /// # Concurrency Limit /// @@ -68,7 +67,7 @@ pub const DEFAULT_CONCURRENCY_LIMIT: usize = 500; /// configured via [`with_concurrency_limit`](StorageService::with_concurrency_limit); /// without an explicit value the default is [`DEFAULT_CONCURRENCY_LIMIT`]. /// Operations that exceed the limit are rejected immediately with -/// [`Error::AtCapacity`]. +/// [`ConcurrencyError::AtCapacity`]. #[derive(Clone, Debug)] pub struct StorageService { inner: Arc, @@ -87,7 +86,7 @@ impl StorageService { /// Sets the maximum number of concurrent backend operations. /// /// Must be called before [`start`](Self::start). Operations beyond this - /// limit are rejected with [`Error::AtCapacity`]. + /// limit are rejected with [`ConcurrencyError::AtCapacity`]. pub fn with_concurrency_limit(mut self, max: usize) -> Self { self.concurrency = ConcurrencyLimiter::new(max); self @@ -113,14 +112,17 @@ impl StorageService { /// Operations are executed concurrently up to a window derived from the /// service's current capacity. The permits for that window are reserved /// upfront — if the service is at capacity, this returns - /// [`Error::AtCapacity`] immediately before any operations are read. + /// [`ConcurrencyError::AtCapacity`] immediately before any operations are read. pub fn stream(&self) -> Result { let available = self.tasks_available(); let window = (available as f64 * 0.10).ceil() as usize; - let acquire_result = match window { - 0 => Err(Error::too_many_requests()), - _ => self.concurrency.try_acquire_many(window), + let acquire_result: Result<_, Error> = match window { + 0 => Err(ConcurrencyError::AtCapacity.into()), + _ => self + .concurrency + .try_acquire_many(window) + .map_err(Into::into), }; let reservation = acquire_result.inspect_err(|_| { objectstore_metrics::count!("service.concurrency.rejected"); @@ -153,10 +155,9 @@ impl StorageService { /// Spawns a future in a separate task and awaits its result. /// - /// Returns [`Error::AtCapacity`] if the concurrency limit is reached, - /// [`Error::Panic`] if the spawned task panics (the panic message - /// is captured for diagnostics), or [`Error::Dropped`] if the task is - /// dropped before sending its result. + /// Returns [`ConcurrencyError::AtCapacity`] if the concurrency limit is + /// reached, or an [`ErrorKind::Internal`] error if the spawned task panics + /// or is dropped before sending its result. /// /// Emits `service.task.start` (counter) after acquiring a permit and /// `service.task.duration` (distribution) when the task completes, tagged @@ -349,6 +350,7 @@ mod tests { use crate::backend::in_memory::InMemoryBackend; use crate::backend::testing::{Hooks, TestBackend}; use crate::backend::tiered::TieredStorage; + use crate::error::ErrorKind; use crate::stream::{self, ClientStream}; fn make_context() -> ObjectContext { @@ -546,7 +548,9 @@ mod tests { let Err(err) = result else { panic!("expected Panic error"); }; - let msg = err.description.as_deref().unwrap_or(""); + let Some(ConcurrencyError::Panic(msg)) = err.downcast_ref::() else { + panic!("expected ConcurrencyError::Panic, got {err:?}"); + }; assert!(msg.contains("intentional panic in get_object"), "{msg}"); } @@ -684,7 +688,8 @@ mod tests { ) .await; - assert!(result.is_err(), "expected AtCapacity error"); + let err = result.unwrap_err(); + assert_eq!(err.kind(), &ErrorKind::TooManyRequests); // Unblock the first operation. hv.hooks.resume.notify_one(); @@ -736,10 +741,22 @@ mod tests { // First operation panics — the permit must still be released. let id = ObjectId::new(make_context(), "panic-permit".into()); let result = service.get_object(id.clone()).await; - assert!(result.is_err()); + assert!( + result + .as_ref() + .is_err_and(|e| e.downcast_ref::().is_some()), + "expected ConcurrencyError::Panic" + ); - // Second operation should succeed in acquiring the permit (not AtCapacity). + // Second operation should succeed in acquiring the permit (not TooManyRequests). + // It will still fail (PanicOnGet panics every time), but it should NOT be + // a TooManyRequests error — that would mean the permit leaked. let result = service.get_object(id).await; - assert!(result.is_ok(), "permit was not released after panic"); + assert!( + result.as_ref().is_err_and( + |e| e.downcast_ref::() != Some(&ConcurrencyError::AtCapacity) + ), + "permit was not released after panic" + ); } } diff --git a/objectstore-service/src/streaming.rs b/objectstore-service/src/streaming.rs index 0d78117a..cf765441 100644 --- a/objectstore-service/src/streaming.rs +++ b/objectstore-service/src/streaming.rs @@ -16,7 +16,7 @@ //! //! This means: //! - If the service is at capacity, [`StorageService::stream`](crate::service::StorageService::stream) fails immediately with -//! [`Error::AtCapacity`] before any operations are read. +//! [`ConcurrencyError::AtCapacity`](crate::concurrency::ConcurrencyError::AtCapacity) before any operations are read. //! - During execution, operations call the storage backend directly without acquiring //! additional per-operation permits. //! @@ -28,7 +28,7 @@ //! `window × max_operation_size`. Results are yielded in completion order. //! //! Each operation is wrapped in a [`tokio::spawn`] for panic isolation: a panic in -//! one operation surfaces as [`Error::Panic`] for that item and does not affect the +//! one operation surfaces as an [`ErrorKind::Internal`](crate::error::ErrorKind::Internal) error for that item and does not affect the //! others. //! //! ## Future Scope @@ -331,7 +331,7 @@ mod tests { use crate::backend::common::PutResponse; use crate::backend::in_memory::InMemoryBackend; use crate::backend::testing::{Hooks, TestBackend}; - use crate::error::Error; + use crate::error::{Error, ErrorKind}; use crate::service::StorageService; use crate::stream::{self, ClientStream}; @@ -363,7 +363,8 @@ mod tests { #[test] fn at_capacity_when_no_permits() { let service = make_service_with_limit(0); - assert!(service.stream().is_err()); + let err = service.stream().unwrap_err(); + assert_eq!(err.kind(), &ErrorKind::TooManyRequests); } #[test] @@ -613,10 +614,8 @@ mod tests { paused_rx.recv().await.unwrap(); // Permit is held — stream() must fail immediately with AtCapacity. - assert!( - service.stream().is_err(), - "expected AtCapacity when all permits are held" - ); + let err = service.stream().unwrap_err(); + assert_eq!(err.kind(), &ErrorKind::TooManyRequests); resume.notify_waiters(); } From 2550ef10589fb1e512ad4ad0c270482c077686a7 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 5 Jun 2026 11:50:02 +0200 Subject: [PATCH 4/7] improve --- objectstore-service/docs/architecture.md | 2 +- objectstore-service/src/lib.rs | 1 + objectstore-service/src/service.rs | 4 ++-- objectstore-service/src/stream.rs | 4 ++-- objectstore-service/src/streaming.rs | 2 +- 5 files changed, 7 insertions(+), 6 deletions(-) diff --git a/objectstore-service/docs/architecture.md b/objectstore-service/docs/architecture.md index f9e1e5f4..09be701d 100644 --- a/objectstore-service/docs/architecture.md +++ b/objectstore-service/docs/architecture.md @@ -161,7 +161,7 @@ A semaphore caps the total number of in-flight backend operations across all callers. A permit is acquired before each operation is spawned and held until the task completes — including on panic — so the limit counts *running* operations, not queued ones. When no permits are available, the operation fails -with [`Error::AtCapacity`](error::Error::AtCapacity). +with [`ConcurrencyError::AtCapacity`](concurrency::ConcurrencyError::AtCapacity). The default limit is [`DEFAULT_CONCURRENCY_LIMIT`](service::DEFAULT_CONCURRENCY_LIMIT). Callers can override it via [`StorageService::with_concurrency_limit`]. diff --git a/objectstore-service/src/lib.rs b/objectstore-service/src/lib.rs index f2e2eb5c..525a1d5d 100644 --- a/objectstore-service/src/lib.rs +++ b/objectstore-service/src/lib.rs @@ -12,5 +12,6 @@ pub mod service; pub mod stream; pub mod streaming; +pub use concurrency::ConcurrencyError; pub use service::StorageService; pub use stream::{ClientStream, PayloadStream}; diff --git a/objectstore-service/src/service.rs b/objectstore-service/src/service.rs index be42123f..fc299838 100644 --- a/objectstore-service/src/service.rs +++ b/objectstore-service/src/service.rs @@ -156,8 +156,8 @@ impl StorageService { /// Spawns a future in a separate task and awaits its result. /// /// Returns [`ConcurrencyError::AtCapacity`] if the concurrency limit is - /// reached, or an [`ErrorKind::Internal`] error if the spawned task panics - /// or is dropped before sending its result. + /// reached, or a [`ConcurrencyError::Panic`] / [`ConcurrencyError::Dropped`] + /// error if the spawned task panics or is dropped before sending its result. /// /// Emits `service.task.start` (counter) after acquiring a permit and /// `service.task.duration` (distribution) when the task completes, tagged diff --git a/objectstore-service/src/stream.rs b/objectstore-service/src/stream.rs index efaf9c43..a1fd6b41 100644 --- a/objectstore-service/src/stream.rs +++ b/objectstore-service/src/stream.rs @@ -66,7 +66,7 @@ impl From for io::Error { /// Uses [`ClientError`] as the error type so that a dropped or interrupted /// client connection is distinguishable from a backend I/O failure. Backends /// that detect a [`ClientError`] (via [`unpack_client_error`]) can surface it -/// as [`crate::error::Error::Client`], which the server maps to HTTP 400 rather +/// as [`ErrorKind::ClientStream`](crate::error::ErrorKind::ClientStream), which the server maps to HTTP 400 rather /// than 500. /// /// Use [`single`] to construct a single-chunk `ClientStream` from an owned value. @@ -81,7 +81,7 @@ pub type ClientStream = BoxStream<'static, Result>; /// value is a `ClientError`. /// /// Use this in `put_object` implementations to reclassify body-stream errors -/// as [`crate::error::Error::Client`] instead of an opaque server error. +/// as [`ErrorKind::ClientStream`](crate::error::ErrorKind::ClientStream) instead of an opaque server error. pub fn unpack_client_error(err: &E) -> Option where E: Error + 'static, diff --git a/objectstore-service/src/streaming.rs b/objectstore-service/src/streaming.rs index cf765441..0cf102e7 100644 --- a/objectstore-service/src/streaming.rs +++ b/objectstore-service/src/streaming.rs @@ -16,7 +16,7 @@ //! //! This means: //! - If the service is at capacity, [`StorageService::stream`](crate::service::StorageService::stream) fails immediately with -//! [`ConcurrencyError::AtCapacity`](crate::concurrency::ConcurrencyError::AtCapacity) before any operations are read. +//! [`ConcurrencyError::AtCapacity`](crate::ConcurrencyError::AtCapacity) before any operations are read. //! - During execution, operations call the storage backend directly without acquiring //! additional per-operation permits. //! From 62161b66ad376aa80f03a4bc67fdd106868ac513 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 5 Jun 2026 16:13:53 +0200 Subject: [PATCH 5/7] improve --- objectstore-server/src/endpoints/batch.rs | 2 +- objectstore-server/src/endpoints/multipart.rs | 4 +- objectstore-server/src/endpoints/objects.rs | 15 +- objectstore-service/src/backend/gcs.rs | 24 +-- objectstore-service/src/backend/local_fs.rs | 204 +++++++++++++----- .../src/backend/s3_compatible.rs | 14 +- objectstore-service/src/error.rs | 47 ++-- objectstore-service/src/stream.rs | 4 +- 8 files changed, 211 insertions(+), 103 deletions(-) diff --git a/objectstore-server/src/endpoints/batch.rs b/objectstore-server/src/endpoints/batch.rs index 13d20ac0..d2dac0f9 100644 --- a/objectstore-server/src/endpoints/batch.rs +++ b/objectstore-server/src/endpoints/batch.rs @@ -234,7 +234,7 @@ async fn got_to_part( .meter_stream(stream, context) .try_collect::() .await - .map_err(|e| ApiError::Service(e.into()))? + .map_err(|e| ApiError::Internal(format!("stream read failed: {e}")))? .freeze(); let mut metadata_headers = metadata.to_headers("").map_err(|err| { diff --git a/objectstore-server/src/endpoints/multipart.rs b/objectstore-server/src/endpoints/multipart.rs index c9d40d26..98b9d282 100644 --- a/objectstore-server/src/endpoints/multipart.rs +++ b/objectstore-server/src/endpoints/multipart.rs @@ -16,7 +16,6 @@ use bytes::Bytes; use futures::StreamExt; use http::HeaderValue; use http::header; -use objectstore_service::error::Error as ServiceError; use objectstore_service::id::{ObjectContext, ObjectId}; use objectstore_service::multipart::{CompletedPart, PartNumber, UploadId}; use objectstore_types::metadata::Metadata; @@ -95,7 +94,8 @@ async fn initiate_inner( id: ObjectId, headers: HeaderMap, ) -> ApiResult { - let mut metadata = Metadata::from_headers(&headers, "").map_err(ServiceError::from)?; + let mut metadata = Metadata::from_headers(&headers, "") + .map_err(|e| ApiError::Client(format!("invalid metadata headers: {e}")))?; // TODO: Do this in `complete` instead, when we have a Service API to mutate metadata. metadata.time_created = Some(SystemTime::now()); diff --git a/objectstore-server/src/endpoints/objects.rs b/objectstore-server/src/endpoints/objects.rs index e5d0a8ae..35110643 100644 --- a/objectstore-server/src/endpoints/objects.rs +++ b/objectstore-server/src/endpoints/objects.rs @@ -6,7 +6,6 @@ use axum::http::{HeaderMap, StatusCode}; use axum::response::{IntoResponse, Response}; use axum::routing; use axum::{Json, Router}; -use objectstore_service::error::Error as ServiceError; use objectstore_service::id::{ObjectContext, ObjectId}; use objectstore_types::metadata::Metadata; use serde::Serialize; @@ -43,7 +42,8 @@ async fn objects_post( headers: HeaderMap, MeteredBody(body): MeteredBody, ) -> ApiResult { - let mut metadata = Metadata::from_headers(&headers, "").map_err(ServiceError::from)?; + let mut metadata = Metadata::from_headers(&headers, "") + .map_err(|e| ApiError::Client(format!("invalid metadata headers: {e}")))?; metadata.time_created = Some(SystemTime::now()); state @@ -71,7 +71,9 @@ async fn object_get( }; let stream = state.meter_stream(stream, &context); - let headers = metadata.to_headers("").map_err(ServiceError::from)?; + let headers = metadata + .to_headers("") + .map_err(|e| ApiError::Internal(format!("failed to serialize metadata headers: {e}")))?; Ok((headers, Body::from_stream(stream)).into_response()) } @@ -80,7 +82,9 @@ async fn object_head(service: AuthAwareService, Xt(id): Xt) -> ApiResu return Ok(StatusCode::NOT_FOUND.into_response()); }; - let headers = metadata.to_headers("").map_err(ServiceError::from)?; + let headers = metadata + .to_headers("") + .map_err(|e| ApiError::Internal(format!("failed to serialize metadata headers: {e}")))?; Ok((StatusCode::NO_CONTENT, headers).into_response()) } @@ -92,7 +96,8 @@ async fn object_put( headers: HeaderMap, MeteredBody(body): MeteredBody, ) -> ApiResult { - let mut metadata = Metadata::from_headers(&headers, "").map_err(ServiceError::from)?; + let mut metadata = Metadata::from_headers(&headers, "") + .map_err(|e| ApiError::Client(format!("invalid metadata headers: {e}")))?; metadata.time_created = Some(SystemTime::now()); let ObjectId { context, key } = id; diff --git a/objectstore-service/src/backend/gcs.rs b/objectstore-service/src/backend/gcs.rs index 128125ad..581665ec 100644 --- a/objectstore-service/src/backend/gcs.rs +++ b/objectstore-service/src/backend/gcs.rs @@ -519,7 +519,7 @@ impl GcsBackend { .await? .send() .await - .map_err(|e| Error::internal("GCS: get metadata request", e))?; + .map_err(|e| Error::from_reqwest("GCS: get metadata request", e))?; if resp.status() == StatusCode::NOT_FOUND { return Ok(None); @@ -527,7 +527,7 @@ impl GcsBackend { let metadata: GcsObject = resp .error_for_status() - .map_err(|e| Error::internal("GCS: get metadata status", e))? + .map_err(|e| Error::from_reqwest("GCS: get metadata status", e))? .json() .await .map_err(|e| Error::internal("GCS: get metadata parse", e))?; @@ -580,7 +580,7 @@ impl GcsBackend { .send() .await .and_then(|r| r.error_for_status()) - .map_err(|e| Error::internal("GCS: update custom time", e))?; + .map_err(|e| Error::from_reqwest("GCS: update custom time", e))?; Ok(()) }) .await @@ -654,7 +654,7 @@ impl Backend for GcsBackend { .and_then(|r| r.error_for_status()) .map_err(|e| match stream::unpack_client_error(&e) { Some(ce) => Error::client_stream(ce), - _ => Error::internal("error uploading upload object", e), + _ => Error::from_reqwest("GCS: put object", e), })?; Ok(()) @@ -679,7 +679,7 @@ impl Backend for GcsBackend { .send() .await .and_then(|r| r.error_for_status()) - .map_err(|e| Error::internal("GCS: get payload", e)) + .map_err(|e| Error::from_reqwest("GCS: get payload", e)) }) .await?; @@ -709,7 +709,7 @@ impl Backend for GcsBackend { .await? .send() .await - .map_err(|e| Error::internal("GCS: delete object", e))?; + .map_err(|e| Error::from_reqwest("GCS: delete object", e))?; // Do not error for objects that do not exist if resp.status() == StatusCode::NOT_FOUND { @@ -717,7 +717,7 @@ impl Backend for GcsBackend { } resp.error_for_status() - .map_err(|e| Error::internal("GCS: delete object", e))?; + .map_err(|e| Error::from_reqwest("GCS: delete object", e))?; Ok(()) }) @@ -860,7 +860,7 @@ impl MultipartUploadBackend for GcsBackend { .send() .await .and_then(|r| r.error_for_status()) - .map_err(|e| Error::internal("GCS: initiate multipart upload", e))?; + .map_err(|e| Error::from_reqwest("GCS: initiate multipart upload", e))?; let body = resp .bytes() @@ -903,7 +903,7 @@ impl MultipartUploadBackend for GcsBackend { .send() .await .and_then(|r| r.error_for_status()) - .map_err(|e| Error::internal("GCS: upload part", e))?; + .map_err(|e| Error::from_reqwest("GCS: upload part", e))?; let etag = resp .headers() @@ -942,7 +942,7 @@ impl MultipartUploadBackend for GcsBackend { .send() .await .and_then(|r| r.error_for_status()) - .map_err(|e| Error::internal("GCS: list parts", e))?; + .map_err(|e| Error::from_reqwest("GCS: list parts", e))?; let body = resp .bytes() @@ -970,7 +970,7 @@ impl MultipartUploadBackend for GcsBackend { .send() .await .and_then(|r| r.error_for_status()) - .map_err(|e| Error::internal("GCS: abort multipart upload", e))?; + .map_err(|e| Error::from_reqwest("GCS: abort multipart upload", e))?; Ok(()) } @@ -999,7 +999,7 @@ impl MultipartUploadBackend for GcsBackend { .send() .await .and_then(|r| r.error_for_status()) - .map_err(|e| Error::internal("GCS: complete multipart upload", e))?; + .map_err(|e| Error::from_reqwest("GCS: complete multipart upload", e))?; let body = resp .bytes() diff --git a/objectstore-service/src/backend/local_fs.rs b/objectstore-service/src/backend/local_fs.rs index fa5c85f1..f71a523c 100644 --- a/objectstore-service/src/backend/local_fs.rs +++ b/objectstore-service/src/backend/local_fs.rs @@ -85,31 +85,46 @@ impl Backend for LocalFsBackend { ) -> Result { let path = self.path.join(id.as_storage_path().to_string()); objectstore_log::debug!(path=%path.display(), "Writing to local_fs backend"); - tokio::fs::create_dir_all(path.parent().unwrap()).await?; + tokio::fs::create_dir_all(path.parent().unwrap()) + .await + .map_err(|e| Error::internal("local-fs: create dir failed", e))?; let file = OpenOptions::new() .create(true) .write(true) .truncate(true) .open(path) - .await?; + .await + .map_err(|e| Error::internal("local-fs: open for write failed", e))?; let mut reader = pin!(StreamReader::new(stream)); let mut writer = BufWriter::new(file); - let metadata_json = serde_json::to_string(metadata)?; - writer.write_all(metadata_json.as_bytes()).await?; - writer.write_all(b"\n").await?; + let metadata_json = serde_json::to_string(metadata) + .map_err(|e| Error::internal("local-fs: serialize metadata failed", e))?; + writer + .write_all(metadata_json.as_bytes()) + .await + .map_err(|e| Error::internal("local-fs: write metadata failed", e))?; + writer + .write_all(b"\n") + .await + .map_err(|e| Error::internal("local-fs: write failed", e))?; tokio::io::copy(&mut reader, &mut writer) .await .map_err(|e| match stream::unpack_client_error(&e) { Some(ce) => Error::client_stream(ce), - None => Error::from(e), + None => Error::internal("local-fs: copy stream failed", e), })?; - writer.flush().await?; + writer + .flush() + .await + .map_err(|e| Error::internal("local-fs: flush failed", e))?; let file = writer.into_inner(); - file.sync_data().await?; + file.sync_data() + .await + .map_err(|e| Error::internal("local-fs: sync failed", e))?; drop(file); Ok(()) @@ -126,14 +141,23 @@ impl Backend for LocalFsBackend { objectstore_log::debug!("Object not found"); return Ok(None); } - err => err?, + err => err.map_err(|e| Error::internal("local-fs: open for read failed", e))?, }; let mut reader = BufReader::new(file); let mut metadata_line = String::new(); - reader.read_line(&mut metadata_line).await?; - let file_len = reader.get_ref().metadata().await?.len(); - let mut metadata: Metadata = serde_json::from_str(metadata_line.trim_end())?; + reader + .read_line(&mut metadata_line) + .await + .map_err(|e| Error::internal("local-fs: read metadata line failed", e))?; + let file_len = reader + .get_ref() + .metadata() + .await + .map_err(|e| Error::internal("local-fs: read file metadata failed", e))? + .len(); + let mut metadata: Metadata = serde_json::from_str(metadata_line.trim_end()) + .map_err(|e| Error::internal("local-fs: parse metadata failed", e))?; let payload_size = file_len .checked_sub(metadata_line.len() as u64) .ok_or_else(|| Error::internal_msg("local-fs file corrupted: shorter than header"))?; @@ -153,7 +177,7 @@ impl Backend for LocalFsBackend { { objectstore_log::debug!("Object not found"); } - Ok(result?) + Ok(result.map_err(|e| Error::internal("local-fs: delete failed", e))?) } } @@ -176,11 +200,16 @@ impl MultipartUploadBackend for LocalFsBackend { let upload_id = UploadId::new(uuid::Uuid::now_v7().to_string()) .map_err(|e| Error::bad_request("invalid upload ID", e))?; let dir = self.multipart_dir(id, &upload_id); - tokio::fs::create_dir_all(&dir).await?; + tokio::fs::create_dir_all(&dir) + .await + .map_err(|e| Error::internal("local-fs: create multipart dir failed", e))?; let meta_path = dir.join("metadata.json"); - let metadata_json = serde_json::to_string(metadata)?; - tokio::fs::write(meta_path, metadata_json).await?; + let metadata_json = serde_json::to_string(metadata) + .map_err(|e| Error::internal("local-fs: serialize metadata failed", e))?; + tokio::fs::write(meta_path, metadata_json) + .await + .map_err(|e| Error::internal("local-fs: write metadata failed", e))?; Ok(upload_id) } @@ -195,7 +224,10 @@ impl MultipartUploadBackend for LocalFsBackend { body: ClientStream, ) -> Result { let dir = self.multipart_dir(id, upload_id); - if !tokio::fs::try_exists(&dir).await? { + if !tokio::fs::try_exists(&dir) + .await + .map_err(|e| Error::internal("local-fs: check multipart dir failed", e))? + { return Err(Error::bad_request_msg("multipart upload not found")); } @@ -206,7 +238,8 @@ impl MultipartUploadBackend for LocalFsBackend { "uploaded_at": SystemTime::now(), "size": content_length, }); - let header_line = serde_json::to_string(&header)?; + let header_line = serde_json::to_string(&header) + .map_err(|e| Error::internal("local-fs: serialize part header failed", e))?; let part_path = dir.join(format!("{part_number}.part")); let file = OpenOptions::new() @@ -214,18 +247,25 @@ impl MultipartUploadBackend for LocalFsBackend { .write(true) .truncate(true) .open(part_path) - .await?; + .await + .map_err(|e| Error::internal("local-fs: open part for write failed", e))?; let mut reader = pin!(StreamReader::new(body)); let mut writer = BufWriter::new(file); - writer.write_all(header_line.as_bytes()).await?; - writer.write_all(b"\n").await?; + writer + .write_all(header_line.as_bytes()) + .await + .map_err(|e| Error::internal("local-fs: write part header failed", e))?; + writer + .write_all(b"\n") + .await + .map_err(|e| Error::internal("local-fs: write failed", e))?; let bytes_copied = tokio::io::copy(&mut reader, &mut writer) .await .map_err(|e| match stream::unpack_client_error(&e) { Some(ce) => Error::client_stream(ce), - None => Error::from(e), + None => Error::internal("local-fs: copy stream failed", e), })?; if bytes_copied != content_length { @@ -234,9 +274,14 @@ impl MultipartUploadBackend for LocalFsBackend { ))); } - writer.flush().await?; + writer + .flush() + .await + .map_err(|e| Error::internal("local-fs: flush failed", e))?; let file = writer.into_inner(); - file.sync_data().await?; + file.sync_data() + .await + .map_err(|e| Error::internal("local-fs: sync failed", e))?; drop(file); Ok(etag) @@ -250,14 +295,23 @@ impl MultipartUploadBackend for LocalFsBackend { part_number_marker: Option, ) -> Result { let dir = self.multipart_dir(id, upload_id); - if !tokio::fs::try_exists(&dir).await? { + if !tokio::fs::try_exists(&dir) + .await + .map_err(|e| Error::internal("local-fs: check multipart dir failed", e))? + { return Err(Error::bad_request_msg("multipart upload not found")); } - let mut entries = tokio::fs::read_dir(&dir).await?; + let mut entries = tokio::fs::read_dir(&dir) + .await + .map_err(|e| Error::internal("local-fs: read multipart dir failed", e))?; let mut parts = Vec::new(); - while let Some(entry) = entries.next_entry().await? { + while let Some(entry) = entries + .next_entry() + .await + .map_err(|e| Error::internal("local-fs: read dir entry failed", e))? + { let name = entry.file_name(); let name_str = name.to_string_lossy(); let Some(pn_str) = name_str.strip_suffix(".part") else { @@ -271,11 +325,17 @@ impl MultipartUploadBackend for LocalFsBackend { continue; } - let file = tokio::fs::File::open(entry.path()).await?; + let file = tokio::fs::File::open(entry.path()) + .await + .map_err(|e| Error::internal("local-fs: open part for read failed", e))?; let mut reader = BufReader::new(file); let mut header_line = String::new(); - reader.read_line(&mut header_line).await?; - let header: serde_json::Value = serde_json::from_str(header_line.trim_end())?; + reader + .read_line(&mut header_line) + .await + .map_err(|e| Error::internal("local-fs: read part header failed", e))?; + let header: serde_json::Value = serde_json::from_str(header_line.trim_end()) + .map_err(|e| Error::internal("local-fs: parse part header failed", e))?; parts.push(Part { part_number: pn, @@ -311,8 +371,13 @@ impl MultipartUploadBackend for LocalFsBackend { upload_id: &UploadId, ) -> Result { let dir = self.multipart_dir(id, upload_id); - if tokio::fs::try_exists(&dir).await? { - tokio::fs::remove_dir_all(dir).await?; + if tokio::fs::try_exists(&dir) + .await + .map_err(|e| Error::internal("local-fs: check multipart dir failed", e))? + { + tokio::fs::remove_dir_all(dir) + .await + .map_err(|e| Error::internal("local-fs: remove multipart dir failed", e))?; } Ok(()) } @@ -324,14 +389,20 @@ impl MultipartUploadBackend for LocalFsBackend { parts: Vec, ) -> Result { let dir = self.multipart_dir(id, upload_id); - if !tokio::fs::try_exists(&dir).await? { + if !tokio::fs::try_exists(&dir) + .await + .map_err(|e| Error::internal("local-fs: check multipart dir failed", e))? + { return Err(Error::bad_request_msg("multipart upload not found")); } // Read metadata let meta_path = dir.join("metadata.json"); - let meta_bytes = tokio::fs::read(&meta_path).await?; - let metadata: Metadata = serde_json::from_slice(&meta_bytes)?; + let meta_bytes = tokio::fs::read(&meta_path) + .await + .map_err(|e| Error::internal("local-fs: read metadata failed", e))?; + let metadata: Metadata = serde_json::from_slice(&meta_bytes) + .map_err(|e| Error::internal("local-fs: parse metadata failed", e))?; // TODO: validate that parts are in ascending part_number order and reject with // InvalidPartOrder if not (matches S3/GCS behavior). Needs a proper client error variant. @@ -339,18 +410,27 @@ impl MultipartUploadBackend for LocalFsBackend { // Validate all parts (headers only) before writing anything for completed in &parts { let part_path = dir.join(format!("{}.part", completed.part_number)); - if !tokio::fs::try_exists(&part_path).await? { + if !tokio::fs::try_exists(&part_path) + .await + .map_err(|e| Error::internal("local-fs: check part file failed", e))? + { return Ok(Some(crate::multipart::CompleteMultipartError { code: "InvalidPart".into(), message: format!("part number {} was not uploaded", completed.part_number), })); } - let file = tokio::fs::File::open(&part_path).await?; + let file = tokio::fs::File::open(&part_path) + .await + .map_err(|e| Error::internal("local-fs: open part for read failed", e))?; let mut reader = BufReader::new(file); let mut header_line = String::new(); - reader.read_line(&mut header_line).await?; - let header: serde_json::Value = serde_json::from_str(header_line.trim_end())?; + reader + .read_line(&mut header_line) + .await + .map_err(|e| Error::internal("local-fs: read part header failed", e))?; + let header: serde_json::Value = serde_json::from_str(header_line.trim_end()) + .map_err(|e| Error::internal("local-fs: parse part header failed", e))?; let stored_etag = header["etag"].as_str().unwrap_or(""); if stored_etag != completed.etag { @@ -366,35 +446,59 @@ impl MultipartUploadBackend for LocalFsBackend { // Stream parts directly to the final object file let path = self.path.join(id.as_storage_path().to_string()); - tokio::fs::create_dir_all(path.parent().unwrap()).await?; + tokio::fs::create_dir_all(path.parent().unwrap()) + .await + .map_err(|e| Error::internal("local-fs: create dir failed", e))?; let file = OpenOptions::new() .create(true) .write(true) .truncate(true) .open(path) - .await?; + .await + .map_err(|e| Error::internal("local-fs: open for write failed", e))?; let mut writer = BufWriter::new(file); - let metadata_json = serde_json::to_string(&metadata)?; - writer.write_all(metadata_json.as_bytes()).await?; - writer.write_all(b"\n").await?; + let metadata_json = serde_json::to_string(&metadata) + .map_err(|e| Error::internal("local-fs: serialize metadata failed", e))?; + writer + .write_all(metadata_json.as_bytes()) + .await + .map_err(|e| Error::internal("local-fs: write metadata failed", e))?; + writer + .write_all(b"\n") + .await + .map_err(|e| Error::internal("local-fs: write failed", e))?; for completed in &parts { let part_path = dir.join(format!("{}.part", completed.part_number)); - let file = tokio::fs::File::open(&part_path).await?; + let file = tokio::fs::File::open(&part_path) + .await + .map_err(|e| Error::internal("local-fs: open part for read failed", e))?; let mut reader = BufReader::new(file); let mut header_line = String::new(); - reader.read_line(&mut header_line).await?; - tokio::io::copy(&mut reader, &mut writer).await?; + reader + .read_line(&mut header_line) + .await + .map_err(|e| Error::internal("local-fs: read part header failed", e))?; + tokio::io::copy(&mut reader, &mut writer) + .await + .map_err(|e| Error::internal("local-fs: copy part data failed", e))?; } - writer.flush().await?; + writer + .flush() + .await + .map_err(|e| Error::internal("local-fs: flush failed", e))?; let file = writer.into_inner(); - file.sync_data().await?; + file.sync_data() + .await + .map_err(|e| Error::internal("local-fs: sync failed", e))?; drop(file); // Clean up multipart state - tokio::fs::remove_dir_all(dir).await?; + tokio::fs::remove_dir_all(dir) + .await + .map_err(|e| Error::internal("local-fs: remove multipart dir failed", e))?; Ok(None) } diff --git a/objectstore-service/src/backend/s3_compatible.rs b/objectstore-service/src/backend/s3_compatible.rs index 5a37ef51..6b1b0452 100644 --- a/objectstore-service/src/backend/s3_compatible.rs +++ b/objectstore-service/src/backend/s3_compatible.rs @@ -178,7 +178,7 @@ where .await? .send() .await - .map_err(|e| Error::internal("S3: failed to send request", e))?; + .map_err(|e| Error::from_reqwest("S3: failed to send request", e))?; if response.status() == StatusCode::NOT_FOUND { objectstore_log::debug!("Object not found"); @@ -187,7 +187,7 @@ where let response = response .error_for_status() - .map_err(|e| Error::internal("S3: failed to get object", e))?; + .map_err(|e| Error::from_reqwest("S3: failed to get object", e))?; let headers = response.headers(); let mut metadata = Metadata::from_headers(headers, GCS_CUSTOM_PREFIX) @@ -227,10 +227,10 @@ where .headers(metadata_to_gcs_headers(metadata, GCS_CUSTOM_PREFIX)?) .send() .await - .map_err(|e| Error::internal("S3: failed to send TTI update request", e))? + .map_err(|e| Error::from_reqwest("S3: failed to send TTI update request", e))? .error_for_status() .map_err(|e| { - Error::internal( + Error::from_reqwest( "S3: failed to update expiration time for object with TTI", e, ) @@ -285,7 +285,7 @@ impl Backend for S3CompatibleBackend { .and_then(|response| response.error_for_status()) .map_err(|e| match stream::unpack_client_error(&e) { Some(ce) => Error::client_stream(ce), - _ => Error::internal("S3: failed to put object", e), + _ => Error::from_reqwest("S3: failed to put object", e), })?; Ok(()) @@ -318,14 +318,14 @@ impl Backend for S3CompatibleBackend { .await? .send() .await - .map_err(|e| Error::internal("S3: failed to send delete request", e))?; + .map_err(|e| Error::from_reqwest("S3: failed to send delete request", e))?; // Do not error for objects that do not exist. if response.status() != StatusCode::NOT_FOUND { objectstore_log::debug!("Object not found"); response .error_for_status() - .map_err(|e| Error::internal("S3: failed to delete object", e))?; + .map_err(|e| Error::from_reqwest("S3: failed to delete object", e))?; } Ok(()) diff --git a/objectstore-service/src/error.rs b/objectstore-service/src/error.rs index 388f28af..a4939f45 100644 --- a/objectstore-service/src/error.rs +++ b/objectstore-service/src/error.rs @@ -107,6 +107,29 @@ impl Error { source: None, } } + + pub(crate) fn from_reqwest( + description: impl Into>, + cause: reqwest::Error, + ) -> Self { + // NOTE: 404 (Not Found) and 409 (Conflict) don't map cleanly to any current ErrorKind. + // Consider adding ErrorKind::NotFound and ErrorKind::Conflict if backends need to + // distinguish these from generic internal errors. + let kind = match cause.status() { + Some(status) => match status.as_u16() { + 400 => ErrorKind::BadRequest, + 408 | 429 => ErrorKind::TooManyRequests, + 500 | 502 | 503 | 504 => ErrorKind::Transient, + _ => ErrorKind::Internal, + }, + None => ErrorKind::Transient, + }; + Self { + kind, + description: Some(description.into()), + source: Some(Box::new(cause)), + } + } } impl From for Error { @@ -115,30 +138,6 @@ impl From for Error { } } -macro_rules! impl_from_internal { - ($($ty:ty),+ $(,)?) => { - $( - impl From<$ty> for Error { - fn from(source: $ty) -> Self { - Self { - kind: ErrorKind::Internal, - description: None, - source: Some(Box::new(source)), - } - } - } - )+ - }; -} - -impl_from_internal!( - std::io::Error, - serde_json::Error, - reqwest::Error, - gcp_auth::Error, - objectstore_types::metadata::Error, -); - /// Classification of a service error. #[derive(Debug, PartialEq, Eq)] pub enum ErrorKind { diff --git a/objectstore-service/src/stream.rs b/objectstore-service/src/stream.rs index a1fd6b41..f89904c7 100644 --- a/objectstore-service/src/stream.rs +++ b/objectstore-service/src/stream.rs @@ -302,11 +302,11 @@ pub fn single( pub(crate) async fn read_to_vec(mut stream: S) -> crate::error::Result> where S: Stream> + Unpin, - E: Into, + E: std::error::Error + Send + Sync + 'static, { let mut payload = Vec::new(); while let Some(result) = stream.next().await { - let chunk = result.map_err(Into::into)?; + let chunk = result.map_err(|e| crate::error::Error::internal("stream read failed", e))?; payload.extend(&chunk); } Ok(payload) From 6c17b556e78d61907faee0bda8813df96409fc1d Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 5 Jun 2026 16:40:57 +0200 Subject: [PATCH 6/7] improve --- objectstore-service/src/backend/common.rs | 4 +- objectstore-service/src/backend/gcs.rs | 2 +- objectstore-service/src/backend/local_fs.rs | 4 +- .../src/backend/s3_compatible.rs | 2 +- objectstore-service/src/error.rs | 97 +++++++++---------- 5 files changed, 51 insertions(+), 58 deletions(-) diff --git a/objectstore-service/src/backend/common.rs b/objectstore-service/src/backend/common.rs index 8d3f2e1d..f4cc3bf2 100644 --- a/objectstore-service/src/backend/common.rs +++ b/objectstore-service/src/backend/common.rs @@ -69,7 +69,9 @@ pub trait Backend: fmt::Debug + Send + Sync + 'static { /// The default returns [`ErrorKind::NotImplemented`](crate::error::ErrorKind::NotImplemented). Backends that implement /// [`MultipartUploadBackend`] should override this to return `Ok(self)`. fn as_multipart_upload_backend(self: Arc) -> Result> { - Err(Error::not_implemented()) + Err(Error::not_implemented_msg( + "the backend doesn't support multipart uploads", + )) } } diff --git a/objectstore-service/src/backend/gcs.rs b/objectstore-service/src/backend/gcs.rs index 581665ec..f80fa121 100644 --- a/objectstore-service/src/backend/gcs.rs +++ b/objectstore-service/src/backend/gcs.rs @@ -653,7 +653,7 @@ impl Backend for GcsBackend { .await .and_then(|r| r.error_for_status()) .map_err(|e| match stream::unpack_client_error(&e) { - Some(ce) => Error::client_stream(ce), + Some(ce) => ce.into(), _ => Error::from_reqwest("GCS: put object", e), })?; diff --git a/objectstore-service/src/backend/local_fs.rs b/objectstore-service/src/backend/local_fs.rs index f71a523c..b4b51418 100644 --- a/objectstore-service/src/backend/local_fs.rs +++ b/objectstore-service/src/backend/local_fs.rs @@ -113,7 +113,7 @@ impl Backend for LocalFsBackend { tokio::io::copy(&mut reader, &mut writer) .await .map_err(|e| match stream::unpack_client_error(&e) { - Some(ce) => Error::client_stream(ce), + Some(ce) => ce.into(), None => Error::internal("local-fs: copy stream failed", e), })?; @@ -264,7 +264,7 @@ impl MultipartUploadBackend for LocalFsBackend { let bytes_copied = tokio::io::copy(&mut reader, &mut writer) .await .map_err(|e| match stream::unpack_client_error(&e) { - Some(ce) => Error::client_stream(ce), + Some(ce) => ce.into(), None => Error::internal("local-fs: copy stream failed", e), })?; diff --git a/objectstore-service/src/backend/s3_compatible.rs b/objectstore-service/src/backend/s3_compatible.rs index 6b1b0452..3e3dffdb 100644 --- a/objectstore-service/src/backend/s3_compatible.rs +++ b/objectstore-service/src/backend/s3_compatible.rs @@ -284,7 +284,7 @@ impl Backend for S3CompatibleBackend { .await .and_then(|response| response.error_for_status()) .map_err(|e| match stream::unpack_client_error(&e) { - Some(ce) => Error::client_stream(ce), + Some(ce) => ce.into(), _ => Error::from_reqwest("S3: failed to put object", e), })?; diff --git a/objectstore-service/src/error.rs b/objectstore-service/src/error.rs index a4939f45..560211e4 100644 --- a/objectstore-service/src/error.rs +++ b/objectstore-service/src/error.rs @@ -53,61 +53,48 @@ impl Display for Error { } } -impl Error { - pub(crate) fn internal( - description: impl Into>, - source: impl std::error::Error + Send + Sync + 'static, - ) -> Self { - Self { - kind: ErrorKind::Internal, - description: Some(description.into()), - source: Some(Box::new(source)), - } - } - - pub(crate) fn internal_msg(description: impl Into>) -> Self { - Self { - kind: ErrorKind::Internal, - description: Some(description.into()), - source: None, +macro_rules! impl_converter { + ($kind:path, $fn_name:ident, $msg_fn_name:ident) => { + #[allow(dead_code)] + impl Error { + pub(crate) fn $fn_name( + description: impl Into>, + source: impl std::error::Error + Send + Sync + 'static, + ) -> Self { + Self { + kind: $kind, + description: Some(description.into()), + source: Some(Box::new(source)), + } + } + + pub(crate) fn $msg_fn_name(description: impl Into>) -> Self { + Self { + kind: $kind, + description: Some(description.into()), + source: None, + } + } } - } - - pub(crate) fn bad_request( - description: impl Into>, - source: impl std::error::Error + Send + Sync + 'static, - ) -> Self { - Self { - kind: ErrorKind::BadRequest, - description: Some(description.into()), - source: Some(Box::new(source)), - } - } - - pub(crate) fn bad_request_msg(description: impl Into>) -> Self { - Self { - kind: ErrorKind::BadRequest, - description: Some(description.into()), - source: None, - } - } + }; +} - pub(crate) fn client_stream(source: ClientError) -> Self { - Self { - kind: ErrorKind::ClientStream, - description: None, - source: Some(Box::new(source)), - } - } - - pub(crate) fn not_implemented() -> Self { - Self { - kind: ErrorKind::NotImplemented, - description: None, - source: None, - } - } +impl_converter!(ErrorKind::ClientStream, client_stream, client_stream_msg); +impl_converter!(ErrorKind::Transient, transient, transient_msg); +impl_converter!(ErrorKind::BadRequest, bad_request, bad_request_msg); +impl_converter!( + ErrorKind::NotImplemented, + not_implemented, + not_implemented_msg +); +impl_converter!( + ErrorKind::TooManyRequests, + too_many_requests, + too_many_requests_msg +); +impl_converter!(ErrorKind::Internal, internal, internal_msg); +impl Error { pub(crate) fn from_reqwest( description: impl Into>, cause: reqwest::Error, @@ -134,7 +121,11 @@ impl Error { impl From for Error { fn from(source: ClientError) -> Self { - Self::client_stream(source) + Self { + kind: ErrorKind::ClientStream, + source: Some(Box::new(source)), + description: None, + } } } From f921bd7f1376136d43b5c86badb750a2d973695f Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 5 Jun 2026 16:44:37 +0200 Subject: [PATCH 7/7] improve --- objectstore-service/src/error.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/objectstore-service/src/error.rs b/objectstore-service/src/error.rs index 560211e4..bdcd3132 100644 --- a/objectstore-service/src/error.rs +++ b/objectstore-service/src/error.rs @@ -136,13 +136,13 @@ pub enum ErrorKind { ClientStream, /// Transient failure that may succeed on retry. Transient, - /// Malformed or invalid client request. + /// Malformed or invalid input. BadRequest, - /// Functionality not implemented by this backend. + /// Functionality not implemented. NotImplemented, - /// Service is at capacity. + /// Cannot handle the request due to too much load. TooManyRequests, - /// Internal service or backend failure. + /// Internal service or storage backend failure. Internal, }