diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index fbed6f33ebb..2fec6f79ebf 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -1,5 +1,6 @@ use super::module_host::{EventStatus, ModuleHost, ModuleInfo, NoSuchModule}; use super::scheduler::SchedulerStarter; +use super::v8::V8HeapMetrics; use super::wasmtime::WasmtimeRuntime; use super::{Scheduler, UpdateDatabaseResult}; use crate::client::{ClientActorId, ClientName}; @@ -1411,26 +1412,8 @@ where .data_size_blob_store_bytes_used_by_blobs .remove_label_values(db); let _ = WORKER_METRICS.wasm_memory_bytes.remove_label_values(db); - let worker_kind = crate::host::v8::V8_WORKER_KIND_MAIN; - let _ = WORKER_METRICS - .v8_total_heap_size_bytes - .remove_label_values(db, worker_kind); - let _ = WORKER_METRICS - .v8_total_physical_size_bytes - .remove_label_values(db, worker_kind); - let _ = WORKER_METRICS - .v8_used_global_handles_size_bytes - .remove_label_values(db, worker_kind); - let _ = WORKER_METRICS - .v8_used_heap_size_bytes - .remove_label_values(db, worker_kind); - let _ = WORKER_METRICS - .v8_heap_size_limit_bytes - .remove_label_values(db, worker_kind); - let _ = WORKER_METRICS - .v8_external_memory_bytes - .remove_label_values(db, worker_kind); - let _ = WORKER_METRICS.v8_native_contexts.remove_label_values(db, worker_kind); - let _ = WORKER_METRICS.v8_detached_contexts.remove_label_values(db, worker_kind); + + V8HeapMetrics::remove_all_metric_label_values_for_database(db); + let _ = WORKER_METRICS.v8_request_queue_length.remove_label_values(db); } diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index db5b64bf5fc..63c8e409424 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -169,17 +169,19 @@ impl V8Runtime { static V8_RUNTIME_GLOBAL: LazyLock = LazyLock::new(V8RuntimeInner::init); const REDUCER_ARGS_BUFFER_SIZE: usize = 4_096; const JS_PROCEDURE_INSTANCE_QUEUE_CAPACITY: usize = 1; -pub(crate) const V8_WORKER_KIND_MAIN: &str = "main"; #[derive(Copy, Clone)] -enum JsWorkerKind { +pub(crate) enum JsWorkerKind { Main, Procedure, } -impl JsWorkerKind { - const fn checks_heap(self) -> bool { - matches!(self, Self::Main) +impl AsRef for JsWorkerKind { + fn as_ref(&self) -> &str { + match self { + Self::Main => "main", + Self::Procedure => "procedure", + } } } @@ -946,7 +948,7 @@ fn handle_detached_worker_request( } } -struct V8HeapMetrics { +pub(in crate::host) struct V8HeapMetrics { total_heap_size_bytes: IntGauge, total_physical_size_bytes: IntGauge, used_global_handles_size_bytes: IntGauge, @@ -955,6 +957,16 @@ struct V8HeapMetrics { external_memory_bytes: IntGauge, native_contexts: IntGauge, detached_contexts: IntGauge, + + /// Previous values observed by this instance. + /// + /// In [`Self::observe`], we use this to compute deltas against the new instance's values, + /// then increment/decrement the metric values by those deltas. + /// We do this rather than `set`ting the metric values as multiple instances may coexist + /// and share the same metric label values. + /// This happens when a database has multiple procedure workers running, + /// and during a module update, as there is a period when the new version has already been created + /// but the old version has not yet shut down. last_observed: V8HeapSnapshot, } @@ -986,32 +998,61 @@ impl V8HeapSnapshot { } impl V8HeapMetrics { - fn new(database_identity: &Identity) -> Self { + pub(in crate::host) fn remove_all_metric_label_values_for_database(database_identity: &Identity) { + for worker_kind in [JsWorkerKind::Main, JsWorkerKind::Procedure] { + let _ = WORKER_METRICS + .v8_total_heap_size_bytes + .remove_label_values(database_identity, &worker_kind); + let _ = WORKER_METRICS + .v8_total_physical_size_bytes + .remove_label_values(database_identity, &worker_kind); + let _ = WORKER_METRICS + .v8_used_global_handles_size_bytes + .remove_label_values(database_identity, &worker_kind); + let _ = WORKER_METRICS + .v8_used_heap_size_bytes + .remove_label_values(database_identity, &worker_kind); + let _ = WORKER_METRICS + .v8_heap_size_limit_bytes + .remove_label_values(database_identity, &worker_kind); + let _ = WORKER_METRICS + .v8_external_memory_bytes + .remove_label_values(database_identity, &worker_kind); + let _ = WORKER_METRICS + .v8_native_contexts + .remove_label_values(database_identity, &worker_kind); + let _ = WORKER_METRICS + .v8_detached_contexts + .remove_label_values(database_identity, &worker_kind); + } + } + + fn new(database_identity: &Identity, worker_kind: JsWorkerKind) -> Self { Self { total_heap_size_bytes: WORKER_METRICS .v8_total_heap_size_bytes - .with_label_values(database_identity, V8_WORKER_KIND_MAIN), + .with_label_values(database_identity, &worker_kind), total_physical_size_bytes: WORKER_METRICS .v8_total_physical_size_bytes - .with_label_values(database_identity, V8_WORKER_KIND_MAIN), + .with_label_values(database_identity, &worker_kind), used_global_handles_size_bytes: WORKER_METRICS .v8_used_global_handles_size_bytes - .with_label_values(database_identity, V8_WORKER_KIND_MAIN), + .with_label_values(database_identity, &worker_kind), used_heap_size_bytes: WORKER_METRICS .v8_used_heap_size_bytes - .with_label_values(database_identity, V8_WORKER_KIND_MAIN), + .with_label_values(database_identity, &worker_kind), heap_size_limit_bytes: WORKER_METRICS .v8_heap_size_limit_bytes - .with_label_values(database_identity, V8_WORKER_KIND_MAIN), + .with_label_values(database_identity, &worker_kind), external_memory_bytes: WORKER_METRICS .v8_external_memory_bytes - .with_label_values(database_identity, V8_WORKER_KIND_MAIN), + .with_label_values(database_identity, &worker_kind), native_contexts: WORKER_METRICS .v8_native_contexts - .with_label_values(database_identity, V8_WORKER_KIND_MAIN), + .with_label_values(database_identity, &worker_kind), detached_contexts: WORKER_METRICS .v8_detached_contexts - .with_label_values(database_identity, V8_WORKER_KIND_MAIN), + .with_label_values(database_identity, &worker_kind), last_observed: V8HeapSnapshot::default(), } } @@ -1031,6 +1072,8 @@ impl V8HeapMetrics { } fn observe(&mut self, stats: &v8::HeapStatistics) { + // See doc comment on `Self::last_observed` for why we compute a delta and apply it to the metrics value + // rather than directly calling `set`. let next = V8HeapSnapshot::from_stats(stats); self.adjust_by(V8HeapSnapshot { total_heap_size_bytes: next.total_heap_size_bytes - self.last_observed.total_heap_size_bytes, @@ -1625,9 +1668,7 @@ where let info = &module_common.info(); let mut instance_common = InstanceCommon::new(&module_common); let replica_ctx: &Arc = module_common.replica_ctx(); - let mut heap_metrics = worker_kind - .checks_heap() - .then(|| V8HeapMetrics::new(&info.database_identity)); + let mut heap_metrics = V8HeapMetrics::new(&info.database_identity, worker_kind); let mut inst = V8Instance { scope, @@ -1639,9 +1680,7 @@ where .with_label_values(&info.database_identity), initial_heap_limit: heap_policy.heap_limit_bytes, }; - if let Some(heap_metrics) = heap_metrics.as_mut() { - let _initial_heap_stats = sample_heap_stats(inst.scope, heap_metrics); - } + let _initial_heap_stats = sample_heap_stats(inst.scope, &mut heap_metrics); // Process requests to the worker. // @@ -1655,9 +1694,7 @@ where let mut outcome = W::handle_request(request, &mut instance_common, &mut inst, &module_common, replica_ctx); - if let WorkerRequestOutcome::Continue = outcome - && let Some(heap_metrics) = heap_metrics.as_mut() - { + if let WorkerRequestOutcome::Continue = outcome { let request_check_due = heap_policy.heap_check_request_interval.is_some_and(|interval| { requests_since_heap_check += 1; requests_since_heap_check >= interval @@ -1669,7 +1706,7 @@ where requests_since_heap_check = 0; last_heap_check_at = Instant::now(); if let Some((used, limit)) = - should_retire_worker_for_heap(inst.scope, heap_metrics, heap_policy) + should_retire_worker_for_heap(inst.scope, &mut heap_metrics, heap_policy) { outcome = outcome.recreate_instance(); log::warn!( diff --git a/crates/core/src/worker_metrics/mod.rs b/crates/core/src/worker_metrics/mod.rs index c1847fa6d1c..9246022ced2 100644 --- a/crates/core/src/worker_metrics/mod.rs +++ b/crates/core/src/worker_metrics/mod.rs @@ -1,6 +1,6 @@ -use crate::hash::Hash; use crate::messages::control_db::HostType; use crate::subscription::row_list_builder_pool::BsatnRowListBuilderPool; +use crate::{hash::Hash, host::v8::JsWorkerKind}; use once_cell::sync::Lazy; use prometheus::{GaugeVec, HistogramVec, IntCounterVec, IntGaugeVec}; use spacetimedb_datastore::execution_context::WorkloadType; @@ -288,28 +288,28 @@ metrics_group!( pub wasm_memory_bytes: IntGaugeVec, #[name = spacetime_worker_v8_total_heap_size_bytes] - #[help = "The total size of the V8 heap for a database's tracked JS worker kind (currently main only)"] - #[labels(database_identity: Identity, worker_kind: str)] + #[help = "The total size of the V8 heap for a database's JS workers"] + #[labels(database_identity: Identity, worker_kind: JsWorkerKind)] pub v8_total_heap_size_bytes: IntGaugeVec, #[name = spacetime_worker_v8_total_physical_size_bytes] - #[help = "The total committed physical V8 heap memory for a database's tracked JS worker kind (currently main only)"] - #[labels(database_identity: Identity, worker_kind: str)] + #[help = "The total committed physical V8 heap memory for a database's JS workers"] + #[labels(database_identity: Identity, worker_kind: JsWorkerKind)] pub v8_total_physical_size_bytes: IntGaugeVec, #[name = spacetime_worker_v8_used_global_handles_size_bytes] - #[help = "The used size of V8 global handles for a database's tracked JS worker kind (currently main only)"] - #[labels(database_identity: Identity, worker_kind: str)] + #[help = "The used size of V8 global handles for a database's JS workers"] + #[labels(database_identity: Identity, worker_kind: JsWorkerKind)] pub v8_used_global_handles_size_bytes: IntGaugeVec, #[name = spacetime_worker_v8_used_heap_size_bytes] - #[help = "The live V8 heap size for a database's tracked JS worker kind (currently main only)"] - #[labels(database_identity: Identity, worker_kind: str)] + #[help = "The live V8 heap size for a database's JS workers"] + #[labels(database_identity: Identity, worker_kind: JsWorkerKind)] pub v8_used_heap_size_bytes: IntGaugeVec, #[name = spacetime_worker_v8_heap_size_limit_bytes] - #[help = "The V8 heap size limit for a database's tracked JS worker kind (currently main only)"] - #[labels(database_identity: Identity, worker_kind: str)] + #[help = "The V8 heap size limit for a database's JS workers"] + #[labels(database_identity: Identity, worker_kind: JsWorkerKind)] pub v8_heap_size_limit_bytes: IntGaugeVec, #[name = spacetime_worker_v8_heap_limit_hit] @@ -318,18 +318,18 @@ metrics_group!( pub v8_heap_limit_hit: IntCounterVec, #[name = spacetime_worker_v8_external_memory_bytes] - #[help = "The external memory tracked by V8 for a database's tracked JS worker kind (currently main only)"] - #[labels(database_identity: Identity, worker_kind: str)] + #[help = "The external memory tracked by V8 for a database's JS workers"] + #[labels(database_identity: Identity, worker_kind: JsWorkerKind)] pub v8_external_memory_bytes: IntGaugeVec, #[name = spacetime_worker_v8_native_contexts] - #[help = "The number of native V8 contexts for a database's tracked JS worker kind (currently main only)"] - #[labels(database_identity: Identity, worker_kind: str)] + #[help = "The number of native V8 contexts for a database's JS workers"] + #[labels(database_identity: Identity, worker_kind: JsWorkerKind)] pub v8_native_contexts: IntGaugeVec, #[name = spacetime_worker_v8_detached_contexts] - #[help = "The number of detached V8 contexts for a database's tracked JS worker kind (currently main only)"] - #[labels(database_identity: Identity, worker_kind: str)] + #[help = "The number of detached V8 contexts for a database's JS workers"] + #[labels(database_identity: Identity, worker_kind: JsWorkerKind)] pub v8_detached_contexts: IntGaugeVec, #[name = spacetime_worker_v8_request_queue_length]