diff --git a/objectstore-server/src/auth/service.rs b/objectstore-server/src/auth/service.rs index b0e5d3bc..89c0c040 100644 --- a/objectstore-server/src/auth/service.rs +++ b/objectstore-server/src/auth/service.rs @@ -7,6 +7,7 @@ use objectstore_service::service::{DeleteResponse, GetResponse, InsertResponse, use objectstore_service::{ClientStream, StorageService}; use objectstore_types::auth::Permission; use objectstore_types::metadata::Metadata; +use objectstore_types::range::ByteRange; use crate::auth::{AuthContext, AuthError}; use crate::endpoints::common::ApiResult; @@ -110,9 +111,13 @@ impl AuthAwareService { } /// Auth-aware wrapper around [`StorageService::get_object`]. - pub async fn get_object(&self, id: ObjectId) -> ApiResult { + pub async fn get_object( + &self, + id: ObjectId, + range: Option, + ) -> ApiResult { self.assert_authorized(Permission::ObjectRead, id.context())?; - Ok(self.service.get_object(id).await?) + Ok(self.service.get_object(id, range).await?) } /// Auth-aware wrapper around [`StorageService::delete_object`]. diff --git a/objectstore-server/src/endpoints/batch.rs b/objectstore-server/src/endpoints/batch.rs index 13d20ac0..c75ece11 100644 --- a/objectstore-server/src/endpoints/batch.rs +++ b/objectstore-server/src/endpoints/batch.rs @@ -147,7 +147,7 @@ async fn convert_to_part( match result { Ok(OpResponse::Got { key, - response: Some((metadata, stream)), + response: Some((metadata, _content_range, stream)), }) => got_to_part(idx, key, metadata, stream, state, context) .await .unwrap_or_else(|e| create_error_part(idx, &e)), diff --git a/objectstore-server/src/endpoints/common.rs b/objectstore-server/src/endpoints/common.rs index ab6a8240..06b6014e 100644 --- a/objectstore-server/src/endpoints/common.rs +++ b/objectstore-server/src/endpoints/common.rs @@ -5,6 +5,7 @@ use std::error::Error; use axum::Json; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; +use http::HeaderValue; use objectstore_service::error::Error as ServiceError; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -93,6 +94,9 @@ impl ApiError { ApiError::Service(ServiceError::Client(_)) => StatusCode::BAD_REQUEST, ApiError::Service(ServiceError::Metadata(_)) => StatusCode::BAD_REQUEST, + ApiError::Service(ServiceError::RangeNotSatisfiable { .. }) => { + StatusCode::RANGE_NOT_SATISFIABLE + } ApiError::Service(ServiceError::InvalidUploadId(_)) => StatusCode::BAD_REQUEST, ApiError::Service(ServiceError::AtCapacity) => StatusCode::TOO_MANY_REQUESTS, ApiError::Service(ServiceError::NotImplemented) => StatusCode::NOT_IMPLEMENTED, @@ -115,3 +119,11 @@ impl IntoResponse for ApiError { (self.status(), Json(body)).into_response() } } + +/// Inserts `Accept-Ranges: bytes` into the response headers. +pub fn insert_accept_ranges(response: &mut Response) { + response.headers_mut().insert( + http::header::ACCEPT_RANGES, + HeaderValue::from_static("bytes"), + ); +} diff --git a/objectstore-server/src/endpoints/objects.rs b/objectstore-server/src/endpoints/objects.rs index e5d0a8ae..8f172e31 100644 --- a/objectstore-server/src/endpoints/objects.rs +++ b/objectstore-server/src/endpoints/objects.rs @@ -9,10 +9,12 @@ use axum::{Json, Router}; use objectstore_service::error::Error as ServiceError; use objectstore_service::id::{ObjectContext, ObjectId}; use objectstore_types::metadata::Metadata; +use objectstore_types::range::ContentRange; use serde::Serialize; use crate::auth::AuthAwareService; -use crate::endpoints::common::{ApiError, ApiResult}; +use crate::endpoints::common::{ApiError, ApiResult, insert_accept_ranges}; +use crate::extractors::byte_range::OptionalByteRange; use crate::extractors::{Xt, body::MeteredBody}; use crate::state::ServiceState; @@ -64,15 +66,55 @@ async fn object_get( service: AuthAwareService, State(state): State, Xt(id): Xt, + OptionalByteRange(byte_range): OptionalByteRange, + _headers: HeaderMap, ) -> ApiResult { let context = id.context().clone(); - let Some((metadata, stream)) = service.get_object(id).await? else { - return Ok(StatusCode::NOT_FOUND.into_response()); + let result = service.get_object(id, byte_range).await; + + let (metadata, content_range, stream) = match result { + Ok(Some(result)) => result, + Ok(None) => return Ok(StatusCode::NOT_FOUND.into_response()), + Err(ApiError::Service(ServiceError::RangeNotSatisfiable { total })) => { + let mut response = ( + StatusCode::RANGE_NOT_SATISFIABLE, + [( + http::header::CONTENT_RANGE, + ContentRange::unsatisfiable_total_to_header_value(total), + )], + ) + .into_response(); + insert_accept_ranges(&mut response); + return Ok(response); + } + Err(e) => return Err(e), }; + let stream = state.meter_stream(stream, &context); + let metadata_headers = metadata.to_headers("").map_err(ServiceError::from)?; + + let mut response = match content_range { + Some(ref content_range) => { + let mut resp = ( + StatusCode::PARTIAL_CONTENT, + metadata_headers, + Body::from_stream(stream), + ) + .into_response(); + let headers = resp.headers_mut(); + headers.insert( + http::header::CONTENT_LENGTH, + content_range.len_to_header_value(), + ); + headers.insert(http::header::CONTENT_RANGE, content_range.to_header_value()); + resp + } + None => (StatusCode::OK, metadata_headers, Body::from_stream(stream)).into_response(), + }; - let headers = metadata.to_headers("").map_err(ServiceError::from)?; - Ok((headers, Body::from_stream(stream)).into_response()) + insert_accept_ranges(&mut response); + + Ok(response) } async fn object_head(service: AuthAwareService, Xt(id): Xt) -> ApiResult { @@ -82,7 +124,9 @@ async fn object_head(service: AuthAwareService, Xt(id): Xt) -> ApiResu let headers = metadata.to_headers("").map_err(ServiceError::from)?; - Ok((StatusCode::NO_CONTENT, headers).into_response()) + let mut response = (StatusCode::NO_CONTENT, headers).into_response(); + insert_accept_ranges(&mut response); + Ok(response) } async fn object_put( diff --git a/objectstore-server/src/extractors/byte_range.rs b/objectstore-server/src/extractors/byte_range.rs new file mode 100644 index 00000000..9f39dc77 --- /dev/null +++ b/objectstore-server/src/extractors/byte_range.rs @@ -0,0 +1,49 @@ +//! Axum extractor for range requests. + +use axum::extract::FromRequestParts; +use http::request::Parts; +use objectstore_types::range::{ByteRange, RangeError}; + +use crate::{endpoints::common::ApiError, state::ServiceState}; + +/// Extractor that parses the `Range` request header into an optional [`ByteRange`]. +#[derive(Debug, Clone)] +pub struct OptionalByteRange(pub Option); + +impl FromRequestParts for OptionalByteRange { + type Rejection = ApiError; + + async fn from_request_parts( + parts: &mut Parts, + _state: &ServiceState, + ) -> Result { + let headers = &parts.headers; + let Some(range) = headers.get(http::header::RANGE) else { + return Ok(Self(None)); + }; + let range = range + .to_str() + .map_err(|_| ApiError::Client("invalid Range header".into()))?; + + match range.parse::() { + Ok(range) => Ok(Self(Some(range))), + // Per RFC 9110: + // > A server that supports range requests MAY ignore or reject a Range header + // field that contains an invalid ranges-specifier [...] + // + // If the client wants multiple ranges, fall back to returning the whole object. + // We might support multiple ranges in the future, so log a warning to let us know + // clients are trying to do this. + Err(RangeError::MultiRange) => { + objectstore_log::warn!( + "received range request with multiple range specifiers, ignoring" + ); + Ok(Self(None)) + } + // The client requested an invalid unit or sent a malformed header. + // We could fall back, but better fail hard and let them know they sent something + // invalid. + Err(err) => Err(ApiError::Client(format!("invalid Range header: {err}"))), + } + } +} diff --git a/objectstore-server/src/extractors/mod.rs b/objectstore-server/src/extractors/mod.rs index 74ec0ba9..6a5e760d 100644 --- a/objectstore-server/src/extractors/mod.rs +++ b/objectstore-server/src/extractors/mod.rs @@ -2,6 +2,7 @@ pub mod batch; pub mod body; +pub mod byte_range; pub mod downstream_service; mod id; mod service; diff --git a/objectstore-server/tests/range_requests.rs b/objectstore-server/tests/range_requests.rs new file mode 100644 index 00000000..ca5afd4d --- /dev/null +++ b/objectstore-server/tests/range_requests.rs @@ -0,0 +1,235 @@ +//! End-to-end tests for HTTP Range request support. + +use anyhow::Result; +use objectstore_server::config::{AuthZ, Config}; +use objectstore_test::server::TestServer; + +async fn setup() -> (TestServer, String) { + let server = TestServer::with_config(Config { + auth: AuthZ { + enforce: false, + ..Default::default() + }, + ..Default::default() + }) + .await; + + let client = reqwest::Client::new(); + let payload = "Hello, Range Requests!"; // 22 bytes + + let resp = client + .post(server.url("/v1/objects/test/org=1/")) + .header("content-type", "text/plain") + .body(payload) + .send() + .await + .unwrap(); + assert_eq!(resp.status(), reqwest::StatusCode::CREATED); + let body: serde_json::Value = resp.json().await.unwrap(); + let key = body["key"].as_str().unwrap().to_string(); + + (server, key) +} + +#[tokio::test] +async fn no_range_returns_200_with_accept_ranges() -> Result<()> { + let (server, key) = setup().await; + let client = reqwest::Client::new(); + + let resp = client + .get(server.url(&format!("/v1/objects/test/org=1/{key}"))) + .send() + .await?; + + assert_eq!(resp.status(), reqwest::StatusCode::OK); + assert_eq!( + resp.headers().get("accept-ranges").unwrap().to_str()?, + "bytes" + ); + assert!( + resp.headers().get("content-range").is_none(), + "200 response must not include Content-Range" + ); + + let body = resp.text().await?; + assert_eq!(body, "Hello, Range Requests!"); + Ok(()) +} + +#[tokio::test] +async fn head_returns_accept_ranges() -> Result<()> { + let (server, key) = setup().await; + let client = reqwest::Client::new(); + + let resp = client + .head(server.url(&format!("/v1/objects/test/org=1/{key}"))) + .send() + .await?; + + assert_eq!(resp.status(), reqwest::StatusCode::NO_CONTENT); + assert_eq!( + resp.headers().get("accept-ranges").unwrap().to_str()?, + "bytes" + ); + Ok(()) +} + +#[tokio::test] +async fn range_prefix_returns_206() -> Result<()> { + let (server, key) = setup().await; + let client = reqwest::Client::new(); + + let resp = client + .get(server.url(&format!("/v1/objects/test/org=1/{key}"))) + .header("range", "bytes=0-4") + .send() + .await?; + + assert_eq!(resp.status(), reqwest::StatusCode::PARTIAL_CONTENT); + assert_eq!( + resp.headers().get("accept-ranges").unwrap().to_str()?, + "bytes" + ); + assert_eq!( + resp.headers().get("content-range").unwrap().to_str()?, + "bytes 0-4/22" + ); + assert_eq!(resp.headers().get("content-length").unwrap().to_str()?, "5"); + + let body = resp.text().await?; + assert_eq!(body, "Hello"); + Ok(()) +} + +#[tokio::test] +async fn range_suffix_returns_206() -> Result<()> { + let (server, key) = setup().await; + let client = reqwest::Client::new(); + + let resp = client + .get(server.url(&format!("/v1/objects/test/org=1/{key}"))) + .header("range", "bytes=-9") + .send() + .await?; + + assert_eq!(resp.status(), reqwest::StatusCode::PARTIAL_CONTENT); + assert_eq!( + resp.headers().get("content-range").unwrap().to_str()?, + "bytes 13-21/22" + ); + + let body = resp.text().await?; + assert_eq!(body, "Requests!"); + Ok(()) +} + +#[tokio::test] +async fn range_from_offset_returns_206() -> Result<()> { + let (server, key) = setup().await; + let client = reqwest::Client::new(); + + let resp = client + .get(server.url(&format!("/v1/objects/test/org=1/{key}"))) + .header("range", "bytes=7-") + .send() + .await?; + + assert_eq!(resp.status(), reqwest::StatusCode::PARTIAL_CONTENT); + assert_eq!( + resp.headers().get("content-range").unwrap().to_str()?, + "bytes 7-21/22" + ); + assert_eq!( + resp.headers().get("content-length").unwrap().to_str()?, + "15" + ); + + let body = resp.text().await?; + assert_eq!(body, "Range Requests!"); + Ok(()) +} + +#[tokio::test] +async fn unknown_range_unit_returns_400() -> Result<()> { + let (server, key) = setup().await; + let client = reqwest::Client::new(); + + let resp = client + .get(server.url(&format!("/v1/objects/test/org=1/{key}"))) + .header("range", "items=0-10") + .send() + .await?; + + assert_eq!(resp.status(), reqwest::StatusCode::BAD_REQUEST); + Ok(()) +} + +#[tokio::test] +async fn invalid_bytes_range_returns_400() -> Result<()> { + let (server, key) = setup().await; + let client = reqwest::Client::new(); + + let resp = client + .get(server.url(&format!("/v1/objects/test/org=1/{key}"))) + .header("range", "bytes=abc-def") + .send() + .await?; + + assert_eq!(resp.status(), reqwest::StatusCode::BAD_REQUEST); + Ok(()) +} + +#[tokio::test] +async fn multi_range_falls_back_to_full_body() -> Result<()> { + let (server, key) = setup().await; + let client = reqwest::Client::new(); + + let resp = client + .get(server.url(&format!("/v1/objects/test/org=1/{key}"))) + .header("range", "bytes=0-4, 10-14") + .send() + .await?; + + assert_eq!(resp.status(), reqwest::StatusCode::OK); + let body = resp.text().await?; + assert_eq!(body, "Hello, Range Requests!"); + Ok(()) +} + +#[tokio::test] +async fn unsatisfiable_range_returns_416() -> Result<()> { + let (server, key) = setup().await; + let client = reqwest::Client::new(); + + let resp = client + .get(server.url(&format!("/v1/objects/test/org=1/{key}"))) + .header("range", "bytes=100-200") + .send() + .await?; + + assert_eq!(resp.status(), reqwest::StatusCode::RANGE_NOT_SATISFIABLE); + assert_eq!( + resp.headers().get("content-range").unwrap().to_str()?, + "bytes */22" + ); + assert_eq!( + resp.headers().get("accept-ranges").unwrap().to_str()?, + "bytes" + ); + Ok(()) +} + +#[tokio::test] +async fn range_on_nonexistent_object_returns_404() -> Result<()> { + let (server, _key) = setup().await; + let client = reqwest::Client::new(); + + let resp = client + .get(server.url("/v1/objects/test/org=1/nonexistent")) + .header("range", "bytes=0-10") + .send() + .await?; + + assert_eq!(resp.status(), reqwest::StatusCode::NOT_FOUND); + Ok(()) +} diff --git a/objectstore-service/src/backend/bigtable.rs b/objectstore-service/src/backend/bigtable.rs index 56b609fb..480bc7cc 100644 --- a/objectstore-service/src/backend/bigtable.rs +++ b/objectstore-service/src/backend/bigtable.rs @@ -38,6 +38,7 @@ use bigtable_rs::google::bigtable::v2::{self, mutation}; use bytes::Bytes; use futures_util::TryStreamExt; use objectstore_types::metadata::{ExpirationPolicy, Metadata}; +use objectstore_types::range::ByteRange; use serde::{Deserialize, Serialize}; use tonic::Code; @@ -909,9 +910,11 @@ impl Backend for BigTableBackend { } #[tracing::instrument(level = "trace", fields(?id), skip_all)] - async fn get_object(&self, id: &ObjectId) -> Result { - match self.get_tiered_object(id).await? { - TieredGet::Object(metadata, payload) => Ok(Some((metadata, payload))), + async fn get_object(&self, id: &ObjectId, range: Option) -> Result { + match self.get_tiered_object(id, range).await? { + TieredGet::Object(metadata, content_range, payload) => { + Ok(Some((metadata, content_range, payload))) + } TieredGet::Tombstone(_) => Err(Error::UnexpectedTombstone), TieredGet::NotFound => Ok(None), } @@ -988,7 +991,11 @@ impl HighVolumeBackend for BigTableBackend { } #[tracing::instrument(level = "trace", fields(?id), skip_all)] - async fn get_tiered_object(&self, id: &ObjectId) -> Result { + async fn get_tiered_object( + &self, + id: &ObjectId, + _range: Option, + ) -> Result { objectstore_log::debug!("Reading from Bigtable backend"); let path = id.as_storage_path().to_string().into_bytes(); @@ -1008,7 +1015,7 @@ impl HighVolumeBackend for BigTableBackend { RowData::Object { metadata, payload } => { let mut metadata = metadata; metadata.size = Some(payload.len()); - TieredGet::Object(metadata, crate::stream::single(payload)) + TieredGet::Object(metadata, None, crate::stream::single(payload)) } }) } @@ -1351,7 +1358,7 @@ mod tests { .put_object(&id, &metadata, stream::single("hello, world")) .await?; - let (obj_meta, stream) = backend.get_object(&id).await?.unwrap(); + let (obj_meta, _, stream) = backend.get_object(&id, None).await?.unwrap(); let payload = stream::read_to_vec(stream).await?; assert_eq!(payload, b"hello, world"); assert_eq!(obj_meta.content_type, metadata.content_type); @@ -1370,7 +1377,7 @@ mod tests { let backend = create_test_backend().await?; let id = make_id(); - assert!(backend.get_object(&id).await?.is_none()); + assert!(backend.get_object(&id, None).await?.is_none()); assert!(backend.get_metadata(&id).await?.is_none()); backend.delete_object(&id).await?; @@ -1396,7 +1403,7 @@ mod tests { .put_object(&id, &second_metadata, stream::single("world")) .await?; - let (meta, stream) = backend.get_object(&id).await?.unwrap(); + let (meta, _, stream) = backend.get_object(&id, None).await?.unwrap(); let payload = stream::read_to_vec(stream).await?; assert_eq!(payload, b"world"); assert_eq!(meta.custom, second_metadata.custom); @@ -1413,7 +1420,7 @@ mod tests { create_object(&backend, &id, &metadata, b"hello", SystemTime::now()).await?; backend.delete_object(&id).await?; - assert!(backend.get_object(&id).await?.is_none()); + assert!(backend.get_object(&id, None).await?.is_none()); Ok(()) } @@ -1442,7 +1449,7 @@ mod tests { create_object(&backend, &id1, &metadata, b"hello, world", past_now).await?; // get_object reads the stale row, triggers bump, and returns the pre-bump metadata. - let (pre_obj_meta, _) = backend.get_object(&id1).await?.unwrap(); + let (pre_obj_meta, _, _) = backend.get_object(&id1, None).await?.unwrap(); let pre_obj_expiry = pre_obj_meta.time_expires.unwrap(); // A second get_metadata reads the freshly bumped row. @@ -1467,7 +1474,7 @@ mod tests { assert!(post_expiry > pre_expiry, "bump should extend expiry"); // Payload must be intact after the loaded=false bump (which re-fetches the payload). - let (_, stream) = backend.get_object(&id2).await?.unwrap(); + let (_, _, stream) = backend.get_object(&id2, None).await?.unwrap(); let payload = stream::read_to_vec(stream).await?; assert_eq!(payload, b"hello, world"); @@ -1517,7 +1524,7 @@ mod tests { }; create_object(&backend, &id, &metadata, b"hello, world", SystemTime::now()).await?; - assert!(backend.get_object(&id).await?.is_none()); + assert!(backend.get_object(&id, None).await?.is_none()); Ok(()) } @@ -1536,7 +1543,7 @@ mod tests { }; create_object(&backend, &id, &metadata, b"hello, world", SystemTime::now()).await?; - assert!(backend.get_object(&id).await?.is_none()); + assert!(backend.get_object(&id, None).await?.is_none()); Ok(()) } @@ -1556,7 +1563,7 @@ mod tests { // empty let id = make_id(); assert!(matches!( - backend.get_tiered_object(&id).await?, + backend.get_tiered_object(&id, None).await?, TieredGet::NotFound )); assert!(matches!( @@ -1573,7 +1580,9 @@ mod tests { }; create_object(&backend, &id, &put_meta, b"payload", SystemTime::now()).await?; - let TieredGet::Object(obj_meta, obj_stream) = backend.get_tiered_object(&id).await? else { + let TieredGet::Object(obj_meta, _, obj_stream) = + backend.get_tiered_object(&id, None).await? + else { panic!("expected TieredGet::Object"); }; let obj_payload = stream::read_to_vec(obj_stream).await?; @@ -1596,7 +1605,7 @@ mod tests { }; create_tombstone(&backend, &hv_id, &tombstone, SystemTime::now()).await?; - match backend.get_tiered_object(&hv_id).await? { + match backend.get_tiered_object(&hv_id, None).await? { TieredGet::Tombstone(get_t) => assert_eq!(get_t.target, lt_id), other => panic!("expected TieredGet::Tombstone, got {other:?}"), } @@ -1624,7 +1633,7 @@ mod tests { .put_non_tombstone(&id, &metadata, Bytes::from_static(b"first")) .await?; assert_eq!(result, None, "expected None on empty row"); - let (_, stream) = backend.get_object(&id).await?.unwrap(); + let (_, _, stream) = backend.get_object(&id, None).await?.unwrap(); assert_eq!(&stream::read_to_vec(stream).await?, b"first"); // object: put_non_tombstone on existing object replaces payload, returns None. @@ -1634,7 +1643,7 @@ mod tests { .put_non_tombstone(&id, &metadata, Bytes::from_static(b"new")) .await?; assert_eq!(result, None, "expected None when overwriting object"); - let (_, stream) = backend.get_object(&id).await?.unwrap(); + let (_, _, stream) = backend.get_object(&id, None).await?.unwrap(); assert_eq!(&stream::read_to_vec(stream).await?, b"new"); // tombstone: put_non_tombstone returns Some(Tombstone) and leaves tombstone intact. @@ -1682,7 +1691,7 @@ mod tests { let metadata = Metadata::default(); create_object(&backend, &id, &metadata, b"hello, world", SystemTime::now()).await?; assert_eq!(backend.delete_non_tombstone(&id).await?, None); - assert!(backend.get_object(&id).await?.is_none()); + assert!(backend.get_object(&id, None).await?.is_none()); // tombstone let id = make_id(); @@ -1736,14 +1745,14 @@ mod tests { }; assert_eq!(t.target, lt_id, "target must round-trip via r column"); assert_eq!(t.expiration_policy, expiration_policy); - match backend.get_tiered_object(&hv_id).await? { + match backend.get_tiered_object(&hv_id, None).await? { TieredGet::Tombstone(t) => assert_eq!(t.target, lt_id, "round-trip via r column"), other => panic!("expected TieredGet::Tombstone, got {other:?}"), } // Legacy reads must error rather than leak tombstone data. assert!(matches!( - backend.get_object(&hv_id).await, + backend.get_object(&hv_id, None).await, Err(Error::UnexpectedTombstone) )); assert!(matches!( @@ -1842,7 +1851,7 @@ mod tests { .compare_and_write(&id, Some(<_id), write.clone()) .await?; assert!(swapped, "expected CAS success with correct target"); - let TieredGet::Object(_, stream) = backend.get_tiered_object(&id).await? else { + let TieredGet::Object(_, _, stream) = backend.get_tiered_object(&id, None).await? else { panic!("expected inline object after swap"); }; assert_eq!(&stream::read_to_vec(stream).await?, payload.as_ref()); @@ -1865,7 +1874,7 @@ mod tests { let committed = backend.compare_and_write(&id, None, write).await?; assert!(committed, "expected CAS success on empty row"); - let TieredGet::Object(_, stream) = backend.get_tiered_object(&id).await? else { + let TieredGet::Object(_, _, stream) = backend.get_tiered_object(&id, None).await? else { panic!("expected Object after CAS-create"); }; assert_eq!(&stream::read_to_vec(stream).await?, payload.as_ref()); @@ -1946,7 +1955,7 @@ mod tests { }; assert_eq!(t.expiration_policy, ExpirationPolicy::Manual); assert!(matches!( - backend.get_tiered_object(&id).await?, + backend.get_tiered_object(&id, None).await?, TieredGet::Tombstone(_) )); @@ -2137,7 +2146,7 @@ mod tests { "put_non_tombstone must succeed (return None) over an expired tombstone" ); - let (_, stream) = backend.get_object(&id).await?.unwrap(); + let (_, _, stream) = backend.get_object(&id, None).await?.unwrap(); assert_eq!(&stream::read_to_vec(stream).await?, b"data"); Ok(()) diff --git a/objectstore-service/src/backend/common.rs b/objectstore-service/src/backend/common.rs index 8efad5da..87705b63 100644 --- a/objectstore-service/src/backend/common.rs +++ b/objectstore-service/src/backend/common.rs @@ -4,6 +4,7 @@ use std::fmt; use std::sync::Arc; use objectstore_types::metadata::{ExpirationPolicy, Metadata}; +use objectstore_types::range::{ByteRange, ContentRange}; use bytes::Bytes; @@ -23,7 +24,7 @@ pub const USER_AGENT: &str = concat!("sentry-objectstore/", env!("CARGO_PKG_VERS /// Backend response for put operations. pub type PutResponse = (); /// Backend response for get operations. -pub type GetResponse = Option<(Metadata, PayloadStream)>; +pub type GetResponse = Option<(Metadata, Option, PayloadStream)>; /// Backend response for metadata-only get operations. pub type MetadataResponse = Option; /// Backend response for delete operations. @@ -43,15 +44,16 @@ pub trait Backend: fmt::Debug + Send + Sync + 'static { stream: ClientStream, ) -> Result; - /// Retrieves an object at the given path, returning its metadata and a stream of bytes. - async fn get_object(&self, id: &ObjectId) -> Result; + /// Retrieves (part of) an object at the given path, returning its metadata, a description of + /// the part being returned, and the payload. + async fn get_object(&self, id: &ObjectId, range: Option) -> Result; /// Retrieves only the metadata for an object, without the payload. async fn get_metadata(&self, id: &ObjectId) -> Result { Ok(self - .get_object(id) + .get_object(id, None) .await? - .map(|(metadata, _stream)| metadata)) + .map(|(metadata, _range, _stream)| metadata)) } /// Deletes the object at the given path. @@ -150,11 +152,12 @@ pub trait HighVolumeBackend: Backend { payload: Bytes, ) -> Result>; - /// Retrieves an object with explicit tombstone awareness. + /// Retrieves (part of) an object with explicit tombstone awareness. /// /// Returns [`TieredGet::Tombstone`] instead of synthesizing a tombstone /// object, making the caller's routing logic a compile-time distinction. - async fn get_tiered_object(&self, id: &ObjectId) -> Result; + async fn get_tiered_object(&self, id: &ObjectId, range: Option) + -> Result; /// Retrieves only metadata with explicit tombstone awareness. /// @@ -209,7 +212,7 @@ pub struct Tombstone { /// Typed response from [`HighVolumeBackend::get_tiered_object`]. pub enum TieredGet { /// A real object was found. - Object(Metadata, PayloadStream), + Object(Metadata, Option, PayloadStream), /// A redirect tombstone was found; the real object lives in the long-term backend. Tombstone(Tombstone), /// No entry exists at this key. @@ -219,9 +222,10 @@ pub enum TieredGet { impl fmt::Debug for TieredGet { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - TieredGet::Object(metadata, _stream) => f + TieredGet::Object(metadata, content_range, _stream) => f .debug_tuple("Object") .field(metadata) + .field(content_range) .finish_non_exhaustive(), TieredGet::Tombstone(info) => f.debug_tuple("Tombstone").field(info).finish(), TieredGet::NotFound => write!(f, "NotFound"), diff --git a/objectstore-service/src/backend/gcs.rs b/objectstore-service/src/backend/gcs.rs index 43cc45c1..70f7b73a 100644 --- a/objectstore-service/src/backend/gcs.rs +++ b/objectstore-service/src/backend/gcs.rs @@ -11,6 +11,7 @@ use anyhow::Context; use futures_util::{StreamExt, TryStreamExt}; use gcp_auth::TokenProvider; use objectstore_types::metadata::{ExpirationPolicy, Metadata}; +use objectstore_types::range::{ByteRange, ContentRange}; use reqwest::header::HeaderName; use reqwest::{Body, IntoUrl, Method, RequestBuilder, StatusCode, Url, header, multipart}; use serde::{Deserialize, Serialize}; @@ -659,7 +660,7 @@ impl Backend for GcsBackend { } #[tracing::instrument(level = "trace", fields(?id), skip_all)] - async fn get_object(&self, id: &ObjectId) -> Result { + async fn get_object(&self, id: &ObjectId, range: Option) -> Result { objectstore_log::debug!("Reading from GCS backend"); let object_url = self.object_url(id)?; @@ -672,21 +673,62 @@ impl Backend for GcsBackend { let payload_response = self .with_retry("get_payload", || async { - self.request(Method::GET, download_url.clone()) - .await? + let mut req = self.request(Method::GET, download_url.clone()).await?; + if let Some(r) = range { + req = req.header(header::RANGE, r.to_header_value()); + } + let resp = req .send() .await - .and_then(|r| r.error_for_status()) + .map_err(|e| Error::reqwest("GCS: get payload", e))?; + + if resp.status() == StatusCode::RANGE_NOT_SATISFIABLE { + let raw = resp + .headers() + .get(header::CONTENT_RANGE) + .and_then(|v| v.to_str().ok()); + let total = raw.and_then(ContentRange::parse_unsatisfiable_total); + match total { + Some(total) => return Err(Error::RangeNotSatisfiable { total }), + None => { + return Err(Error::Generic { + context: format!( + "GCS: 416 response with invalid Content-Range: {:?}", + raw + ), + cause: None, + }); + } + } + } + + resp.error_for_status() .map_err(|e| Error::reqwest("GCS: get payload", e)) }) .await?; + let content_range = if payload_response.status() == StatusCode::PARTIAL_CONTENT { + Some( + payload_response + .headers() + .get(header::CONTENT_RANGE) + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse::().ok()) + .ok_or_else(|| Error::Generic { + context: "GCS: 206 response missing valid Content-Range header".to_owned(), + cause: None, + })?, + ) + } else { + None + }; + let stream = payload_response .bytes_stream() .map_err(io::Error::other) .boxed(); - Ok(Some((metadata, stream))) + Ok(Some((metadata, content_range, stream))) } #[tracing::instrument(level = "trace", fields(?id), skip_all)] @@ -1071,7 +1113,7 @@ mod tests { .put_object(&id, &metadata, stream::single("hello, world")) .await?; - let (meta, stream) = backend.get_object(&id).await?.unwrap(); + let (meta, _, stream) = backend.get_object(&id, None).await?.unwrap(); let payload = stream::read_to_vec(stream).await?; let str_payload = str::from_utf8(&payload).unwrap(); @@ -1089,7 +1131,7 @@ mod tests { let backend = create_test_backend().await?; let id = make_id(); - let result = backend.get_object(&id).await?; + let result = backend.get_object(&id, None).await?; assert!(result.is_none()); Ok(()) @@ -1128,7 +1170,7 @@ mod tests { .put_object(&id, &metadata, stream::single("world")) .await?; - let (meta, stream) = backend.get_object(&id).await?.unwrap(); + let (meta, _, stream) = backend.get_object(&id, None).await?.unwrap(); let payload = stream::read_to_vec(stream).await?; let str_payload = str::from_utf8(&payload).unwrap(); @@ -1151,7 +1193,7 @@ mod tests { backend.delete_object(&id).await?; - let result = backend.get_object(&id).await?; + let result = backend.get_object(&id, None).await?; assert!(result.is_none()); Ok(()) @@ -1174,7 +1216,7 @@ mod tests { .put_object(&id, &metadata, stream::single("hello, world")) .await?; - let result = backend.get_object(&id).await?; + let result = backend.get_object(&id, None).await?; assert!(result.is_none()); Ok(()) @@ -1197,7 +1239,7 @@ mod tests { .put_object(&id, &metadata, stream::single("hello, world")) .await?; - let result = backend.get_object(&id).await?; + let result = backend.get_object(&id, None).await?; assert!(result.is_none()); Ok(()) @@ -1274,7 +1316,7 @@ mod tests { ); // Verify the payload is still intact after the bump. - let (_, stream) = backend.get_object(&id).await?.unwrap(); + let (_, _, stream) = backend.get_object(&id, None).await?.unwrap(); let payload = stream::read_to_vec(stream).await?; assert_eq!(&payload, b"hello, world"); @@ -1334,7 +1376,7 @@ mod tests { .put_object(&id, &metadata, stream::single(compressed.clone())) .await?; - let (meta, stream) = backend.get_object(&id).await?.unwrap(); + let (meta, _, stream) = backend.get_object(&id, None).await?.unwrap(); let payload = stream::read_to_vec(stream).await?; assert_eq!(meta.compression, Some(Compression::Zstd)); @@ -1384,7 +1426,7 @@ mod tests { .await?; assert!(result.is_none(), "expected no error on complete"); - let (meta, stream) = backend.get_object(&id).await?.unwrap(); + let (meta, _, stream) = backend.get_object(&id, None).await?.unwrap(); let payload = stream::read_to_vec(stream).await?; assert_eq!(payload, data); assert_eq!(meta.content_type, "text/plain".to_string()); @@ -1469,7 +1511,7 @@ mod tests { assert!(result.is_none(), "expected no error on complete"); // Object exists after complete - let (_meta, stream) = backend.get_object(&id).await?.unwrap(); + let (_meta, _, stream) = backend.get_object(&id, None).await?.unwrap(); let payload = stream::read_to_vec(stream).await?; let mut expected = Vec::new(); expected.extend_from_slice(&part1); @@ -1550,7 +1592,7 @@ mod tests { assert!(result.is_none(), "expected no error on complete"); // Verify reassembly order matches part numbers, not upload order. - let (_meta, stream) = backend.get_object(&id).await?.unwrap(); + let (_meta, _, stream) = backend.get_object(&id, None).await?.unwrap(); let payload = stream::read_to_vec(stream).await?; let mut expected = Vec::new(); expected.extend_from_slice(&part1); @@ -1641,7 +1683,7 @@ mod tests { backend.abort_multipart(&id, &upload_id).await?; // Object should not exist after abort. - let result = backend.get_object(&id).await?; + let result = backend.get_object(&id, None).await?; assert!(result.is_none(), "object should not exist after abort"); Ok(()) @@ -1693,7 +1735,7 @@ mod tests { multipart_put(&backend, &id, &metadata, "hello, world").await?; - let result = backend.get_object(&id).await?; + let result = backend.get_object(&id, None).await?; assert!(result.is_none()); Ok(()) @@ -1710,7 +1752,7 @@ mod tests { multipart_put(&backend, &id, &metadata, "hello, world").await?; - let result = backend.get_object(&id).await?; + let result = backend.get_object(&id, None).await?; assert!(result.is_none()); Ok(()) @@ -1734,7 +1776,7 @@ mod tests { multipart_put(&backend, &id, &metadata, compressed.clone()).await?; - let (meta, stream) = backend.get_object(&id).await?.unwrap(); + let (meta, _, stream) = backend.get_object(&id, None).await?.unwrap(); let payload = stream::read_to_vec(stream).await?; assert_eq!(meta.compression, Some(Compression::Zstd)); diff --git a/objectstore-service/src/backend/in_memory.rs b/objectstore-service/src/backend/in_memory.rs index 9fc1499a..4e855c22 100644 --- a/objectstore-service/src/backend/in_memory.rs +++ b/objectstore-service/src/backend/in_memory.rs @@ -9,6 +9,8 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::{Arc, Mutex}; use std::time::SystemTime; +use objectstore_types::range::ByteRange; + use bytes::{Bytes, BytesMut}; use futures_util::TryStreamExt; use objectstore_types::metadata::Metadata; @@ -122,15 +124,30 @@ impl super::common::Backend for InMemoryBackend { Ok(()) } - async fn get_object(&self, id: &ObjectId) -> Result { + async fn get_object(&self, id: &ObjectId, range: Option) -> Result { let entry = self.store.lock().unwrap().get(id).cloned(); match entry { None => Ok(None), Some(StoreEntry::Tombstone(_)) => Err(Error::UnexpectedTombstone), Some(StoreEntry::Object(mut metadata, bytes)) => { + let total = bytes.len() as u64; metadata.size = Some(bytes.len()); - let stream = crate::stream::single(bytes); - Ok(Some((metadata, stream))) + let (content_range, payload) = match range { + Some(range) => { + let content_range = range + .resolve(total) + .ok_or(Error::RangeNotSatisfiable { total })?; + let sliced = + bytes.slice(content_range.start as usize..=content_range.end as usize); + (Some(content_range), sliced) + } + None => (None, bytes), + }; + Ok(Some(( + metadata, + content_range, + crate::stream::single(payload), + ))) } } } @@ -160,14 +177,30 @@ impl HighVolumeBackend for InMemoryBackend { Ok(None) } - async fn get_tiered_object(&self, id: &ObjectId) -> Result { + async fn get_tiered_object( + &self, + id: &ObjectId, + range: Option, + ) -> Result { let entry = self.store.lock().unwrap().get(id).cloned(); Ok(match entry { None => TieredGet::NotFound, Some(StoreEntry::Tombstone(tombstone)) => TieredGet::Tombstone(tombstone), Some(StoreEntry::Object(mut metadata, bytes)) => { + let total = bytes.len() as u64; metadata.size = Some(bytes.len()); - TieredGet::Object(metadata, crate::stream::single(bytes)) + let (content_range, payload) = match range { + Some(range) => { + let content_range = range + .resolve(total) + .ok_or(Error::RangeNotSatisfiable { total })?; + let sliced = + bytes.slice(content_range.start as usize..=content_range.end as usize); + (Some(content_range), sliced) + } + None => (None, bytes), + }; + TieredGet::Object(metadata, content_range, crate::stream::single(payload)) } }) } @@ -517,7 +550,7 @@ mod tests { .unwrap(); assert!(result.is_none(), "expected no error on complete"); - let (meta, body) = backend.get_object(&id).await.unwrap().unwrap(); + let (meta, _, body) = backend.get_object(&id, None).await.unwrap().unwrap(); let payload = stream::read_to_vec(body).await.unwrap(); assert_eq!(payload, data); assert_eq!(meta.content_type, "text/plain".to_string()); @@ -598,7 +631,7 @@ mod tests { .unwrap(); assert!(result.is_none()); - let (_, body) = backend.get_object(&id).await.unwrap().unwrap(); + let (_, _, body) = backend.get_object(&id, None).await.unwrap().unwrap(); let payload = stream::read_to_vec(body).await.unwrap(); assert_eq!(payload, b"aaaabbbbcc"); } @@ -688,7 +721,7 @@ mod tests { backend.abort_multipart(&id, &upload_id).await.unwrap(); - let result = backend.get_object(&id).await.unwrap(); + let result = backend.get_object(&id, None).await.unwrap(); assert!(result.is_none(), "object should not exist after abort"); } diff --git a/objectstore-service/src/backend/local_fs.rs b/objectstore-service/src/backend/local_fs.rs index 533b616e..2a221a79 100644 --- a/objectstore-service/src/backend/local_fs.rs +++ b/objectstore-service/src/backend/local_fs.rs @@ -8,8 +8,9 @@ use std::time::SystemTime; use futures_util::StreamExt; use objectstore_types::metadata::Metadata; +use objectstore_types::range::ByteRange; use tokio::fs::OpenOptions; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}; +use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, BufWriter}; use tokio_util::io::{ReaderStream, StreamReader}; use crate::backend::common::{ @@ -120,7 +121,7 @@ impl Backend for LocalFsBackend { // TODO: Return `Ok(None)` if object is found but past expiry #[tracing::instrument(level = "trace", fields(?id), skip_all)] - async fn get_object(&self, id: &ObjectId) -> Result { + async fn get_object(&self, id: &ObjectId, range: Option) -> Result { objectstore_log::debug!("Reading from local_fs backend"); let path = self.path.join(id.as_storage_path().to_string()); let file = match OpenOptions::new().read(true).open(path).await { @@ -146,8 +147,23 @@ impl Backend for LocalFsBackend { .ok_or_else(|| Error::generic("local-fs file corrupted: shorter than header"))?; metadata.size = Some(payload_size as usize); - let stream = ReaderStream::new(reader); - Ok(Some((metadata, stream.boxed()))) + let (content_range, stream) = match range { + Some(byte_range) => { + let content_range = + byte_range + .resolve(payload_size) + .ok_or(Error::RangeNotSatisfiable { + total: payload_size, + })?; + reader + .seek(std::io::SeekFrom::Current(content_range.start as i64)) + .await?; + let limited = reader.take(content_range.len()); + (Some(content_range), ReaderStream::new(limited).boxed()) + } + None => (None, ReaderStream::new(reader).boxed()), + }; + Ok(Some((metadata, content_range, stream))) } #[tracing::instrument(level = "trace", fields(?id), skip_all)] @@ -466,7 +482,7 @@ mod tests { .await .unwrap(); - let (read_metadata, stream) = backend.get_object(&id).await.unwrap().unwrap(); + let (read_metadata, _, stream) = backend.get_object(&id, None).await.unwrap().unwrap(); let file_contents: BytesMut = stream.try_collect().await.unwrap(); assert_eq!( @@ -584,7 +600,7 @@ mod tests { .unwrap(); assert!(result.is_none(), "expected no error on complete"); - let (meta, body) = backend.get_object(&id).await.unwrap().unwrap(); + let (meta, _, body) = backend.get_object(&id, None).await.unwrap().unwrap(); let payload: BytesMut = body.try_collect().await.unwrap(); assert_eq!(payload.as_ref(), data); assert_eq!(meta.content_type, "text/plain".to_string()); @@ -665,7 +681,7 @@ mod tests { .unwrap(); assert!(result.is_none()); - let (_, body) = backend.get_object(&id).await.unwrap().unwrap(); + let (_, _, body) = backend.get_object(&id, None).await.unwrap().unwrap(); let payload: BytesMut = body.try_collect().await.unwrap(); assert_eq!(payload.as_ref(), b"aaaabbbbcc"); } @@ -733,6 +749,105 @@ mod tests { backend.abort_multipart(&id, &upload_id).await.unwrap(); } + #[tokio::test] + async fn get_object_range_bounded() { + let (_tempdir, backend) = make_backend(); + let id = make_id(); + let metadata = Metadata::default(); + + let payload = b"Hello, range requests!"; + backend + .put_object(&id, &metadata, stream::single(payload.to_vec())) + .await + .unwrap(); + + // Request bytes 7-11 → "range" + let (_, content_range, body) = backend + .get_object(&id, Some(ByteRange::Bounded(7, 11))) + .await + .unwrap() + .unwrap(); + let data: BytesMut = body.try_collect().await.unwrap(); + + assert_eq!(data.as_ref(), b"range"); + let content_range = content_range.unwrap(); + assert_eq!(content_range.start, 7); + assert_eq!(content_range.end, 11); + assert_eq!(content_range.total, payload.len() as u64); + } + + #[tokio::test] + async fn get_object_range_from() { + let (_tempdir, backend) = make_backend(); + let id = make_id(); + let metadata = Metadata::default(); + + let payload = b"Hello, range requests!"; + backend + .put_object(&id, &metadata, stream::single(payload.to_vec())) + .await + .unwrap(); + + // Request bytes 7- → "range requests!" + let (_, content_range, body) = backend + .get_object(&id, Some(ByteRange::From(7))) + .await + .unwrap() + .unwrap(); + let data: BytesMut = body.try_collect().await.unwrap(); + + assert_eq!(data.as_ref(), b"range requests!"); + let content_range = content_range.unwrap(); + assert_eq!(content_range.start, 7); + assert_eq!(content_range.end, 21); + assert_eq!(content_range.total, payload.len() as u64); + } + + #[tokio::test] + async fn get_object_range_last() { + let (_tempdir, backend) = make_backend(); + let id = make_id(); + let metadata = Metadata::default(); + + let payload = b"Hello, range requests!"; + backend + .put_object(&id, &metadata, stream::single(payload.to_vec())) + .await + .unwrap(); + + // Request last 9 bytes → "requests!" + let (_, content_range, body) = backend + .get_object(&id, Some(ByteRange::Last(9))) + .await + .unwrap() + .unwrap(); + let data: BytesMut = body.try_collect().await.unwrap(); + + assert_eq!(data.as_ref(), b"requests!"); + let content_range = content_range.unwrap(); + assert_eq!(content_range.start, 13); + assert_eq!(content_range.end, 21); + assert_eq!(content_range.total, payload.len() as u64); + } + + #[tokio::test] + async fn get_object_range_unsatisfiable() { + let (_tempdir, backend) = make_backend(); + let id = make_id(); + let metadata = Metadata::default(); + + backend + .put_object(&id, &metadata, stream::single(b"short".to_vec())) + .await + .unwrap(); + + match backend.get_object(&id, Some(ByteRange::From(100))).await { + Err(Error::RangeNotSatisfiable { total: 5 }) => {} + Err(other) => panic!("expected RangeNotSatisfiable, got: {other:?}"), + Ok(_) => panic!("expected RangeNotSatisfiable, got Ok"), + } + } + #[tokio::test] async fn multipart_abort() { let (_tempdir, backend) = make_backend(); @@ -755,7 +870,7 @@ mod tests { backend.abort_multipart(&id, &upload_id).await.unwrap(); - let result = backend.get_object(&id).await.unwrap(); + let result = backend.get_object(&id, None).await.unwrap(); assert!(result.is_none(), "object should not exist after abort"); } diff --git a/objectstore-service/src/backend/s3_compatible.rs b/objectstore-service/src/backend/s3_compatible.rs index 51069406..9818597c 100644 --- a/objectstore-service/src/backend/s3_compatible.rs +++ b/objectstore-service/src/backend/s3_compatible.rs @@ -5,6 +5,7 @@ use std::{fmt, io}; use futures_util::{StreamExt, TryStreamExt}; use objectstore_types::metadata::{ExpirationPolicy, Metadata}; +use objectstore_types::range::{ByteRange, ContentRange}; use reqwest::header::HeaderMap; use reqwest::{Body, IntoUrl, Method, RequestBuilder, StatusCode}; @@ -163,24 +164,41 @@ where &self, method: Method, id: &ObjectId, - ) -> Result> { + range: Option, + ) -> Result, reqwest::Response)>> { let object_url = self.object_url(id); - let response = self - .request(method, &object_url) - .await? - .send() - .await - .map_err(|cause| Error::Reqwest { - context: "S3: failed to send request".to_string(), - cause, - })?; + let mut builder = self.request(method, &object_url).await?; + if let Some(r) = range { + builder = builder.header(reqwest::header::RANGE, r.to_header_value()); + } + let response = builder.send().await.map_err(|cause| Error::Reqwest { + context: "S3: failed to send request".to_string(), + cause, + })?; if response.status() == StatusCode::NOT_FOUND { objectstore_log::debug!("Object not found"); return Ok(None); } + if response.status() == StatusCode::RANGE_NOT_SATISFIABLE { + let raw = response + .headers() + .get(reqwest::header::CONTENT_RANGE) + .and_then(|v| v.to_str().ok()); + let total = raw.and_then(ContentRange::parse_unsatisfiable_total); + match total { + Some(total) => return Err(Error::RangeNotSatisfiable { total }), + None => { + return Err(Error::Generic { + context: format!("S3: 416 response with invalid Content-Range: {:?}", raw), + cause: None, + }); + } + } + } + let response = response .error_for_status() .map_err(|cause| Error::Reqwest { @@ -190,7 +208,26 @@ where let headers = response.headers(); let mut metadata = Metadata::from_headers(headers, GCS_CUSTOM_PREFIX)?; - metadata.size = response.content_length().map(|len| len as usize); + + let content_range = if response.status() == StatusCode::PARTIAL_CONTENT { + let range = headers + .get(reqwest::header::CONTENT_RANGE) + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse::().ok()) + .ok_or_else(|| Error::Generic { + context: "S3: 206 response missing valid Content-Range header".to_owned(), + cause: None, + })?; + metadata.size = Some(range.total as usize); + Some(range) + } else { + if let Some(len) = response.content_length() { + metadata.size = Some(len as usize); + } else { + objectstore_log::warn!("S3: 200 response missing Content-Length header"); + } + None + }; // TODO: Schedule into background persistently so this doesn't get lost on restarts if let ExpirationPolicy::TimeToIdle(tti) = metadata.expiration_policy { @@ -208,7 +245,7 @@ where } } - Ok(Some((metadata, response))) + Ok(Some((metadata, content_range, response))) } /// Issues a request to update the metadata for the given object. @@ -294,22 +331,24 @@ impl Backend for S3CompatibleBackend { } #[tracing::instrument(level = "trace", fields(?id), skip_all)] - async fn get_object(&self, id: &ObjectId) -> Result { + async fn get_object(&self, id: &ObjectId, range: Option) -> Result { objectstore_log::debug!("Reading from s3_compatible backend"); - let Some((metadata, response)) = self.request_object(Method::GET, id).await? else { + let Some((metadata, content_range, response)) = + self.request_object(Method::GET, id, range).await? + else { return Ok(None); }; let stream = response.bytes_stream().map_err(io::Error::other); - Ok(Some((metadata, stream.boxed()))) + Ok(Some((metadata, content_range, stream.boxed()))) } #[tracing::instrument(level = "trace", fields(?id), skip_all)] async fn get_metadata(&self, id: &ObjectId) -> Result { objectstore_log::debug!("Reading metadata from s3_compatible backend"); - let response = self.request_object(Method::HEAD, id).await?; - Ok(response.map(|(metadata, _)| metadata)) + let response = self.request_object(Method::HEAD, id, None).await?; + Ok(response.map(|(metadata, _, _)| metadata)) } #[tracing::instrument(level = "trace", fields(?id), skip_all)] diff --git a/objectstore-service/src/backend/testing.rs b/objectstore-service/src/backend/testing.rs index ac709fec..f1cca265 100644 --- a/objectstore-service/src/backend/testing.rs +++ b/objectstore-service/src/backend/testing.rs @@ -40,6 +40,8 @@ use std::sync::Arc; use bytes::Bytes; use objectstore_types::metadata::Metadata; +use objectstore_types::range::ByteRange; + use crate::backend::common::{ Backend, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse, MultipartUploadBackend, PutResponse, TieredGet, TieredMetadata, TieredWrite, Tombstone, @@ -82,8 +84,13 @@ pub trait Hooks: fmt::Debug + Send + Sync + 'static { } /// Intercepts [`Backend::get_object`]. Default delegates to `inner`. - async fn get_object(&self, inner: &InMemoryBackend, id: &ObjectId) -> Result { - inner.get_object(id).await + async fn get_object( + &self, + inner: &InMemoryBackend, + id: &ObjectId, + range: Option, + ) -> Result { + inner.get_object(id, range).await } /// Intercepts [`Backend::get_metadata`]. Default delegates to `inner`. @@ -123,8 +130,13 @@ pub trait Hooks: fmt::Debug + Send + Sync + 'static { } /// Intercepts [`HighVolumeBackend::get_tiered_object`]. Default delegates to `inner`. - async fn get_tiered_object(&self, inner: &InMemoryBackend, id: &ObjectId) -> Result { - inner.get_tiered_object(id).await + async fn get_tiered_object( + &self, + inner: &InMemoryBackend, + id: &ObjectId, + range: Option, + ) -> Result { + inner.get_tiered_object(id, range).await } /// Intercepts [`HighVolumeBackend::get_tiered_metadata`]. Default delegates to `inner`. @@ -285,8 +297,8 @@ impl Backend for TestBackend { .await } - async fn get_object(&self, id: &ObjectId) -> Result { - self.hooks.get_object(&self.inner, id).await + async fn get_object(&self, id: &ObjectId, range: Option) -> Result { + self.hooks.get_object(&self.inner, id, range).await } async fn get_metadata(&self, id: &ObjectId) -> Result { @@ -315,8 +327,12 @@ impl HighVolumeBackend for TestBackend { .await } - async fn get_tiered_object(&self, id: &ObjectId) -> Result { - self.hooks.get_tiered_object(&self.inner, id).await + async fn get_tiered_object( + &self, + id: &ObjectId, + range: Option, + ) -> Result { + self.hooks.get_tiered_object(&self.inner, id, range).await } async fn get_tiered_metadata(&self, id: &ObjectId) -> Result { diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index 1385e01d..ccb8d5ee 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -105,6 +105,7 @@ use base64::Engine as _; use bytes::Bytes; use futures_util::{Stream, StreamExt}; use objectstore_types::metadata::Metadata; +use objectstore_types::range::ByteRange; use serde::{Deserialize, Serialize}; use crate::backend::changelog::{Change, ChangeGuard, ChangeLog, ChangeManager, ChangePhase}; @@ -415,17 +416,21 @@ impl Backend for TieredStorage { Ok(()) } - async fn get_object(&self, id: &ObjectId) -> Result { + async fn get_object(&self, id: &ObjectId, range: Option) -> Result { let start = Instant::now(); - let hv_result = self.inner.high_volume.get_tiered_object(id).await?; + let hv_result = self.inner.high_volume.get_tiered_object(id, range).await?; let (result, backend_choice) = match hv_result { TieredGet::NotFound => (None, BackendChoice::HighVolume), - TieredGet::Object(metadata, stream) => { - (Some((metadata, stream)), BackendChoice::HighVolume) - } + TieredGet::Object(metadata, content_range, stream) => ( + Some((metadata, content_range, stream)), + BackendChoice::HighVolume, + ), TieredGet::Tombstone(tombstone) => ( - self.inner.long_term.get_object(&tombstone.target).await?, + self.inner + .long_term + .get_object(&tombstone.target, range) + .await?, BackendChoice::LongTerm, ), }; @@ -438,16 +443,15 @@ impl Backend for TieredStorage { backend_type = backend_type, ); - if let Some((ref metadata, _)) = result { - if let Some(size) = metadata.size { + if let Some((ref metadata, ref content_range, _)) = result { + let size = content_range.map(|cr| cr.len() as usize).or(metadata.size); + if let Some(size) = size { objectstore_metrics::record!( "get.size" = size, usecase = id.usecase().to_owned(), backend_choice = backend_choice.as_str(), backend_type = backend_type, ); - } else { - objectstore_log::warn!(backend_type, "Missing object size"); } } @@ -878,7 +882,7 @@ mod tests { let (storage, _hv, _lt, _) = make_tiered_storage(); let id = make_id("does-not-exist"); - assert!(storage.get_object(&id).await.unwrap().is_none()); + assert!(storage.get_object(&id, None).await.unwrap().is_none()); assert!(storage.get_metadata(&id).await.unwrap().is_none()); } @@ -906,7 +910,7 @@ mod tests { assert!(hv.contains(&id), "expected in high-volume"); assert!(!lt.contains(&id), "leaked to long-term"); - let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); + let (_, _, s) = storage.get_object(&id, None).await.unwrap().unwrap(); let body = stream::read_to_vec(s).await.unwrap(); assert_eq!(body, payload); @@ -949,7 +953,7 @@ mod tests { assert_eq!(lt_meta.expiration_policy, metadata_in.expiration_policy); // get_object follows the tombstone and returns the correct payload. - let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); + let (_, _, s) = storage.get_object(&id, None).await.unwrap().unwrap(); let body = stream::read_to_vec(s).await.unwrap(); assert_eq!(body, payload); @@ -1022,7 +1026,7 @@ mod tests { lt.get(<_id_1).expect_not_found(); lt.get(<_id_2).expect_object(); - let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); + let (_, _, s) = storage.get_object(&id, None).await.unwrap().unwrap(); let body = stream::read_to_vec(s).await.unwrap(); assert_eq!(body, payload2); } @@ -1042,7 +1046,7 @@ mod tests { storage.delete_object(&id).await.unwrap(); hv.get(&id).expect_not_found(); - assert!(storage.get_object(&id).await.unwrap().is_none()); + assert!(storage.get_object(&id, None).await.unwrap().is_none()); } #[tokio::test] @@ -1114,7 +1118,7 @@ mod tests { // The orphaned GCS blob remains but the object is unreachable through the service. assert!( - storage.get_object(&id).await.unwrap().is_none(), + storage.get_object(&id, None).await.unwrap().is_none(), "object should be unreachable after tombstone is deleted" ); } @@ -1267,7 +1271,7 @@ mod tests { lt.remove(<_id); assert!( - storage.get_object(&id).await.unwrap().is_none(), + storage.get_object(&id, None).await.unwrap().is_none(), "orphan tombstone should resolve to None on get_object" ); assert!( @@ -1304,7 +1308,7 @@ mod tests { .unwrap(); // get_object must follow the tombstone and find the object via the lt_id target. - let (_, s) = storage.get_object(&hv_id).await.unwrap().unwrap(); + let (_, _, s) = storage.get_object(&hv_id, None).await.unwrap().unwrap(); let body = stream::read_to_vec(s).await.unwrap(); assert_eq!(body, payload); @@ -1600,7 +1604,7 @@ mod tests { ); // get_object should follow the tombstone and return the payload. - let (got_meta, s) = storage.get_object(&id).await.unwrap().unwrap(); + let (got_meta, _, s) = storage.get_object(&id, None).await.unwrap().unwrap(); let body = stream::read_to_vec(s).await.unwrap(); assert_eq!(body, payload); assert_eq!(got_meta.content_type, "application/octet-stream"); @@ -1685,7 +1689,7 @@ mod tests { .unwrap(); assert!(error.is_none()); - let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); + let (_, _, s) = storage.get_object(&id, None).await.unwrap().unwrap(); let body = stream::read_to_vec(s).await.unwrap(); let mut expected = Vec::new(); @@ -1725,7 +1729,7 @@ mod tests { hv.get(&id).expect_not_found(); // The object should not be reachable. - assert!(storage.get_object(&id).await.unwrap().is_none()); + assert!(storage.get_object(&id, None).await.unwrap().is_none()); } #[tokio::test] @@ -1836,7 +1840,7 @@ mod tests { lt.get(&new_lt_id).expect_object(); // Assert the contents of the new revision. - let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); + let (_, _, s) = storage.get_object(&id, None).await.unwrap().unwrap(); let body = stream::read_to_vec(s).await.unwrap(); assert_eq!(body, payload2); } @@ -2066,7 +2070,7 @@ mod tests { storage.join().await; // The object is there. - let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); + let (_, _, s) = storage.get_object(&id, None).await.unwrap().unwrap(); let body = stream::read_to_vec(s).await.unwrap(); assert_eq!(body, payload); @@ -2089,7 +2093,7 @@ mod tests { assert!(remaining.is_empty()); // The object is still there after recovery. - let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); + let (_, _, s) = storage.get_object(&id, None).await.unwrap().unwrap(); let body = stream::read_to_vec(s).await.unwrap(); assert_eq!(body, payload); } @@ -2203,7 +2207,7 @@ mod tests { storage.join().await; // The object is there. - let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); + let (_, _, s) = storage.get_object(&id, None).await.unwrap().unwrap(); let body = stream::read_to_vec(s).await.unwrap(); assert_eq!(body, payload); @@ -2226,7 +2230,7 @@ mod tests { assert!(remaining.is_empty()); // The object is there after recovery. - let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); + let (_, _, s) = storage.get_object(&id, None).await.unwrap().unwrap(); let body = stream::read_to_vec(s).await.unwrap(); assert_eq!(body, payload); } diff --git a/objectstore-service/src/error.rs b/objectstore-service/src/error.rs index 46e38e28..93c18818 100644 --- a/objectstore-service/src/error.rs +++ b/objectstore-service/src/error.rs @@ -73,6 +73,13 @@ pub enum Error { #[error("unexpected tombstone")] UnexpectedTombstone, + /// The requested byte range is not satisfiable for the object's size. + #[error("range not satisfiable (object size: {total} bytes)")] + RangeNotSatisfiable { + /// Total size of the object in bytes. + total: u64, + }, + /// The service has reached its concurrency limit and cannot accept more operations. #[error("concurrency limit reached")] AtCapacity, @@ -140,6 +147,7 @@ impl Error { // Malformed client input at DEBUG level Self::Client(_) => Level::DEBUG, Self::Metadata(_) => Level::DEBUG, + Self::RangeNotSatisfiable { .. } => Level::DEBUG, // Like rate limits, we treat capacity errors as warnings Self::AtCapacity => Level::WARN, // All other errors are service or backend failures diff --git a/objectstore-service/src/service.rs b/objectstore-service/src/service.rs index 8505da44..d705cbf6 100644 --- a/objectstore-service/src/service.rs +++ b/objectstore-service/src/service.rs @@ -9,6 +9,7 @@ use std::future::Future; use std::sync::Arc; use objectstore_types::metadata::Metadata; +use objectstore_types::range::{ByteRange, ContentRange}; use crate::backend::common::Backend; use crate::concurrency::ConcurrencyLimiter; @@ -22,7 +23,7 @@ use crate::stream::{ClientStream, PayloadStream}; use crate::streaming::StreamExecutor; /// Service response for [`StorageService::get_object`]. -pub type GetResponse = Option<(Metadata, PayloadStream)>; +pub type GetResponse = Option<(Metadata, Option, PayloadStream)>; /// Service response for [`StorageService::get_metadata`]. pub type MetadataResponse = Option; /// Service response for [`StorageService::insert_object`]. @@ -209,10 +210,10 @@ impl StorageService { .await } - /// Streams the contents of an object. - pub async fn get_object(&self, id: ObjectId) -> Result { + /// Streams (part of) the contents of an object. + pub async fn get_object(&self, id: ObjectId, range: Option) -> Result { let inner = Arc::clone(&self.inner); - self.spawn("get", async move { inner.get_object(&id).await }) + self.spawn("get", async move { inner.get_object(&id, range).await }) .await } @@ -339,6 +340,7 @@ mod tests { use bytes::BytesMut; use futures_util::TryStreamExt; use objectstore_types::metadata::Metadata; + use objectstore_types::range::ByteRange; use objectstore_types::scope::{Scope, Scopes}; use super::*; @@ -394,7 +396,7 @@ mod tests { .await .unwrap(); - let (_metadata, stream) = service.get_object(key).await.unwrap().unwrap(); + let (_metadata, _, stream) = service.get_object(key, None).await.unwrap().unwrap(); let file_contents: BytesMut = stream.try_collect().await.unwrap(); assert_eq!(file_contents.as_ref(), b"oh hai!"); @@ -420,7 +422,7 @@ mod tests { .await .unwrap(); - let (_metadata, stream) = service.get_object(key).await.unwrap().unwrap(); + let (_metadata, _, stream) = service.get_object(key, None).await.unwrap().unwrap(); let file_contents: BytesMut = stream.try_collect().await.unwrap(); assert_eq!(file_contents.as_ref(), b"oh hai!"); @@ -463,7 +465,7 @@ mod tests { .unwrap(); // Sanity: the object is readable through the service (follows the tombstone). - let (_, stream) = service.get_object(id.clone()).await.unwrap().unwrap(); + let (_, _, stream) = service.get_object(id.clone(), None).await.unwrap().unwrap(); let body: BytesMut = stream.try_collect().await.unwrap(); assert_eq!(body.len(), payload_len); @@ -471,11 +473,11 @@ mod tests { service.delete_object(id.clone()).await.unwrap(); // The tombstone in BigTable should be gone, so the service returns None. - let after_delete = service.get_object(id.clone()).await.unwrap(); + let after_delete = service.get_object(id.clone(), None).await.unwrap(); assert!(after_delete.is_none(), "tombstone not deleted"); // The real object in GCS must also be gone — no orphan. - let orphan = gcs_backend.get_object(&id).await.unwrap(); + let orphan = gcs_backend.get_object(&id, None).await.unwrap(); assert!(orphan.is_none(), "object leaked"); } @@ -495,7 +497,7 @@ mod tests { .await .unwrap(); - let (_, stream) = service.get_object(id).await.unwrap().unwrap(); + let (_, _, stream) = service.get_object(id, None).await.unwrap().unwrap(); let body: BytesMut = stream.try_collect().await.unwrap(); assert_eq!(body.as_ref(), b"hello world"); } @@ -519,7 +521,7 @@ mod tests { service.delete_object(id.clone()).await.unwrap(); - let after = service.get_object(id).await.unwrap(); + let after = service.get_object(id, None).await.unwrap(); assert!(after.is_none()); } @@ -532,6 +534,7 @@ mod tests { &self, _inner: &InMemoryBackend, _id: &ObjectId, + _range: Option, ) -> Result { panic!("intentional panic in get_object"); } @@ -542,7 +545,7 @@ mod tests { let service = StorageService::new(Box::new(TestBackend::new(PanicOnGet))); let id = ObjectId::new(make_context(), "panic-test".into()); - let result = service.get_object(id).await; + let result = service.get_object(id, None).await; let Err(Error::Panic(msg)) = result else { panic!("expected Panic error"); @@ -738,11 +741,11 @@ mod tests { // First operation panics — the permit must still be released. let id = ObjectId::new(make_context(), "panic-permit".into()); - let result = service.get_object(id.clone()).await; + let result = service.get_object(id.clone(), None).await; assert!(matches!(result, Err(Error::Panic(_)))); // Second operation should succeed in acquiring the permit (not AtCapacity). - let result = service.get_object(id).await; + let result = service.get_object(id, None).await; assert!( !matches!(result, Err(Error::AtCapacity)), "permit was not released after panic" diff --git a/objectstore-service/src/streaming.rs b/objectstore-service/src/streaming.rs index 6bd4ed15..597f4a53 100644 --- a/objectstore-service/src/streaming.rs +++ b/objectstore-service/src/streaming.rs @@ -289,7 +289,7 @@ async fn execute_operation( match op { Operation::Get(get) => { let id = ObjectId::new(context, get.key); - let response = backend.get_object(&id).await?; + let response = backend.get_object(&id, None).await?; Ok(OpResponse::Got { key: id.key, response, diff --git a/objectstore-types/src/lib.rs b/objectstore-types/src/lib.rs index 13d2946b..559aff36 100644 --- a/objectstore-types/src/lib.rs +++ b/objectstore-types/src/lib.rs @@ -33,4 +33,5 @@ pub mod auth; pub mod metadata; pub mod multipart; +pub mod range; pub mod scope; diff --git a/objectstore-types/src/range.rs b/objectstore-types/src/range.rs new file mode 100644 index 00000000..531aa48b --- /dev/null +++ b/objectstore-types/src/range.rs @@ -0,0 +1,300 @@ +//! Types for HTTP range requests. + +use std::fmt; +use std::str::FromStr; + +use http::header::HeaderValue; +use thiserror::Error; + +/// Specifier for a single byte range. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ByteRange { + /// Bounded range with start and end, inclusive + Bounded(u64, u64), + /// From offset X onwards + From(u64), + /// Last X bytes + Last(u64), +} + +impl ByteRange { + /// Formats this range for a `Range` request header. + pub fn to_header_value(&self) -> HeaderValue { + let s = match self { + ByteRange::Bounded(a, b) => format!("bytes={a}-{b}"), + ByteRange::From(n) => format!("bytes={n}-"), + ByteRange::Last(n) => format!("bytes=-{n}"), + }; + HeaderValue::from_str(&s).expect("always a valid header value") + } + + /// Resolves this range against a known total size, returning `None` if + /// unsatisfiable (the object is empty, or the start offset is past the end). + pub fn resolve(self, total: u64) -> Option { + if total == 0 { + return None; + } + + let (start, end) = match self { + ByteRange::Bounded(start, end) => { + if start >= total { + return None; + } + (start, end.min(total - 1)) // clamp + } + ByteRange::From(start) => { + if start >= total { + return None; + } + (start, total - 1) + } + ByteRange::Last(negative_start) => { + let start = total.saturating_sub(negative_start); + (start, total - 1) + } + }; + + Some(ContentRange { start, end, total }) + } +} + +/// Errors that can occur when parsing a `Range` header. +#[derive(Debug, Clone, PartialEq, Eq, Error)] +pub enum RangeError { + /// The value could not be parsed as a valid byte range. + #[error("invalid byte range")] + Invalid, + /// The value contained multiple range specifiers separated by commas. + #[error("expected single byte range, found multipart range")] + MultiRange, + /// The range unit is invalid + #[error("invalid range unit: {0}, expected: bytes")] + InvalidUnit(String), +} + +/// Parses a `Range` request header value into a [`ByteRange`]. +/// +/// Only `bytes=` ranges with a single specifier are accepted. +/// Multiple ranges and non-`bytes` units are rejected. +impl FromStr for ByteRange { + type Err = RangeError; + + fn from_str(value: &str) -> Result { + let lower = value.to_ascii_lowercase(); + let Some(spec) = lower.strip_prefix("bytes=") else { + let unit = lower.split_once('=').map_or(&*lower, |(u, _)| u); + return Err(RangeError::InvalidUnit(unit.to_owned())); + }; + if spec.contains(',') { + return Err(RangeError::MultiRange); + } + + let (start, end) = spec.split_once('-').ok_or(RangeError::Invalid)?; + if end.is_empty() { + let start: u64 = start.parse().map_err(|_| RangeError::Invalid)?; + Ok(ByteRange::From(start)) + } else if start.is_empty() { + let last: u64 = end.parse().map_err(|_| RangeError::Invalid)?; + if last == 0 { + return Err(RangeError::Invalid); + } + Ok(ByteRange::Last(last)) + } else { + let start: u64 = start.parse().map_err(|_| RangeError::Invalid)?; + let end: u64 = end.parse().map_err(|_| RangeError::Invalid)?; + if start > end { + return Err(RangeError::Invalid); + } + Ok(ByteRange::Bounded(start, end)) + } + } +} + +/// Describes which bytes of the full object are present in the response body. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct ContentRange { + /// Byte offset of the first byte in the body (inclusive). + pub start: u64, + /// Byte offset of the last byte in the body (inclusive). + pub end: u64, + /// Total size of the complete object in bytes. + pub total: u64, +} + +/// Parses a `Content-Range` response header value into a [`ContentRange`]. +impl FromStr for ContentRange { + type Err = RangeError; + + fn from_str(s: &str) -> Result { + let parse = || { + let rest = s.strip_prefix("bytes ")?; + let (range_part, total_str) = rest.split_once('/')?; + let total: u64 = total_str.parse().ok()?; + let (start_str, end_str) = range_part.split_once('-')?; + let start: u64 = start_str.parse().ok()?; + let end: u64 = end_str.parse().ok()?; + if start > end || end >= total { + return None; + } + Some(Self { start, end, total }) + }; + parse().ok_or(RangeError::Invalid) + } +} + +#[expect( + clippy::len_without_is_empty, + reason = "A valid ContentRange is never empty" +)] +impl ContentRange { + /// Returns the number of bytes in this range. + pub fn len(&self) -> u64 { + self.end - self.start + 1 + } + + /// Formats this range for a `Content-Range` response header. + /// + /// The returned value is always valid ASCII and can be inserted directly + /// into an HTTP header map. + pub fn to_header_value(&self) -> HeaderValue { + HeaderValue::from_str(&self.to_string()).expect("always a valid header value") + } + + /// Formats the length of this range for a `Content-Length` response header. + pub fn len_to_header_value(&self) -> HeaderValue { + HeaderValue::from_str(&self.len().to_string()).expect("always a valid header value") + } + + /// Parses the total from an unsatisfiable `Content-Range` response header value. + /// + /// An unsatisfiable `Content-Range` header value is of the form `bytes */1234`, where `1234` + /// represents the total size of the object. + /// This is communicated back to the client, so that it can make requests that make sense for + /// that total. + pub fn parse_unsatisfiable_total(header: &str) -> Option { + let rest = header.strip_prefix("bytes */")?; + rest.parse().ok() + } + + /// Formats `total` as the total size of the object in an unsatisfiable `Content-Range` response header. + pub fn unsatisfiable_total_to_header_value(total: u64) -> HeaderValue { + HeaderValue::from_str(format!("bytes */{total}").as_str()) + .expect("always a valid header value") + } +} + +impl fmt::Display for ContentRange { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "bytes {}-{}/{}", self.start, self.end, self.total) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_valid_ranges() { + assert_eq!( + "bytes=0-499".parse::(), + Ok(ByteRange::Bounded(0, 499)) + ); + assert_eq!("bytes=500-".parse::(), Ok(ByteRange::From(500))); + assert_eq!("bytes=-100".parse::(), Ok(ByteRange::Last(100))); + // Case insensitive + assert_eq!( + "Bytes=0-499".parse::(), + Ok(ByteRange::Bounded(0, 499)) + ); + assert_eq!("BYTES=100-".parse::(), Ok(ByteRange::From(100))); + } + + #[test] + fn parse_invalid_ranges() { + assert_eq!( + "bytes=0-10, 20-30".parse::(), + Err(RangeError::MultiRange) + ); + assert_eq!( + "items=0-10".parse::(), + Err(RangeError::InvalidUnit("items".into())) + ); + assert_eq!( + "bytes=500-100".parse::(), + Err(RangeError::Invalid) + ); + assert_eq!("bytes=-0".parse::(), Err(RangeError::Invalid)); + } + + #[test] + fn resolve_satisfiable() { + let cr = |start, end, total| Some(ContentRange { start, end, total }); + assert_eq!(ByteRange::Bounded(0, 499).resolve(1000), cr(0, 499, 1000)); + assert_eq!(ByteRange::Bounded(0, 9999).resolve(500), cr(0, 499, 500)); + assert_eq!(ByteRange::From(500).resolve(1000), cr(500, 999, 1000)); + assert_eq!(ByteRange::Last(100).resolve(1000), cr(900, 999, 1000)); + assert_eq!(ByteRange::Last(2000).resolve(1000), cr(0, 999, 1000)); + } + + #[test] + fn resolve_unsatisfiable() { + assert_eq!(ByteRange::Bounded(1000, 2000).resolve(500), None); + assert_eq!(ByteRange::From(500).resolve(500), None); + assert_eq!(ByteRange::Bounded(0, 0).resolve(0), None); + } + + #[test] + fn content_range_len() { + let full = ContentRange { + start: 0, + end: 999, + total: 1000, + }; + assert_eq!(full.len(), 1000); + + let partial = ContentRange { + start: 0, + end: 499, + total: 1000, + }; + assert_eq!(partial.len(), 500); + } + + #[test] + fn parse_unsatisfiable_total() { + assert_eq!( + ContentRange::parse_unsatisfiable_total("bytes */1234"), + Some(1234) + ); + assert_eq!( + ContentRange::parse_unsatisfiable_total("bytes 0-499/1234"), + None + ); + assert_eq!(ContentRange::parse_unsatisfiable_total("invalid"), None); + } + + #[test] + fn header_value_roundtrips() { + assert_eq!(ByteRange::Bounded(0, 499).to_header_value(), "bytes=0-499"); + assert_eq!(ByteRange::From(500).to_header_value(), "bytes=500-"); + assert_eq!(ByteRange::Last(100).to_header_value(), "bytes=-100"); + + let cr = ContentRange { + start: 0, + end: 499, + total: 1234, + }; + assert_eq!(cr.to_header_value(), "bytes 0-499/1234"); + assert_eq!("bytes 0-499/1234".parse::(), Ok(cr)); + assert!("bytes */1234".parse::().is_err()); + assert!("invalid".parse::().is_err()); + // Inverted bounds + assert!("bytes 499-0/1234".parse::().is_err()); + // End beyond total + assert!("bytes 0-1234/1234".parse::().is_err()); + assert_eq!( + ContentRange::unsatisfiable_total_to_header_value(1234), + "bytes */1234" + ); + } +}