Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,15 @@ func run() error {
return err
}

// Record the active FSM apply sync mode so operators can see on the
// /metrics endpoint which durability posture this node is running in.
// The label is resolved per-pebbleStore from ELASTICKV_FSM_SYNC_MODE
// in NewPebbleStore; read it off the first constructed store (all
// shards share the same env and therefore the same label).
if label := fsmApplySyncModeLabelFromRuntimes(runtimes); label != "" {
metricsRegistry.SetFSMApplySyncMode(label)
}

cleanup := internalutil.CleanupStack{}
defer cleanup.Run()

Expand Down Expand Up @@ -430,6 +439,35 @@ func raftMonitorRuntimes(runtimes []*raftGroupRuntime) []monitoring.RaftRuntime
return out
}

// fsmApplySyncModeLabeler narrows an MVCCStore to those implementations
// that can report the resolved ELASTICKV_FSM_SYNC_MODE label. The
// pebble-backed store satisfies this today; alternate backends (none
// yet) would either implement it or be skipped.
type fsmApplySyncModeLabeler interface {
FSMApplySyncModeLabel() string
}

// fsmApplySyncModeLabelFromRuntimes returns the FSM apply sync-mode
// label resolved by the first shard store that exposes it. All shards
// on a node read the same ELASTICKV_FSM_SYNC_MODE env var at
// construction time so the label is uniform across the runtimes;
// returning the first one suffices. Returns "" when no runtime
// exposes the accessor, in which case the caller skips emitting the
// gauge to avoid publishing a misleading default.
func fsmApplySyncModeLabelFromRuntimes(runtimes []*raftGroupRuntime) string {
for _, runtime := range runtimes {
if runtime == nil || runtime.store == nil {
continue
}
src, ok := runtime.store.(fsmApplySyncModeLabeler)
if !ok {
continue
}
return src.FSMApplySyncModeLabel()
}
return ""
}

// pebbleMonitorSources extracts the MVCC stores that expose
// *pebble.DB.Metrics() so monitoring can poll LSM internals (L0
// sublevels, compaction debt, memtable, block cache) for the
Expand Down
43 changes: 43 additions & 0 deletions monitoring/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ type PebbleMetrics struct {
blockCacheCapacityBytes *prometheus.GaugeVec
blockCacheHitsTotal *prometheus.CounterVec
blockCacheMissesTotal *prometheus.CounterVec

// FSM apply sync mode. Resolved once from ELASTICKV_FSM_SYNC_MODE at
// process start (see store/lsm_store.go). The label-scoped gauge is
// set to 1 for the active mode (either "sync" or "nosync") and 0 for
// the other, so dashboards can alert on unexpected posture changes
// (e.g. a rolling deploy that accidentally drops durability).
fsmApplySyncMode *prometheus.GaugeVec
}

func newPebbleMetrics(registerer prometheus.Registerer) *PebbleMetrics {
Expand Down Expand Up @@ -139,6 +146,13 @@ func newPebbleMetrics(registerer prometheus.Registerer) *PebbleMetrics {
},
[]string{"group"},
),
fsmApplySyncMode: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "elastickv_fsm_apply_sync_mode",
Help: "Active ELASTICKV_FSM_SYNC_MODE on this node. Gauge is 1 for the active mode and 0 for the other. \"sync\" means every FSM apply issues a Pebble fsync; \"nosync\" relies on raft-log replay for crash recovery of the FSM state.",
},
[]string{"mode"},
),
}

registerer.MustRegister(
Expand All @@ -154,10 +168,39 @@ func newPebbleMetrics(registerer prometheus.Registerer) *PebbleMetrics {
m.blockCacheCapacityBytes,
m.blockCacheHitsTotal,
m.blockCacheMissesTotal,
m.fsmApplySyncMode,
)
return m
}

