From 14d8e8fddd3c1b69430b3f9b7ac5c7d85fcb6312 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Fri, 24 Apr 2026 02:28:35 +0900 Subject: [PATCH] fix(store): scope FSM_SYNC_MODE=nosync to raft-apply callers only Previously ApplyMutations/DeletePrefixAt used s.fsmApplyWriteOpts unconditionally, which made ELASTICKV_FSM_SYNC_MODE=nosync affect direct (non-raft) callers that do NOT have raft-log replay as a durability backstop. Concrete production path: distribution.EnsureCatalogSnapshot -> CatalogStore.Save -> store.ApplyMutations. If the process crashed before Pebble flushed, those acknowledged writes could be lost with no raft entry to re-apply. This change splits the API: - ApplyMutations / DeletePrefixAt: always pebble.Sync. Safe for any caller, including those that bypass raft (catalog bootstrap, admin snapshots, migrations, tests). - ApplyMutationsRaft / DeletePrefixAtRaft: governed by s.fsmApplyWriteOpts (ELASTICKV_FSM_SYNC_MODE). Intended solely for the FSM apply loop; only the raft WAL fsync makes pebble.NoSync safe. kv/fsm.go is the only caller of the new *Raft methods. CatalogStore and every adapter test retain the always-sync ApplyMutations, so the nosync opt-in can no longer silently weaken durability outside the raft-apply path. MVCCStore / ShardStore / LeaderRoutedStore all implement both variants; the in-memory mvccStore has no WAL so both delegate to the same body. Tests: - Added TestDirectApplyWriteOpts_AlwaysSync asserting the direct path always resolves to pebble.Sync even when the FSM-apply mode is nosync. - Added TestDirectApplyMutations_NoSyncConfigured_StillWritesDurably for functional coverage of the public entry points under a NoSync-configured store. - Existing sync-mode tests + benchmark renamed to *Raft to reflect that the knob now only governs the raft-apply path. Addresses codex P2 on #592. --- kv/fsm.go | 14 ++-- kv/leader_routed_store.go | 17 +++++ kv/shard_store.go | 47 ++++++++++-- store/lsm_store.go | 83 +++++++++++++++++---- store/lsm_store_sync_mode_benchmark_test.go | 14 ++-- store/lsm_store_sync_mode_test.go | 80 ++++++++++++++++++-- store/mvcc_store.go | 13 ++++ store/store.go | 13 ++++ 8 files changed, 239 insertions(+), 42 deletions(-) diff --git a/kv/fsm.go b/kv/fsm.go index 772e0607f..3b8efafd6 100644 --- a/kv/fsm.go +++ b/kv/fsm.go @@ -227,7 +227,7 @@ func (f *kvFSM) handleRawRequest(ctx context.Context, r *pb.Request, commitTS ui } // Raw requests always commit against the latest state; use commitTS as both // the validation snapshot and the commit timestamp. - return errors.WithStack(f.store.ApplyMutations(ctx, muts, nil, commitTS, commitTS)) + return errors.WithStack(f.store.ApplyMutationsRaft(ctx, muts, nil, commitTS, commitTS)) } // extractDelPrefix checks if the mutations contain a DEL_PREFIX operation. @@ -244,7 +244,7 @@ func extractDelPrefix(muts []*pb.Mutation) (bool, []byte) { // handleDelPrefix delegates prefix deletion to the store. Transaction-internal // keys are always excluded to preserve transactional integrity. func (f *kvFSM) handleDelPrefix(ctx context.Context, prefix []byte, commitTS uint64) error { - return errors.WithStack(f.store.DeletePrefixAt(ctx, prefix, txnCommonPrefix, commitTS)) + return errors.WithStack(f.store.DeletePrefixAtRaft(ctx, prefix, txnCommonPrefix, commitTS)) } var ErrNotImplemented = errors.New("not implemented") @@ -378,7 +378,7 @@ func (f *kvFSM) handlePrepareRequest(ctx context.Context, r *pb.Request) error { return err } - if err := f.store.ApplyMutations(ctx, storeMuts, r.ReadKeys, startTS, startTS); err != nil { + if err := f.store.ApplyMutationsRaft(ctx, storeMuts, r.ReadKeys, startTS, startTS); err != nil { return errors.WithStack(err) } return nil @@ -415,7 +415,7 @@ func (f *kvFSM) handleOnePhaseTxnRequest(ctx context.Context, r *pb.Request, com if err != nil { return err } - return errors.WithStack(f.store.ApplyMutations(ctx, storeMuts, r.ReadKeys, startTS, commitTS)) + return errors.WithStack(f.store.ApplyMutationsRaft(ctx, storeMuts, r.ReadKeys, startTS, commitTS)) } func (f *kvFSM) handleCommitRequest(ctx context.Context, r *pb.Request) error { @@ -500,7 +500,7 @@ func (f *kvFSM) commitApplyStartTS(ctx context.Context, primaryKey []byte, start // The secondary-shard LatestCommitTS scan is intentionally deferred to the // write-conflict path so the hot (first-time) commit path pays no extra cost. func (f *kvFSM) applyCommitWithIdempotencyFallback(ctx context.Context, storeMuts []*store.KVPairMutation, uniq []*pb.Mutation, applyStartTS, commitTS uint64) error { - err := f.store.ApplyMutations(ctx, storeMuts, nil, applyStartTS, commitTS) + err := f.store.ApplyMutationsRaft(ctx, storeMuts, nil, applyStartTS, commitTS) if err == nil { return nil } @@ -517,7 +517,7 @@ func (f *kvFSM) applyCommitWithIdempotencyFallback(ctx context.Context, storeMut return errors.WithStack(lErr) } if exists && latestTS >= commitTS { - return errors.WithStack(f.store.ApplyMutations(ctx, storeMuts, nil, commitTS, commitTS)) + return errors.WithStack(f.store.ApplyMutationsRaft(ctx, storeMuts, nil, commitTS, commitTS)) } } return errors.WithStack(err) @@ -568,7 +568,7 @@ func (f *kvFSM) handleAbortRequest(ctx context.Context, r *pb.Request, abortTS u if len(storeMuts) == 0 { return nil } - return errors.WithStack(f.store.ApplyMutations(ctx, storeMuts, nil, startTS, abortTS)) + return errors.WithStack(f.store.ApplyMutationsRaft(ctx, storeMuts, nil, startTS, abortTS)) } func (f *kvFSM) buildPrepareStoreMutations(ctx context.Context, muts []*pb.Mutation, primaryKey []byte, startTS, expireAt uint64) ([]*store.KVPairMutation, error) { diff --git a/kv/leader_routed_store.go b/kv/leader_routed_store.go index f595032e2..9ecc8bfa3 100644 --- a/kv/leader_routed_store.go +++ b/kv/leader_routed_store.go @@ -264,6 +264,15 @@ func (s *LeaderRoutedStore) ApplyMutations(ctx context.Context, mutations []*sto return errors.WithStack(s.local.ApplyMutations(ctx, mutations, readKeys, startTS, commitTS)) } +// ApplyMutationsRaft forwards to the local store's raft-apply variant. See +// store.MVCCStore for the durability contract. +func (s *LeaderRoutedStore) ApplyMutationsRaft(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error { + if s == nil || s.local == nil { + return errors.WithStack(store.ErrNotSupported) + } + return errors.WithStack(s.local.ApplyMutationsRaft(ctx, mutations, readKeys, startTS, commitTS)) +} + func (s *LeaderRoutedStore) DeletePrefixAt(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error { if s == nil || s.local == nil { return errors.WithStack(store.ErrNotSupported) @@ -271,6 +280,14 @@ func (s *LeaderRoutedStore) DeletePrefixAt(ctx context.Context, prefix []byte, e return errors.WithStack(s.local.DeletePrefixAt(ctx, prefix, excludePrefix, commitTS)) } +// DeletePrefixAtRaft forwards to the local store's raft-apply variant. +func (s *LeaderRoutedStore) DeletePrefixAtRaft(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error { + if s == nil || s.local == nil { + return errors.WithStack(store.ErrNotSupported) + } + return errors.WithStack(s.local.DeletePrefixAtRaft(ctx, prefix, excludePrefix, commitTS)) +} + func (s *LeaderRoutedStore) LastCommitTS() uint64 { if s == nil || s.local == nil { return 0 diff --git a/kv/shard_store.go b/kv/shard_store.go index 84d1d6eb7..419c028eb 100644 --- a/kv/shard_store.go +++ b/kv/shard_store.go @@ -1127,25 +1127,45 @@ func cleanupTSWithNow(startTS, now uint64) uint64 { // All mutations must belong to the same shard. Cross-shard mutation batches are // not supported. func (s *ShardStore) ApplyMutations(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error { + group, err := s.resolveSingleShardGroup(mutations) + if err != nil || group == nil { + return err + } + return errors.WithStack(group.Store.ApplyMutations(ctx, mutations, readKeys, startTS, commitTS)) +} + +// ApplyMutationsRaft is the raft-apply variant; see store.MVCCStore for the +// durability contract. Only the FSM may call this method. +func (s *ShardStore) ApplyMutationsRaft(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error { + group, err := s.resolveSingleShardGroup(mutations) + if err != nil || group == nil { + return err + } + return errors.WithStack(group.Store.ApplyMutationsRaft(ctx, mutations, readKeys, startTS, commitTS)) +} + +// resolveSingleShardGroup returns the shard group that owns every +// mutation in the batch, or an error if the batch is cross-shard or +// references an unknown group. A nil group with nil error means "empty +// batch — caller should no-op". +func (s *ShardStore) resolveSingleShardGroup(mutations []*store.KVPairMutation) (*ShardGroup, error) { if len(mutations) == 0 { - return nil + return nil, nil } - // Determine the shard group for the first mutation. firstGroup, ok := s.groupForKey(mutations[0].Key) if !ok || firstGroup == nil || firstGroup.Store == nil { - return store.ErrNotSupported + return nil, store.ErrNotSupported } - // Ensure that all mutations in the batch belong to the same shard. for i := 1; i < len(mutations); i++ { g, ok := s.groupForKey(mutations[i].Key) if !ok || g == nil || g.Store == nil { - return store.ErrNotSupported + return nil, store.ErrNotSupported } if g != firstGroup { - return errors.WithStack(ErrCrossShardMutationBatchNotSupported) + return nil, errors.WithStack(ErrCrossShardMutationBatchNotSupported) } } - return errors.WithStack(firstGroup.Store.ApplyMutations(ctx, mutations, readKeys, startTS, commitTS)) + return firstGroup, nil } // DeletePrefixAt applies a prefix delete to every shard in the store. @@ -1161,6 +1181,19 @@ func (s *ShardStore) DeletePrefixAt(ctx context.Context, prefix []byte, excludeP return nil } +// DeletePrefixAtRaft is the raft-apply variant of DeletePrefixAt. +func (s *ShardStore) DeletePrefixAtRaft(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error { + for _, g := range s.groups { + if g == nil || g.Store == nil { + continue + } + if err := g.Store.DeletePrefixAtRaft(ctx, prefix, excludePrefix, commitTS); err != nil { + return errors.WithStack(err) + } + } + return nil +} + func (s *ShardStore) LastCommitTS() uint64 { var max uint64 for _, g := range s.groups { diff --git a/store/lsm_store.go b/store/lsm_store.go index 2bbe12b38..07f86a984 100644 --- a/store/lsm_store.go +++ b/store/lsm_store.go @@ -63,8 +63,10 @@ const ( // avoid a magic-number lint violation on the shift amount. mebibyteShift = 20 - // fsmSyncModeEnv selects the Pebble WriteOptions used on the FSM - // commit path (ApplyMutations, DeletePrefixAt). Values: + // fsmSyncModeEnv selects the Pebble WriteOptions used on the + // raft-apply FSM commit path (ApplyMutationsRaft / DeletePrefixAtRaft). + // Direct (non-raft) callers of ApplyMutations / DeletePrefixAt are + // unaffected by this knob and always use pebble.Sync. Values: // // "sync" (default) — b.Commit(pebble.Sync); every committed raft // entry triggers an fsync on the Pebble WAL. @@ -171,11 +173,13 @@ type pebbleStore struct { // WriteConflictCollector; not part of the authoritative OCC path. writeConflicts *writeConflictCounter // fsmApplyWriteOpts is the Pebble WriteOptions value applied on the - // FSM commit path (ApplyMutations, DeletePrefixAt). Resolved once - // from ELASTICKV_FSM_SYNC_MODE in NewPebbleStore and then treated - // as read-only for the store's lifetime. The default is pebble.Sync; - // operators may opt into pebble.NoSync when the raft WAL's - // durability is considered sufficient. + // raft-apply FSM commit path (ApplyMutationsRaft / DeletePrefixAtRaft). + // Resolved once from ELASTICKV_FSM_SYNC_MODE in NewPebbleStore and + // then treated as read-only for the store's lifetime. The default is + // pebble.Sync; operators may opt into pebble.NoSync when the raft + // WAL's durability is considered sufficient. Direct (non-raft) + // callers of ApplyMutations / DeletePrefixAt use pebble.Sync + // unconditionally and are never affected by this field. fsmApplyWriteOpts *pebble.WriteOptions // fsmApplySyncModeLabel is the human-readable label corresponding // to fsmApplyWriteOpts ("sync" or "nosync"). Kept alongside the @@ -1084,7 +1088,46 @@ func (s *pebbleStore) applyMutationsBatch(b *pebble.Batch, mutations []*KVPairMu return nil } +// directApplyWriteOpts returns the Pebble WriteOptions used by the +// direct (non-raft) commit path. Always pebble.Sync: direct callers do +// not have raft-log replay as a durability backstop, so they must not +// be affected by ELASTICKV_FSM_SYNC_MODE=nosync. +// +// Exposed so tests can assert the non-raft path is always Sync even +// when the FSM-apply path has been reconfigured to NoSync. +func (s *pebbleStore) directApplyWriteOpts() *pebble.WriteOptions { + return pebble.Sync +} + +// raftApplyWriteOpts returns the Pebble WriteOptions used by the +// raft-apply commit path, as configured by ELASTICKV_FSM_SYNC_MODE. +func (s *pebbleStore) raftApplyWriteOpts() *pebble.WriteOptions { + return s.fsmApplyWriteOpts +} + +// ApplyMutations is the direct (non-raft) commit path. It unconditionally +// uses pebble.Sync so that callers without raft-log replay as a durability +// backstop (catalog bootstrap, admin snapshots, migrations, tests) are never +// affected by ELASTICKV_FSM_SYNC_MODE=nosync. func (s *pebbleStore) ApplyMutations(ctx context.Context, mutations []*KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error { + return s.applyMutationsWithOpts(ctx, mutations, readKeys, startTS, commitTS, s.directApplyWriteOpts()) +} + +// ApplyMutationsRaft is the raft-apply commit path. Durability is governed +// by ELASTICKV_FSM_SYNC_MODE (s.fsmApplyWriteOpts, default pebble.Sync): +// operators may opt into pebble.NoSync when the raft WAL's fsync is +// considered the authoritative durability boundary. On crash, raft-log +// replay from the last FSM snapshot re-applies any entries lost from +// Pebble's un-fsynced WAL tail. +// +// Must only be called from inside the FSM apply loop. All other call sites +// must use ApplyMutations so a nosync opt-in cannot silently drop +// acknowledged writes that have no raft backstop. +func (s *pebbleStore) ApplyMutationsRaft(ctx context.Context, mutations []*KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error { + return s.applyMutationsWithOpts(ctx, mutations, readKeys, startTS, commitTS, s.raftApplyWriteOpts()) +} + +func (s *pebbleStore) applyMutationsWithOpts(ctx context.Context, mutations []*KVPairMutation, readKeys [][]byte, startTS, commitTS uint64, writeOpts *pebble.WriteOptions) error { s.dbMu.RLock() defer s.dbMu.RUnlock() @@ -1120,11 +1163,7 @@ func (s *pebbleStore) ApplyMutations(ctx context.Context, mutations []*KVPairMut s.mtx.Unlock() return err } - // s.fsmApplyWriteOpts is Sync by default. Operators may opt in to NoSync - // via ELASTICKV_FSM_SYNC_MODE=nosync when the raft WAL's durability is - // considered sufficient (raft-log replay from the last FSM snapshot - // re-applies any entries lost from Pebble after a crash). - if err := b.Commit(s.fsmApplyWriteOpts); err != nil { + if err := b.Commit(writeOpts); err != nil { s.mtx.Unlock() return errors.WithStack(err) } @@ -1138,7 +1177,23 @@ func (s *pebbleStore) ApplyMutations(ctx context.Context, mutations []*KVPairMut // tombstone versions at commitTS. An empty prefix deletes all keys. Keys // matching excludePrefix are preserved. Uses an iterator-based approach that // avoids loading values into caller memory. +// +// Like ApplyMutations, this direct-commit entry point unconditionally uses +// pebble.Sync so non-raft callers are never affected by +// ELASTICKV_FSM_SYNC_MODE=nosync. Raft-apply callers must use +// DeletePrefixAtRaft instead. func (s *pebbleStore) DeletePrefixAt(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error { + return s.deletePrefixAtWithOpts(ctx, prefix, excludePrefix, commitTS, s.directApplyWriteOpts()) +} + +// DeletePrefixAtRaft is the raft-apply variant of DeletePrefixAt. Durability +// is governed by s.fsmApplyWriteOpts (ELASTICKV_FSM_SYNC_MODE). See +// ApplyMutationsRaft for the full durability argument. +func (s *pebbleStore) DeletePrefixAtRaft(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error { + return s.deletePrefixAtWithOpts(ctx, prefix, excludePrefix, commitTS, s.raftApplyWriteOpts()) +} + +func (s *pebbleStore) deletePrefixAtWithOpts(_ context.Context, prefix []byte, excludePrefix []byte, commitTS uint64, writeOpts *pebble.WriteOptions) error { s.dbMu.RLock() defer s.dbMu.RUnlock() @@ -1176,9 +1231,7 @@ func (s *pebbleStore) DeletePrefixAt(ctx context.Context, prefix []byte, exclude if err := setPebbleUint64InBatch(batch, metaLastCommitTSBytes, newLastTS); err != nil { return err } - // See ApplyMutations for the durability argument behind - // s.fsmApplyWriteOpts (ELASTICKV_FSM_SYNC_MODE). - if err := batch.Commit(s.fsmApplyWriteOpts); err != nil { + if err := batch.Commit(writeOpts); err != nil { return errors.WithStack(err) } s.updateLastCommitTS(newLastTS) diff --git a/store/lsm_store_sync_mode_benchmark_test.go b/store/lsm_store_sync_mode_benchmark_test.go index cc293118c..bb1f02662 100644 --- a/store/lsm_store_sync_mode_benchmark_test.go +++ b/store/lsm_store_sync_mode_benchmark_test.go @@ -8,22 +8,22 @@ import ( "github.com/cockroachdb/pebble/v2" ) -// BenchmarkApplyMutations_SyncMode measures per-op latency and -// throughput of the FSM commit path under each +// BenchmarkApplyMutationsRaft_SyncMode measures per-op latency and +// throughput of the raft-apply FSM commit path under each // ELASTICKV_FSM_SYNC_MODE value. The benchmark is write-heavy and -// serial: each iteration issues one ApplyMutations on a fresh (key, +// serial: each iteration issues one ApplyMutationsRaft on a fresh (key, // commitTS) pair with a single Put mutation, exercising the single- // fsync hot path. // // Run with: // -// go test ./store -run='^$' -bench='BenchmarkApplyMutations_SyncMode' -benchtime=2s -benchmem +// go test ./store -run='^$' -bench='BenchmarkApplyMutationsRaft_SyncMode' -benchtime=2s -benchmem // // The sync/nosync ratio (not absolute numbers, which are disk- // dependent) is the signal of interest. On a laptop SSD, nosync // typically runs 10-50x faster per op; the exact multiplier reflects // how cheap the platform's fsync is on a freshly-created WAL file. -func BenchmarkApplyMutations_SyncMode(b *testing.B) { +func BenchmarkApplyMutationsRaft_SyncMode(b *testing.B) { cases := []struct { name string opts *pebble.WriteOptions @@ -67,8 +67,8 @@ func BenchmarkApplyMutations_SyncMode(b *testing.B) { } startTS := uint64(i) * 2 commitTS := startTS + 1 - if err := s.ApplyMutations(ctx, muts, nil, startTS, commitTS); err != nil { - b.Fatalf("ApplyMutations: %v", err) + if err := s.ApplyMutationsRaft(ctx, muts, nil, startTS, commitTS); err != nil { + b.Fatalf("ApplyMutationsRaft: %v", err) } } }) diff --git a/store/lsm_store_sync_mode_test.go b/store/lsm_store_sync_mode_test.go index 3d95ca0c4..e5e065bfd 100644 --- a/store/lsm_store_sync_mode_test.go +++ b/store/lsm_store_sync_mode_test.go @@ -9,8 +9,8 @@ import ( "github.com/stretchr/testify/require" ) -// TestApplyMutations_NoSyncFunctionalEquivalence verifies that the -// NoSync FSM commit mode produces the same observable state as the +// TestApplyMutationsRaft_NoSyncFunctionalEquivalence verifies that the +// NoSync FSM-apply commit mode produces the same observable state as the // default Sync mode for a well-behaved (no-crash) workload. This is // the "happy-path" contract: operators turning on NoSync must not see // correctness regressions in steady-state operation; the durability @@ -23,7 +23,11 @@ import ( // retries. At this layer we do not exercise the raft engine; we only // verify that switching write options does not change the visible // store state. -func TestApplyMutations_NoSyncFunctionalEquivalence(t *testing.T) { +// +// NOTE: tests exercise ApplyMutationsRaft — the raft-apply entry point — +// because ApplyMutations is unconditionally pebble.Sync and therefore +// independent of ELASTICKV_FSM_SYNC_MODE. +func TestApplyMutationsRaft_NoSyncFunctionalEquivalence(t *testing.T) { dir := t.TempDir() s := newPebbleStoreWithFSMApplyWriteOptsForTest(t, dir, pebble.NoSync, fsmSyncModeNoSync) defer s.Close() @@ -34,7 +38,7 @@ func TestApplyMutations_NoSyncFunctionalEquivalence(t *testing.T) { {Op: OpTypePut, Key: []byte("k1"), Value: []byte("v1")}, {Op: OpTypePut, Key: []byte("k2"), Value: []byte("v2")}, } - require.NoError(t, s.ApplyMutations(ctx, mutations, nil, 0, 10)) + require.NoError(t, s.ApplyMutationsRaft(ctx, mutations, nil, 0, 10)) val, err := s.GetAt(ctx, []byte("k1"), 10) require.NoError(t, err) @@ -59,7 +63,7 @@ func TestApplyMutations_NoSyncFunctionalEquivalence(t *testing.T) { // simulating an fsync loss without actually yanking power requires a // custom VFS shim that is not currently wired into Pebble at this // layer. -func TestApplyMutations_NoSyncReopenVisibility(t *testing.T) { +func TestApplyMutationsRaft_NoSyncReopenVisibility(t *testing.T) { dir := t.TempDir() s := newPebbleStoreWithFSMApplyWriteOptsForTest(t, dir, pebble.NoSync, fsmSyncModeNoSync) @@ -67,7 +71,7 @@ func TestApplyMutations_NoSyncReopenVisibility(t *testing.T) { mutations := []*KVPairMutation{ {Op: OpTypePut, Key: []byte("keep"), Value: []byte("after-reopen")}, } - require.NoError(t, s.ApplyMutations(ctx, mutations, nil, 0, 10)) + require.NoError(t, s.ApplyMutationsRaft(ctx, mutations, nil, 0, 10)) require.NoError(t, s.Close()) reopened := newPebbleStoreWithFSMApplyWriteOptsForTest(t, dir, pebble.NoSync, fsmSyncModeNoSync) @@ -77,3 +81,67 @@ func TestApplyMutations_NoSyncReopenVisibility(t *testing.T) { require.NoError(t, err) assert.Equal(t, []byte("after-reopen"), val) } + +// TestDirectApplyWriteOpts_AlwaysSync verifies the correctness contract +// introduced by the ApplyMutations / ApplyMutationsRaft split: the +// direct (non-raft) commit path must always use pebble.Sync so callers +// that bypass raft (catalog bootstrap via CatalogStore.Save, +// EnsureCatalogSnapshot, admin snapshots, migrations) are never +// affected by ELASTICKV_FSM_SYNC_MODE=nosync. The raft-apply path +// (ApplyMutationsRaft / DeletePrefixAtRaft) observes the knob as +// before; the direct path must not. +func TestDirectApplyWriteOpts_AlwaysSync(t *testing.T) { + t.Run("nosync-configured store still syncs direct ApplyMutations", func(t *testing.T) { + ps := newPebbleStoreWithFSMApplyWriteOptsForTest(t, t.TempDir(), pebble.NoSync, fsmSyncModeNoSync) + defer ps.Close() + + // Raft-apply path observes the NoSync opt-in. + require.Same(t, pebble.NoSync, ps.raftApplyWriteOpts(), + "raft-apply path must observe ELASTICKV_FSM_SYNC_MODE=nosync") + + // Direct (non-raft) path must remain Sync regardless of the + // FSM-apply mode. This is the correctness guarantee: callers + // without a raft-log durability backstop must never have + // their fsync elided by the FSM-apply knob. + require.Same(t, pebble.Sync, ps.directApplyWriteOpts(), + "direct commit path must be pebble.Sync regardless of FSM-apply mode") + }) + + t.Run("sync-configured store: both paths use Sync", func(t *testing.T) { + ps := newPebbleStoreWithFSMApplyWriteOptsForTest(t, t.TempDir(), pebble.Sync, fsmSyncModeSync) + defer ps.Close() + + require.Same(t, pebble.Sync, ps.raftApplyWriteOpts()) + require.Same(t, pebble.Sync, ps.directApplyWriteOpts()) + }) +} + +// TestDirectApplyMutations_NoSyncConfigured_StillWritesDurably is the +// functional twin of TestDirectApplyWriteOpts_AlwaysSync: it exercises +// the public ApplyMutations and DeletePrefixAt entry points with a +// NoSync-configured FSM-apply mode and asserts the data is visible +// after a clean reopen. This does not assert fsync at the syscall +// level (that requires a VFS shim; see JEPSEN_TODO.md), but it +// guarantees the public direct-path API still behaves correctly +// under the split. +func TestDirectApplyMutations_NoSyncConfigured_StillWritesDurably(t *testing.T) { + dir := t.TempDir() + s := newPebbleStoreWithFSMApplyWriteOptsForTest(t, dir, pebble.NoSync, fsmSyncModeNoSync) + + ctx := context.Background() + // Direct ApplyMutations (the path CatalogStore.Save takes). + require.NoError(t, s.ApplyMutations(ctx, []*KVPairMutation{ + {Op: OpTypePut, Key: []byte("direct-key"), Value: []byte("direct-val")}, + }, nil, 0, 5)) + // Direct DeletePrefixAt. + require.NoError(t, s.DeletePrefixAt(ctx, []byte("nothing-here"), nil, 6)) + + require.NoError(t, s.Close()) + + reopened := newPebbleStoreWithFSMApplyWriteOptsForTest(t, dir, pebble.NoSync, fsmSyncModeNoSync) + defer reopened.Close() + + val, err := reopened.GetAt(ctx, []byte("direct-key"), 10) + require.NoError(t, err) + assert.Equal(t, []byte("direct-val"), val) +} diff --git a/store/mvcc_store.go b/store/mvcc_store.go index c96e15a51..fbd1a800f 100644 --- a/store/mvcc_store.go +++ b/store/mvcc_store.go @@ -464,6 +464,13 @@ func (s *mvccStore) LatestCommitTS(_ context.Context, key []byte) (uint64, bool, return ver.TS, true, nil } +// ApplyMutationsRaft is provided to satisfy the MVCCStore interface. The +// in-memory store has no WAL and therefore no sync-mode distinction; this +// method delegates to ApplyMutations. +func (s *mvccStore) ApplyMutationsRaft(ctx context.Context, mutations []*KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error { + return s.ApplyMutations(ctx, mutations, readKeys, startTS, commitTS) +} + func (s *mvccStore) ApplyMutations(ctx context.Context, mutations []*KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error { s.mtx.Lock() defer s.mtx.Unlock() @@ -527,6 +534,12 @@ func (s *mvccStore) WriteConflictCount() uint64 { // DeletePrefixAt deletes all visible keys matching prefix by writing tombstones // at commitTS. An empty prefix deletes all keys. Keys matching excludePrefix // are preserved. +// DeletePrefixAtRaft delegates to DeletePrefixAt; the in-memory store has +// no WAL and therefore no sync-mode distinction. +func (s *mvccStore) DeletePrefixAtRaft(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error { + return s.DeletePrefixAt(ctx, prefix, excludePrefix, commitTS) +} + func (s *mvccStore) DeletePrefixAt(_ context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error { s.mtx.Lock() defer s.mtx.Unlock() diff --git a/store/store.go b/store/store.go index 8c914acba..41f495cd1 100644 --- a/store/store.go +++ b/store/store.go @@ -150,12 +150,25 @@ type MVCCStore interface { // read barrier followed by LatestCommitTS outside the FSM lock. A // small TOCTOU window exists between the barrier and the check. ApplyMutations(ctx context.Context, mutations []*KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error + // ApplyMutationsRaft is the raft-apply variant of ApplyMutations. It + // carries identical MVCC semantics but is governed by the FSM-commit + // sync-mode knob (ELASTICKV_FSM_SYNC_MODE). Callers MUST only use this + // when the write is part of a raft-log apply — the raft WAL is the + // durability backstop that makes an un-fsynced Pebble write safe. + // + // Direct (non-raft) callers (catalog bootstrap, admin snapshots, + // migrations, tests) must use ApplyMutations, which is always + // pebble.Sync and therefore safe without raft-log replay. + ApplyMutationsRaft(ctx context.Context, mutations []*KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error // DeletePrefixAt atomically deletes all visible (non-tombstone, non-expired) // keys matching prefix at commitTS by writing tombstone versions. An empty // prefix means "all keys". Keys matching excludePrefix are preserved. // No conflict checking is performed; this is intended for bulk operations // such as FLUSHALL where the caller knows no conflict check is needed. DeletePrefixAt(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error + // DeletePrefixAtRaft is the raft-apply variant of DeletePrefixAt with + // the same durability contract as ApplyMutationsRaft. + DeletePrefixAtRaft(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error // LastCommitTS returns the highest commit timestamp applied on this node. LastCommitTS() uint64 // WriteConflictCountsByPrefix returns a snapshot of the MVCC