Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -187,6 +188,9 @@ func run() error {
if collector := metricsRegistry.DispatchCollector(); collector != nil {
collector.Start(runCtx, dispatchMonitorSources(runtimes), raftMetricsObserveInterval)
}
if collector := metricsRegistry.PebbleCollector(); collector != nil {
collector.Start(runCtx, pebbleMonitorSources(runtimes), raftMetricsObserveInterval)
}
compactor := kv.NewFSMCompactor(
fsmCompactionRuntimes(runtimes),
kv.WithFSMCompactorActiveTimestampTracker(readTracker),
Expand Down Expand Up @@ -447,6 +451,31 @@ func raftMonitorRuntimes(runtimes []*raftGroupRuntime) []monitoring.RaftRuntime
return out
}

// pebbleMonitorSources extracts the MVCC stores that expose
// *pebble.DB.Metrics() so monitoring can poll LSM internals (L0
// sublevels, compaction debt, memtable, block cache) for the
// elastickv_pebble_* metrics family. Stores that do not satisfy the
// interface (non-Pebble backends, if any are added later) are skipped
// silently.
func pebbleMonitorSources(runtimes []*raftGroupRuntime) []monitoring.PebbleSource {
out := make([]monitoring.PebbleSource, 0, len(runtimes))
for _, runtime := range runtimes {
if runtime == nil || runtime.store == nil {
continue
}
src, ok := runtime.store.(monitoring.PebbleMetricsSource)
if !ok {
continue
}
out = append(out, monitoring.PebbleSource{
GroupID: runtime.spec.id,
GroupIDStr: strconv.FormatUint(runtime.spec.id, 10),
Source: src,
})
Comment on lines +470 to +474
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If the PebbleSource struct is updated to include a pre-calculated GroupIDStr, populate it here during the extraction loop.

out = append(out, monitoring.PebbleSource{
	GroupID:    runtime.spec.id,
	GroupIDStr: strconv.FormatUint(runtime.spec.id, 10),
	Source:     src,
})

}
return out
}

// dispatchMonitorSources extracts the raft engines that expose etcd
// dispatch counters so monitoring can poll them for the hot-path
// dashboard. Engines that do not satisfy the interface (hashicorp
Expand Down
279 changes: 279 additions & 0 deletions monitoring/pebble.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
package monitoring

import (
"context"
"sync"
"time"

"github.com/cockroachdb/pebble/v2"
"github.com/prometheus/client_golang/prometheus"
)

// Pebble LSM metrics. These mirror the most operationally useful
// fields from *pebble.DB.Metrics() so operators can graph/alert on
// write-stall signals (L0 sublevels, compaction debt) and capacity
// trends (memtable, block cache) without importing Pebble from every
// dashboard.
//
// The point-in-time fields (Sublevels, NumFiles, EstimatedDebt,
// MemTable.*, NumInProgress, BlockCache.Size) are exposed as
// Prometheus GAUGES — each poll overwrites the previous value.
// Monotonic fields (Compact.Count, BlockCache.Hits/Misses) are exposed
// as COUNTERS; the collector emits only the positive delta against the
// last snapshot so a store reset (Restore/swap) does not produce
// negative values.
//
// Name convention: elastickv_pebble_* to keep a consistent node_id /
// node_address label prefix with the rest of the registry.

const defaultPebblePollInterval = 5 * time.Second

// PebbleMetrics owns the Prometheus vectors for Pebble LSM internals.
// One instance per registry; shared by all groups (labelled by group
// ID + level where relevant).
type PebbleMetrics struct {
// L0 pressure: incident signals.
l0Sublevels *prometheus.GaugeVec
l0NumFiles *prometheus.GaugeVec

// Compaction queue depth / debt.
compactEstimatedDebt *prometheus.GaugeVec
compactInProgress *prometheus.GaugeVec
compactCountTotal *prometheus.CounterVec

// Memtable footprint.
memtableCount *prometheus.GaugeVec
memtableSizeBytes *prometheus.GaugeVec
memtableZombieCount *prometheus.GaugeVec

// Block cache.
blockCacheSizeBytes *prometheus.GaugeVec
blockCacheHitsTotal *prometheus.CounterVec
blockCacheMissesTotal *prometheus.CounterVec
}

func newPebbleMetrics(registerer prometheus.Registerer) *PebbleMetrics {
m := &PebbleMetrics{
l0Sublevels: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "elastickv_pebble_l0_sublevels",
Help: "Current L0 sublevel count reported by Pebble. Climbing sublevels are the canonical precursor to a write stall; alert when this exceeds the L0CompactionThreshold for a sustained period.",
},
[]string{"group"},
),
l0NumFiles: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "elastickv_pebble_l0_num_files",
Help: "Current number of sstables in L0 reported by Pebble. Paired with elastickv_pebble_l0_sublevels to diagnose L0 pressure.",
},
[]string{"group"},
),
compactEstimatedDebt: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "elastickv_pebble_compact_estimated_debt_bytes",
Help: "Estimated number of bytes Pebble still needs to compact for the LSM to reach a stable state. Growth indicates compactions are falling behind ingest.",
},
[]string{"group"},
),
compactInProgress: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "elastickv_pebble_compact_in_progress",
Help: "Number of compactions currently in progress.",
},
[]string{"group"},
),
compactCountTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "elastickv_pebble_compact_count_total",
Help: "Cumulative number of compactions completed by Pebble since the process started.",
},
[]string{"group"},
),
memtableCount: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "elastickv_pebble_memtable_count",
Help: "Current count of memtables (active + queued for flush).",
},
[]string{"group"},
),
memtableSizeBytes: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "elastickv_pebble_memtable_size_bytes",
Help: "Current bytes allocated by memtables and large flushable batches.",
},
[]string{"group"},
),
memtableZombieCount: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "elastickv_pebble_memtable_zombie_count",
Help: "Current count of zombie memtables (no longer referenced by the DB but pinned by open iterators).",
},
[]string{"group"},
),
blockCacheSizeBytes: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "elastickv_pebble_block_cache_size_bytes",
Help: "Current bytes in use by Pebble's block cache.",
},
[]string{"group"},
),
blockCacheHitsTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "elastickv_pebble_block_cache_hits_total",
Help: "Cumulative block cache hits reported by Pebble.",
},
[]string{"group"},
),
blockCacheMissesTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "elastickv_pebble_block_cache_misses_total",
Help: "Cumulative block cache misses reported by Pebble.",
},
[]string{"group"},
),
}

