Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion objectstore-server/src/endpoints/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ async fn got_to_part(
.meter_stream(stream, context)
.try_collect::<BytesMut>()
.await
.map_err(|e| ApiError::Service(e.into()))?
.map_err(|e| ApiError::Internal(format!("stream read failed: {e}")))?
.freeze();

let mut metadata_headers = metadata.to_headers("").map_err(|err| {
Expand Down
26 changes: 15 additions & 11 deletions objectstore-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::error::Error;
use axum::Json;
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use objectstore_service::error::Error as ServiceError;
use objectstore_service::error::{Error as ServiceError, ErrorKind};
use serde::{Deserialize, Serialize};
use thiserror::Error;

Expand All @@ -24,7 +24,7 @@ pub enum ApiError {
Auth(#[from] AuthError),

/// Service errors, indicating that something went wrong when receiving or executing a request.
#[error("service error: {0}")]
#[error(transparent)]
Service(#[from] ServiceError),

/// Errors encountered when parsing or executing a batch request.
Expand Down Expand Up @@ -91,15 +91,19 @@ impl ApiError {
StatusCode::INTERNAL_SERVER_ERROR
}

ApiError::Service(ServiceError::Client(_)) => StatusCode::BAD_REQUEST,
ApiError::Service(ServiceError::Metadata(_)) => StatusCode::BAD_REQUEST,
ApiError::Service(ServiceError::InvalidUploadId(_)) => StatusCode::BAD_REQUEST,
ApiError::Service(ServiceError::AtCapacity) => StatusCode::TOO_MANY_REQUESTS,
ApiError::Service(ServiceError::NotImplemented) => StatusCode::NOT_IMPLEMENTED,
ApiError::Service(_) => {
objectstore_log::error!(!!self, "error handling request");
StatusCode::INTERNAL_SERVER_ERROR
}
ApiError::Service(e) => match e.kind() {
ErrorKind::ClientStream | ErrorKind::BadRequest => StatusCode::BAD_REQUEST,
ErrorKind::TooManyRequests => StatusCode::TOO_MANY_REQUESTS,
ErrorKind::NotImplemented => StatusCode::NOT_IMPLEMENTED,
ErrorKind::Transient => {
objectstore_log::warn!(!!self, "transient error handling request");
StatusCode::SERVICE_UNAVAILABLE
}
ErrorKind::Internal => {
objectstore_log::error!(!!self, "internal error handling request");
StatusCode::INTERNAL_SERVER_ERROR
}
},

ApiError::Internal(_) => {
objectstore_log::error!(!!self, "internal error");
Expand Down
4 changes: 2 additions & 2 deletions objectstore-server/src/endpoints/multipart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use bytes::Bytes;
use futures::StreamExt;
use http::HeaderValue;
use http::header;
use objectstore_service::error::Error as ServiceError;
use objectstore_service::id::{ObjectContext, ObjectId};
use objectstore_service::multipart::{CompletedPart, PartNumber, UploadId};
use objectstore_types::metadata::Metadata;
Expand Down Expand Up @@ -95,7 +94,8 @@ async fn initiate_inner(
id: ObjectId,
headers: HeaderMap,
) -> ApiResult<Response> {
let mut metadata = Metadata::from_headers(&headers, "").map_err(ServiceError::from)?;
let mut metadata = Metadata::from_headers(&headers, "")
.map_err(|e| ApiError::Client(format!("invalid metadata headers: {e}")))?;
// TODO: Do this in `complete` instead, when we have a Service API to mutate metadata.
metadata.time_created = Some(SystemTime::now());

Expand Down
15 changes: 10 additions & 5 deletions objectstore-server/src/endpoints/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use axum::http::{HeaderMap, StatusCode};
use axum::response::{IntoResponse, Response};
use axum::routing;
use axum::{Json, Router};
use objectstore_service::error::Error as ServiceError;
use objectstore_service::id::{ObjectContext, ObjectId};
use objectstore_types::metadata::Metadata;
use serde::Serialize;
Expand Down Expand Up @@ -43,7 +42,8 @@ async fn objects_post(
headers: HeaderMap,
MeteredBody(body): MeteredBody,
) -> ApiResult<Response> {
let mut metadata = Metadata::from_headers(&headers, "").map_err(ServiceError::from)?;
let mut metadata = Metadata::from_headers(&headers, "")
.map_err(|e| ApiError::Client(format!("invalid metadata headers: {e}")))?;
metadata.time_created = Some(SystemTime::now());

state
Expand Down Expand Up @@ -71,7 +71,9 @@ async fn object_get(
};
let stream = state.meter_stream(stream, &context);

let headers = metadata.to_headers("").map_err(ServiceError::from)?;
let headers = metadata
.to_headers("")
.map_err(|e| ApiError::Internal(format!("failed to serialize metadata headers: {e}")))?;
Ok((headers, Body::from_stream(stream)).into_response())
}

Expand All @@ -80,7 +82,9 @@ async fn object_head(service: AuthAwareService, Xt(id): Xt<ObjectId>) -> ApiResu
return Ok(StatusCode::NOT_FOUND.into_response());
};

let headers = metadata.to_headers("").map_err(ServiceError::from)?;
let headers = metadata
.to_headers("")
.map_err(|e| ApiError::Internal(format!("failed to serialize metadata headers: {e}")))?;

Ok((StatusCode::NO_CONTENT, headers).into_response())
}
Expand All @@ -92,7 +96,8 @@ async fn object_put(
headers: HeaderMap,
MeteredBody(body): MeteredBody,
) -> ApiResult<Response> {
let mut metadata = Metadata::from_headers(&headers, "").map_err(ServiceError::from)?;
let mut metadata = Metadata::from_headers(&headers, "")
.map_err(|e| ApiError::Client(format!("invalid metadata headers: {e}")))?;
metadata.time_created = Some(SystemTime::now());

let ObjectId { context, key } = id;
Expand Down
2 changes: 1 addition & 1 deletion objectstore-service/docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ A semaphore caps the total number of in-flight backend operations across all
callers. A permit is acquired before each operation is spawned and held until
the task completes — including on panic — so the limit counts *running*
operations, not queued ones. When no permits are available, the operation fails
with [`Error::AtCapacity`](error::Error::AtCapacity).
with [`ConcurrencyError::AtCapacity`](concurrency::ConcurrencyError::AtCapacity).

The default limit is [`DEFAULT_CONCURRENCY_LIMIT`](service::DEFAULT_CONCURRENCY_LIMIT). Callers can override it via
[`StorageService::with_concurrency_limit`].
Expand Down
76 changes: 39 additions & 37 deletions objectstore-service/src/backend/bigtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ fn object_mutations(
};

let metadata_bytes = serde_json::to_vec(metadata)
.map_err(|cause| Error::serde("failed to serialize metadata", cause))?;
.map_err(|e| Error::internal("failed to serialize metadata", e))?;

Ok([
// NB: We explicitly delete the row to clear metadata on overwrite.
Expand Down Expand Up @@ -520,7 +520,7 @@ fn tombstone_mutations(tombstone: &Tombstone, now: SystemTime) -> Result<[v2::Mu
column_qualifier: COLUMN_TOMBSTONE_META.to_owned(),
timestamp_micros,
value: serde_json::to_vec(&tombstone_meta)
.map_err(|cause| Error::serde("failed to serialize tombstone", cause))?,
.map_err(|e| Error::internal("failed to serialize tombstone", e))?,
})),
])
}
Expand Down Expand Up @@ -593,8 +593,8 @@ impl RowData {
}
COLUMN_TOMBSTONE_META => {
tombstone_meta_opt =
Some(serde_json::from_slice(&cell.value).map_err(|cause| {
Error::serde("failed to deserialize tombstone meta", cause)
Some(serde_json::from_slice(&cell.value).map_err(|e| {
Error::internal("failed to deserialize tombstone meta", e)
})?);
}
COLUMN_METADATA => {
Expand All @@ -609,8 +609,8 @@ impl RowData {
});
} else {
metadata_opt =
Some(serde_json::from_slice(&cell.value).map_err(|cause| {
Error::serde("failed to deserialize metadata", cause)
Some(serde_json::from_slice(&cell.value).map_err(|e| {
Error::internal("failed to deserialize metadata", e)
})?);
}
}
Expand Down Expand Up @@ -675,9 +675,9 @@ fn parse_redirect_target(redirect_path: &[u8], tombstone_id: &ObjectId) -> Resul
Ok(tombstone_id.clone())
} else {
let redirect_str = std::str::from_utf8(redirect_path)
.map_err(|_| Error::generic("invalid UTF-8 in redirect path"))?;
.map_err(|_| Error::internal_msg("invalid UTF-8 in redirect path"))?;
ObjectId::from_storage_path(redirect_str)
.ok_or_else(|| Error::generic("corrupt redirect path"))
.ok_or_else(|| Error::internal_msg("corrupt redirect path"))
}
}

Expand Down Expand Up @@ -912,7 +912,7 @@ impl Backend for BigTableBackend {
async fn get_object(&self, id: &ObjectId) -> Result<GetResponse> {
match self.get_tiered_object(id).await? {
TieredGet::Object(metadata, payload) => Ok(Some((metadata, payload))),
TieredGet::Tombstone(_) => Err(Error::UnexpectedTombstone),
TieredGet::Tombstone(_) => Err(Error::internal_msg("unexpected tombstone")),
TieredGet::NotFound => Ok(None),
}
}
Expand All @@ -921,7 +921,7 @@ impl Backend for BigTableBackend {
async fn get_metadata(&self, id: &ObjectId) -> Result<MetadataResponse> {
match self.get_tiered_metadata(id).await? {
TieredMetadata::Object(metadata) => Ok(Some(metadata)),
TieredMetadata::Tombstone(_) => Err(Error::UnexpectedTombstone),
TieredMetadata::Tombstone(_) => Err(Error::internal_msg("unexpected tombstone")),
TieredMetadata::NotFound => Ok(None),
}
}
Expand Down Expand Up @@ -984,7 +984,9 @@ impl HighVolumeBackend for BigTableBackend {
}
}

Err(Error::generic("BigTable: race loop in put_non_tombstone"))
Err(Error::internal_msg(
"BigTable: race loop in put_non_tombstone",
))
}

#[tracing::instrument(level = "trace", fields(?id), skip_all)]
Expand Down Expand Up @@ -1079,7 +1081,7 @@ impl HighVolumeBackend for BigTableBackend {
}
}

Err(Error::generic(
Err(Error::internal_msg(
"BigTable: race loop in delete_non_tombstone",
))
}
Expand Down Expand Up @@ -1120,27 +1122,30 @@ impl HighVolumeBackend for BigTableBackend {
/// required by BigTable, the resulting timestamp has millisecond precision, with the last digits at
/// 0.
fn ttl_to_micros(ttl: Duration, from: SystemTime) -> Result<i64> {
let deadline = from.checked_add(ttl).ok_or_else(|| Error::Generic {
context: format!(
let deadline = from.checked_add(ttl).ok_or_else(|| {
Error::internal_msg(format!(
"TTL duration overflow: {} plus {}s cannot be represented as SystemTime",
humantime::format_rfc3339_seconds(from),
ttl.as_secs()
),
cause: None,
))
})?;
let millis = deadline
.duration_since(SystemTime::UNIX_EPOCH)
.map_err(|e| Error::Generic {
context: format!(
"unable to get duration since UNIX_EPOCH for SystemTime {}",
humantime::format_rfc3339_seconds(deadline)
),
cause: Some(Box::new(e)),
.map_err(|e| {
Error::internal(
format!(
"unable to get duration since UNIX_EPOCH for SystemTime {}",
humantime::format_rfc3339_seconds(deadline)
),
e,
)
})?
.as_millis();
(millis * 1000).try_into().map_err(|e| Error::Generic {
context: format!("failed to convert {}ms to i64 microseconds", millis),
cause: Some(Box::new(e)),
(millis * 1000).try_into().map_err(|e| {
Error::internal(
format!("failed to convert {}ms to i64 microseconds", millis),
e,
)
})
}

Expand All @@ -1163,10 +1168,7 @@ where
Ok(res) => return Ok(res),
Err(e) if retry_count >= REQUEST_RETRY_COUNT || !is_retryable(&e) => {
objectstore_metrics::count!("bigtable.failures", action = context);
return Err(Error::Generic {
context: format!("Bigtable: `{context}` failed"),
cause: Some(Box::new(e)),
});
return Err(Error::internal(format!("Bigtable: `{context}` failed"), e));
}
Err(e) => {
retry_count += 1;
Expand Down Expand Up @@ -1742,14 +1744,14 @@ mod tests {
}

// Legacy reads must error rather than leak tombstone data.
assert!(matches!(
backend.get_object(&hv_id).await,
Err(Error::UnexpectedTombstone)
));
assert!(matches!(
backend.get_metadata(&hv_id).await,
Err(Error::UnexpectedTombstone)
));
let Err(err) = backend.get_object(&hv_id).await else {
panic!("expected error");
};
assert_eq!(err.description.as_deref(), Some("unexpected tombstone"));
let Err(err) = backend.get_metadata(&hv_id).await else {
panic!("expected error");
};
assert_eq!(err.description.as_deref(), Some("unexpected tombstone"));

// Idempotent retry: retry with the same target succeeds
let second = backend
Expand Down
6 changes: 4 additions & 2 deletions objectstore-service/src/backend/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,12 @@ pub trait Backend: fmt::Debug + Send + Sync + 'static {

/// Casts this backend into an [`Arc<dyn MultipartUploadBackend>`] if supported.
///
/// The default returns [`Error::NotImplemented`]. Backends that implement
/// The default returns [`ErrorKind::NotImplemented`](crate::error::ErrorKind::NotImplemented). Backends that implement
/// [`MultipartUploadBackend`] should override this to return `Ok(self)`.
fn as_multipart_upload_backend(self: Arc<Self>) -> Result<Arc<dyn MultipartUploadBackend>> {
Err(Error::NotImplemented)
Err(Error::not_implemented_msg(
"the backend doesn't support multipart uploads",
))
}
}

Expand Down
Loading
Loading