// SetFSMApplySyncMode records which ELASTICKV_FSM_SYNC_MODE is active.
// activeLabel must be "sync" or "nosync"; any other value is coerced to
// "sync" to match the store resolver's fallback behaviour for unknown
// ELASTICKV_FSM_SYNC_MODE values (see store.resolveFSMApplyWriteOpts).
// This keeps the gauge's two-row shape stable: exactly one of
// {"sync","nosync"} is 1 at any time and the other is 0.
//
// Call this once at startup after the store package has resolved the
// env var. Invoking again is safe and idempotent: the new label goes to
// 1 and the other known label goes to 0.
func (m *PebbleMetrics) SetFSMApplySyncMode(activeLabel string) {
if m == nil || m.fsmApplySyncMode == nil {
return
}
// Coerce unknown labels to "sync" so the gauge never leaks a third
// row and so a prior stale label cannot stay pinned at 1. This
// mirrors store.resolveFSMApplyWriteOpts, which also falls back to
// sync on unrecognised input.
if activeLabel != "sync" && activeLabel != "nosync" {
activeLabel = "sync"
}
// Zero both known labels before setting the active one so the gauge
// has a stable two-row shape regardless of call ordering.
m.fsmApplySyncMode.WithLabelValues("sync").Set(0)
m.fsmApplySyncMode.WithLabelValues("nosync").Set(0)
m.fsmApplySyncMode.WithLabelValues(activeLabel).Set(1)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

// PebbleMetricsSource abstracts the per-group access to a Pebble DB's
// Metrics(). The concrete *store pebbleStore satisfies this via its
// Metrics() accessor. Returning nil (e.g. store closed mid-restore) is
Expand Down
73 changes: 73 additions & 0 deletions monitoring/pebble_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,3 +265,76 @@ func TestPebbleCollectorZeroRegistryIsSafe(t *testing.T) {
collector := registry.PebbleCollector()
require.NotPanics(t, func() { collector.ObserveOnce(nil) })
}

// TestSetFSMApplySyncMode_LabelsAreMutuallyExclusive verifies that
// calling SetFSMApplySyncMode("nosync") drives the sync label to 0
// and the nosync label to 1 (and vice versa). Operators rely on this
// gauge to alert on unexpected durability posture changes, so the
// two-row shape must stay stable across successive calls.
func TestSetFSMApplySyncMode_LabelsAreMutuallyExclusive(t *testing.T) {
registry := NewRegistry("n1", "10.0.0.1:50051")

registry.SetFSMApplySyncMode("sync")
require.InEpsilon(t,
float64(1),
testutil.ToFloat64(registry.pebble.fsmApplySyncMode.WithLabelValues("sync")),
0.0,
)
require.InDelta(t,
float64(0),
testutil.ToFloat64(registry.pebble.fsmApplySyncMode.WithLabelValues("nosync")),
0.0,
)

registry.SetFSMApplySyncMode("nosync")
require.InDelta(t,
float64(0),
testutil.ToFloat64(registry.pebble.fsmApplySyncMode.WithLabelValues("sync")),
0.0,
)
require.InEpsilon(t,
float64(1),
testutil.ToFloat64(registry.pebble.fsmApplySyncMode.WithLabelValues("nosync")),
0.0,
)
}

// TestSetFSMApplySyncMode_NilRegistryIsSafe matches the pattern of
// other monitoring helpers: bootstrap paths that construct an engine
// without a registry (tests, the redis-proxy binary) must not panic.
func TestSetFSMApplySyncMode_NilRegistryIsSafe(t *testing.T) {
var r *Registry
require.NotPanics(t, func() { r.SetFSMApplySyncMode("sync") })
}

// TestSetFSMApplySyncMode_UnknownLabelCoercesToSync verifies that an
// unrecognised label is coerced to "sync" rather than pinning a third
// row on the gauge. This mirrors store.resolveFSMApplyWriteOpts, which
// also falls back to sync on unknown input; the two paths must agree
// or the gauge will disagree with the actual WriteOptions the store
// uses.
func TestSetFSMApplySyncMode_UnknownLabelCoercesToSync(t *testing.T) {
registry := NewRegistry("n1", "10.0.0.1:50051")

// Prime the gauge with a legitimate nosync posture so we can prove
// the follow-up unknown-label call flips sync to 1 (not leaves it
// at its prior value).
registry.SetFSMApplySyncMode("nosync")
require.InEpsilon(t,
float64(1),
testutil.ToFloat64(registry.pebble.fsmApplySyncMode.WithLabelValues("nosync")),
0.0,
)

registry.SetFSMApplySyncMode("batch") // never implemented, treated as sync
require.InEpsilon(t,
float64(1),
testutil.ToFloat64(registry.pebble.fsmApplySyncMode.WithLabelValues("sync")),
0.0,
)
require.InDelta(t,
float64(0),
testutil.ToFloat64(registry.pebble.fsmApplySyncMode.WithLabelValues("nosync")),
0.0,
)
}
10 changes: 10 additions & 0 deletions monitoring/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,16 @@ func (r *Registry) PebbleCollector() *PebbleCollector {
return newPebbleCollector(r.pebble)
}

// SetFSMApplySyncMode forwards the resolved ELASTICKV_FSM_SYNC_MODE
// label to the PebbleMetrics gauge so operators can observe the active
// durability posture on this node. Safe to call with a nil registry.
func (r *Registry) SetFSMApplySyncMode(activeLabel string) {
if r == nil || r.pebble == nil {
return
}
r.pebble.SetFSMApplySyncMode(activeLabel)
}

// WriteConflictCollector returns a collector that polls each MVCC
// store's per-(kind, key_prefix) OCC conflict counters and mirrors
// them into the elastickv_store_write_conflict_total Prometheus
Expand Down
79 changes: 76 additions & 3 deletions store/lsm_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"os"
"path/filepath"
"strconv"
"strings"
"sync"

"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -61,6 +62,31 @@ const (
// mebibyteShift converts MiB to bytes via x << mebibyteShift. Named to
// avoid a magic-number lint violation on the shift amount.
mebibyteShift = 20

// fsmSyncModeEnv selects the Pebble WriteOptions used on the FSM
// commit path (ApplyMutations, DeletePrefixAt). Values:
//
// "sync" (default) — b.Commit(pebble.Sync); every committed raft
// entry triggers an fsync on the Pebble WAL.
// Strongest local durability; slowest.
// "nosync" — b.Commit(pebble.NoSync); the Pebble WAL
// still records the write, but is not fsynced.
// Durability still holds because the raft WAL
// (etcd/raft) fsyncs the committed entry
// upstream, and on restart the raft log is
// replayed from the last FSM-snapshot index;
// any apply that did not reach Pebble's
// fsync'd region is re-applied.
//
// The default is "sync" so production behaviour is unchanged without
// an explicit opt-in. See docs/fsm_sync_mode.md (or the PR body) for
// the full durability argument.
fsmSyncModeEnv = "ELASTICKV_FSM_SYNC_MODE"

// fsmSyncModeSync / fsmSyncModeNoSync are the accepted values for
// fsmSyncModeEnv. Any other value falls back to the default.
fsmSyncModeSync = "sync"
fsmSyncModeNoSync = "nosync"
)

