Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions objectstore-server/src/cogs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
//! Cost of Goods Sold (COGS) instrumentation utilities for Objectstore.

/// Converts an Objectstore `usecase` to an appropriate `app_feature` value for use in our COGS
/// pipelines.
fn usecase_to_appfeature(usecase: &str) -> &'static str {
// TODO: Flesh out this mapping
match usecase {
"attachments" => "attachments",
_ => {
objectstore_log::warn!(?usecase, "COGS: Can't convert usecase to app_feature");
"shared"
}
}
}

/// Increment `objectstore.cogs.usage` by `incr` for the given `usecase`.
///
/// Under the hood, `usecase` is converted to the appropriate `app_feature` value expected in our
/// COGS pipeline.
pub fn count_by(usecase: &str, incr: usize) {
objectstore_metrics::count!(
"objectstore.cogs.usage" += incr,
app_feature = usecase_to_appfeature(usecase),
);
}

/// Increment `objectstore.cogs.usage` by 1 for the given `usecase`.
///
/// Under the hood, `usecase` is converted to the appropriate `app_feature` value expected in our
/// COGS pipeline.
pub fn count(usecase: &str) {
count_by(usecase, 1)
}
18 changes: 15 additions & 3 deletions objectstore-server/src/endpoints/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::auth::AuthAwareService;
use crate::batch::{
HEADER_BATCH_OPERATION_INDEX, HEADER_BATCH_OPERATION_KEY, HEADER_BATCH_OPERATION_KIND,
};
use crate::cogs;
use crate::endpoints::common::{ApiError, ApiErrorResponse};
use crate::extractors::Xt;
use crate::extractors::batch::{BatchError, BatchOperationStream};
Expand All @@ -27,6 +28,8 @@ use crate::state::ServiceState;
const MAX_BODY_SIZE: usize = 1024 * 1024 * 1024; // 1 GB
const HEADER_BATCH_OPERATION_STATUS: &str = "x-sn-batch-operation-status";

pub(crate) const ROUTE_PREFIX: &str = "objects:batch";

pub fn router() -> Router<ServiceState> {
Router::new()
.route("/objects:batch/{usecase}/{scopes}/", routing::post(batch))
Expand Down Expand Up @@ -109,8 +112,17 @@ async fn batch(
}
});

