Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 4 additions & 21 deletions crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::module_host::{EventStatus, ModuleHost, ModuleInfo, NoSuchModule};
use super::scheduler::SchedulerStarter;
use super::v8::V8HeapMetrics;
use super::wasmtime::WasmtimeRuntime;
use super::{Scheduler, UpdateDatabaseResult};
use crate::client::{ClientActorId, ClientName};
Expand Down Expand Up @@ -1411,26 +1412,8 @@ where
.data_size_blob_store_bytes_used_by_blobs
.remove_label_values(db);
let _ = WORKER_METRICS.wasm_memory_bytes.remove_label_values(db);
let worker_kind = crate::host::v8::V8_WORKER_KIND_MAIN;
let _ = WORKER_METRICS
.v8_total_heap_size_bytes
.remove_label_values(db, worker_kind);
let _ = WORKER_METRICS
.v8_total_physical_size_bytes
.remove_label_values(db, worker_kind);
let _ = WORKER_METRICS
.v8_used_global_handles_size_bytes
.remove_label_values(db, worker_kind);
let _ = WORKER_METRICS
.v8_used_heap_size_bytes
.remove_label_values(db, worker_kind);
let _ = WORKER_METRICS
.v8_heap_size_limit_bytes
.remove_label_values(db, worker_kind);
let _ = WORKER_METRICS
.v8_external_memory_bytes
.remove_label_values(db, worker_kind);
let _ = WORKER_METRICS.v8_native_contexts.remove_label_values(db, worker_kind);
let _ = WORKER_METRICS.v8_detached_contexts.remove_label_values(db, worker_kind);

V8HeapMetrics::remove_all_metric_label_values_for_database(db);