registerer.MustRegister(
m.l0Sublevels,
m.l0NumFiles,
m.compactEstimatedDebt,
m.compactInProgress,
m.compactCountTotal,
m.memtableCount,
m.memtableSizeBytes,
m.memtableZombieCount,
m.blockCacheSizeBytes,
m.blockCacheHitsTotal,
m.blockCacheMissesTotal,
)
return m
}

// PebbleMetricsSource abstracts the per-group access to a Pebble DB's
// Metrics(). The concrete *store pebbleStore satisfies this via its
// Metrics() accessor. Returning nil (e.g. store closed mid-restore) is
// allowed; the collector will skip that group for the tick.
type PebbleMetricsSource interface {
Metrics() *pebble.Metrics
}

// PebbleSource binds a raft group ID to its Pebble store. Multiple
// groups can be polled by a single collector on a sharded node.
// GroupIDStr is the pre-formatted decimal form of GroupID used as the
// "group" Prometheus label; pre-computing it avoids a per-tick
// strconv.FormatUint allocation in observeOnce.
type PebbleSource struct {
GroupID uint64
GroupIDStr string
Source PebbleMetricsSource
}
Comment on lines +165 to +169
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To avoid repeated allocations during the polling loop, consider pre-calculating the string representation of the GroupID and storing it in the PebbleSource struct. strconv.FormatUint is called for every group on every 5-second tick.

