diff --git a/objectstore-metrics/src/lib.rs b/objectstore-metrics/src/lib.rs index 094b9751..6f950906 100644 --- a/objectstore-metrics/src/lib.rs +++ b/objectstore-metrics/src/lib.rs @@ -2,7 +2,7 @@ //! //! This crate provides three things: //! -//! 1. [`count!`], [`gauge!`], and [`record!`] macros with rustfmt-friendly +//! 1. [`count!`], [`gauge!`], [`record!`], and [`timer!`] macros with rustfmt-friendly //! expression-based syntax. //! 2. [`MetricsConfig`] and [`init`] for wiring up a DogStatsD exporter. //! 3. [`with_capturing_test_client`] for asserting on emitted metrics in tests. @@ -11,7 +11,7 @@ //! //! ```rust //! use std::time::Duration; -//! use objectstore_metrics::{count, gauge, record}; +//! use objectstore_metrics::{count, gauge, record, timer}; //! //! let stored_size: u64 = 1024; //! let elapsed = Duration::from_secs(1); @@ -82,6 +82,91 @@ impl AsF64 for std::time::Duration { } } +/// A guard that measures elapsed time and records it as a distribution metric. +/// +/// Created by the [`timer!`] macro. Records with `success:true` when +/// [`record()`](TimerGuard::record) is called, or `success:false` when dropped +/// without calling `record()`. +/// Call [`success()`](TimerGuard::success) to override this behavior and record +/// with `success:true` even on drop. +/// +/// Tags can be added after creation via [`tag()`](TimerGuard::tag). +#[must_use = "timer! returns a guard that records the metric on guard.record() or on drop, bind it to a variable"] +pub struct TimerGuard { + start: std::time::Instant, + name: &'static str, + module_path: &'static str, + labels: Vec, + record_failure_on_drop: bool, + recorded: bool, +} + +impl TimerGuard { + #[doc(hidden)] + pub fn new(name: &'static str, module_path: &'static str, labels: Vec) -> Self { + Self { + start: std::time::Instant::now(), + name, + module_path, + labels, + record_failure_on_drop: true, + recorded: false, + } + } + + /// Returns the time elapsed since the guard was created. + pub fn elapsed(&self) -> std::time::Duration { + self.start.elapsed() + } + + /// Adds a tag to the metric. + pub fn tag(mut self, key: &'static str, value: impl Into) -> Self { + self.labels.push(metrics::Label::new(key, value)); + self + } + + /// Changes the behavior of this guard to always record the metric + /// with `success:true`, even on drop. + pub fn success(mut self) -> Self { + self.record_failure_on_drop = false; + self + } + + /// Consumes the guard, recording the elapsed time with `success:true`. + pub fn record(mut self) { + self.emit("true"); + } + + fn emit(&mut self, success: &'static str) { + self.recorded = true; + let mut labels = std::mem::take(&mut self.labels); + labels.push(metrics::Label::new("success", success)); + let key = metrics::Key::from_parts(self.name, labels); + let metadata = metrics::Metadata::new( + self.module_path, + metrics::Level::INFO, + Some(self.module_path), + ); + metrics::with_recorder(|rec| { + rec.register_histogram(&key, &metadata) + .record(AsF64::as_f64(self.start.elapsed())); + }); + } +} + +impl Drop for TimerGuard { + fn drop(&mut self) { + if !self.recorded { + let success = if self.record_failure_on_drop { + "false" + } else { + "true" + }; + self.emit(success); + } + } +} + /// Re-exports used by macro expansion. Not part of the public API. #[doc(hidden)] pub mod _macro_support { @@ -343,3 +428,49 @@ macro_rules! record { .record($crate::_macro_support::AsF64::as_f64($value)); }; } + +/// Starts a timer that records elapsed time in fractional seconds as a +/// distribution metric. +/// +/// Returns a [`TimerGuard`] that captures `Instant::now()` at creation. +/// Call [`.record()`](TimerGuard::record) to record the metric with the +/// tag `success:true`, or let it drop to record with `success:false`. +/// +/// If you want to override this behavior and record the metric with +/// `success:true` even on drop, call [`.success()`](TimerGuard::success) +/// on the guard. +/// +/// Tags can also be added after creation via [`.tag()`](TimerGuard::tag), +/// which is useful when some tag values depend on the outcome of the +/// timed operation. +/// +/// # Syntax +/// +/// ```rust +/// use objectstore_metrics::timer; +/// +/// let guard = timer!("server.requests.duration"); +/// let guard = timer!("server.requests.duration", route = "/v1/test"); +/// // ... do work ... +/// guard.record(); // records elapsed time with success:true +/// ``` +/// +/// ```rust +/// use objectstore_metrics::timer; +/// +/// let guard = timer!("server.requests.duration", route = "/v1/test"); +/// // ... determine backend ... +/// let guard = guard.tag("backend", "gcs"); +/// guard.record(); +/// ``` +/// +/// Tag keys are identifiers; tag values must implement `Into`. +#[macro_export] +macro_rules! timer { + ($name:literal $(, $tag:ident = $tv:expr)* $(,)?) => {{ + let labels = vec![ + $($crate::_macro_support::metrics::Label::new(stringify!($tag), $tv),)* + ]; + $crate::TimerGuard::new($name, module_path!(), labels) + }}; +} diff --git a/objectstore-metrics/src/mock.rs b/objectstore-metrics/src/mock.rs index 43061861..62b61d91 100644 --- a/objectstore-metrics/src/mock.rs +++ b/objectstore-metrics/src/mock.rs @@ -208,4 +208,80 @@ mod tests { assert_eq!(captured.len(), 1); assert_eq!(captured[0], "test.latency:2|d|#route:/v1/test,method:GET"); } + + #[test] + fn timer_record_emits_success_true() { + let captured = with_capturing_test_client(|| { + let guard = crate::timer!("test.timer"); + guard.record(); + }); + assert_eq!(captured.len(), 1); + assert!(captured[0].starts_with("test.timer:")); + assert!(captured[0].contains("|d|#success:true")); + } + + #[test] + fn timer_drop_emits_success_false() { + let captured = with_capturing_test_client(|| { + let _guard = crate::timer!("test.timer"); + }); + assert_eq!(captured.len(), 1); + assert!(captured[0].starts_with("test.timer:")); + assert!(captured[0].contains("|d|#success:false")); + } + + #[test] + fn timer_drop_with_success_emits_success_true() { + let captured = with_capturing_test_client(|| { + let _guard = crate::timer!("test.timer").success(); + }); + assert_eq!(captured.len(), 1); + assert!(captured[0].starts_with("test.timer:")); + assert!(captured[0].contains("|d|#success:true")); + } + + #[test] + fn timer_with_tags() { + let captured = with_capturing_test_client(|| { + let guard = crate::timer!("test.timer", route = "/v1/test"); + guard.record(); + }); + assert_eq!(captured.len(), 1); + assert!(captured[0].starts_with("test.timer:")); + assert!(captured[0].contains("route:/v1/test")); + assert!(captured[0].contains("success:true")); + } + + #[test] + fn timer_drop_with_tags() { + let captured = with_capturing_test_client(|| { + let _guard = crate::timer!("test.timer", op = "put"); + }); + assert_eq!(captured.len(), 1); + assert!(captured[0].contains("op:put")); + assert!(captured[0].contains("success:false")); + } + + #[test] + fn timer_deferred_tag_on_record() { + let captured = with_capturing_test_client(|| { + let guard = crate::timer!("test.timer", usecase = "test"); + guard.tag("backend", "gcs").record(); + }); + assert_eq!(captured.len(), 1); + assert!(captured[0].contains("usecase:test")); + assert!(captured[0].contains("backend:gcs")); + assert!(captured[0].contains("success:true")); + } + + #[test] + fn timer_deferred_tag_on_drop() { + let captured = with_capturing_test_client(|| { + let _guard = crate::timer!("test.timer", usecase = "test").tag("backend", "gcs"); + }); + assert_eq!(captured.len(), 1); + assert!(captured[0].contains("usecase:test")); + assert!(captured[0].contains("backend:gcs")); + assert!(captured[0].contains("success:false")); + } } diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index 74a82e34..121e0a4f 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -99,7 +99,7 @@ use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; -use std::time::{Duration, Instant, SystemTime}; +use std::time::{Duration, SystemTime}; use base64::Engine as _; use bytes::Bytes; @@ -375,14 +375,14 @@ impl Backend for TieredStorage { metadata: &Metadata, stream: ClientStream, ) -> Result { - let start = Instant::now(); + let timer = objectstore_metrics::timer!("put.latency", usecase = id.usecase().to_owned()); if metadata.origin.is_none() { objectstore_metrics::count!("put.origin_missing", usecase = id.usecase().to_owned()); } let peeked = SizedPeek::new(stream, BACKEND_SIZE_THRESHOLD).await?; objectstore_metrics::record!( - "put.first_chunk.latency" = start.elapsed(), + "put.first_chunk.latency" = timer.elapsed(), usecase = id.usecase().to_owned(), complete = if peeked.is_exhausted() { "yes" } else { "no" }, ); @@ -399,12 +399,10 @@ impl Backend for TieredStorage { }; let backend_ty = self.backend_type(&backend_choice); - objectstore_metrics::record!( - "put.latency" = start.elapsed(), - usecase = id.usecase().to_owned(), - backend_choice = backend_choice.as_str(), - backend_type = backend_ty, - ); + timer + .tag("backend_choice", backend_choice.as_str()) + .tag("backend_type", backend_ty) + .record(); objectstore_metrics::record!( "put.size" = stored_size, usecase = id.usecase().to_owned(), @@ -417,7 +415,10 @@ impl Backend for TieredStorage { } async fn get_object(&self, id: &ObjectId) -> Result { - let start = Instant::now(); + let timer = objectstore_metrics::timer!( + "get.latency.pre-response", + usecase = id.usecase().to_owned(), + ); let hv_result = self.inner.high_volume.get_tiered_object(id).await?; let (result, backend_choice) = match hv_result { @@ -432,12 +433,10 @@ impl Backend for TieredStorage { }; let backend_type = self.backend_type(&backend_choice); - objectstore_metrics::record!( - "get.latency.pre-response" = start.elapsed(), - usecase = id.usecase().to_owned(), - backend_choice = backend_choice.as_str(), - backend_type = backend_type, - ); + timer + .tag("backend_choice", backend_choice.as_str()) + .tag("backend_type", backend_type) + .record(); if let Some((ref metadata, _)) = result { if let Some(size) = metadata.size { @@ -456,7 +455,7 @@ impl Backend for TieredStorage { } async fn get_metadata(&self, id: &ObjectId) -> Result { - let start = Instant::now(); + let timer = objectstore_metrics::timer!("head.latency", usecase = id.usecase().to_owned()); let hv_result = self.inner.high_volume.get_tiered_metadata(id).await?; let (result, backend_choice) = match hv_result { @@ -468,18 +467,17 @@ impl Backend for TieredStorage { ), }; - objectstore_metrics::record!( - "head.latency" = start.elapsed(), - usecase = id.usecase().to_owned(), - backend_choice = backend_choice.as_str(), - backend_type = self.backend_type(&backend_choice), - ); + timer + .tag("backend_choice", backend_choice.as_str()) + .tag("backend_type", self.backend_type(&backend_choice)) + .record(); Ok(result) } async fn delete_object(&self, id: &ObjectId) -> Result { - let start = Instant::now(); + let timer = + objectstore_metrics::timer!("delete.latency", usecase = id.usecase().to_owned()); let mut backend_choice = BackendChoice::HighVolume; @@ -507,12 +505,10 @@ impl Backend for TieredStorage { guard.advance(ChangePhase::compare_and_write(deleted)); } - objectstore_metrics::record!( - "delete.latency" = start.elapsed(), - usecase = id.usecase().to_owned(), - backend_choice = backend_choice.as_str(), - backend_type = self.backend_type(&backend_choice), - ); + timer + .tag("backend_choice", backend_choice.as_str()) + .tag("backend_type", self.backend_type(&backend_choice)) + .record(); Ok(()) } @@ -605,7 +601,10 @@ impl MultipartUploadBackend for TieredStorage { id: &ObjectId, metadata: &Metadata, ) -> Result { - let start = Instant::now(); + let timer = objectstore_metrics::timer!( + "multipart.initiate.latency", + usecase = id.usecase().to_owned(), + ); let physical = new_long_term_revision(id); let upload_id = self @@ -614,16 +613,14 @@ impl MultipartUploadBackend for TieredStorage { .initiate_multipart(&physical, metadata) .await?; - objectstore_metrics::record!( - "multipart.initiate.latency" = start.elapsed(), - usecase = id.usecase().to_owned(), - ); - let id = TieredUploadId { revision: physical.key, upload_id, }; - id.try_into() + let id = id.try_into()?; + + timer.record(); + Ok(id) } async fn upload_part( @@ -635,7 +632,10 @@ impl MultipartUploadBackend for TieredStorage { content_md5: Option<&str>, body: ClientStream, ) -> Result { - let start = Instant::now(); + let timer = objectstore_metrics::timer!( + "multipart.upload_part.latency", + usecase = id.usecase().to_owned(), + ); let tiered: TieredUploadId = upload_id.try_into()?; let physical = ObjectId { @@ -656,10 +656,7 @@ impl MultipartUploadBackend for TieredStorage { ) .await?; - objectstore_metrics::record!( - "multipart.upload_part.latency" = start.elapsed(), - usecase = id.usecase().to_owned(), - ); + timer.record(); objectstore_metrics::record!( "multipart.upload_part.size" = content_length, usecase = id.usecase().to_owned(), @@ -675,7 +672,10 @@ impl MultipartUploadBackend for TieredStorage { max_parts: Option, part_number_marker: Option, ) -> Result { - let start = Instant::now(); + let timer = objectstore_metrics::timer!( + "multipart.list_parts.latency", + usecase = id.usecase().to_owned(), + ); let tiered: TieredUploadId = upload_id.try_into()?; let physical = ObjectId { @@ -683,18 +683,14 @@ impl MultipartUploadBackend for TieredStorage { key: tiered.revision, }; - let result = self + let response = self .inner .long_term .list_parts(&physical, &tiered.upload_id, max_parts, part_number_marker) - .await; - - objectstore_metrics::record!( - "multipart.list_parts.latency" = start.elapsed(), - usecase = id.usecase().to_owned(), - ); + .await?; - result + timer.record(); + Ok(response) } async fn abort_multipart( @@ -702,7 +698,10 @@ impl MultipartUploadBackend for TieredStorage { id: &ObjectId, upload_id: &UploadId, ) -> Result { - let start = Instant::now(); + let timer = objectstore_metrics::timer!( + "multipart.abort.latency", + usecase = id.usecase().to_owned(), + ); let tiered: TieredUploadId = upload_id.try_into()?; let physical = ObjectId { @@ -710,18 +709,14 @@ impl MultipartUploadBackend for TieredStorage { key: tiered.revision, }; - let result = self + let () = self .inner .long_term .abort_multipart(&physical, &tiered.upload_id) - .await; - - objectstore_metrics::record!( - "multipart.abort.latency" = start.elapsed(), - usecase = id.usecase().to_owned(), - ); + .await?; - result + timer.record(); + Ok(()) } async fn complete_multipart( @@ -730,7 +725,10 @@ impl MultipartUploadBackend for TieredStorage { upload_id: &UploadId, parts: Vec, ) -> Result { - let start = Instant::now(); + let timer = objectstore_metrics::timer!( + "multipart.complete.latency", + usecase = id.usecase().to_owned(), + ); let part_count = parts.len(); let tiered: TieredUploadId = upload_id.try_into()?; @@ -742,7 +740,10 @@ impl MultipartUploadBackend for TieredStorage { // 1. Read current HV revision to establish the write precondition. let current = match self.inner.high_volume.get_tiered_metadata(id).await? { // Optimization: a previous attempt already finalized this revision and tombstone -- report success. - TieredMetadata::Tombstone(t) if t.target == physical => return Ok(None), + TieredMetadata::Tombstone(t) if t.target == physical => { + timer.record(); + return Ok(None); + } TieredMetadata::Tombstone(t) => Some(t.target), _ => None, }; @@ -830,10 +831,7 @@ impl MultipartUploadBackend for TieredStorage { // Update guard and let it schedule cleanup in the background. guard.advance(ChangePhase::compare_and_write(written)); - objectstore_metrics::record!( - "multipart.complete.latency" = start.elapsed(), - usecase = id.usecase().to_owned(), - ); + timer.record(); objectstore_metrics::record!( "multipart.complete.part_count" = part_count as u64, usecase = id.usecase().to_owned(),