// Step 5: stamp inserts with time_created and record bandwidth
let stamped = policy_checked.map({
// Step 5: Increment COGS counter for each operation
let cogs_counted = validate(policy_checked, {
let usecase = context.usecase.clone();
move |_op| {
cogs::count(&usecase);
Ok(())
}
});
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Batch COGS skips pre-auth ops

Medium Severity

Per-operation objectstore.cogs.usage is incremented only after rate limiting, authorization, and use-case policy checks, while non-batch routes increment once in middleware before those checks. Batch operations that fail earlier in the pipeline emit no COGS, and batch HTTP requests rejected before the handler also emit none, unlike equivalent single-object requests.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit bdefa80. Configure here.


// Step 6: stamp inserts with time_created and record bandwidth
let stamped = cogs_counted.map({
let state = Arc::clone(&state);
let context = context.clone();
move |(idx, mut item)| {
Expand All @@ -122,7 +134,7 @@ async fn batch(
}
});

// Step 6: execute concurrently, then convert each result to a multipart Part
// Step 7: execute concurrently, then convert each result to a multipart Part
let state_ref = Arc::clone(&state);
let context_ref = context.clone();
let responses = batch.execute(context, stamped).then(move |(idx, result)| {
Expand Down
7 changes: 7 additions & 0 deletions objectstore-server/src/endpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ pub fn is_internal_route(route: &str) -> bool {
matches!(route, "/health" | "/ready" | "/keda")
}

/// Returns `true` for batch routes which may need separate instrumentation for auth or metrics.
pub fn is_batch_route(route: &str) -> bool {
let mut split = route.split('/');
split.next(); // Routes start with `/` so the first component will be empty
split.next() == Some("v1") && split.next() == Some(batch::ROUTE_PREFIX)
}

/// Returns a router with all objectstore HTTP endpoints mounted.
///
/// Mounts health and KEDA endpoints at the root and all object/batch
Expand Down
1 change: 1 addition & 0 deletions objectstore-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
pub mod auth;
pub mod batch;
pub mod cli;
pub mod cogs;
pub mod config;
pub mod endpoints;
pub mod extractors;
Expand Down
117 changes: 113 additions & 4 deletions objectstore-server/src/web/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ use std::any::Any;
use std::net::SocketAddr;

use axum::RequestExt;
use axum::extract::{ConnectInfo, MatchedPath, Request, State};
use axum::extract::{ConnectInfo, MatchedPath, RawPathParams, Request, State};
use axum::http::{HeaderValue, Method, StatusCode, header};
use axum::middleware::Next;
use axum::response::{IntoResponse, Response};
use objectstore_log::tracing;
use tokio::time::Instant;
use tower_http::set_header::SetResponseHeaderLayer;

use crate::endpoints::is_internal_route;
use crate::cogs;
use crate::endpoints::{is_batch_route, is_internal_route};
use crate::extractors::downstream_service::DownstreamService;
use crate::web::RequestCounter;

Expand Down Expand Up @@ -85,7 +86,7 @@ pub fn handle_panic(err: Box<dyn Any + Send + 'static>) -> Response {
response.into_response()
}

/// A middleware that logs web request timings as metrics.
/// A middleware that logs web request counts and timings as metrics.
///
/// Use this with [`from_fn`](axum::middleware::from_fn).
///
Expand All @@ -98,6 +99,8 @@ pub async fn emit_request_metrics(mut request: Request, next: Next) -> Response
let should_emit = !is_internal_route(route);
let guard = should_emit.then(|| EmitMetricsGuard::new(route, request.method(), service));

maybe_emit_cogs_usage(route, &mut request).await;

let response = next.run(request).await;

if let Some(guard) = guard {
Expand All @@ -106,6 +109,33 @@ pub async fn emit_request_metrics(mut request: Request, next: Next) -> Response
response
}

/// Increment a COGS request counter. See [cogs::count]. Skipped for batch endpoints.
///
/// For COGS purposes we use request count as a proxy for compute cost under the assumption that
/// each request we serve has a basically flat CPU cost. Large payloads take longer, but they can be
/// streamed in the background while other requests are served so they don't really cost more. The
/// exception to this assumption is batch requests: a 1000-object batch request is going to cost
/// meaningfully more CPU than a single-object request. So, batch requests must implement COGS
/// counters separately.
async fn maybe_emit_cogs_usage(route: &str, request: &mut Request) {
if is_internal_route(route) || is_batch_route(route) {
return;
}

let Ok(params) = request.extract_parts::<RawPathParams>().await else {
return;
};

let Some(usecase) = params
.iter()
.find_map(|(key, value)| (key == "usecase").then_some(value))
else {
return;
};

cogs::count(usecase)
}

/// Helper for [`emit_request_metrics`].
///
/// This tracks relevant generic request parameters and emits metrics. If the guard is dropped
Expand Down Expand Up @@ -162,14 +192,93 @@ mod tests {
use axum::Router;
use axum::body::Body;
use axum::http::{Request, StatusCode};
use axum::middleware::from_fn_with_state;
use axum::middleware::{from_fn, from_fn_with_state};
use axum::response::IntoResponse;
use axum::routing::get;
use tokio::sync::Notify;
use tower::ServiceExt;

use super::*;

fn capture_metrics(fut: impl std::future::Future<Output = ()>) -> Vec<String> {
objectstore_metrics::with_capturing_test_client(|| {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(fut);
})
}

#[test]
fn emits_cogs_usage_for_service_routes() {
let captured = capture_metrics(async {
let app = Router::new()
.route(
"/v1/objects/{usecase}/{scopes}/{*key}",
get(|| async { StatusCode::OK.into_response() }),
)
.layer(from_fn(emit_request_metrics));

let resp = app
.oneshot(make_request("/v1/objects/attachments/org=1/my-key"))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
});

assert!(
captured
.iter()
.any(|m| m == "objectstore.cogs.usage:+1|c|#app_feature:attachments"),
"missing cogs usage metric, captured: {captured:?}"
);
}

#[test]
fn no_cogs_usage_for_batch_routes() {
let captured = capture_metrics(async {
let app = Router::new()
.route(
"/v1/objects:batch/{usecase}/{scopes}/{*key}",
get(|| async { StatusCode::OK.into_response() }),
)
.layer(from_fn(emit_request_metrics));

let resp = app
.oneshot(make_request("/v1/objects:batch/attachments/org=1/my-key"))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
});

assert!(
!captured
.iter()
.any(|m| m.starts_with("objectstore.cogs.usage")),
"unexpected cogs usage metric for internal route, captured: {captured:?}"
);
}

#[test]
fn no_cogs_usage_for_internal_routes() {
let captured = capture_metrics(async {
let app = Router::new()
.route("/health", get(|| async { StatusCode::OK.into_response() }))
.layer(from_fn(emit_request_metrics));

let resp = app.oneshot(make_request("/health")).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
});

assert!(
!captured
.iter()
.any(|m| m.starts_with("objectstore.cogs.usage")),
"unexpected cogs usage metric for internal route, captured: {captured:?}"
);
}

fn make_request(uri: &str) -> Request<Body> {
Request::builder().uri(uri).body(Body::empty()).unwrap()
}
Expand Down
Loading