Suggested change
type PebbleSource struct {
GroupID uint64
Source PebbleMetricsSource
}
type PebbleSource struct {
GroupID uint64
GroupIDStr string
Source PebbleMetricsSource
}


// PebbleCollector polls each registered Pebble store on a fixed
// interval and mirrors the snapshot into the Prometheus vectors.
// Gauges are overwritten; counters advance by the positive delta
// against the previous snapshot.
type PebbleCollector struct {
metrics *PebbleMetrics

mu sync.Mutex
previous map[uint64]pebbleSnapshot
}

type pebbleSnapshot struct {
compactCount int64
blockCacheHits int64
blockCacheMisses int64
}

func newPebbleCollector(metrics *PebbleMetrics) *PebbleCollector {
return &PebbleCollector{
metrics: metrics,
previous: map[uint64]pebbleSnapshot{},
}
}

// Start begins polling sources on interval until ctx is canceled.
// Passing interval <= 0 uses defaultPebblePollInterval (5 s), matching
// the DispatchCollector cadence so operators see consistent refresh
// rates across dashboards. Pebble.Metrics() acquires internal mutexes
// but is not expensive; 5 s gives ample headroom.
func (c *PebbleCollector) Start(ctx context.Context, sources []PebbleSource, interval time.Duration) {
if c == nil || c.metrics == nil || len(sources) == 0 {
return
}
if interval <= 0 {
interval = defaultPebblePollInterval
}
c.observeOnce(sources)
ticker := time.NewTicker(interval)
go func() {
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
c.observeOnce(sources)
}
}
}()
}

// ObserveOnce is exposed for tests and single-shot callers.
func (c *PebbleCollector) ObserveOnce(sources []PebbleSource) {
c.observeOnce(sources)
}