// pebbleCacheBytes is the effective per-store Pebble block-cache capacity,
Expand All @@ -78,6 +104,23 @@ func init() {
pebbleCacheBytes = resolvePebbleCacheBytes(os.Getenv(pebbleCacheMBEnv))
}

// resolveFSMApplyWriteOpts parses an ELASTICKV_FSM_SYNC_MODE value and
// returns both the *pebble.WriteOptions used on the FSM commit path and
// the canonical label name. Case is normalised. Empty, malformed, or
// unrecognised values fall back to the default ("sync").
//
// Exported via package-internal calls only; tests use it directly.
func resolveFSMApplyWriteOpts(envVal string) (*pebble.WriteOptions, string) {
switch strings.ToLower(strings.TrimSpace(envVal)) {
case fsmSyncModeNoSync:
return pebble.NoSync, fsmSyncModeNoSync
case "", fsmSyncModeSync:
return pebble.Sync, fsmSyncModeSync
default:
return pebble.Sync, fsmSyncModeSync
}
Comment on lines +114 to +121
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The switch statement can be simplified by merging the redundant cases that fall back to the default sync behavior. This improves readability and maintainability by removing unnecessary code paths.

func resolveFSMApplyWriteOpts(envVal string) (*pebble.WriteOptions, string) {
	switch strings.ToLower(strings.TrimSpace(envVal)) {
	case fsmSyncModeNoSync:
		return pebble.NoSync, fsmSyncModeNoSync
	default:
		return pebble.Sync, fsmSyncModeSync
	}
}
References
  1. Remove dead or unreachable code paths to improve code clarity and maintainability.

}