let _ = WORKER_METRICS.v8_request_queue_length.remove_label_values(db);
}
87 changes: 62 additions & 25 deletions crates/core/src/host/v8/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,17 +169,19 @@ impl V8Runtime {
static V8_RUNTIME_GLOBAL: LazyLock<V8RuntimeInner> = LazyLock::new(V8RuntimeInner::init);
const REDUCER_ARGS_BUFFER_SIZE: usize = 4_096;
const JS_PROCEDURE_INSTANCE_QUEUE_CAPACITY: usize = 1;
pub(crate) const V8_WORKER_KIND_MAIN: &str = "main";

#[derive(Copy, Clone)]
enum JsWorkerKind {
pub(crate) enum JsWorkerKind {
Main,
Procedure,
}

impl JsWorkerKind {
const fn checks_heap(self) -> bool {
matches!(self, Self::Main)
impl AsRef<str> for JsWorkerKind {
fn as_ref(&self) -> &str {
match self {
Self::Main => "main",
Self::Procedure => "procedure",
}
}
}

Expand Down Expand Up @@ -946,7 +948,7 @@ fn handle_detached_worker_request(
}
}

struct V8HeapMetrics {
pub(in crate::host) struct V8HeapMetrics {
total_heap_size_bytes: IntGauge,
total_physical_size_bytes: IntGauge,
used_global_handles_size_bytes: IntGauge,
Expand All @@ -955,6 +957,16 @@ struct V8HeapMetrics {
external_memory_bytes: IntGauge,
native_contexts: IntGauge,
detached_contexts: IntGauge,

/// Previous values observed by this instance.
///
/// In [`Self::observe`], we use this to compute deltas against the new instance's values,
/// then increment/decrement the metric values by those deltas.
/// We do this rather than `set`ting the metric values as multiple instances may coexist
/// and share the same metric label values.
/// This happens when a database has multiple procedure workers running,
/// and during a module update, as there is a period when the new version has already been created
/// but the old version has not yet shut down.
last_observed: V8HeapSnapshot,
Comment thread
gefjon marked this conversation as resolved.
}

Expand Down Expand Up @@ -986,32 +998,61 @@ impl V8HeapSnapshot {
}

impl V8HeapMetrics {
fn new(database_identity: &Identity) -> Self {
pub(in crate::host) fn remove_all_metric_label_values_for_database(database_identity: &Identity) {
for worker_kind in [JsWorkerKind::Main, JsWorkerKind::Procedure] {
let _ = WORKER_METRICS
.v8_total_heap_size_bytes
.remove_label_values(database_identity, &worker_kind);
let _ = WORKER_METRICS
.v8_total_physical_size_bytes
.remove_label_values(database_identity, &worker_kind);
let _ = WORKER_METRICS
.v8_used_global_handles_size_bytes
.remove_label_values(database_identity, &worker_kind);
let _ = WORKER_METRICS
.v8_used_heap_size_bytes
.remove_label_values(database_identity, &worker_kind);
let _ = WORKER_METRICS
.v8_heap_size_limit_bytes
.remove_label_values(database_identity, &worker_kind);
let _ = WORKER_METRICS
.v8_external_memory_bytes
.remove_label_values(database_identity, &worker_kind);
let _ = WORKER_METRICS
.v8_native_contexts
.remove_label_values(database_identity, &worker_kind);
let _ = WORKER_METRICS
.v8_detached_contexts
.remove_label_values(database_identity, &worker_kind);
}
}

fn new(database_identity: &Identity, worker_kind: JsWorkerKind) -> Self {
Self {
total_heap_size_bytes: WORKER_METRICS
.v8_total_heap_size_bytes
.with_label_values(database_identity, V8_WORKER_KIND_MAIN),
.with_label_values(database_identity, &worker_kind),
total_physical_size_bytes: WORKER_METRICS
.v8_total_physical_size_bytes
.with_label_values(database_identity, V8_WORKER_KIND_MAIN),
.with_label_values(database_identity, &worker_kind),
used_global_handles_size_bytes: WORKER_METRICS
.v8_used_global_handles_size_bytes
.with_label_values(database_identity, V8_WORKER_KIND_MAIN),
.with_label_values(database_identity, &worker_kind),
used_heap_size_bytes: WORKER_METRICS
.v8_used_heap_size_bytes
.with_label_values(database_identity, V8_WORKER_KIND_MAIN),
.with_label_values(database_identity, &worker_kind),
heap_size_limit_bytes: WORKER_METRICS
.v8_heap_size_limit_bytes
.with_label_values(database_identity, V8_WORKER_KIND_MAIN),
.with_label_values(database_identity, &worker_kind),
external_memory_bytes: WORKER_METRICS
.v8_external_memory_bytes
.with_label_values(database_identity, V8_WORKER_KIND_MAIN),
.with_label_values(database_identity, &worker_kind),
native_contexts: WORKER_METRICS
.v8_native_contexts
.with_label_values(database_identity, V8_WORKER_KIND_MAIN),
.with_label_values(database_identity, &worker_kind),
detached_contexts: WORKER_METRICS
.v8_detached_contexts
.with_label_values(database_identity, V8_WORKER_KIND_MAIN),
.with_label_values(database_identity, &worker_kind),
last_observed: V8HeapSnapshot::default(),
}
}
Expand All @@ -1031,6 +1072,8 @@ impl V8HeapMetrics {
}

fn observe(&mut self, stats: &v8::HeapStatistics) {
// See doc comment on `Self::last_observed` for why we compute a delta and apply it to the metrics value
// rather than directly calling `set`.
let next = V8HeapSnapshot::from_stats(stats);
self.adjust_by(V8HeapSnapshot {
total_heap_size_bytes: next.total_heap_size_bytes - self.last_observed.total_heap_size_bytes,
Expand Down Expand Up @@ -1625,9 +1668,7 @@ where
let info = &module_common.info();
let mut instance_common = InstanceCommon::new(&module_common);
let replica_ctx: &Arc<ReplicaContext> = module_common.replica_ctx();
let mut heap_metrics = worker_kind
.checks_heap()
.then(|| V8HeapMetrics::new(&info.database_identity));
let mut heap_metrics = V8HeapMetrics::new(&info.database_identity, worker_kind);

let mut inst = V8Instance {
scope,
Expand All @@ -1639,9 +1680,7 @@ where
.with_label_values(&info.database_identity),
initial_heap_limit: heap_policy.heap_limit_bytes,
};
if let Some(heap_metrics) = heap_metrics.as_mut() {
let _initial_heap_stats = sample_heap_stats(inst.scope, heap_metrics);
}
let _initial_heap_stats = sample_heap_stats(inst.scope, &mut heap_metrics);

// Process requests to the worker.
//
Expand All @@ -1655,9 +1694,7 @@ where
let mut outcome =
W::handle_request(request, &mut instance_common, &mut inst, &module_common, replica_ctx);

if let WorkerRequestOutcome::Continue = outcome
&& let Some(heap_metrics) = heap_metrics.as_mut()
{
if let WorkerRequestOutcome::Continue = outcome {
let request_check_due = heap_policy.heap_check_request_interval.is_some_and(|interval| {
requests_since_heap_check += 1;
requests_since_heap_check >= interval
Expand All @@ -1669,7 +1706,7 @@ where
requests_since_heap_check = 0;
last_heap_check_at = Instant::now();
if let Some((used, limit)) =
should_retire_worker_for_heap(inst.scope, heap_metrics, heap_policy)
should_retire_worker_for_heap(inst.scope, &mut heap_metrics, heap_policy)
{
outcome = outcome.recreate_instance();
log::warn!(
Expand Down
34 changes: 17 additions & 17 deletions crates/core/src/worker_metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::hash::Hash;
use crate::messages::control_db::HostType;
use crate::subscription::row_list_builder_pool::BsatnRowListBuilderPool;
use crate::{hash::Hash, host::v8::JsWorkerKind};
use once_cell::sync::Lazy;
use prometheus::{GaugeVec, HistogramVec, IntCounterVec, IntGaugeVec};
use spacetimedb_datastore::execution_context::WorkloadType;
Expand Down Expand Up @@ -288,28 +288,28 @@ metrics_group!(
pub wasm_memory_bytes: IntGaugeVec,

#[name = spacetime_worker_v8_total_heap_size_bytes]
#[help = "The total size of the V8 heap for a database's tracked JS worker kind (currently main only)"]
#[labels(database_identity: Identity, worker_kind: str)]
#[help = "The total size of the V8 heap for a database's JS workers"]
#[labels(database_identity: Identity, worker_kind: JsWorkerKind)]
pub v8_total_heap_size_bytes: IntGaugeVec,

#[name = spacetime_worker_v8_total_physical_size_bytes]
#[help = "The total committed physical V8 heap memory for a database's tracked JS worker kind (currently main only)"]
#[labels(database_identity: Identity, worker_kind: str)]
#[help = "The total committed physical V8 heap memory for a database's JS workers"]
#[labels(database_identity: Identity, worker_kind: JsWorkerKind)]
pub v8_total_physical_size_bytes: IntGaugeVec,

#[name = spacetime_worker_v8_used_global_handles_size_bytes]
#[help = "The used size of V8 global handles for a database's tracked JS worker kind (currently main only)"]
#[labels(database_identity: Identity, worker_kind: str)]
#[help = "The used size of V8 global handles for a database's JS workers"]
#[labels(database_identity: Identity, worker_kind: JsWorkerKind)]
pub v8_used_global_handles_size_bytes: IntGaugeVec,

#[name = spacetime_worker_v8_used_heap_size_bytes]
#[help = "The live V8 heap size for a database's tracked JS worker kind (currently main only)"]
#[labels(database_identity: Identity, worker_kind: str)]
#[help = "The live V8 heap size for a database's JS workers"]
#[labels(database_identity: Identity, worker_kind: JsWorkerKind)]
pub v8_used_heap_size_bytes: IntGaugeVec,

#[name = spacetime_worker_v8_heap_size_limit_bytes]
#[help = "The V8 heap size limit for a database's tracked JS worker kind (currently main only)"]
#[labels(database_identity: Identity, worker_kind: str)]
#[help = "The V8 heap size limit for a database's JS workers"]
#[labels(database_identity: Identity, worker_kind: JsWorkerKind)]
pub v8_heap_size_limit_bytes: IntGaugeVec,

#[name = spacetime_worker_v8_heap_limit_hit]
Expand All @@ -318,18 +318,18 @@ metrics_group!(
pub v8_heap_limit_hit: IntCounterVec,

#[name = spacetime_worker_v8_external_memory_bytes]
#[help = "The external memory tracked by V8 for a database's tracked JS worker kind (currently main only)"]
#[labels(database_identity: Identity, worker_kind: str)]
#[help = "The external memory tracked by V8 for a database's JS workers"]
#[labels(database_identity: Identity, worker_kind: JsWorkerKind)]
pub v8_external_memory_bytes: IntGaugeVec,

#[name = spacetime_worker_v8_native_contexts]
#[help = "The number of native V8 contexts for a database's tracked JS worker kind (currently main only)"]
#[labels(database_identity: Identity, worker_kind: str)]
#[help = "The number of native V8 contexts for a database's JS workers"]
#[labels(database_identity: Identity, worker_kind: JsWorkerKind)]
pub v8_native_contexts: IntGaugeVec,

#[name = spacetime_worker_v8_detached_contexts]
#[help = "The number of detached V8 contexts for a database's tracked JS worker kind (currently main only)"]
#[labels(database_identity: Identity, worker_kind: str)]
#[help = "The number of detached V8 contexts for a database's JS workers"]
#[labels(database_identity: Identity, worker_kind: JsWorkerKind)]
pub v8_detached_contexts: IntGaugeVec,

#[name = spacetime_worker_v8_request_queue_length]
Expand Down
Loading