func (c *PebbleCollector) observeOnce(sources []PebbleSource) {
if c == nil || c.metrics == nil {
return
}
c.mu.Lock()
defer c.mu.Unlock()
Comment on lines +231 to +232
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Holding the PebbleCollector mutex across the entire loop is problematic because src.Source.Metrics() can block. In pebbleStore, Metrics() acquires dbMu.RLock(). If a raft group is performing a long-running operation that holds dbMu exclusively (like a Restore or a large Compact), this collector will block on that group while holding c.mu. This prevents metrics from being updated for all other raft groups on the same node, leading to a total loss of Pebble observability during critical recovery operations. Per repository guidelines, ensure that the blocking semantic of Metrics() is explicitly stated in its interface documentation. Additionally, when performing updates to the previous state, the lock must be held continuously from the read to the assignment to prevent race conditions.

func (c *PebbleCollector) observeOnce(sources []PebbleSource) {
	if c == nil || c.metrics == nil {
		return
	}
	for _, src := range sources {
		if src.Source == nil {
			continue
		}
		snap := src.Source.Metrics()
		if snap == nil {
			continue
		}
		group := src.GroupIDStr

		// L0 pressure: gauges, overwritten each tick.
		c.metrics.l0Sublevels.WithLabelValues(group).Set(float64(snap.Levels[0].Sublevels))
		c.metrics.l0NumFiles.WithLabelValues(group).Set(float64(snap.Levels[0].TablesCount))

		// Compaction.
		c.metrics.compactEstimatedDebt.WithLabelValues(group).Set(float64(snap.Compact.EstimatedDebt))
		c.metrics.compactInProgress.WithLabelValues(group).Set(float64(snap.Compact.NumInProgress))

		// Memtable.
		c.metrics.memtableCount.WithLabelValues(group).Set(float64(snap.MemTable.Count))
		c.metrics.memtableSizeBytes.WithLabelValues(group).Set(float64(snap.MemTable.Size))
		c.metrics.memtableZombieCount.WithLabelValues(group).Set(float64(snap.MemTable.ZombieCount))

		// Block cache gauge.
		c.metrics.blockCacheSizeBytes.WithLabelValues(group).Set(float64(snap.BlockCache.Size))

		// Monotonic counters: emit only the positive delta. A smaller
		// value means the source was reset (store reopened); rebase
		// silently without emitting negative.
		c.mu.Lock()
		prev := c.previous[src.GroupID]
		curr := pebbleSnapshot{
			compactCount:     snap.Compact.Count,
			blockCacheHits:   snap.BlockCache.Hits,
			blockCacheMisses: snap.BlockCache.Misses,
		}
		if curr.compactCount > prev.compactCount {
			c.metrics.compactCountTotal.WithLabelValues(group).Add(float64(curr.compactCount - prev.compactCount))
		}
		if curr.blockCacheHits > prev.blockCacheHits {
			c.metrics.blockCacheHitsTotal.WithLabelValues(group).Add(float64(curr.blockCacheHits - prev.blockCacheHits))
		}
		if curr.blockCacheMisses > prev.blockCacheMisses {
			c.metrics.blockCacheMissesTotal.WithLabelValues(group).Add(float64(curr.blockCacheMisses - prev.blockCacheMisses))
		}
		c.previous[src.GroupID] = curr
		c.mu.Unlock()
	}
}
References
  1. When an interface method is intended to block until a certain condition is met, explicitly state this blocking semantic in the documentation or interface description to avoid ambiguity for implementers.
  2. When performing atomic operations based on a version or state variable, do not release the lock between reading the variable and performing the conditional check to prevent race conditions and the use of stale data.

for _, src := range sources {
if src.Source == nil {
continue
}
snap := src.Source.Metrics()
if snap == nil {
continue
}
group := src.GroupIDStr

// L0 pressure: gauges, overwritten each tick.
c.metrics.l0Sublevels.WithLabelValues(group).Set(float64(snap.Levels[0].Sublevels))
c.metrics.l0NumFiles.WithLabelValues(group).Set(float64(snap.Levels[0].TablesCount))

// Compaction.
c.metrics.compactEstimatedDebt.WithLabelValues(group).Set(float64(snap.Compact.EstimatedDebt))
c.metrics.compactInProgress.WithLabelValues(group).Set(float64(snap.Compact.NumInProgress))

// Memtable.
c.metrics.memtableCount.WithLabelValues(group).Set(float64(snap.MemTable.Count))
c.metrics.memtableSizeBytes.WithLabelValues(group).Set(float64(snap.MemTable.Size))
c.metrics.memtableZombieCount.WithLabelValues(group).Set(float64(snap.MemTable.ZombieCount))

// Block cache gauge.
c.metrics.blockCacheSizeBytes.WithLabelValues(group).Set(float64(snap.BlockCache.Size))

// Monotonic counters: emit only the positive delta. A smaller
// value means the source was reset (store reopened); rebase
// silently without emitting negative.
prev := c.previous[src.GroupID]
curr := pebbleSnapshot{
compactCount: snap.Compact.Count,
blockCacheHits: snap.BlockCache.Hits,
blockCacheMisses: snap.BlockCache.Misses,
}
if curr.compactCount > prev.compactCount {
c.metrics.compactCountTotal.WithLabelValues(group).Add(float64(curr.compactCount - prev.compactCount))
}
if curr.blockCacheHits > prev.blockCacheHits {
c.metrics.blockCacheHitsTotal.WithLabelValues(group).Add(float64(curr.blockCacheHits - prev.blockCacheHits))
}
if curr.blockCacheMisses > prev.blockCacheMisses {
c.metrics.blockCacheMissesTotal.WithLabelValues(group).Add(float64(curr.blockCacheMisses - prev.blockCacheMisses))
}
c.previous[src.GroupID] = curr
}
}
Loading
Loading