diff --git a/main.go b/main.go index 484869c3..4306388a 100644 --- a/main.go +++ b/main.go @@ -139,6 +139,15 @@ func run() error { return err } + // Record the active FSM apply sync mode so operators can see on the + // /metrics endpoint which durability posture this node is running in. + // The label is resolved per-pebbleStore from ELASTICKV_FSM_SYNC_MODE + // in NewPebbleStore; read it off the first constructed store (all + // shards share the same env and therefore the same label). + if label := fsmApplySyncModeLabelFromRuntimes(runtimes); label != "" { + metricsRegistry.SetFSMApplySyncMode(label) + } + cleanup := internalutil.CleanupStack{} defer cleanup.Run() @@ -430,6 +439,35 @@ func raftMonitorRuntimes(runtimes []*raftGroupRuntime) []monitoring.RaftRuntime return out } +// fsmApplySyncModeLabeler narrows an MVCCStore to those implementations +// that can report the resolved ELASTICKV_FSM_SYNC_MODE label. The +// pebble-backed store satisfies this today; alternate backends (none +// yet) would either implement it or be skipped. +type fsmApplySyncModeLabeler interface { + FSMApplySyncModeLabel() string +} + +// fsmApplySyncModeLabelFromRuntimes returns the FSM apply sync-mode +// label resolved by the first shard store that exposes it. All shards +// on a node read the same ELASTICKV_FSM_SYNC_MODE env var at +// construction time so the label is uniform across the runtimes; +// returning the first one suffices. Returns "" when no runtime +// exposes the accessor, in which case the caller skips emitting the +// gauge to avoid publishing a misleading default. +func fsmApplySyncModeLabelFromRuntimes(runtimes []*raftGroupRuntime) string { + for _, runtime := range runtimes { + if runtime == nil || runtime.store == nil { + continue + } + src, ok := runtime.store.(fsmApplySyncModeLabeler) + if !ok { + continue + } + return src.FSMApplySyncModeLabel() + } + return "" +} + // pebbleMonitorSources extracts the MVCC stores that expose // *pebble.DB.Metrics() so monitoring can poll LSM internals (L0 // sublevels, compaction debt, memtable, block cache) for the diff --git a/monitoring/pebble.go b/monitoring/pebble.go index 9313077a..59133099 100644 --- a/monitoring/pebble.go +++ b/monitoring/pebble.go @@ -51,6 +51,13 @@ type PebbleMetrics struct { blockCacheCapacityBytes *prometheus.GaugeVec blockCacheHitsTotal *prometheus.CounterVec blockCacheMissesTotal *prometheus.CounterVec + + // FSM apply sync mode. Resolved once from ELASTICKV_FSM_SYNC_MODE at + // process start (see store/lsm_store.go). The label-scoped gauge is + // set to 1 for the active mode (either "sync" or "nosync") and 0 for + // the other, so dashboards can alert on unexpected posture changes + // (e.g. a rolling deploy that accidentally drops durability). + fsmApplySyncMode *prometheus.GaugeVec } func newPebbleMetrics(registerer prometheus.Registerer) *PebbleMetrics { @@ -139,6 +146,13 @@ func newPebbleMetrics(registerer prometheus.Registerer) *PebbleMetrics { }, []string{"group"}, ), + fsmApplySyncMode: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "elastickv_fsm_apply_sync_mode", + Help: "Active ELASTICKV_FSM_SYNC_MODE on this node. Gauge is 1 for the active mode and 0 for the other. \"sync\" means every FSM apply issues a Pebble fsync; \"nosync\" relies on raft-log replay for crash recovery of the FSM state.", + }, + []string{"mode"}, + ), } registerer.MustRegister( @@ -154,10 +168,39 @@ func newPebbleMetrics(registerer prometheus.Registerer) *PebbleMetrics { m.blockCacheCapacityBytes, m.blockCacheHitsTotal, m.blockCacheMissesTotal, + m.fsmApplySyncMode, ) return m } +// SetFSMApplySyncMode records which ELASTICKV_FSM_SYNC_MODE is active. +// activeLabel must be "sync" or "nosync"; any other value is coerced to +// "sync" to match the store resolver's fallback behaviour for unknown +// ELASTICKV_FSM_SYNC_MODE values (see store.resolveFSMApplyWriteOpts). +// This keeps the gauge's two-row shape stable: exactly one of +// {"sync","nosync"} is 1 at any time and the other is 0. +// +// Call this once at startup after the store package has resolved the +// env var. Invoking again is safe and idempotent: the new label goes to +// 1 and the other known label goes to 0. +func (m *PebbleMetrics) SetFSMApplySyncMode(activeLabel string) { + if m == nil || m.fsmApplySyncMode == nil { + return + } + // Coerce unknown labels to "sync" so the gauge never leaks a third + // row and so a prior stale label cannot stay pinned at 1. This + // mirrors store.resolveFSMApplyWriteOpts, which also falls back to + // sync on unrecognised input. + if activeLabel != "sync" && activeLabel != "nosync" { + activeLabel = "sync" + } + // Zero both known labels before setting the active one so the gauge + // has a stable two-row shape regardless of call ordering. + m.fsmApplySyncMode.WithLabelValues("sync").Set(0) + m.fsmApplySyncMode.WithLabelValues("nosync").Set(0) + m.fsmApplySyncMode.WithLabelValues(activeLabel).Set(1) +} + // PebbleMetricsSource abstracts the per-group access to a Pebble DB's // Metrics(). The concrete *store pebbleStore satisfies this via its // Metrics() accessor. Returning nil (e.g. store closed mid-restore) is diff --git a/monitoring/pebble_test.go b/monitoring/pebble_test.go index f3edc52e..dc9cb270 100644 --- a/monitoring/pebble_test.go +++ b/monitoring/pebble_test.go @@ -265,3 +265,76 @@ func TestPebbleCollectorZeroRegistryIsSafe(t *testing.T) { collector := registry.PebbleCollector() require.NotPanics(t, func() { collector.ObserveOnce(nil) }) } + +// TestSetFSMApplySyncMode_LabelsAreMutuallyExclusive verifies that +// calling SetFSMApplySyncMode("nosync") drives the sync label to 0 +// and the nosync label to 1 (and vice versa). Operators rely on this +// gauge to alert on unexpected durability posture changes, so the +// two-row shape must stay stable across successive calls. +func TestSetFSMApplySyncMode_LabelsAreMutuallyExclusive(t *testing.T) { + registry := NewRegistry("n1", "10.0.0.1:50051") + + registry.SetFSMApplySyncMode("sync") + require.InEpsilon(t, + float64(1), + testutil.ToFloat64(registry.pebble.fsmApplySyncMode.WithLabelValues("sync")), + 0.0, + ) + require.InDelta(t, + float64(0), + testutil.ToFloat64(registry.pebble.fsmApplySyncMode.WithLabelValues("nosync")), + 0.0, + ) + + registry.SetFSMApplySyncMode("nosync") + require.InDelta(t, + float64(0), + testutil.ToFloat64(registry.pebble.fsmApplySyncMode.WithLabelValues("sync")), + 0.0, + ) + require.InEpsilon(t, + float64(1), + testutil.ToFloat64(registry.pebble.fsmApplySyncMode.WithLabelValues("nosync")), + 0.0, + ) +} + +// TestSetFSMApplySyncMode_NilRegistryIsSafe matches the pattern of +// other monitoring helpers: bootstrap paths that construct an engine +// without a registry (tests, the redis-proxy binary) must not panic. +func TestSetFSMApplySyncMode_NilRegistryIsSafe(t *testing.T) { + var r *Registry + require.NotPanics(t, func() { r.SetFSMApplySyncMode("sync") }) +} + +// TestSetFSMApplySyncMode_UnknownLabelCoercesToSync verifies that an +// unrecognised label is coerced to "sync" rather than pinning a third +// row on the gauge. This mirrors store.resolveFSMApplyWriteOpts, which +// also falls back to sync on unknown input; the two paths must agree +// or the gauge will disagree with the actual WriteOptions the store +// uses. +func TestSetFSMApplySyncMode_UnknownLabelCoercesToSync(t *testing.T) { + registry := NewRegistry("n1", "10.0.0.1:50051") + + // Prime the gauge with a legitimate nosync posture so we can prove + // the follow-up unknown-label call flips sync to 1 (not leaves it + // at its prior value). + registry.SetFSMApplySyncMode("nosync") + require.InEpsilon(t, + float64(1), + testutil.ToFloat64(registry.pebble.fsmApplySyncMode.WithLabelValues("nosync")), + 0.0, + ) + + registry.SetFSMApplySyncMode("batch") // never implemented, treated as sync + require.InEpsilon(t, + float64(1), + testutil.ToFloat64(registry.pebble.fsmApplySyncMode.WithLabelValues("sync")), + 0.0, + ) + require.InDelta(t, + float64(0), + testutil.ToFloat64(registry.pebble.fsmApplySyncMode.WithLabelValues("nosync")), + 0.0, + ) +} diff --git a/monitoring/registry.go b/monitoring/registry.go index 07e06927..9f73d979 100644 --- a/monitoring/registry.go +++ b/monitoring/registry.go @@ -148,6 +148,16 @@ func (r *Registry) PebbleCollector() *PebbleCollector { return newPebbleCollector(r.pebble) } +// SetFSMApplySyncMode forwards the resolved ELASTICKV_FSM_SYNC_MODE +// label to the PebbleMetrics gauge so operators can observe the active +// durability posture on this node. Safe to call with a nil registry. +func (r *Registry) SetFSMApplySyncMode(activeLabel string) { + if r == nil || r.pebble == nil { + return + } + r.pebble.SetFSMApplySyncMode(activeLabel) +} + // WriteConflictCollector returns a collector that polls each MVCC // store's per-(kind, key_prefix) OCC conflict counters and mirrors // them into the elastickv_store_write_conflict_total Prometheus diff --git a/store/lsm_store.go b/store/lsm_store.go index d888d4b3..2bbe12b3 100644 --- a/store/lsm_store.go +++ b/store/lsm_store.go @@ -13,6 +13,7 @@ import ( "os" "path/filepath" "strconv" + "strings" "sync" "github.com/cockroachdb/errors" @@ -61,6 +62,31 @@ const ( // mebibyteShift converts MiB to bytes via x << mebibyteShift. Named to // avoid a magic-number lint violation on the shift amount. mebibyteShift = 20 + + // fsmSyncModeEnv selects the Pebble WriteOptions used on the FSM + // commit path (ApplyMutations, DeletePrefixAt). Values: + // + // "sync" (default) — b.Commit(pebble.Sync); every committed raft + // entry triggers an fsync on the Pebble WAL. + // Strongest local durability; slowest. + // "nosync" — b.Commit(pebble.NoSync); the Pebble WAL + // still records the write, but is not fsynced. + // Durability still holds because the raft WAL + // (etcd/raft) fsyncs the committed entry + // upstream, and on restart the raft log is + // replayed from the last FSM-snapshot index; + // any apply that did not reach Pebble's + // fsync'd region is re-applied. + // + // The default is "sync" so production behaviour is unchanged without + // an explicit opt-in. See docs/fsm_sync_mode.md (or the PR body) for + // the full durability argument. + fsmSyncModeEnv = "ELASTICKV_FSM_SYNC_MODE" + + // fsmSyncModeSync / fsmSyncModeNoSync are the accepted values for + // fsmSyncModeEnv. Any other value falls back to the default. + fsmSyncModeSync = "sync" + fsmSyncModeNoSync = "nosync" ) // pebbleCacheBytes is the effective per-store Pebble block-cache capacity, @@ -78,6 +104,23 @@ func init() { pebbleCacheBytes = resolvePebbleCacheBytes(os.Getenv(pebbleCacheMBEnv)) } +// resolveFSMApplyWriteOpts parses an ELASTICKV_FSM_SYNC_MODE value and +// returns both the *pebble.WriteOptions used on the FSM commit path and +// the canonical label name. Case is normalised. Empty, malformed, or +// unrecognised values fall back to the default ("sync"). +// +// Exported via package-internal calls only; tests use it directly. +func resolveFSMApplyWriteOpts(envVal string) (*pebble.WriteOptions, string) { + switch strings.ToLower(strings.TrimSpace(envVal)) { + case fsmSyncModeNoSync: + return pebble.NoSync, fsmSyncModeNoSync + case "", fsmSyncModeSync: + return pebble.Sync, fsmSyncModeSync + default: + return pebble.Sync, fsmSyncModeSync + } +} + // resolvePebbleCacheBytes parses an ELASTICKV_PEBBLE_CACHE_MB value and // returns the resolved cache size in bytes. Empty, malformed, or // out-of-range values are rejected and fall back to the default rather @@ -127,6 +170,18 @@ type pebbleStore struct { // detected inside ApplyMutations. Polled by the monitoring // WriteConflictCollector; not part of the authoritative OCC path. writeConflicts *writeConflictCounter + // fsmApplyWriteOpts is the Pebble WriteOptions value applied on the + // FSM commit path (ApplyMutations, DeletePrefixAt). Resolved once + // from ELASTICKV_FSM_SYNC_MODE in NewPebbleStore and then treated + // as read-only for the store's lifetime. The default is pebble.Sync; + // operators may opt into pebble.NoSync when the raft WAL's + // durability is considered sufficient. + fsmApplyWriteOpts *pebble.WriteOptions + // fsmApplySyncModeLabel is the human-readable label corresponding + // to fsmApplyWriteOpts ("sync" or "nosync"). Kept alongside the + // write-options pointer so monitoring (elastickv_fsm_apply_sync_mode) + // and log lines stay in sync with the resolved mode. + fsmApplySyncModeLabel string } // Ensure pebbleStore implements MVCCStore and RetentionController. @@ -173,12 +228,15 @@ func defaultPebbleOptionsWithCache() (*pebble.Options, *pebble.Cache) { // NewPebbleStore creates a new Pebble-backed MVCC store. func NewPebbleStore(dir string, opts ...PebbleStoreOption) (MVCCStore, error) { + fsmOpts, fsmLabel := resolveFSMApplyWriteOpts(os.Getenv(fsmSyncModeEnv)) s := &pebbleStore{ dir: dir, log: slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ Level: slog.LevelWarn, })), - writeConflicts: newWriteConflictCounter(), + writeConflicts: newWriteConflictCounter(), + fsmApplyWriteOpts: fsmOpts, + fsmApplySyncModeLabel: fsmLabel, } for _, opt := range opts { opt(s) @@ -424,6 +482,15 @@ func (s *pebbleStore) LastCommitTS() uint64 { return s.lastCommitTS } +// FSMApplySyncModeLabel returns the resolved FSM sync-mode label +// ("sync" or "nosync") for this store. Consumed by monitoring to +// surface the current durability posture as a gauge with a mode label. +// The value is fixed for the store's lifetime (resolved once from +// ELASTICKV_FSM_SYNC_MODE in NewPebbleStore) so no locking is needed. +func (s *pebbleStore) FSMApplySyncModeLabel() string { + return s.fsmApplySyncModeLabel +} + func (s *pebbleStore) MinRetainedTS() uint64 { s.mtx.RLock() defer s.mtx.RUnlock() @@ -1053,7 +1120,11 @@ func (s *pebbleStore) ApplyMutations(ctx context.Context, mutations []*KVPairMut s.mtx.Unlock() return err } - if err := b.Commit(pebble.Sync); err != nil { + // s.fsmApplyWriteOpts is Sync by default. Operators may opt in to NoSync + // via ELASTICKV_FSM_SYNC_MODE=nosync when the raft WAL's durability is + // considered sufficient (raft-log replay from the last FSM snapshot + // re-applies any entries lost from Pebble after a crash). + if err := b.Commit(s.fsmApplyWriteOpts); err != nil { s.mtx.Unlock() return errors.WithStack(err) } @@ -1105,7 +1176,9 @@ func (s *pebbleStore) DeletePrefixAt(ctx context.Context, prefix []byte, exclude if err := setPebbleUint64InBatch(batch, metaLastCommitTSBytes, newLastTS); err != nil { return err } - if err := batch.Commit(pebble.Sync); err != nil { + // See ApplyMutations for the durability argument behind + // s.fsmApplyWriteOpts (ELASTICKV_FSM_SYNC_MODE). + if err := batch.Commit(s.fsmApplyWriteOpts); err != nil { return errors.WithStack(err) } s.updateLastCommitTS(newLastTS) diff --git a/store/lsm_store_env_test.go b/store/lsm_store_env_test.go index 92b4b454..addf7d8d 100644 --- a/store/lsm_store_env_test.go +++ b/store/lsm_store_env_test.go @@ -3,6 +3,7 @@ package store import ( "testing" + "github.com/cockroachdb/pebble/v2" "github.com/stretchr/testify/require" ) @@ -88,3 +89,93 @@ func TestDefaultPebbleOptionsCarriesCache(t *testing.T) { require.Equal(t, int64(16)<<20, cache.MaxSize()) cache.Unref() } + +// newPebbleStoreWithFSMApplyWriteOptsForTest constructs a pebbleStore +// (not the MVCCStore interface) with an explicit *pebble.WriteOptions +// and sync-mode label for the FSM commit path, bypassing the +// ELASTICKV_FSM_SYNC_MODE env resolution. Tests use this to exercise +// both sync and nosync modes deterministically without mutating +// os.Environ (which would leak into parallel test binaries). +// +// The store is created via NewPebbleStore and then the relevant +// fields are overridden; this keeps the full init path (cache, +// metadata scans, etc.) identical to production while only swapping +// the write-options that govern commit-time durability. +func newPebbleStoreWithFSMApplyWriteOptsForTest(t *testing.T, dir string, opts *pebble.WriteOptions, label string) *pebbleStore { + t.Helper() + s, err := NewPebbleStore(dir) + require.NoError(t, err) + ps, ok := s.(*pebbleStore) + require.True(t, ok, "NewPebbleStore returned non-*pebbleStore type") + ps.fsmApplyWriteOpts = opts + ps.fsmApplySyncModeLabel = label + return ps +} + +// TestFSMApplySyncModeEnvOverride covers the ELASTICKV_FSM_SYNC_MODE +// parsing contract directly against resolveFSMApplyWriteOpts, which is +// what NewPebbleStore calls. Mirrors the approach taken for +// ELASTICKV_PEBBLE_CACHE_MB to avoid mutating os.Environ at runtime. +func TestFSMApplySyncModeEnvOverride(t *testing.T) { + t.Run("empty string uses sync default", func(t *testing.T) { + opts, label := resolveFSMApplyWriteOpts("") + require.Same(t, pebble.Sync, opts) + require.Equal(t, fsmSyncModeSync, label) + }) + + t.Run("explicit sync is accepted", func(t *testing.T) { + opts, label := resolveFSMApplyWriteOpts("sync") + require.Same(t, pebble.Sync, opts) + require.Equal(t, fsmSyncModeSync, label) + }) + + t.Run("explicit nosync is accepted", func(t *testing.T) { + opts, label := resolveFSMApplyWriteOpts("nosync") + require.Same(t, pebble.NoSync, opts) + require.Equal(t, fsmSyncModeNoSync, label) + }) + + t.Run("mixed-case nosync is accepted", func(t *testing.T) { + opts, label := resolveFSMApplyWriteOpts("NoSync") + require.Same(t, pebble.NoSync, opts) + require.Equal(t, fsmSyncModeNoSync, label) + }) + + t.Run("whitespace is trimmed", func(t *testing.T) { + opts, label := resolveFSMApplyWriteOpts(" nosync\n") + require.Same(t, pebble.NoSync, opts) + require.Equal(t, fsmSyncModeNoSync, label) + }) + + t.Run("unknown value falls back to sync", func(t *testing.T) { + // "batch" was considered in the design discussion but never + // implemented; the resolver must not crash or silently enable + // NoSync if an operator sets an unsupported value. + opts, label := resolveFSMApplyWriteOpts("batch") + require.Same(t, pebble.Sync, opts) + require.Equal(t, fsmSyncModeSync, label) + }) + + t.Run("garbage falls back to sync", func(t *testing.T) { + opts, label := resolveFSMApplyWriteOpts("garbage") + require.Same(t, pebble.Sync, opts) + require.Equal(t, fsmSyncModeSync, label) + }) +} + +// TestFSMApplySyncModeLabelAccessor verifies that a constructed +// pebbleStore exposes its resolved sync-mode label via the per-instance +// accessor, so monitoring can read the mode off a concrete store +// instead of a package global. +func TestFSMApplySyncModeLabelAccessor(t *testing.T) { + t.Run("nosync", func(t *testing.T) { + ps := newPebbleStoreWithFSMApplyWriteOptsForTest(t, t.TempDir(), pebble.NoSync, fsmSyncModeNoSync) + defer ps.Close() + require.Equal(t, fsmSyncModeNoSync, ps.FSMApplySyncModeLabel()) + }) + t.Run("sync", func(t *testing.T) { + ps := newPebbleStoreWithFSMApplyWriteOptsForTest(t, t.TempDir(), pebble.Sync, fsmSyncModeSync) + defer ps.Close() + require.Equal(t, fsmSyncModeSync, ps.FSMApplySyncModeLabel()) + }) +} diff --git a/store/lsm_store_sync_mode_benchmark_test.go b/store/lsm_store_sync_mode_benchmark_test.go new file mode 100644 index 00000000..cc293118 --- /dev/null +++ b/store/lsm_store_sync_mode_benchmark_test.go @@ -0,0 +1,76 @@ +package store + +import ( + "context" + "fmt" + "testing" + + "github.com/cockroachdb/pebble/v2" +) + +// BenchmarkApplyMutations_SyncMode measures per-op latency and +// throughput of the FSM commit path under each +// ELASTICKV_FSM_SYNC_MODE value. The benchmark is write-heavy and +// serial: each iteration issues one ApplyMutations on a fresh (key, +// commitTS) pair with a single Put mutation, exercising the single- +// fsync hot path. +// +// Run with: +// +// go test ./store -run='^$' -bench='BenchmarkApplyMutations_SyncMode' -benchtime=2s -benchmem +// +// The sync/nosync ratio (not absolute numbers, which are disk- +// dependent) is the signal of interest. On a laptop SSD, nosync +// typically runs 10-50x faster per op; the exact multiplier reflects +// how cheap the platform's fsync is on a freshly-created WAL file. +func BenchmarkApplyMutations_SyncMode(b *testing.B) { + cases := []struct { + name string + opts *pebble.WriteOptions + }{ + {name: "sync", opts: pebble.Sync}, + {name: "nosync", opts: pebble.NoSync}, + } + for _, tc := range cases { + b.Run(tc.name, func(b *testing.B) { + dir := b.TempDir() + s, err := NewPebbleStore(dir) + if err != nil { + b.Fatalf("NewPebbleStore: %v", err) + } + defer s.Close() + ps, ok := s.(*pebbleStore) + if !ok { + b.Fatalf("NewPebbleStore returned non-*pebbleStore type: %T", s) + } + ps.fsmApplyWriteOpts = tc.opts + if tc.opts == pebble.NoSync { + ps.fsmApplySyncModeLabel = fsmSyncModeNoSync + } else { + ps.fsmApplySyncModeLabel = fsmSyncModeSync + } + + ctx := context.Background() + val := make([]byte, 64) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + key := []byte(fmt.Sprintf("bench-%010d", i)) + muts := []*KVPairMutation{{Op: OpTypePut, Key: key, Value: val}} + // startTS must be strictly < commitTS and distinct across + // iterations to avoid MVCC write-conflict. Guard the + // int -> uint64 conversion to satisfy gosec G115; i is + // non-negative here by construction (loop from 0) but + // the linter cannot prove it. + if i < 0 { + b.Fatalf("unexpected negative iteration counter: %d", i) + } + startTS := uint64(i) * 2 + commitTS := startTS + 1 + if err := s.ApplyMutations(ctx, muts, nil, startTS, commitTS); err != nil { + b.Fatalf("ApplyMutations: %v", err) + } + } + }) + } +} diff --git a/store/lsm_store_sync_mode_test.go b/store/lsm_store_sync_mode_test.go new file mode 100644 index 00000000..3d95ca0c --- /dev/null +++ b/store/lsm_store_sync_mode_test.go @@ -0,0 +1,79 @@ +package store + +import ( + "context" + "testing" + + "github.com/cockroachdb/pebble/v2" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestApplyMutations_NoSyncFunctionalEquivalence verifies that the +// NoSync FSM commit mode produces the same observable state as the +// default Sync mode for a well-behaved (no-crash) workload. This is +// the "happy-path" contract: operators turning on NoSync must not see +// correctness regressions in steady-state operation; the durability +// trade-off only surfaces on crash. +// +// The crash-recovery half of the contract (raft-log replay re-applying +// any entries lost from Pebble after fsync was skipped) is handled by +// kv/fsm.applyCommitWithIdempotencyFallback, which treats +// already-applied keys (LatestCommitTS >= commitTS) as idempotent +// retries. At this layer we do not exercise the raft engine; we only +// verify that switching write options does not change the visible +// store state. +func TestApplyMutations_NoSyncFunctionalEquivalence(t *testing.T) { + dir := t.TempDir() + s := newPebbleStoreWithFSMApplyWriteOptsForTest(t, dir, pebble.NoSync, fsmSyncModeNoSync) + defer s.Close() + + ctx := context.Background() + + mutations := []*KVPairMutation{ + {Op: OpTypePut, Key: []byte("k1"), Value: []byte("v1")}, + {Op: OpTypePut, Key: []byte("k2"), Value: []byte("v2")}, + } + require.NoError(t, s.ApplyMutations(ctx, mutations, nil, 0, 10)) + + val, err := s.GetAt(ctx, []byte("k1"), 10) + require.NoError(t, err) + assert.Equal(t, []byte("v1"), val) + + val, err = s.GetAt(ctx, []byte("k2"), 10) + require.NoError(t, err) + assert.Equal(t, []byte("v2"), val) +} + +// TestApplyMutations_NoSyncReopenVisibility verifies that after a +// clean Close and reopen of the Pebble DB in NoSync mode, writes that +// completed before Close remain visible. A clean Close drives a final +// Pebble flush, so this exercises the common graceful-shutdown path: +// NoSync only impacts crash recovery of un-fsynced tail WAL entries, +// not orderly shutdowns. +// +// The deliberately-unfsynced crash case (kill -9 mid-apply, where +// Pebble's WAL tail is lost and must be reconstructed from raft) is +// inherently an OS-level scenario; it lives in the Jepsen / integration +// suite (see JEPSEN_TODO.md) rather than as a Go unit test, because +// simulating an fsync loss without actually yanking power requires a +// custom VFS shim that is not currently wired into Pebble at this +// layer. +func TestApplyMutations_NoSyncReopenVisibility(t *testing.T) { + dir := t.TempDir() + s := newPebbleStoreWithFSMApplyWriteOptsForTest(t, dir, pebble.NoSync, fsmSyncModeNoSync) + + ctx := context.Background() + mutations := []*KVPairMutation{ + {Op: OpTypePut, Key: []byte("keep"), Value: []byte("after-reopen")}, + } + require.NoError(t, s.ApplyMutations(ctx, mutations, nil, 0, 10)) + require.NoError(t, s.Close()) + + reopened := newPebbleStoreWithFSMApplyWriteOptsForTest(t, dir, pebble.NoSync, fsmSyncModeNoSync) + defer reopened.Close() + + val, err := reopened.GetAt(ctx, []byte("keep"), 10) + require.NoError(t, err) + assert.Equal(t, []byte("after-reopen"), val) +}