From bdefa80357faca0df565f5d2de2dcec96170d519 Mon Sep 17 00:00:00 2001 From: Matt Hammerly Date: Fri, 5 Jun 2026 16:53:54 -0700 Subject: [PATCH] feat(cogs): objectstore.cogs.usage datadog metric --- objectstore-server/src/cogs.rs | 33 ++++++ objectstore-server/src/endpoints/batch.rs | 18 +++- objectstore-server/src/endpoints/mod.rs | 7 ++ objectstore-server/src/lib.rs | 1 + objectstore-server/src/web/middleware.rs | 117 +++++++++++++++++++++- 5 files changed, 169 insertions(+), 7 deletions(-) create mode 100644 objectstore-server/src/cogs.rs diff --git a/objectstore-server/src/cogs.rs b/objectstore-server/src/cogs.rs new file mode 100644 index 00000000..afaa23a8 --- /dev/null +++ b/objectstore-server/src/cogs.rs @@ -0,0 +1,33 @@ +//! Cost of Goods Sold (COGS) instrumentation utilities for Objectstore. + +/// Converts an Objectstore `usecase` to an appropriate `app_feature` value for use in our COGS +/// pipelines. +fn usecase_to_appfeature(usecase: &str) -> &'static str { + // TODO: Flesh out this mapping + match usecase { + "attachments" => "attachments", + _ => { + objectstore_log::warn!(?usecase, "COGS: Can't convert usecase to app_feature"); + "shared" + } + } +} + +/// Increment `objectstore.cogs.usage` by `incr` for the given `usecase`. +/// +/// Under the hood, `usecase` is converted to the appropriate `app_feature` value expected in our +/// COGS pipeline. +pub fn count_by(usecase: &str, incr: usize) { + objectstore_metrics::count!( + "objectstore.cogs.usage" += incr, + app_feature = usecase_to_appfeature(usecase), + ); +} + +/// Increment `objectstore.cogs.usage` by 1 for the given `usecase`. +/// +/// Under the hood, `usecase` is converted to the appropriate `app_feature` value expected in our +/// COGS pipeline. +pub fn count(usecase: &str) { + count_by(usecase, 1) +} diff --git a/objectstore-server/src/endpoints/batch.rs b/objectstore-server/src/endpoints/batch.rs index 13d20ac0..9200a3a6 100644 --- a/objectstore-server/src/endpoints/batch.rs +++ b/objectstore-server/src/endpoints/batch.rs @@ -18,6 +18,7 @@ use crate::auth::AuthAwareService; use crate::batch::{ HEADER_BATCH_OPERATION_INDEX, HEADER_BATCH_OPERATION_KEY, HEADER_BATCH_OPERATION_KIND, }; +use crate::cogs; use crate::endpoints::common::{ApiError, ApiErrorResponse}; use crate::extractors::Xt; use crate::extractors::batch::{BatchError, BatchOperationStream}; @@ -27,6 +28,8 @@ use crate::state::ServiceState; const MAX_BODY_SIZE: usize = 1024 * 1024 * 1024; // 1 GB const HEADER_BATCH_OPERATION_STATUS: &str = "x-sn-batch-operation-status"; +pub(crate) const ROUTE_PREFIX: &str = "objects:batch"; + pub fn router() -> Router { Router::new() .route("/objects:batch/{usecase}/{scopes}/", routing::post(batch)) @@ -109,8 +112,17 @@ async fn batch( } }); - // Step 5: stamp inserts with time_created and record bandwidth - let stamped = policy_checked.map({ + // Step 5: Increment COGS counter for each operation + let cogs_counted = validate(policy_checked, { + let usecase = context.usecase.clone(); + move |_op| { + cogs::count(&usecase); + Ok(()) + } + }); + + // Step 6: stamp inserts with time_created and record bandwidth + let stamped = cogs_counted.map({ let state = Arc::clone(&state); let context = context.clone(); move |(idx, mut item)| { @@ -122,7 +134,7 @@ async fn batch( } }); - // Step 6: execute concurrently, then convert each result to a multipart Part + // Step 7: execute concurrently, then convert each result to a multipart Part let state_ref = Arc::clone(&state); let context_ref = context.clone(); let responses = batch.execute(context, stamped).then(move |(idx, result)| { diff --git a/objectstore-server/src/endpoints/mod.rs b/objectstore-server/src/endpoints/mod.rs index a29f5060..f3647092 100644 --- a/objectstore-server/src/endpoints/mod.rs +++ b/objectstore-server/src/endpoints/mod.rs @@ -18,6 +18,13 @@ pub fn is_internal_route(route: &str) -> bool { matches!(route, "/health" | "/ready" | "/keda") } +/// Returns `true` for batch routes which may need separate instrumentation for auth or metrics. +pub fn is_batch_route(route: &str) -> bool { + let mut split = route.split('/'); + split.next(); // Routes start with `/` so the first component will be empty + split.next() == Some("v1") && split.next() == Some(batch::ROUTE_PREFIX) +} + /// Returns a router with all objectstore HTTP endpoints mounted. /// /// Mounts health and KEDA endpoints at the root and all object/batch diff --git a/objectstore-server/src/lib.rs b/objectstore-server/src/lib.rs index 1fc6d724..1c8d34ef 100644 --- a/objectstore-server/src/lib.rs +++ b/objectstore-server/src/lib.rs @@ -5,6 +5,7 @@ pub mod auth; pub mod batch; pub mod cli; +pub mod cogs; pub mod config; pub mod endpoints; pub mod extractors; diff --git a/objectstore-server/src/web/middleware.rs b/objectstore-server/src/web/middleware.rs index 195b2985..93c710d1 100644 --- a/objectstore-server/src/web/middleware.rs +++ b/objectstore-server/src/web/middleware.rs @@ -2,7 +2,7 @@ use std::any::Any; use std::net::SocketAddr; use axum::RequestExt; -use axum::extract::{ConnectInfo, MatchedPath, Request, State}; +use axum::extract::{ConnectInfo, MatchedPath, RawPathParams, Request, State}; use axum::http::{HeaderValue, Method, StatusCode, header}; use axum::middleware::Next; use axum::response::{IntoResponse, Response}; @@ -10,7 +10,8 @@ use objectstore_log::tracing; use tokio::time::Instant; use tower_http::set_header::SetResponseHeaderLayer; -use crate::endpoints::is_internal_route; +use crate::cogs; +use crate::endpoints::{is_batch_route, is_internal_route}; use crate::extractors::downstream_service::DownstreamService; use crate::web::RequestCounter; @@ -85,7 +86,7 @@ pub fn handle_panic(err: Box) -> Response { response.into_response() } -/// A middleware that logs web request timings as metrics. +/// A middleware that logs web request counts and timings as metrics. /// /// Use this with [`from_fn`](axum::middleware::from_fn). /// @@ -98,6 +99,8 @@ pub async fn emit_request_metrics(mut request: Request, next: Next) -> Response let should_emit = !is_internal_route(route); let guard = should_emit.then(|| EmitMetricsGuard::new(route, request.method(), service)); + maybe_emit_cogs_usage(route, &mut request).await; + let response = next.run(request).await; if let Some(guard) = guard { @@ -106,6 +109,33 @@ pub async fn emit_request_metrics(mut request: Request, next: Next) -> Response response } +/// Increment a COGS request counter. See [cogs::count]. Skipped for batch endpoints. +/// +/// For COGS purposes we use request count as a proxy for compute cost under the assumption that +/// each request we serve has a basically flat CPU cost. Large payloads take longer, but they can be +/// streamed in the background while other requests are served so they don't really cost more. The +/// exception to this assumption is batch requests: a 1000-object batch request is going to cost +/// meaningfully more CPU than a single-object request. So, batch requests must implement COGS +/// counters separately. +async fn maybe_emit_cogs_usage(route: &str, request: &mut Request) { + if is_internal_route(route) || is_batch_route(route) { + return; + } + + let Ok(params) = request.extract_parts::().await else { + return; + }; + + let Some(usecase) = params + .iter() + .find_map(|(key, value)| (key == "usecase").then_some(value)) + else { + return; + }; + + cogs::count(usecase) +} + /// Helper for [`emit_request_metrics`]. /// /// This tracks relevant generic request parameters and emits metrics. If the guard is dropped @@ -162,7 +192,7 @@ mod tests { use axum::Router; use axum::body::Body; use axum::http::{Request, StatusCode}; - use axum::middleware::from_fn_with_state; + use axum::middleware::{from_fn, from_fn_with_state}; use axum::response::IntoResponse; use axum::routing::get; use tokio::sync::Notify; @@ -170,6 +200,85 @@ mod tests { use super::*; + fn capture_metrics(fut: impl std::future::Future) -> Vec { + objectstore_metrics::with_capturing_test_client(|| { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(fut); + }) + } + + #[test] + fn emits_cogs_usage_for_service_routes() { + let captured = capture_metrics(async { + let app = Router::new() + .route( + "/v1/objects/{usecase}/{scopes}/{*key}", + get(|| async { StatusCode::OK.into_response() }), + ) + .layer(from_fn(emit_request_metrics)); + + let resp = app + .oneshot(make_request("/v1/objects/attachments/org=1/my-key")) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + }); + + assert!( + captured + .iter() + .any(|m| m == "objectstore.cogs.usage:+1|c|#app_feature:attachments"), + "missing cogs usage metric, captured: {captured:?}" + ); + } + + #[test] + fn no_cogs_usage_for_batch_routes() { + let captured = capture_metrics(async { + let app = Router::new() + .route( + "/v1/objects:batch/{usecase}/{scopes}/{*key}", + get(|| async { StatusCode::OK.into_response() }), + ) + .layer(from_fn(emit_request_metrics)); + + let resp = app + .oneshot(make_request("/v1/objects:batch/attachments/org=1/my-key")) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + }); + + assert!( + !captured + .iter() + .any(|m| m.starts_with("objectstore.cogs.usage")), + "unexpected cogs usage metric for internal route, captured: {captured:?}" + ); + } + + #[test] + fn no_cogs_usage_for_internal_routes() { + let captured = capture_metrics(async { + let app = Router::new() + .route("/health", get(|| async { StatusCode::OK.into_response() })) + .layer(from_fn(emit_request_metrics)); + + let resp = app.oneshot(make_request("/health")).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + }); + + assert!( + !captured + .iter() + .any(|m| m.starts_with("objectstore.cogs.usage")), + "unexpected cogs usage metric for internal route, captured: {captured:?}" + ); + } + fn make_request(uri: &str) -> Request { Request::builder().uri(uri).body(Body::empty()).unwrap() }