From 88910f9b7ea4c1714cdf92e4ce374790bb6e1c39 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 23 Apr 2026 21:34:51 +0900 Subject: [PATCH 1/4] perf(store): add ELASTICKV_FSM_SYNC_MODE for FSM apply fsync opt-out Every committed raft entry triggered a pebble.Sync in ApplyMutations / DeletePrefixAt, on top of the raft WAL fsync that etcd upstream already performs per Ready batch. Under write-heavy workloads this per-proposal fsync dominates p50 latency (Apple M1 Max: 16 ms/op on an APFS tempdir vs 16 us/op with NoSync -- three orders of magnitude). Because raft re-delivers all committed entries past the last FSM snapshot on restart (Config.Applied defaults to snapshot.Metadata.Index and kv/fsm.applyCommitWithIdempotencyFallback treats already-committed writes as idempotent retries), Pebble durability on the FSM commit path is redundant with raft-WAL durability: a crash that loses the tail of the Pebble WAL is recoverable via raft-log replay from the fsynced FSM snapshot. This CL exposes that lever behind ELASTICKV_FSM_SYNC_MODE, defaulting to "sync" so production behaviour is unchanged. Operators can opt in to "nosync" to trade FSM-state crash durability (re-covered from raft) for write throughput. Other pebble.Sync call sites (snapshot commit, metadata boundary writes, compaction) are untouched: their durability contract is orthogonal to the raft log. Also adds an elastickv_fsm_apply_sync_mode gauge so dashboards alert on unexpected posture changes, wired at NewRegistry time via store.FSMApplySyncModeLabel(). Tests: env-var parsing (sync / nosync / case / whitespace / unknown fallback), functional equivalence of both modes, and clean-shutdown reopen visibility. Crash-recovery via un-fsynced WAL tail loss is an OS-level scenario tracked in JEPSEN_TODO and not a Go unit test. --- main.go | 4 + monitoring/pebble.go | 37 +++++++++ monitoring/pebble_test.go | 41 ++++++++++ monitoring/registry.go | 10 +++ store/lsm_store.go | 75 +++++++++++++++++- store/lsm_store_env_test.go | 81 +++++++++++++++++++ store/lsm_store_sync_mode_benchmark_test.go | 74 ++++++++++++++++++ store/lsm_store_sync_mode_test.go | 86 +++++++++++++++++++++ 8 files changed, 406 insertions(+), 2 deletions(-) create mode 100644 store/lsm_store_sync_mode_benchmark_test.go create mode 100644 store/lsm_store_sync_mode_test.go diff --git a/main.go b/main.go index 484869c3..ebd11eb5 100644 --- a/main.go +++ b/main.go @@ -117,6 +117,10 @@ func run() error { var lc net.ListenConfig metricsRegistry := monitoring.NewRegistry(*raftId, *myAddr) + // Record the active FSM apply sync mode so operators can see on the + // /metrics endpoint which durability posture this node is running in. + // See store.FSMApplySyncModeLabel for the underlying env resolution. + metricsRegistry.SetFSMApplySyncMode(store.FSMApplySyncModeLabel()) // Create the shared HLC before building shard groups so every FSM can update // physicalCeiling when HLC lease entries are applied to the Raft log. diff --git a/monitoring/pebble.go b/monitoring/pebble.go index 9313077a..1c8d58b9 100644 --- a/monitoring/pebble.go +++ b/monitoring/pebble.go @@ -51,6 +51,13 @@ type PebbleMetrics struct { blockCacheCapacityBytes *prometheus.GaugeVec blockCacheHitsTotal *prometheus.CounterVec blockCacheMissesTotal *prometheus.CounterVec + + // FSM apply sync mode. Resolved once from ELASTICKV_FSM_SYNC_MODE at + // process start (see store/lsm_store.go). The label-scoped gauge is + // set to 1 for the active mode (either "sync" or "nosync") and 0 for + // the other, so dashboards can alert on unexpected posture changes + // (e.g. a rolling deploy that accidentally drops durability). + fsmApplySyncMode *prometheus.GaugeVec } func newPebbleMetrics(registerer prometheus.Registerer) *PebbleMetrics { @@ -139,6 +146,13 @@ func newPebbleMetrics(registerer prometheus.Registerer) *PebbleMetrics { }, []string{"group"}, ), + fsmApplySyncMode: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "elastickv_fsm_apply_sync_mode", + Help: "Active ELASTICKV_FSM_SYNC_MODE on this node. Gauge is 1 for the active mode and 0 for the other. \"sync\" means every FSM apply issues a Pebble fsync; \"nosync\" relies on raft-log replay for crash recovery of the FSM state.", + }, + []string{"mode"}, + ), } registerer.MustRegister( @@ -154,10 +168,33 @@ func newPebbleMetrics(registerer prometheus.Registerer) *PebbleMetrics { m.blockCacheCapacityBytes, m.blockCacheHitsTotal, m.blockCacheMissesTotal, + m.fsmApplySyncMode, ) return m } +// SetFSMApplySyncMode records which ELASTICKV_FSM_SYNC_MODE is active. +// activeLabel is expected to be "sync" or "nosync"; any other value is +// still accepted (operator observability trumps enum strictness) and +// leaves the previously-recorded mode labels untouched at 0. +// +// Call this once at startup after the store package has resolved the +// env var. Invoking again is safe and idempotent: the new label goes to +// 1 and all previously-set labels go to 0. +func (m *PebbleMetrics) SetFSMApplySyncMode(activeLabel string) { + if m == nil || m.fsmApplySyncMode == nil { + return + } + // Zero both known labels before setting the active one so the gauge + // has a stable two-row shape regardless of call ordering. Unknown + // labels received via an earlier SetFSMApplySyncMode call remain in + // place at their prior value; they are not part of the documented + // mode set and exist only as an escape hatch for future modes. + m.fsmApplySyncMode.WithLabelValues("sync").Set(0) + m.fsmApplySyncMode.WithLabelValues("nosync").Set(0) + m.fsmApplySyncMode.WithLabelValues(activeLabel).Set(1) +} + // PebbleMetricsSource abstracts the per-group access to a Pebble DB's // Metrics(). The concrete *store pebbleStore satisfies this via its // Metrics() accessor. Returning nil (e.g. store closed mid-restore) is diff --git a/monitoring/pebble_test.go b/monitoring/pebble_test.go index f3edc52e..8d0bb42e 100644 --- a/monitoring/pebble_test.go +++ b/monitoring/pebble_test.go @@ -265,3 +265,44 @@ func TestPebbleCollectorZeroRegistryIsSafe(t *testing.T) { collector := registry.PebbleCollector() require.NotPanics(t, func() { collector.ObserveOnce(nil) }) } + +// TestSetFSMApplySyncMode_LabelsAreMutuallyExclusive verifies that +// calling SetFSMApplySyncMode("nosync") drives the sync label to 0 +// and the nosync label to 1 (and vice versa). Operators rely on this +// gauge to alert on unexpected durability posture changes, so the +// two-row shape must stay stable across successive calls. +func TestSetFSMApplySyncMode_LabelsAreMutuallyExclusive(t *testing.T) { + registry := NewRegistry("n1", "10.0.0.1:50051") + + registry.SetFSMApplySyncMode("sync") + require.InEpsilon(t, + float64(1), + testutil.ToFloat64(registry.pebble.fsmApplySyncMode.WithLabelValues("sync")), + 0.0, + ) + require.InDelta(t, + float64(0), + testutil.ToFloat64(registry.pebble.fsmApplySyncMode.WithLabelValues("nosync")), + 0.0, + ) + + registry.SetFSMApplySyncMode("nosync") + require.InDelta(t, + float64(0), + testutil.ToFloat64(registry.pebble.fsmApplySyncMode.WithLabelValues("sync")), + 0.0, + ) + require.InEpsilon(t, + float64(1), + testutil.ToFloat64(registry.pebble.fsmApplySyncMode.WithLabelValues("nosync")), + 0.0, + ) +} + +// TestSetFSMApplySyncMode_NilRegistryIsSafe matches the pattern of +// other monitoring helpers: bootstrap paths that construct an engine +// without a registry (tests, the redis-proxy binary) must not panic. +func TestSetFSMApplySyncMode_NilRegistryIsSafe(t *testing.T) { + var r *Registry + require.NotPanics(t, func() { r.SetFSMApplySyncMode("sync") }) +} diff --git a/monitoring/registry.go b/monitoring/registry.go index 07e06927..9f73d979 100644 --- a/monitoring/registry.go +++ b/monitoring/registry.go @@ -148,6 +148,16 @@ func (r *Registry) PebbleCollector() *PebbleCollector { return newPebbleCollector(r.pebble) } +// SetFSMApplySyncMode forwards the resolved ELASTICKV_FSM_SYNC_MODE +// label to the PebbleMetrics gauge so operators can observe the active +// durability posture on this node. Safe to call with a nil registry. +func (r *Registry) SetFSMApplySyncMode(activeLabel string) { + if r == nil || r.pebble == nil { + return + } + r.pebble.SetFSMApplySyncMode(activeLabel) +} + // WriteConflictCollector returns a collector that polls each MVCC // store's per-(kind, key_prefix) OCC conflict counters and mirrors // them into the elastickv_store_write_conflict_total Prometheus diff --git a/store/lsm_store.go b/store/lsm_store.go index d888d4b3..bce4324b 100644 --- a/store/lsm_store.go +++ b/store/lsm_store.go @@ -13,6 +13,7 @@ import ( "os" "path/filepath" "strconv" + "strings" "sync" "github.com/cockroachdb/errors" @@ -61,6 +62,31 @@ const ( // mebibyteShift converts MiB to bytes via x << mebibyteShift. Named to // avoid a magic-number lint violation on the shift amount. mebibyteShift = 20 + + // fsmSyncModeEnv selects the Pebble WriteOptions used on the FSM + // commit path (ApplyMutations, DeletePrefixAt). Values: + // + // "sync" (default) — b.Commit(pebble.Sync); every committed raft + // entry triggers an fsync on the Pebble WAL. + // Strongest local durability; slowest. + // "nosync" — b.Commit(pebble.NoSync); the Pebble WAL + // still records the write, but is not fsynced. + // Durability still holds because the raft WAL + // (etcd/raft) fsyncs the committed entry + // upstream, and on restart the raft log is + // replayed from the last FSM-snapshot index; + // any apply that did not reach Pebble's + // fsync'd region is re-applied. + // + // The default is "sync" so production behaviour is unchanged without + // an explicit opt-in. See docs/fsm_sync_mode.md (or the PR body) for + // the full durability argument. + fsmSyncModeEnv = "ELASTICKV_FSM_SYNC_MODE" + + // fsmSyncModeSync / fsmSyncModeNoSync are the accepted values for + // fsmSyncModeEnv. Any other value falls back to the default. + fsmSyncModeSync = "sync" + fsmSyncModeNoSync = "nosync" ) // pebbleCacheBytes is the effective per-store Pebble block-cache capacity, @@ -74,8 +100,47 @@ const ( // NewPebbleStore's signature and is deferred to a follow-up PR. var pebbleCacheBytes = defaultPebbleCacheBytes +// fsmApplyWriteOpts is the Pebble WriteOptions value applied on the FSM +// commit path. Resolved once from ELASTICKV_FSM_SYNC_MODE at init() and +// then treated as read-only. Exposed as a package variable so tests can +// swap it via setFSMApplyWriteOptsForTest; production code must not +// mutate it after init(). +// +// The zero (unset) state is `pebble.Sync`, preserving legacy behaviour. +var fsmApplyWriteOpts = pebble.Sync + +// fsmApplySyncModeLabel is the human-readable label corresponding to the +// resolved fsmApplyWriteOpts. Kept alongside the write-options pointer so +// monitoring (elastickv_fsm_apply_sync_mode) and log lines stay in sync. +var fsmApplySyncModeLabel = fsmSyncModeSync + func init() { pebbleCacheBytes = resolvePebbleCacheBytes(os.Getenv(pebbleCacheMBEnv)) + fsmApplyWriteOpts, fsmApplySyncModeLabel = resolveFSMApplyWriteOpts(os.Getenv(fsmSyncModeEnv)) +} + +// FSMApplySyncModeLabel returns the resolved FSM sync-mode label +// ("sync" or "nosync"). Consumed by monitoring to surface the current +// durability posture as a gauge with a mode label. +func FSMApplySyncModeLabel() string { + return fsmApplySyncModeLabel +} + +// resolveFSMApplyWriteOpts parses an ELASTICKV_FSM_SYNC_MODE value and +// returns both the *pebble.WriteOptions used on the FSM commit path and +// the canonical label name. Case is normalised. Empty, malformed, or +// unrecognised values fall back to the default ("sync"). +// +// Exported via package-internal calls only; tests use it directly. +func resolveFSMApplyWriteOpts(envVal string) (*pebble.WriteOptions, string) { + switch strings.ToLower(strings.TrimSpace(envVal)) { + case fsmSyncModeNoSync: + return pebble.NoSync, fsmSyncModeNoSync + case "", fsmSyncModeSync: + return pebble.Sync, fsmSyncModeSync + default: + return pebble.Sync, fsmSyncModeSync + } } // resolvePebbleCacheBytes parses an ELASTICKV_PEBBLE_CACHE_MB value and @@ -1053,7 +1118,11 @@ func (s *pebbleStore) ApplyMutations(ctx context.Context, mutations []*KVPairMut s.mtx.Unlock() return err } - if err := b.Commit(pebble.Sync); err != nil { + // fsmApplyWriteOpts is Sync by default. Operators may opt in to NoSync + // via ELASTICKV_FSM_SYNC_MODE=nosync when the raft WAL's durability is + // considered sufficient (raft-log replay from the last FSM snapshot + // re-applies any entries lost from Pebble after a crash). + if err := b.Commit(fsmApplyWriteOpts); err != nil { s.mtx.Unlock() return errors.WithStack(err) } @@ -1105,7 +1174,9 @@ func (s *pebbleStore) DeletePrefixAt(ctx context.Context, prefix []byte, exclude if err := setPebbleUint64InBatch(batch, metaLastCommitTSBytes, newLastTS); err != nil { return err } - if err := batch.Commit(pebble.Sync); err != nil { + // See ApplyMutations for the durability argument behind + // fsmApplyWriteOpts (ELASTICKV_FSM_SYNC_MODE). + if err := batch.Commit(fsmApplyWriteOpts); err != nil { return errors.WithStack(err) } s.updateLastCommitTS(newLastTS) diff --git a/store/lsm_store_env_test.go b/store/lsm_store_env_test.go index 92b4b454..fd491d14 100644 --- a/store/lsm_store_env_test.go +++ b/store/lsm_store_env_test.go @@ -3,6 +3,7 @@ package store import ( "testing" + "github.com/cockroachdb/pebble/v2" "github.com/stretchr/testify/require" ) @@ -88,3 +89,83 @@ func TestDefaultPebbleOptionsCarriesCache(t *testing.T) { require.Equal(t, int64(16)<<20, cache.MaxSize()) cache.Unref() } + +// setFSMApplyWriteOptsForTest swaps the package-level fsmApplyWriteOpts +// and fsmApplySyncModeLabel for the duration of a single test and restores +// both during t.Cleanup. Tests use this instead of mutating os.Environ so +// parallel binaries remain isolated. +func setFSMApplyWriteOptsForTest(t *testing.T, opts *pebble.WriteOptions, label string) { + t.Helper() + prevOpts := fsmApplyWriteOpts + prevLabel := fsmApplySyncModeLabel + fsmApplyWriteOpts = opts + fsmApplySyncModeLabel = label + t.Cleanup(func() { + fsmApplyWriteOpts = prevOpts + fsmApplySyncModeLabel = prevLabel + }) +} + +// TestFSMApplySyncModeEnvOverride covers the ELASTICKV_FSM_SYNC_MODE +// parsing contract directly against resolveFSMApplyWriteOpts, which is +// what init() calls. Mirrors the approach taken for +// ELASTICKV_PEBBLE_CACHE_MB to avoid mutating os.Environ at runtime. +func TestFSMApplySyncModeEnvOverride(t *testing.T) { + t.Run("empty string uses sync default", func(t *testing.T) { + opts, label := resolveFSMApplyWriteOpts("") + require.Same(t, pebble.Sync, opts) + require.Equal(t, fsmSyncModeSync, label) + }) + + t.Run("explicit sync is accepted", func(t *testing.T) { + opts, label := resolveFSMApplyWriteOpts("sync") + require.Same(t, pebble.Sync, opts) + require.Equal(t, fsmSyncModeSync, label) + }) + + t.Run("explicit nosync is accepted", func(t *testing.T) { + opts, label := resolveFSMApplyWriteOpts("nosync") + require.Same(t, pebble.NoSync, opts) + require.Equal(t, fsmSyncModeNoSync, label) + }) + + t.Run("mixed-case nosync is accepted", func(t *testing.T) { + opts, label := resolveFSMApplyWriteOpts("NoSync") + require.Same(t, pebble.NoSync, opts) + require.Equal(t, fsmSyncModeNoSync, label) + }) + + t.Run("whitespace is trimmed", func(t *testing.T) { + opts, label := resolveFSMApplyWriteOpts(" nosync\n") + require.Same(t, pebble.NoSync, opts) + require.Equal(t, fsmSyncModeNoSync, label) + }) + + t.Run("unknown value falls back to sync", func(t *testing.T) { + // "batch" was considered in the design discussion but never + // implemented; the resolver must not crash or silently enable + // NoSync if an operator sets an unsupported value. + opts, label := resolveFSMApplyWriteOpts("batch") + require.Same(t, pebble.Sync, opts) + require.Equal(t, fsmSyncModeSync, label) + }) + + t.Run("garbage falls back to sync", func(t *testing.T) { + opts, label := resolveFSMApplyWriteOpts("garbage") + require.Same(t, pebble.Sync, opts) + require.Equal(t, fsmSyncModeSync, label) + }) +} + +// TestFSMApplySyncModeLabelAccessor verifies the read-only public +// accessor returns the current label (so monitoring never reads the +// package var directly) and tracks the test helper. +func TestFSMApplySyncModeLabelAccessor(t *testing.T) { + before := FSMApplySyncModeLabel() + t.Run("nosync", func(t *testing.T) { + setFSMApplyWriteOptsForTest(t, pebble.NoSync, fsmSyncModeNoSync) + require.Equal(t, fsmSyncModeNoSync, FSMApplySyncModeLabel()) + }) + // Cleanup via t.Cleanup should restore the prior label. + require.Equal(t, before, FSMApplySyncModeLabel()) +} diff --git a/store/lsm_store_sync_mode_benchmark_test.go b/store/lsm_store_sync_mode_benchmark_test.go new file mode 100644 index 00000000..a8965cd3 --- /dev/null +++ b/store/lsm_store_sync_mode_benchmark_test.go @@ -0,0 +1,74 @@ +package store + +import ( + "context" + "fmt" + "testing" + + "github.com/cockroachdb/pebble/v2" +) + +// BenchmarkApplyMutations_SyncMode measures per-op latency and +// throughput of the FSM commit path under each +// ELASTICKV_FSM_SYNC_MODE value. The benchmark is write-heavy and +// serial: each iteration issues one ApplyMutations on a fresh (key, +// commitTS) pair with a single Put mutation, exercising the single- +// fsync hot path. +// +// Run with: +// +// go test ./store -run='^$' -bench='BenchmarkApplyMutations_SyncMode' -benchtime=2s -benchmem +// +// The sync/nosync ratio (not absolute numbers, which are disk- +// dependent) is the signal of interest. On a laptop SSD, nosync +// typically runs 10-50x faster per op; the exact multiplier reflects +// how cheap the platform's fsync is on a freshly-created WAL file. +func BenchmarkApplyMutations_SyncMode(b *testing.B) { + cases := []struct { + name string + opts *pebble.WriteOptions + }{ + {name: "sync", opts: pebble.Sync}, + {name: "nosync", opts: pebble.NoSync}, + } + for _, tc := range cases { + b.Run(tc.name, func(b *testing.B) { + setBenchFSMApplyWriteOpts(b, tc.opts) + + dir := b.TempDir() + s, err := NewPebbleStore(dir) + if err != nil { + b.Fatalf("NewPebbleStore: %v", err) + } + defer s.Close() + + ctx := context.Background() + val := make([]byte, 64) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + key := []byte(fmt.Sprintf("bench-%010d", i)) + muts := []*KVPairMutation{{Op: OpTypePut, Key: key, Value: val}} + // startTS must be strictly < commitTS and distinct across + // iterations to avoid MVCC write-conflict. + startTS := uint64(i) * 2 + commitTS := startTS + 1 + if err := s.ApplyMutations(ctx, muts, nil, startTS, commitTS); err != nil { + b.Fatalf("ApplyMutations: %v", err) + } + } + }) + } +} + +// setBenchFSMApplyWriteOpts is the benchmark counterpart to +// setFSMApplyWriteOptsForTest. It accepts *testing.B (which does not +// satisfy the testing.TB-only t.Helper() pattern used by the test +// helper) so the benchmark can swap the package-level write options +// around each sub-run. +func setBenchFSMApplyWriteOpts(b *testing.B, opts *pebble.WriteOptions) { + b.Helper() + prev := fsmApplyWriteOpts + fsmApplyWriteOpts = opts + b.Cleanup(func() { fsmApplyWriteOpts = prev }) +} diff --git a/store/lsm_store_sync_mode_test.go b/store/lsm_store_sync_mode_test.go new file mode 100644 index 00000000..804dd5af --- /dev/null +++ b/store/lsm_store_sync_mode_test.go @@ -0,0 +1,86 @@ +package store + +import ( + "context" + "testing" + + "github.com/cockroachdb/pebble/v2" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestApplyMutations_NoSyncFunctionalEquivalence verifies that the +// NoSync FSM commit mode produces the same observable state as the +// default Sync mode for a well-behaved (no-crash) workload. This is +// the "happy-path" contract: operators turning on NoSync must not see +// correctness regressions in steady-state operation; the durability +// trade-off only surfaces on crash. +// +// The crash-recovery half of the contract (raft-log replay re-applying +// any entries lost from Pebble after fsync was skipped) is handled by +// kv/fsm.applyCommitWithIdempotencyFallback, which treats +// already-applied keys (LatestCommitTS >= commitTS) as idempotent +// retries. At this layer we do not exercise the raft engine; we only +// verify that switching write options does not change the visible +// store state. +func TestApplyMutations_NoSyncFunctionalEquivalence(t *testing.T) { + setFSMApplyWriteOptsForTest(t, pebble.NoSync, fsmSyncModeNoSync) + + dir := t.TempDir() + s, err := NewPebbleStore(dir) + require.NoError(t, err) + defer s.Close() + + ctx := context.Background() + + mutations := []*KVPairMutation{ + {Op: OpTypePut, Key: []byte("k1"), Value: []byte("v1")}, + {Op: OpTypePut, Key: []byte("k2"), Value: []byte("v2")}, + } + require.NoError(t, s.ApplyMutations(ctx, mutations, nil, 0, 10)) + + val, err := s.GetAt(ctx, []byte("k1"), 10) + require.NoError(t, err) + assert.Equal(t, []byte("v1"), val) + + val, err = s.GetAt(ctx, []byte("k2"), 10) + require.NoError(t, err) + assert.Equal(t, []byte("v2"), val) +} + +// TestApplyMutations_NoSyncReopenVisibility verifies that after a +// clean Close and reopen of the Pebble DB in NoSync mode, writes that +// completed before Close remain visible. A clean Close drives a final +// Pebble flush, so this exercises the common graceful-shutdown path: +// NoSync only impacts crash recovery of un-fsynced tail WAL entries, +// not orderly shutdowns. +// +// The deliberately-unfsynced crash case (kill -9 mid-apply, where +// Pebble's WAL tail is lost and must be reconstructed from raft) is +// inherently an OS-level scenario; it lives in the Jepsen / integration +// suite (see JEPSEN_TODO.md) rather than as a Go unit test, because +// simulating an fsync loss without actually yanking power requires a +// custom VFS shim that is not currently wired into Pebble at this +// layer. +func TestApplyMutations_NoSyncReopenVisibility(t *testing.T) { + setFSMApplyWriteOptsForTest(t, pebble.NoSync, fsmSyncModeNoSync) + + dir := t.TempDir() + s, err := NewPebbleStore(dir) + require.NoError(t, err) + + ctx := context.Background() + mutations := []*KVPairMutation{ + {Op: OpTypePut, Key: []byte("keep"), Value: []byte("after-reopen")}, + } + require.NoError(t, s.ApplyMutations(ctx, mutations, nil, 0, 10)) + require.NoError(t, s.Close()) + + reopened, err := NewPebbleStore(dir) + require.NoError(t, err) + defer reopened.Close() + + val, err := reopened.GetAt(ctx, []byte("keep"), 10) + require.NoError(t, err) + assert.Equal(t, []byte("after-reopen"), val) +} From 56af1db00f4c89caec0a30cc30ebdc652d32ed12 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 23 Apr 2026 23:08:59 +0900 Subject: [PATCH 2/4] fix(store): guard int-to-uint64 conversion in sync-mode benchmark gosec G115 flagged uint64(i) * 2 on the benchmark's MVCC timestamp math. The loop counter is non-negative by construction, but the linter cannot prove it. Add an explicit if i < 0 guard so the conversion is provably safe and the reviewdog warning clears. Addresses PR #592 review feedback from coderabbit / reviewdog. --- store/lsm_store_sync_mode_benchmark_test.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/store/lsm_store_sync_mode_benchmark_test.go b/store/lsm_store_sync_mode_benchmark_test.go index a8965cd3..d672fb5a 100644 --- a/store/lsm_store_sync_mode_benchmark_test.go +++ b/store/lsm_store_sync_mode_benchmark_test.go @@ -50,7 +50,13 @@ func BenchmarkApplyMutations_SyncMode(b *testing.B) { key := []byte(fmt.Sprintf("bench-%010d", i)) muts := []*KVPairMutation{{Op: OpTypePut, Key: key, Value: val}} // startTS must be strictly < commitTS and distinct across - // iterations to avoid MVCC write-conflict. + // iterations to avoid MVCC write-conflict. Guard the + // int -> uint64 conversion to satisfy gosec G115; i is + // non-negative here by construction (loop from 0) but + // the linter cannot prove it. + if i < 0 { + b.Fatalf("unexpected negative iteration counter: %d", i) + } startTS := uint64(i) * 2 commitTS := startTS + 1 if err := s.ApplyMutations(ctx, muts, nil, startTS, commitTS); err != nil { From 1236dae7b840ff4d6ce6d3984ec8bddca23a7cc1 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 23 Apr 2026 23:09:08 +0900 Subject: [PATCH 3/4] fix(monitoring): coerce unknown FSM sync-mode labels to sync SetFSMApplySyncMode previously accepted any label verbatim, which broke the promised two-row gauge shape in two ways: (1) a prior unknown label was left at 1 because only sync/nosync were zeroed explicitly, and (2) the store's resolver maps unknown ELASTICKV_FSM_SYNC_MODE values to sync, so the monitoring gauge could disagree with the actual WriteOptions in use. Coerce any label outside {sync, nosync} to sync before setting, matching store.resolveFSMApplyWriteOpts's fallback. Update the docstring and add a dedicated test that exercises the coercion against a primed nosync posture. Addresses PR #592 review feedback from coderabbit and gemini. --- monitoring/pebble.go | 22 ++++++++++++++-------- monitoring/pebble_test.go | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 8 deletions(-) diff --git a/monitoring/pebble.go b/monitoring/pebble.go index 1c8d58b9..59133099 100644 --- a/monitoring/pebble.go +++ b/monitoring/pebble.go @@ -174,22 +174,28 @@ func newPebbleMetrics(registerer prometheus.Registerer) *PebbleMetrics { } // SetFSMApplySyncMode records which ELASTICKV_FSM_SYNC_MODE is active. -// activeLabel is expected to be "sync" or "nosync"; any other value is -// still accepted (operator observability trumps enum strictness) and -// leaves the previously-recorded mode labels untouched at 0. +// activeLabel must be "sync" or "nosync"; any other value is coerced to +// "sync" to match the store resolver's fallback behaviour for unknown +// ELASTICKV_FSM_SYNC_MODE values (see store.resolveFSMApplyWriteOpts). +// This keeps the gauge's two-row shape stable: exactly one of +// {"sync","nosync"} is 1 at any time and the other is 0. // // Call this once at startup after the store package has resolved the // env var. Invoking again is safe and idempotent: the new label goes to -// 1 and all previously-set labels go to 0. +// 1 and the other known label goes to 0. func (m *PebbleMetrics) SetFSMApplySyncMode(activeLabel string) { if m == nil || m.fsmApplySyncMode == nil { return } + // Coerce unknown labels to "sync" so the gauge never leaks a third + // row and so a prior stale label cannot stay pinned at 1. This + // mirrors store.resolveFSMApplyWriteOpts, which also falls back to + // sync on unrecognised input. + if activeLabel != "sync" && activeLabel != "nosync" { + activeLabel = "sync" + } // Zero both known labels before setting the active one so the gauge - // has a stable two-row shape regardless of call ordering. Unknown - // labels received via an earlier SetFSMApplySyncMode call remain in - // place at their prior value; they are not part of the documented - // mode set and exist only as an escape hatch for future modes. + // has a stable two-row shape regardless of call ordering. m.fsmApplySyncMode.WithLabelValues("sync").Set(0) m.fsmApplySyncMode.WithLabelValues("nosync").Set(0) m.fsmApplySyncMode.WithLabelValues(activeLabel).Set(1) diff --git a/monitoring/pebble_test.go b/monitoring/pebble_test.go index 8d0bb42e..dc9cb270 100644 --- a/monitoring/pebble_test.go +++ b/monitoring/pebble_test.go @@ -306,3 +306,35 @@ func TestSetFSMApplySyncMode_NilRegistryIsSafe(t *testing.T) { var r *Registry require.NotPanics(t, func() { r.SetFSMApplySyncMode("sync") }) } + +// TestSetFSMApplySyncMode_UnknownLabelCoercesToSync verifies that an +// unrecognised label is coerced to "sync" rather than pinning a third +// row on the gauge. This mirrors store.resolveFSMApplyWriteOpts, which +// also falls back to sync on unknown input; the two paths must agree +// or the gauge will disagree with the actual WriteOptions the store +// uses. +func TestSetFSMApplySyncMode_UnknownLabelCoercesToSync(t *testing.T) { + registry := NewRegistry("n1", "10.0.0.1:50051") + + // Prime the gauge with a legitimate nosync posture so we can prove + // the follow-up unknown-label call flips sync to 1 (not leaves it + // at its prior value). + registry.SetFSMApplySyncMode("nosync") + require.InEpsilon(t, + float64(1), + testutil.ToFloat64(registry.pebble.fsmApplySyncMode.WithLabelValues("nosync")), + 0.0, + ) + + registry.SetFSMApplySyncMode("batch") // never implemented, treated as sync + require.InEpsilon(t, + float64(1), + testutil.ToFloat64(registry.pebble.fsmApplySyncMode.WithLabelValues("sync")), + 0.0, + ) + require.InDelta(t, + float64(0), + testutil.ToFloat64(registry.pebble.fsmApplySyncMode.WithLabelValues("nosync")), + 0.0, + ) +} From de873f5478b56d37882afb8b78633652758d03d3 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Fri, 24 Apr 2026 01:15:54 +0900 Subject: [PATCH 4/4] refactor(store): move fsmApplyWriteOpts from package var to pebbleStore field Per the gemini review on PR #592, migrate the FSM apply write options (and its sync-mode label) from package-level globals onto the pebbleStore struct so each store instance owns its resolved ELASTICKV_FSM_SYNC_MODE state. NewPebbleStore now calls resolveFSMApplyWriteOpts directly and stores the result; the ApplyMutations and DeletePrefixAt commit paths read s.fsmApplyWriteOpts instead of the package var. - Remove the package-level fsmApplyWriteOpts / fsmApplySyncModeLabel vars and the init() assignment that populated them. - Add fsmApplyWriteOpts and fsmApplySyncModeLabel fields on pebbleStore plus a (*pebbleStore).FSMApplySyncModeLabel() accessor. - Drop setFSMApplyWriteOptsForTest / setBenchFSMApplyWriteOpts helpers; replace with newPebbleStoreWithFSMApplyWriteOptsForTest which builds a real store and overrides the fields, so tests exercise the full construction path without mutating globals. - Main wiring now reads the label off the first built shard store via a fsmApplySyncModeLabeler type assertion after buildShardGroups, skipping the gauge update if no store exposes the accessor. Env-var parsing coverage (sync / nosync / mixed-case / whitespace / unknown / empty / garbage) remains exercised directly against resolveFSMApplyWriteOpts, and the NoSync functional-equivalence and reopen-visibility tests continue to validate the commit-path behaviour through the per-store override. --- main.go | 42 ++++++++++++++-- store/lsm_store.go | 56 +++++++++++---------- store/lsm_store_env_test.go | 54 ++++++++++++-------- store/lsm_store_sync_mode_benchmark_test.go | 24 ++++----- store/lsm_store_sync_mode_test.go | 13 ++--- 5 files changed, 112 insertions(+), 77 deletions(-) diff --git a/main.go b/main.go index ebd11eb5..4306388a 100644 --- a/main.go +++ b/main.go @@ -117,10 +117,6 @@ func run() error { var lc net.ListenConfig metricsRegistry := monitoring.NewRegistry(*raftId, *myAddr) - // Record the active FSM apply sync mode so operators can see on the - // /metrics endpoint which durability posture this node is running in. - // See store.FSMApplySyncModeLabel for the underlying env resolution. - metricsRegistry.SetFSMApplySyncMode(store.FSMApplySyncModeLabel()) // Create the shared HLC before building shard groups so every FSM can update // physicalCeiling when HLC lease entries are applied to the Raft log. @@ -143,6 +139,15 @@ func run() error { return err } + // Record the active FSM apply sync mode so operators can see on the + // /metrics endpoint which durability posture this node is running in. + // The label is resolved per-pebbleStore from ELASTICKV_FSM_SYNC_MODE + // in NewPebbleStore; read it off the first constructed store (all + // shards share the same env and therefore the same label). + if label := fsmApplySyncModeLabelFromRuntimes(runtimes); label != "" { + metricsRegistry.SetFSMApplySyncMode(label) + } + cleanup := internalutil.CleanupStack{} defer cleanup.Run() @@ -434,6 +439,35 @@ func raftMonitorRuntimes(runtimes []*raftGroupRuntime) []monitoring.RaftRuntime return out } +// fsmApplySyncModeLabeler narrows an MVCCStore to those implementations +// that can report the resolved ELASTICKV_FSM_SYNC_MODE label. The +// pebble-backed store satisfies this today; alternate backends (none +// yet) would either implement it or be skipped. +type fsmApplySyncModeLabeler interface { + FSMApplySyncModeLabel() string +} + +// fsmApplySyncModeLabelFromRuntimes returns the FSM apply sync-mode +// label resolved by the first shard store that exposes it. All shards +// on a node read the same ELASTICKV_FSM_SYNC_MODE env var at +// construction time so the label is uniform across the runtimes; +// returning the first one suffices. Returns "" when no runtime +// exposes the accessor, in which case the caller skips emitting the +// gauge to avoid publishing a misleading default. +func fsmApplySyncModeLabelFromRuntimes(runtimes []*raftGroupRuntime) string { + for _, runtime := range runtimes { + if runtime == nil || runtime.store == nil { + continue + } + src, ok := runtime.store.(fsmApplySyncModeLabeler) + if !ok { + continue + } + return src.FSMApplySyncModeLabel() + } + return "" +} + // pebbleMonitorSources extracts the MVCC stores that expose // *pebble.DB.Metrics() so monitoring can poll LSM internals (L0 // sublevels, compaction debt, memtable, block cache) for the diff --git a/store/lsm_store.go b/store/lsm_store.go index bce4324b..2bbe12b3 100644 --- a/store/lsm_store.go +++ b/store/lsm_store.go @@ -100,30 +100,8 @@ const ( // NewPebbleStore's signature and is deferred to a follow-up PR. var pebbleCacheBytes = defaultPebbleCacheBytes -// fsmApplyWriteOpts is the Pebble WriteOptions value applied on the FSM -// commit path. Resolved once from ELASTICKV_FSM_SYNC_MODE at init() and -// then treated as read-only. Exposed as a package variable so tests can -// swap it via setFSMApplyWriteOptsForTest; production code must not -// mutate it after init(). -// -// The zero (unset) state is `pebble.Sync`, preserving legacy behaviour. -var fsmApplyWriteOpts = pebble.Sync - -// fsmApplySyncModeLabel is the human-readable label corresponding to the -// resolved fsmApplyWriteOpts. Kept alongside the write-options pointer so -// monitoring (elastickv_fsm_apply_sync_mode) and log lines stay in sync. -var fsmApplySyncModeLabel = fsmSyncModeSync - func init() { pebbleCacheBytes = resolvePebbleCacheBytes(os.Getenv(pebbleCacheMBEnv)) - fsmApplyWriteOpts, fsmApplySyncModeLabel = resolveFSMApplyWriteOpts(os.Getenv(fsmSyncModeEnv)) -} - -// FSMApplySyncModeLabel returns the resolved FSM sync-mode label -// ("sync" or "nosync"). Consumed by monitoring to surface the current -// durability posture as a gauge with a mode label. -func FSMApplySyncModeLabel() string { - return fsmApplySyncModeLabel } // resolveFSMApplyWriteOpts parses an ELASTICKV_FSM_SYNC_MODE value and @@ -192,6 +170,18 @@ type pebbleStore struct { // detected inside ApplyMutations. Polled by the monitoring // WriteConflictCollector; not part of the authoritative OCC path. writeConflicts *writeConflictCounter + // fsmApplyWriteOpts is the Pebble WriteOptions value applied on the + // FSM commit path (ApplyMutations, DeletePrefixAt). Resolved once + // from ELASTICKV_FSM_SYNC_MODE in NewPebbleStore and then treated + // as read-only for the store's lifetime. The default is pebble.Sync; + // operators may opt into pebble.NoSync when the raft WAL's + // durability is considered sufficient. + fsmApplyWriteOpts *pebble.WriteOptions + // fsmApplySyncModeLabel is the human-readable label corresponding + // to fsmApplyWriteOpts ("sync" or "nosync"). Kept alongside the + // write-options pointer so monitoring (elastickv_fsm_apply_sync_mode) + // and log lines stay in sync with the resolved mode. + fsmApplySyncModeLabel string } // Ensure pebbleStore implements MVCCStore and RetentionController. @@ -238,12 +228,15 @@ func defaultPebbleOptionsWithCache() (*pebble.Options, *pebble.Cache) { // NewPebbleStore creates a new Pebble-backed MVCC store. func NewPebbleStore(dir string, opts ...PebbleStoreOption) (MVCCStore, error) { + fsmOpts, fsmLabel := resolveFSMApplyWriteOpts(os.Getenv(fsmSyncModeEnv)) s := &pebbleStore{ dir: dir, log: slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ Level: slog.LevelWarn, })), - writeConflicts: newWriteConflictCounter(), + writeConflicts: newWriteConflictCounter(), + fsmApplyWriteOpts: fsmOpts, + fsmApplySyncModeLabel: fsmLabel, } for _, opt := range opts { opt(s) @@ -489,6 +482,15 @@ func (s *pebbleStore) LastCommitTS() uint64 { return s.lastCommitTS } +// FSMApplySyncModeLabel returns the resolved FSM sync-mode label +// ("sync" or "nosync") for this store. Consumed by monitoring to +// surface the current durability posture as a gauge with a mode label. +// The value is fixed for the store's lifetime (resolved once from +// ELASTICKV_FSM_SYNC_MODE in NewPebbleStore) so no locking is needed. +func (s *pebbleStore) FSMApplySyncModeLabel() string { + return s.fsmApplySyncModeLabel +} + func (s *pebbleStore) MinRetainedTS() uint64 { s.mtx.RLock() defer s.mtx.RUnlock() @@ -1118,11 +1120,11 @@ func (s *pebbleStore) ApplyMutations(ctx context.Context, mutations []*KVPairMut s.mtx.Unlock() return err } - // fsmApplyWriteOpts is Sync by default. Operators may opt in to NoSync + // s.fsmApplyWriteOpts is Sync by default. Operators may opt in to NoSync // via ELASTICKV_FSM_SYNC_MODE=nosync when the raft WAL's durability is // considered sufficient (raft-log replay from the last FSM snapshot // re-applies any entries lost from Pebble after a crash). - if err := b.Commit(fsmApplyWriteOpts); err != nil { + if err := b.Commit(s.fsmApplyWriteOpts); err != nil { s.mtx.Unlock() return errors.WithStack(err) } @@ -1175,8 +1177,8 @@ func (s *pebbleStore) DeletePrefixAt(ctx context.Context, prefix []byte, exclude return err } // See ApplyMutations for the durability argument behind - // fsmApplyWriteOpts (ELASTICKV_FSM_SYNC_MODE). - if err := batch.Commit(fsmApplyWriteOpts); err != nil { + // s.fsmApplyWriteOpts (ELASTICKV_FSM_SYNC_MODE). + if err := batch.Commit(s.fsmApplyWriteOpts); err != nil { return errors.WithStack(err) } s.updateLastCommitTS(newLastTS) diff --git a/store/lsm_store_env_test.go b/store/lsm_store_env_test.go index fd491d14..addf7d8d 100644 --- a/store/lsm_store_env_test.go +++ b/store/lsm_store_env_test.go @@ -90,25 +90,31 @@ func TestDefaultPebbleOptionsCarriesCache(t *testing.T) { cache.Unref() } -// setFSMApplyWriteOptsForTest swaps the package-level fsmApplyWriteOpts -// and fsmApplySyncModeLabel for the duration of a single test and restores -// both during t.Cleanup. Tests use this instead of mutating os.Environ so -// parallel binaries remain isolated. -func setFSMApplyWriteOptsForTest(t *testing.T, opts *pebble.WriteOptions, label string) { +// newPebbleStoreWithFSMApplyWriteOptsForTest constructs a pebbleStore +// (not the MVCCStore interface) with an explicit *pebble.WriteOptions +// and sync-mode label for the FSM commit path, bypassing the +// ELASTICKV_FSM_SYNC_MODE env resolution. Tests use this to exercise +// both sync and nosync modes deterministically without mutating +// os.Environ (which would leak into parallel test binaries). +// +// The store is created via NewPebbleStore and then the relevant +// fields are overridden; this keeps the full init path (cache, +// metadata scans, etc.) identical to production while only swapping +// the write-options that govern commit-time durability. +func newPebbleStoreWithFSMApplyWriteOptsForTest(t *testing.T, dir string, opts *pebble.WriteOptions, label string) *pebbleStore { t.Helper() - prevOpts := fsmApplyWriteOpts - prevLabel := fsmApplySyncModeLabel - fsmApplyWriteOpts = opts - fsmApplySyncModeLabel = label - t.Cleanup(func() { - fsmApplyWriteOpts = prevOpts - fsmApplySyncModeLabel = prevLabel - }) + s, err := NewPebbleStore(dir) + require.NoError(t, err) + ps, ok := s.(*pebbleStore) + require.True(t, ok, "NewPebbleStore returned non-*pebbleStore type") + ps.fsmApplyWriteOpts = opts + ps.fsmApplySyncModeLabel = label + return ps } // TestFSMApplySyncModeEnvOverride covers the ELASTICKV_FSM_SYNC_MODE // parsing contract directly against resolveFSMApplyWriteOpts, which is -// what init() calls. Mirrors the approach taken for +// what NewPebbleStore calls. Mirrors the approach taken for // ELASTICKV_PEBBLE_CACHE_MB to avoid mutating os.Environ at runtime. func TestFSMApplySyncModeEnvOverride(t *testing.T) { t.Run("empty string uses sync default", func(t *testing.T) { @@ -157,15 +163,19 @@ func TestFSMApplySyncModeEnvOverride(t *testing.T) { }) } -// TestFSMApplySyncModeLabelAccessor verifies the read-only public -// accessor returns the current label (so monitoring never reads the -// package var directly) and tracks the test helper. +// TestFSMApplySyncModeLabelAccessor verifies that a constructed +// pebbleStore exposes its resolved sync-mode label via the per-instance +// accessor, so monitoring can read the mode off a concrete store +// instead of a package global. func TestFSMApplySyncModeLabelAccessor(t *testing.T) { - before := FSMApplySyncModeLabel() t.Run("nosync", func(t *testing.T) { - setFSMApplyWriteOptsForTest(t, pebble.NoSync, fsmSyncModeNoSync) - require.Equal(t, fsmSyncModeNoSync, FSMApplySyncModeLabel()) + ps := newPebbleStoreWithFSMApplyWriteOptsForTest(t, t.TempDir(), pebble.NoSync, fsmSyncModeNoSync) + defer ps.Close() + require.Equal(t, fsmSyncModeNoSync, ps.FSMApplySyncModeLabel()) + }) + t.Run("sync", func(t *testing.T) { + ps := newPebbleStoreWithFSMApplyWriteOptsForTest(t, t.TempDir(), pebble.Sync, fsmSyncModeSync) + defer ps.Close() + require.Equal(t, fsmSyncModeSync, ps.FSMApplySyncModeLabel()) }) - // Cleanup via t.Cleanup should restore the prior label. - require.Equal(t, before, FSMApplySyncModeLabel()) } diff --git a/store/lsm_store_sync_mode_benchmark_test.go b/store/lsm_store_sync_mode_benchmark_test.go index d672fb5a..cc293118 100644 --- a/store/lsm_store_sync_mode_benchmark_test.go +++ b/store/lsm_store_sync_mode_benchmark_test.go @@ -33,14 +33,22 @@ func BenchmarkApplyMutations_SyncMode(b *testing.B) { } for _, tc := range cases { b.Run(tc.name, func(b *testing.B) { - setBenchFSMApplyWriteOpts(b, tc.opts) - dir := b.TempDir() s, err := NewPebbleStore(dir) if err != nil { b.Fatalf("NewPebbleStore: %v", err) } defer s.Close() + ps, ok := s.(*pebbleStore) + if !ok { + b.Fatalf("NewPebbleStore returned non-*pebbleStore type: %T", s) + } + ps.fsmApplyWriteOpts = tc.opts + if tc.opts == pebble.NoSync { + ps.fsmApplySyncModeLabel = fsmSyncModeNoSync + } else { + ps.fsmApplySyncModeLabel = fsmSyncModeSync + } ctx := context.Background() val := make([]byte, 64) @@ -66,15 +74,3 @@ func BenchmarkApplyMutations_SyncMode(b *testing.B) { }) } } - -// setBenchFSMApplyWriteOpts is the benchmark counterpart to -// setFSMApplyWriteOptsForTest. It accepts *testing.B (which does not -// satisfy the testing.TB-only t.Helper() pattern used by the test -// helper) so the benchmark can swap the package-level write options -// around each sub-run. -func setBenchFSMApplyWriteOpts(b *testing.B, opts *pebble.WriteOptions) { - b.Helper() - prev := fsmApplyWriteOpts - fsmApplyWriteOpts = opts - b.Cleanup(func() { fsmApplyWriteOpts = prev }) -} diff --git a/store/lsm_store_sync_mode_test.go b/store/lsm_store_sync_mode_test.go index 804dd5af..3d95ca0c 100644 --- a/store/lsm_store_sync_mode_test.go +++ b/store/lsm_store_sync_mode_test.go @@ -24,11 +24,8 @@ import ( // verify that switching write options does not change the visible // store state. func TestApplyMutations_NoSyncFunctionalEquivalence(t *testing.T) { - setFSMApplyWriteOptsForTest(t, pebble.NoSync, fsmSyncModeNoSync) - dir := t.TempDir() - s, err := NewPebbleStore(dir) - require.NoError(t, err) + s := newPebbleStoreWithFSMApplyWriteOptsForTest(t, dir, pebble.NoSync, fsmSyncModeNoSync) defer s.Close() ctx := context.Background() @@ -63,11 +60,8 @@ func TestApplyMutations_NoSyncFunctionalEquivalence(t *testing.T) { // custom VFS shim that is not currently wired into Pebble at this // layer. func TestApplyMutations_NoSyncReopenVisibility(t *testing.T) { - setFSMApplyWriteOptsForTest(t, pebble.NoSync, fsmSyncModeNoSync) - dir := t.TempDir() - s, err := NewPebbleStore(dir) - require.NoError(t, err) + s := newPebbleStoreWithFSMApplyWriteOptsForTest(t, dir, pebble.NoSync, fsmSyncModeNoSync) ctx := context.Background() mutations := []*KVPairMutation{ @@ -76,8 +70,7 @@ func TestApplyMutations_NoSyncReopenVisibility(t *testing.T) { require.NoError(t, s.ApplyMutations(ctx, mutations, nil, 0, 10)) require.NoError(t, s.Close()) - reopened, err := NewPebbleStore(dir) - require.NoError(t, err) + reopened := newPebbleStoreWithFSMApplyWriteOptsForTest(t, dir, pebble.NoSync, fsmSyncModeNoSync) defer reopened.Close() val, err := reopened.GetAt(ctx, []byte("keep"), 10)