// resolvePebbleCacheBytes parses an ELASTICKV_PEBBLE_CACHE_MB value and
// returns the resolved cache size in bytes. Empty, malformed, or
// out-of-range values are rejected and fall back to the default rather
Expand Down Expand Up @@ -127,6 +170,18 @@ type pebbleStore struct {
// detected inside ApplyMutations. Polled by the monitoring
// WriteConflictCollector; not part of the authoritative OCC path.
writeConflicts *writeConflictCounter
// fsmApplyWriteOpts is the Pebble WriteOptions value applied on the
// FSM commit path (ApplyMutations, DeletePrefixAt). Resolved once
// from ELASTICKV_FSM_SYNC_MODE in NewPebbleStore and then treated
// as read-only for the store's lifetime. The default is pebble.Sync;
// operators may opt into pebble.NoSync when the raft WAL's
// durability is considered sufficient.
fsmApplyWriteOpts *pebble.WriteOptions
// fsmApplySyncModeLabel is the human-readable label corresponding
// to fsmApplyWriteOpts ("sync" or "nosync"). Kept alongside the
// write-options pointer so monitoring (elastickv_fsm_apply_sync_mode)
// and log lines stay in sync with the resolved mode.
fsmApplySyncModeLabel string
}

// Ensure pebbleStore implements MVCCStore and RetentionController.
Expand Down Expand Up @@ -173,12 +228,15 @@ func defaultPebbleOptionsWithCache() (*pebble.Options, *pebble.Cache) {

// NewPebbleStore creates a new Pebble-backed MVCC store.
func NewPebbleStore(dir string, opts ...PebbleStoreOption) (MVCCStore, error) {
fsmOpts, fsmLabel := resolveFSMApplyWriteOpts(os.Getenv(fsmSyncModeEnv))
s := &pebbleStore{
dir: dir,
log: slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelWarn,
})),
writeConflicts: newWriteConflictCounter(),
writeConflicts: newWriteConflictCounter(),
fsmApplyWriteOpts: fsmOpts,
fsmApplySyncModeLabel: fsmLabel,
}
for _, opt := range opts {
opt(s)
Expand Down Expand Up @@ -424,6 +482,15 @@ func (s *pebbleStore) LastCommitTS() uint64 {
return s.lastCommitTS
}

// FSMApplySyncModeLabel returns the resolved FSM sync-mode label
// ("sync" or "nosync") for this store. Consumed by monitoring to
// surface the current durability posture as a gauge with a mode label.
// The value is fixed for the store's lifetime (resolved once from
// ELASTICKV_FSM_SYNC_MODE in NewPebbleStore) so no locking is needed.
func (s *pebbleStore) FSMApplySyncModeLabel() string {
return s.fsmApplySyncModeLabel
}

func (s *pebbleStore) MinRetainedTS() uint64 {
s.mtx.RLock()
defer s.mtx.RUnlock()
Expand Down Expand Up @@ -1053,7 +1120,11 @@ func (s *pebbleStore) ApplyMutations(ctx context.Context, mutations []*KVPairMut
s.mtx.Unlock()
return err
}
if err := b.Commit(pebble.Sync); err != nil {
// s.fsmApplyWriteOpts is Sync by default. Operators may opt in to NoSync
// via ELASTICKV_FSM_SYNC_MODE=nosync when the raft WAL's durability is
// considered sufficient (raft-log replay from the last FSM snapshot
// re-applies any entries lost from Pebble after a crash).
if err := b.Commit(s.fsmApplyWriteOpts); err != nil {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Keep non-Raft ApplyMutations commits synced

Switching ApplyMutations to s.fsmApplyWriteOpts makes ELASTICKV_FSM_SYNC_MODE=nosync affect callers that do not have Raft-log replay as a durability backstop. A concrete production path is distribution.EnsureCatalogSnapshot -> CatalogStore.Save -> store.ApplyMutations, which writes catalog state directly; if the process crashes before Pebble flushes, those acknowledged writes can be lost with no Raft entry to re-apply. This change should be scoped to true FSM/raft-apply call paths only, or direct callers must continue to force pebble.Sync.

Useful? React with 👍 / 👎.

s.mtx.Unlock()
return errors.WithStack(err)
}
Expand Down Expand Up @@ -1105,7 +1176,9 @@ func (s *pebbleStore) DeletePrefixAt(ctx context.Context, prefix []byte, exclude
if err := setPebbleUint64InBatch(batch, metaLastCommitTSBytes, newLastTS); err != nil {
return err
}
if err := batch.Commit(pebble.Sync); err != nil {
// See ApplyMutations for the durability argument behind
// s.fsmApplyWriteOpts (ELASTICKV_FSM_SYNC_MODE).
if err := batch.Commit(s.fsmApplyWriteOpts); err != nil {
return errors.WithStack(err)
}
s.updateLastCommitTS(newLastTS)
Expand Down
Loading
Loading