From 675d63769d762a993a4169cfe8d9836865f88912 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 22 Apr 2026 04:13:30 +0900 Subject: [PATCH 1/4] obs(pebble): expose LSM internals as prometheus metrics Add an elastickv_pebble_* metrics family that periodically polls each raft group's *pebble.DB.Metrics() snapshot and mirrors the most operationally useful fields into Prometheus so operators can graph and alert on write-stall precursors without restarting the process for a heap dump. Fields exposed (per-group label): Gauges (overwritten each tick): l0_sublevels - canonical write-stall precursor l0_num_files - L0 fan-in compact_estimated_debt_bytes - compactions falling behind ingest compact_in_progress memtable_count / memtable_size_bytes / memtable_zombie_count block_cache_size_bytes Counters (positive-delta only, reset-safe): compact_count_total block_cache_hits_total / block_cache_misses_total Rationale: L0 sublevels and compaction debt are the incident signals for write stalls; memtable size + zombies catch memtable-related stalls and long-lived iterators; block cache size/hit/miss track working-set fit. Note: pebble v2.1.4 does not expose WriteStallCount / WriteStallDuration on the Metrics struct (only as EventListener callbacks), so those are omitted from this pass. L0 sublevels is the more actionable proxy. Wiring mirrors the existing DispatchCollector shape: - store.pebbleStore.Metrics() accessor (guarded by dbMu so a concurrent Restore/Close cannot race the DB swap) - monitoring.PebbleMetricsSource interface satisfied by *pebbleStore - monitoring.PebbleCollector started from main.go next to the DispatchCollector, on the same 5 s cadence Tests use a fakePebbleSource returning canned *pebble.Metrics and assert both the gauge-overwrite semantics and the reset-safe counter deltas. --- main.go | 27 ++++ monitoring/pebble.go | 276 ++++++++++++++++++++++++++++++++++++++ monitoring/pebble_test.go | 202 ++++++++++++++++++++++++++++ monitoring/registry.go | 14 ++ store/lsm_store.go | 19 +++ 5 files changed, 538 insertions(+) create mode 100644 monitoring/pebble.go create mode 100644 monitoring/pebble_test.go diff --git a/main.go b/main.go index 2e5b43f6..bffafa08 100644 --- a/main.go +++ b/main.go @@ -187,6 +187,9 @@ func run() error { if collector := metricsRegistry.DispatchCollector(); collector != nil { collector.Start(runCtx, dispatchMonitorSources(runtimes), raftMetricsObserveInterval) } + if collector := metricsRegistry.PebbleCollector(); collector != nil { + collector.Start(runCtx, pebbleMonitorSources(runtimes), raftMetricsObserveInterval) + } compactor := kv.NewFSMCompactor( fsmCompactionRuntimes(runtimes), kv.WithFSMCompactorActiveTimestampTracker(readTracker), @@ -447,6 +450,30 @@ func raftMonitorRuntimes(runtimes []*raftGroupRuntime) []monitoring.RaftRuntime return out } +// pebbleMonitorSources extracts the MVCC stores that expose +// *pebble.DB.Metrics() so monitoring can poll LSM internals (L0 +// sublevels, compaction debt, memtable, block cache) for the +// elastickv_pebble_* metrics family. Stores that do not satisfy the +// interface (non-Pebble backends, if any are added later) are skipped +// silently. +func pebbleMonitorSources(runtimes []*raftGroupRuntime) []monitoring.PebbleSource { + out := make([]monitoring.PebbleSource, 0, len(runtimes)) + for _, runtime := range runtimes { + if runtime == nil || runtime.store == nil { + continue + } + src, ok := runtime.store.(monitoring.PebbleMetricsSource) + if !ok { + continue + } + out = append(out, monitoring.PebbleSource{ + GroupID: runtime.spec.id, + Source: src, + }) + } + return out +} + // dispatchMonitorSources extracts the raft engines that expose etcd // dispatch counters so monitoring can poll them for the hot-path // dashboard. Engines that do not satisfy the interface (hashicorp diff --git a/monitoring/pebble.go b/monitoring/pebble.go new file mode 100644 index 00000000..750849c8 --- /dev/null +++ b/monitoring/pebble.go @@ -0,0 +1,276 @@ +package monitoring + +import ( + "context" + "strconv" + "sync" + "time" + + "github.com/cockroachdb/pebble/v2" + "github.com/prometheus/client_golang/prometheus" +) + +// Pebble LSM metrics. These mirror the most operationally useful +// fields from *pebble.DB.Metrics() so operators can graph/alert on +// write-stall signals (L0 sublevels, compaction debt) and capacity +// trends (memtable, block cache) without importing Pebble from every +// dashboard. +// +// The point-in-time fields (Sublevels, NumFiles, EstimatedDebt, +// MemTable.*, NumInProgress, BlockCache.Size/Count) are exposed as +// Prometheus GAUGES — each poll overwrites the previous value. +// Monotonic fields (Compact.Count, BlockCache.Hits/Misses) are exposed +// as COUNTERS; the collector emits only the positive delta against the +// last snapshot so a store reset (Restore/swap) does not produce +// negative values. +// +// Name convention: elastickv_pebble_* to keep a consistent node_id / +// node_address label prefix with the rest of the registry. + +const defaultPebblePollInterval = 5 * time.Second + +// PebbleMetrics owns the Prometheus vectors for Pebble LSM internals. +// One instance per registry; shared by all groups (labelled by group +// ID + level where relevant). +type PebbleMetrics struct { + // L0 pressure: incident signals. + l0Sublevels *prometheus.GaugeVec + l0NumFiles *prometheus.GaugeVec + + // Compaction queue depth / debt. + compactEstimatedDebt *prometheus.GaugeVec + compactInProgress *prometheus.GaugeVec + compactCountTotal *prometheus.CounterVec + + // Memtable footprint. + memtableCount *prometheus.GaugeVec + memtableSizeBytes *prometheus.GaugeVec + memtableZombieCount *prometheus.GaugeVec + + // Block cache. + blockCacheSizeBytes *prometheus.GaugeVec + blockCacheHitsTotal *prometheus.CounterVec + blockCacheMissTotal *prometheus.CounterVec +} + +func newPebbleMetrics(registerer prometheus.Registerer) *PebbleMetrics { + m := &PebbleMetrics{ + l0Sublevels: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "elastickv_pebble_l0_sublevels", + Help: "Current L0 sublevel count reported by Pebble. Climbing sublevels are the canonical precursor to a write stall; alert when this exceeds the L0CompactionThreshold for a sustained period.", + }, + []string{"group"}, + ), + l0NumFiles: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "elastickv_pebble_l0_num_files", + Help: "Current number of sstables in L0 reported by Pebble. Paired with elastickv_pebble_l0_sublevels to diagnose L0 pressure.", + }, + []string{"group"}, + ), + compactEstimatedDebt: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "elastickv_pebble_compact_estimated_debt_bytes", + Help: "Estimated number of bytes Pebble still needs to compact for the LSM to reach a stable state. Growth indicates compactions are falling behind ingest.", + }, + []string{"group"}, + ), + compactInProgress: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "elastickv_pebble_compact_in_progress", + Help: "Number of compactions currently in progress.", + }, + []string{"group"}, + ), + compactCountTotal: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "elastickv_pebble_compact_count_total", + Help: "Cumulative number of compactions completed by Pebble since the process started.", + }, + []string{"group"}, + ), + memtableCount: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "elastickv_pebble_memtable_count", + Help: "Current count of memtables (active + queued for flush).", + }, + []string{"group"}, + ), + memtableSizeBytes: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "elastickv_pebble_memtable_size_bytes", + Help: "Current bytes allocated by memtables and large flushable batches.", + }, + []string{"group"}, + ), + memtableZombieCount: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "elastickv_pebble_memtable_zombie_count", + Help: "Current count of zombie memtables (no longer referenced by the DB but pinned by open iterators).", + }, + []string{"group"}, + ), + blockCacheSizeBytes: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "elastickv_pebble_block_cache_size_bytes", + Help: "Current bytes in use by Pebble's block cache.", + }, + []string{"group"}, + ), + blockCacheHitsTotal: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "elastickv_pebble_block_cache_hits_total", + Help: "Cumulative block cache hits reported by Pebble.", + }, + []string{"group"}, + ), + blockCacheMissTotal: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "elastickv_pebble_block_cache_misses_total", + Help: "Cumulative block cache misses reported by Pebble.", + }, + []string{"group"}, + ), + } + + registerer.MustRegister( + m.l0Sublevels, + m.l0NumFiles, + m.compactEstimatedDebt, + m.compactInProgress, + m.compactCountTotal, + m.memtableCount, + m.memtableSizeBytes, + m.memtableZombieCount, + m.blockCacheSizeBytes, + m.blockCacheHitsTotal, + m.blockCacheMissTotal, + ) + return m +} + +// PebbleMetricsSource abstracts the per-group access to a Pebble DB's +// Metrics(). The concrete *store pebbleStore satisfies this via its +// Metrics() accessor. Returning nil (e.g. store closed mid-restore) is +// allowed; the collector will skip that group for the tick. +type PebbleMetricsSource interface { + Metrics() *pebble.Metrics +} + +// PebbleSource binds a raft group ID to its Pebble store. Multiple +// groups can be polled by a single collector on a sharded node. +type PebbleSource struct { + GroupID uint64 + Source PebbleMetricsSource +} + +// PebbleCollector polls each registered Pebble store on a fixed +// interval and mirrors the snapshot into the Prometheus vectors. +// Gauges are overwritten; counters advance by the positive delta +// against the previous snapshot. +type PebbleCollector struct { + metrics *PebbleMetrics + + mu sync.Mutex + previous map[uint64]pebbleSnapshot +} + +type pebbleSnapshot struct { + compactCount int64 + blockCacheHits int64 + blockCacheMisse int64 +} + +func newPebbleCollector(metrics *PebbleMetrics) *PebbleCollector { + return &PebbleCollector{ + metrics: metrics, + previous: map[uint64]pebbleSnapshot{}, + } +} + +// Start begins polling sources on interval until ctx is canceled. +// Passing interval <= 0 uses defaultPebblePollInterval (5 s), matching +// the DispatchCollector cadence so operators see consistent refresh +// rates across dashboards. Pebble.Metrics() acquires internal mutexes +// but is not expensive; 5 s gives ample headroom. +func (c *PebbleCollector) Start(ctx context.Context, sources []PebbleSource, interval time.Duration) { + if c == nil || c.metrics == nil || len(sources) == 0 { + return + } + if interval <= 0 { + interval = defaultPebblePollInterval + } + c.observeOnce(sources) + ticker := time.NewTicker(interval) + go func() { + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + c.observeOnce(sources) + } + } + }() +} + +// ObserveOnce is exposed for tests and single-shot callers. +func (c *PebbleCollector) ObserveOnce(sources []PebbleSource) { + c.observeOnce(sources) +} + +func (c *PebbleCollector) observeOnce(sources []PebbleSource) { + if c == nil || c.metrics == nil { + return + } + c.mu.Lock() + defer c.mu.Unlock() + for _, src := range sources { + if src.Source == nil { + continue + } + snap := src.Source.Metrics() + if snap == nil { + continue + } + group := strconv.FormatUint(src.GroupID, 10) + + // L0 pressure: gauges, overwritten each tick. + c.metrics.l0Sublevels.WithLabelValues(group).Set(float64(snap.Levels[0].Sublevels)) + c.metrics.l0NumFiles.WithLabelValues(group).Set(float64(snap.Levels[0].TablesCount)) + + // Compaction. + c.metrics.compactEstimatedDebt.WithLabelValues(group).Set(float64(snap.Compact.EstimatedDebt)) + c.metrics.compactInProgress.WithLabelValues(group).Set(float64(snap.Compact.NumInProgress)) + + // Memtable. + c.metrics.memtableCount.WithLabelValues(group).Set(float64(snap.MemTable.Count)) + c.metrics.memtableSizeBytes.WithLabelValues(group).Set(float64(snap.MemTable.Size)) + c.metrics.memtableZombieCount.WithLabelValues(group).Set(float64(snap.MemTable.ZombieCount)) + + // Block cache gauge. + c.metrics.blockCacheSizeBytes.WithLabelValues(group).Set(float64(snap.BlockCache.Size)) + + // Monotonic counters: emit only the positive delta. A smaller + // value means the source was reset (store reopened); rebase + // silently without emitting negative. + prev := c.previous[src.GroupID] + curr := pebbleSnapshot{ + compactCount: snap.Compact.Count, + blockCacheHits: snap.BlockCache.Hits, + blockCacheMisse: snap.BlockCache.Misses, + } + if curr.compactCount > prev.compactCount { + c.metrics.compactCountTotal.WithLabelValues(group).Add(float64(curr.compactCount - prev.compactCount)) + } + if curr.blockCacheHits > prev.blockCacheHits { + c.metrics.blockCacheHitsTotal.WithLabelValues(group).Add(float64(curr.blockCacheHits - prev.blockCacheHits)) + } + if curr.blockCacheMisse > prev.blockCacheMisse { + c.metrics.blockCacheMissTotal.WithLabelValues(group).Add(float64(curr.blockCacheMisse - prev.blockCacheMisse)) + } + c.previous[src.GroupID] = curr + } +} diff --git a/monitoring/pebble_test.go b/monitoring/pebble_test.go new file mode 100644 index 00000000..8b8f757e --- /dev/null +++ b/monitoring/pebble_test.go @@ -0,0 +1,202 @@ +package monitoring + +import ( + "strings" + "sync" + "testing" + + "github.com/cockroachdb/pebble/v2" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" +) + +// fakePebbleSource implements PebbleMetricsSource with canned values +// so tests can exercise the collector without opening a real Pebble +// DB. A nil stored value makes Metrics() return nil (exercising the +// "store closed / mid-swap" skip path). +type fakePebbleSource struct { + mu sync.Mutex + metrics *pebble.Metrics +} + +func (f *fakePebbleSource) set(m *pebble.Metrics) { + f.mu.Lock() + defer f.mu.Unlock() + f.metrics = m +} + +func (f *fakePebbleSource) Metrics() *pebble.Metrics { + f.mu.Lock() + defer f.mu.Unlock() + return f.metrics +} + +// newFakeMetrics builds a *pebble.Metrics populated only with the +// fields the collector reads. Other fields stay at their zero value. +func newFakeMetrics(l0Sub int32, l0Files int64, debt uint64, inProg int64, compactCount int64, + memCount int64, memSize uint64, memZombie int64, + cacheSize int64, hits int64, misses int64, +) *pebble.Metrics { + m := &pebble.Metrics{} + m.Levels[0].Sublevels = l0Sub + m.Levels[0].TablesCount = l0Files + m.Compact.EstimatedDebt = debt + m.Compact.NumInProgress = inProg + m.Compact.Count = compactCount + m.MemTable.Count = memCount + m.MemTable.Size = memSize + m.MemTable.ZombieCount = memZombie + m.BlockCache.Size = cacheSize + m.BlockCache.Hits = hits + m.BlockCache.Misses = misses + return m +} + +func TestPebbleCollectorMirrorsGaugesAndCounters(t *testing.T) { + registry := NewRegistry("n1", "10.0.0.1:50051") + collector := registry.PebbleCollector() + require.NotNil(t, collector) + + src := &fakePebbleSource{} + sources := []PebbleSource{{GroupID: 1, Source: src}} + + // Baseline tick: initial counter values establish the delta + // baseline, gauges reflect the snapshot immediately. + src.set(newFakeMetrics( + 2, 7, 1024, 1, 10, + 3, 2048, 1, + 8192, 100, 20, + )) + collector.ObserveOnce(sources) + + // Advance: gauges change, monotonic counters grow. + src.set(newFakeMetrics( + 5, 12, 4096, 2, 15, + 4, 8192, 2, + 16384, 150, 25, + )) + collector.ObserveOnce(sources) + + // Idempotent second pass with the same snapshot must not + // double-count the counters. + collector.ObserveOnce(sources) + + err := testutil.GatherAndCompare( + registry.Gatherer(), + strings.NewReader(` +# HELP elastickv_pebble_block_cache_hits_total Cumulative block cache hits reported by Pebble. +# TYPE elastickv_pebble_block_cache_hits_total counter +elastickv_pebble_block_cache_hits_total{group="1",node_address="10.0.0.1:50051",node_id="n1"} 150 +# HELP elastickv_pebble_block_cache_misses_total Cumulative block cache misses reported by Pebble. +# TYPE elastickv_pebble_block_cache_misses_total counter +elastickv_pebble_block_cache_misses_total{group="1",node_address="10.0.0.1:50051",node_id="n1"} 25 + +# HELP elastickv_pebble_block_cache_size_bytes Current bytes in use by Pebble's block cache. +# TYPE elastickv_pebble_block_cache_size_bytes gauge +elastickv_pebble_block_cache_size_bytes{group="1",node_address="10.0.0.1:50051",node_id="n1"} 16384 +# HELP elastickv_pebble_compact_count_total Cumulative number of compactions completed by Pebble since the process started. +# TYPE elastickv_pebble_compact_count_total counter +elastickv_pebble_compact_count_total{group="1",node_address="10.0.0.1:50051",node_id="n1"} 15 +# HELP elastickv_pebble_compact_estimated_debt_bytes Estimated number of bytes Pebble still needs to compact for the LSM to reach a stable state. Growth indicates compactions are falling behind ingest. +# TYPE elastickv_pebble_compact_estimated_debt_bytes gauge +elastickv_pebble_compact_estimated_debt_bytes{group="1",node_address="10.0.0.1:50051",node_id="n1"} 4096 +# HELP elastickv_pebble_compact_in_progress Number of compactions currently in progress. +# TYPE elastickv_pebble_compact_in_progress gauge +elastickv_pebble_compact_in_progress{group="1",node_address="10.0.0.1:50051",node_id="n1"} 2 +# HELP elastickv_pebble_l0_num_files Current number of sstables in L0 reported by Pebble. Paired with elastickv_pebble_l0_sublevels to diagnose L0 pressure. +# TYPE elastickv_pebble_l0_num_files gauge +elastickv_pebble_l0_num_files{group="1",node_address="10.0.0.1:50051",node_id="n1"} 12 +# HELP elastickv_pebble_l0_sublevels Current L0 sublevel count reported by Pebble. Climbing sublevels are the canonical precursor to a write stall; alert when this exceeds the L0CompactionThreshold for a sustained period. +# TYPE elastickv_pebble_l0_sublevels gauge +elastickv_pebble_l0_sublevels{group="1",node_address="10.0.0.1:50051",node_id="n1"} 5 +# HELP elastickv_pebble_memtable_count Current count of memtables (active + queued for flush). +# TYPE elastickv_pebble_memtable_count gauge +elastickv_pebble_memtable_count{group="1",node_address="10.0.0.1:50051",node_id="n1"} 4 +# HELP elastickv_pebble_memtable_size_bytes Current bytes allocated by memtables and large flushable batches. +# TYPE elastickv_pebble_memtable_size_bytes gauge +elastickv_pebble_memtable_size_bytes{group="1",node_address="10.0.0.1:50051",node_id="n1"} 8192 +# HELP elastickv_pebble_memtable_zombie_count Current count of zombie memtables (no longer referenced by the DB but pinned by open iterators). +# TYPE elastickv_pebble_memtable_zombie_count gauge +elastickv_pebble_memtable_zombie_count{group="1",node_address="10.0.0.1:50051",node_id="n1"} 2 +`), + "elastickv_pebble_l0_sublevels", + "elastickv_pebble_l0_num_files", + "elastickv_pebble_compact_estimated_debt_bytes", + "elastickv_pebble_compact_in_progress", + "elastickv_pebble_compact_count_total", + "elastickv_pebble_memtable_count", + "elastickv_pebble_memtable_size_bytes", + "elastickv_pebble_memtable_zombie_count", + "elastickv_pebble_block_cache_size_bytes", + "elastickv_pebble_block_cache_hits_total", + "elastickv_pebble_block_cache_misses_total", + ) + require.NoError(t, err) +} + +func TestPebbleCollectorHandlesSourceReset(t *testing.T) { + // If the underlying DB is replaced (Restore reopens it) the + // monotonic counters may go DOWN. The collector must not emit + // negative deltas; instead, it rebases silently. + registry := NewRegistry("n1", "10.0.0.1:50051") + collector := registry.PebbleCollector() + require.NotNil(t, collector) + + src := &fakePebbleSource{} + sources := []PebbleSource{{GroupID: 7, Source: src}} + + src.set(newFakeMetrics(0, 0, 0, 0, 10, 0, 0, 0, 0, 100, 5)) + collector.ObserveOnce(sources) // baseline: 10 compactions, 100 hits + + src.set(newFakeMetrics(0, 0, 0, 0, 3, 0, 0, 0, 0, 20, 5)) // simulated reset + collector.ObserveOnce(sources) + + src.set(newFakeMetrics(0, 0, 0, 0, 5, 0, 0, 0, 0, 30, 5)) // +2 compactions, +10 hits from post-reset baseline + collector.ObserveOnce(sources) + + // Expected: baseline (10 / 100) + 0 + post-reset delta (2 / 10). + err := testutil.GatherAndCompare( + registry.Gatherer(), + strings.NewReader(` +# HELP elastickv_pebble_block_cache_hits_total Cumulative block cache hits reported by Pebble. +# TYPE elastickv_pebble_block_cache_hits_total counter +elastickv_pebble_block_cache_hits_total{group="7",node_address="10.0.0.1:50051",node_id="n1"} 110 +# HELP elastickv_pebble_compact_count_total Cumulative number of compactions completed by Pebble since the process started. +# TYPE elastickv_pebble_compact_count_total counter +elastickv_pebble_compact_count_total{group="7",node_address="10.0.0.1:50051",node_id="n1"} 12 +`), + "elastickv_pebble_compact_count_total", + "elastickv_pebble_block_cache_hits_total", + ) + require.NoError(t, err) +} + +func TestPebbleCollectorSkipsNilSnapshot(t *testing.T) { + // A source that returns nil (store closed mid-restore) must not + // panic and must not populate any series for that group. + registry := NewRegistry("n1", "10.0.0.1:50051") + collector := registry.PebbleCollector() + require.NotNil(t, collector) + + src := &fakePebbleSource{} + sources := []PebbleSource{{GroupID: 9, Source: src}} + + // Both nil-source and nil-snapshot should be safe. + require.NotPanics(t, func() { collector.ObserveOnce(sources) }) + require.NotPanics(t, func() { collector.ObserveOnce([]PebbleSource{{GroupID: 1, Source: nil}}) }) + + // No gauges or counters should exist yet. + require.Equal(t, 0, testutil.CollectAndCount(registry.pebble.l0Sublevels)) + require.Equal(t, 0, testutil.CollectAndCount(registry.pebble.compactCountTotal)) +} + +func TestPebbleCollectorZeroRegistryIsSafe(t *testing.T) { + // Code paths that bypass the registry (tests, bootstrap helpers) + // must tolerate a nil collector / empty sources without panicking. + var c *PebbleCollector + require.NotPanics(t, func() { c.ObserveOnce(nil) }) + + registry := NewRegistry("n1", "10.0.0.1:50051") + collector := registry.PebbleCollector() + require.NotPanics(t, func() { collector.ObserveOnce(nil) }) +} diff --git a/monitoring/registry.go b/monitoring/registry.go index 2145f979..3c8ed09e 100644 --- a/monitoring/registry.go +++ b/monitoring/registry.go @@ -19,6 +19,7 @@ type Registry struct { raft *RaftMetrics lua *LuaMetrics hotPath *HotPathMetrics + pebble *PebbleMetrics } // NewRegistry builds a registry with constant labels that identify the local node. @@ -39,6 +40,7 @@ func NewRegistry(nodeID string, nodeAddress string) *Registry { r.raft = newRaftMetrics(registerer) r.lua = newLuaMetrics(registerer) r.hotPath = newHotPathMetrics(registerer) + r.pebble = newPebbleMetrics(registerer) return r } @@ -131,3 +133,15 @@ func (r *Registry) DispatchCollector() *DispatchCollector { } return newDispatchCollector(r.hotPath) } + +// PebbleCollector returns a collector that polls each Pebble store's +// Metrics() snapshot and mirrors the operationally useful fields +// (L0 sublevels, compaction debt, memtable, block cache) into +// Prometheus. Start it with the node's Pebble sources after the +// stores have been opened. +func (r *Registry) PebbleCollector() *PebbleCollector { + if r == nil || r.pebble == nil { + return nil + } + return newPebbleCollector(r.pebble) +} diff --git a/store/lsm_store.go b/store/lsm_store.go index c593c19d..9a200d59 100644 --- a/store/lsm_store.go +++ b/store/lsm_store.go @@ -1790,3 +1790,22 @@ func (s *pebbleStore) Close() error { defer s.dbMu.Unlock() return errors.WithStack(s.db.Close()) } + +// Metrics returns a snapshot of the underlying Pebble DB's operational +// metrics (LSM shape, compaction debt, memtable, block cache). The +// return value is a freshly allocated *pebble.Metrics owned by the +// caller. +// +// Returns nil while the store is closed or between a Restore/swap (no +// current DB). Callers must handle nil. +// +// Safe for concurrent use: takes the dbMu read lock to protect against +// Restore/Close swapping the DB pointer. +func (s *pebbleStore) Metrics() *pebble.Metrics { + s.dbMu.RLock() + defer s.dbMu.RUnlock() + if s.db == nil { + return nil + } + return s.db.Metrics() +} From b8a8b67e72200a432f3e84f41c533233c554a969 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 22 Apr 2026 05:46:51 +0900 Subject: [PATCH 2/4] fix(pebble-metrics): rename blockCacheMisss -> blockCacheMisses --- monitoring/pebble.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/monitoring/pebble.go b/monitoring/pebble.go index 750849c8..ffe48b40 100644 --- a/monitoring/pebble.go +++ b/monitoring/pebble.go @@ -177,9 +177,9 @@ type PebbleCollector struct { } type pebbleSnapshot struct { - compactCount int64 - blockCacheHits int64 - blockCacheMisse int64 + compactCount int64 + blockCacheHits int64 + blockCacheMisses int64 } func newPebbleCollector(metrics *PebbleMetrics) *PebbleCollector { @@ -258,9 +258,9 @@ func (c *PebbleCollector) observeOnce(sources []PebbleSource) { // silently without emitting negative. prev := c.previous[src.GroupID] curr := pebbleSnapshot{ - compactCount: snap.Compact.Count, - blockCacheHits: snap.BlockCache.Hits, - blockCacheMisse: snap.BlockCache.Misses, + compactCount: snap.Compact.Count, + blockCacheHits: snap.BlockCache.Hits, + blockCacheMisses: snap.BlockCache.Misses, } if curr.compactCount > prev.compactCount { c.metrics.compactCountTotal.WithLabelValues(group).Add(float64(curr.compactCount - prev.compactCount)) @@ -268,8 +268,8 @@ func (c *PebbleCollector) observeOnce(sources []PebbleSource) { if curr.blockCacheHits > prev.blockCacheHits { c.metrics.blockCacheHitsTotal.WithLabelValues(group).Add(float64(curr.blockCacheHits - prev.blockCacheHits)) } - if curr.blockCacheMisse > prev.blockCacheMisse { - c.metrics.blockCacheMissTotal.WithLabelValues(group).Add(float64(curr.blockCacheMisse - prev.blockCacheMisse)) + if curr.blockCacheMisses > prev.blockCacheMisses { + c.metrics.blockCacheMissTotal.WithLabelValues(group).Add(float64(curr.blockCacheMisses - prev.blockCacheMisses)) } c.previous[src.GroupID] = curr } From 2af6f78f560924e9f08cbac86fa28a8ec0d89d70 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 22 Apr 2026 14:11:48 +0900 Subject: [PATCH 3/4] perf(pebble-metrics): precompute GroupIDStr to avoid per-tick alloc Addresses gemini-code-assist medium-priority review on PR #578: the polling loop formatted src.GroupID with strconv.FormatUint on every tick (every 5s per group), causing avoidable allocations. Store the pre-computed decimal string on PebbleSource at construction time and reuse it as the "group" Prometheus label in observeOnce. - monitoring/pebble.go: add PebbleSource.GroupIDStr; consume it in observeOnce; drop now-unused strconv import. - main.go: populate GroupIDStr in pebbleMonitorSources. - monitoring/pebble_test.go: populate GroupIDStr in test fixtures. --- main.go | 6 ++++-- monitoring/pebble.go | 11 +++++++---- monitoring/pebble_test.go | 8 ++++---- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/main.go b/main.go index bffafa08..f2f66e59 100644 --- a/main.go +++ b/main.go @@ -8,6 +8,7 @@ import ( "net/http" "os" "path/filepath" + "strconv" "strings" "sync" "time" @@ -467,8 +468,9 @@ func pebbleMonitorSources(runtimes []*raftGroupRuntime) []monitoring.PebbleSourc continue } out = append(out, monitoring.PebbleSource{ - GroupID: runtime.spec.id, - Source: src, + GroupID: runtime.spec.id, + GroupIDStr: strconv.FormatUint(runtime.spec.id, 10), + Source: src, }) } return out diff --git a/monitoring/pebble.go b/monitoring/pebble.go index ffe48b40..9832b341 100644 --- a/monitoring/pebble.go +++ b/monitoring/pebble.go @@ -2,7 +2,6 @@ package monitoring import ( "context" - "strconv" "sync" "time" @@ -160,9 +159,13 @@ type PebbleMetricsSource interface { // PebbleSource binds a raft group ID to its Pebble store. Multiple // groups can be polled by a single collector on a sharded node. +// GroupIDStr is the pre-formatted decimal form of GroupID used as the +// "group" Prometheus label; pre-computing it avoids a per-tick +// strconv.FormatUint allocation in observeOnce. type PebbleSource struct { - GroupID uint64 - Source PebbleMetricsSource + GroupID uint64 + GroupIDStr string + Source PebbleMetricsSource } // PebbleCollector polls each registered Pebble store on a fixed @@ -235,7 +238,7 @@ func (c *PebbleCollector) observeOnce(sources []PebbleSource) { if snap == nil { continue } - group := strconv.FormatUint(src.GroupID, 10) + group := src.GroupIDStr // L0 pressure: gauges, overwritten each tick. c.metrics.l0Sublevels.WithLabelValues(group).Set(float64(snap.Levels[0].Sublevels)) diff --git a/monitoring/pebble_test.go b/monitoring/pebble_test.go index 8b8f757e..93b05a72 100644 --- a/monitoring/pebble_test.go +++ b/monitoring/pebble_test.go @@ -58,7 +58,7 @@ func TestPebbleCollectorMirrorsGaugesAndCounters(t *testing.T) { require.NotNil(t, collector) src := &fakePebbleSource{} - sources := []PebbleSource{{GroupID: 1, Source: src}} + sources := []PebbleSource{{GroupID: 1, GroupIDStr: "1", Source: src}} // Baseline tick: initial counter values establish the delta // baseline, gauges reflect the snapshot immediately. @@ -143,7 +143,7 @@ func TestPebbleCollectorHandlesSourceReset(t *testing.T) { require.NotNil(t, collector) src := &fakePebbleSource{} - sources := []PebbleSource{{GroupID: 7, Source: src}} + sources := []PebbleSource{{GroupID: 7, GroupIDStr: "7", Source: src}} src.set(newFakeMetrics(0, 0, 0, 0, 10, 0, 0, 0, 0, 100, 5)) collector.ObserveOnce(sources) // baseline: 10 compactions, 100 hits @@ -179,11 +179,11 @@ func TestPebbleCollectorSkipsNilSnapshot(t *testing.T) { require.NotNil(t, collector) src := &fakePebbleSource{} - sources := []PebbleSource{{GroupID: 9, Source: src}} + sources := []PebbleSource{{GroupID: 9, GroupIDStr: "9", Source: src}} // Both nil-source and nil-snapshot should be safe. require.NotPanics(t, func() { collector.ObserveOnce(sources) }) - require.NotPanics(t, func() { collector.ObserveOnce([]PebbleSource{{GroupID: 1, Source: nil}}) }) + require.NotPanics(t, func() { collector.ObserveOnce([]PebbleSource{{GroupID: 1, GroupIDStr: "1", Source: nil}}) }) // No gauges or counters should exist yet. require.Equal(t, 0, testutil.CollectAndCount(registry.pebble.l0Sublevels)) From be3c19b4cfac8b0ea40ebcb7e64ba60a7d9e57c1 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 22 Apr 2026 16:36:06 +0900 Subject: [PATCH 4/4] fix(pebble-metrics): doc accuracy + blockCacheMissesTotal rename - Metrics() docstring now matches actual behavior (nil only pre-Open or after a failed Open; Restore callers block on dbMu). - Drop "Count" from the collector header comment; only BlockCache.Size is exported. - Rename blockCacheMissTotal -> blockCacheMissesTotal for consistency with blockCacheHitsTotal, the snapshot field blockCacheMisses, and the exported metric name (..._misses_total). --- monitoring/pebble.go | 14 +++++++------- store/lsm_store.go | 9 ++++++--- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/monitoring/pebble.go b/monitoring/pebble.go index 9832b341..7f4e3be7 100644 --- a/monitoring/pebble.go +++ b/monitoring/pebble.go @@ -16,7 +16,7 @@ import ( // dashboard. // // The point-in-time fields (Sublevels, NumFiles, EstimatedDebt, -// MemTable.*, NumInProgress, BlockCache.Size/Count) are exposed as +// MemTable.*, NumInProgress, BlockCache.Size) are exposed as // Prometheus GAUGES — each poll overwrites the previous value. // Monotonic fields (Compact.Count, BlockCache.Hits/Misses) are exposed // as COUNTERS; the collector emits only the positive delta against the @@ -47,9 +47,9 @@ type PebbleMetrics struct { memtableZombieCount *prometheus.GaugeVec // Block cache. - blockCacheSizeBytes *prometheus.GaugeVec - blockCacheHitsTotal *prometheus.CounterVec - blockCacheMissTotal *prometheus.CounterVec + blockCacheSizeBytes *prometheus.GaugeVec + blockCacheHitsTotal *prometheus.CounterVec + blockCacheMissesTotal *prometheus.CounterVec } func newPebbleMetrics(registerer prometheus.Registerer) *PebbleMetrics { @@ -124,7 +124,7 @@ func newPebbleMetrics(registerer prometheus.Registerer) *PebbleMetrics { }, []string{"group"}, ), - blockCacheMissTotal: prometheus.NewCounterVec( + blockCacheMissesTotal: prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "elastickv_pebble_block_cache_misses_total", Help: "Cumulative block cache misses reported by Pebble.", @@ -144,7 +144,7 @@ func newPebbleMetrics(registerer prometheus.Registerer) *PebbleMetrics { m.memtableZombieCount, m.blockCacheSizeBytes, m.blockCacheHitsTotal, - m.blockCacheMissTotal, + m.blockCacheMissesTotal, ) return m } @@ -272,7 +272,7 @@ func (c *PebbleCollector) observeOnce(sources []PebbleSource) { c.metrics.blockCacheHitsTotal.WithLabelValues(group).Add(float64(curr.blockCacheHits - prev.blockCacheHits)) } if curr.blockCacheMisses > prev.blockCacheMisses { - c.metrics.blockCacheMissTotal.WithLabelValues(group).Add(float64(curr.blockCacheMisses - prev.blockCacheMisses)) + c.metrics.blockCacheMissesTotal.WithLabelValues(group).Add(float64(curr.blockCacheMisses - prev.blockCacheMisses)) } c.previous[src.GroupID] = curr } diff --git a/store/lsm_store.go b/store/lsm_store.go index 9a200d59..4572d980 100644 --- a/store/lsm_store.go +++ b/store/lsm_store.go @@ -1796,11 +1796,14 @@ func (s *pebbleStore) Close() error { // return value is a freshly allocated *pebble.Metrics owned by the // caller. // -// Returns nil while the store is closed or between a Restore/swap (no -// current DB). Callers must handle nil. +// Returns nil only before the first Open has installed a DB or after a +// failed Open left s.db unset; callers during an in-flight Restore block +// on dbMu (which Restore holds exclusively) rather than observing nil, +// and Close() does not clear s.db. Callers must still handle nil for the +// pre-Open case. // // Safe for concurrent use: takes the dbMu read lock to protect against -// Restore/Close swapping the DB pointer. +// Restore swapping the DB pointer. func (s *pebbleStore) Metrics() *pebble.Metrics { s.dbMu.RLock() defer s.dbMu.RUnlock()