Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions kv/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (f *kvFSM) handleRawRequest(ctx context.Context, r *pb.Request, commitTS ui
}
// Raw requests always commit against the latest state; use commitTS as both
// the validation snapshot and the commit timestamp.
return errors.WithStack(f.store.ApplyMutations(ctx, muts, nil, commitTS, commitTS))
return errors.WithStack(f.store.ApplyMutationsRaft(ctx, muts, nil, commitTS, commitTS))
}

// extractDelPrefix checks if the mutations contain a DEL_PREFIX operation.
Expand All @@ -244,7 +244,7 @@ func extractDelPrefix(muts []*pb.Mutation) (bool, []byte) {
// handleDelPrefix delegates prefix deletion to the store. Transaction-internal
// keys are always excluded to preserve transactional integrity.
func (f *kvFSM) handleDelPrefix(ctx context.Context, prefix []byte, commitTS uint64) error {
return errors.WithStack(f.store.DeletePrefixAt(ctx, prefix, txnCommonPrefix, commitTS))
return errors.WithStack(f.store.DeletePrefixAtRaft(ctx, prefix, txnCommonPrefix, commitTS))
}

var ErrNotImplemented = errors.New("not implemented")
Expand Down Expand Up @@ -378,7 +378,7 @@ func (f *kvFSM) handlePrepareRequest(ctx context.Context, r *pb.Request) error {
return err
}

if err := f.store.ApplyMutations(ctx, storeMuts, r.ReadKeys, startTS, startTS); err != nil {
if err := f.store.ApplyMutationsRaft(ctx, storeMuts, r.ReadKeys, startTS, startTS); err != nil {
return errors.WithStack(err)
}
return nil
Expand Down Expand Up @@ -415,7 +415,7 @@ func (f *kvFSM) handleOnePhaseTxnRequest(ctx context.Context, r *pb.Request, com
if err != nil {
return err
}
return errors.WithStack(f.store.ApplyMutations(ctx, storeMuts, r.ReadKeys, startTS, commitTS))
return errors.WithStack(f.store.ApplyMutationsRaft(ctx, storeMuts, r.ReadKeys, startTS, commitTS))
}

func (f *kvFSM) handleCommitRequest(ctx context.Context, r *pb.Request) error {
Expand Down Expand Up @@ -500,7 +500,7 @@ func (f *kvFSM) commitApplyStartTS(ctx context.Context, primaryKey []byte, start
// The secondary-shard LatestCommitTS scan is intentionally deferred to the
// write-conflict path so the hot (first-time) commit path pays no extra cost.
func (f *kvFSM) applyCommitWithIdempotencyFallback(ctx context.Context, storeMuts []*store.KVPairMutation, uniq []*pb.Mutation, applyStartTS, commitTS uint64) error {
err := f.store.ApplyMutations(ctx, storeMuts, nil, applyStartTS, commitTS)
err := f.store.ApplyMutationsRaft(ctx, storeMuts, nil, applyStartTS, commitTS)
if err == nil {
return nil
}
Expand All @@ -517,7 +517,7 @@ func (f *kvFSM) applyCommitWithIdempotencyFallback(ctx context.Context, storeMut
return errors.WithStack(lErr)
}
if exists && latestTS >= commitTS {
return errors.WithStack(f.store.ApplyMutations(ctx, storeMuts, nil, commitTS, commitTS))
return errors.WithStack(f.store.ApplyMutationsRaft(ctx, storeMuts, nil, commitTS, commitTS))
}
}
return errors.WithStack(err)
Expand Down Expand Up @@ -568,7 +568,7 @@ func (f *kvFSM) handleAbortRequest(ctx context.Context, r *pb.Request, abortTS u
if len(storeMuts) == 0 {
return nil
}
return errors.WithStack(f.store.ApplyMutations(ctx, storeMuts, nil, startTS, abortTS))
return errors.WithStack(f.store.ApplyMutationsRaft(ctx, storeMuts, nil, startTS, abortTS))
}

func (f *kvFSM) buildPrepareStoreMutations(ctx context.Context, muts []*pb.Mutation, primaryKey []byte, startTS, expireAt uint64) ([]*store.KVPairMutation, error) {
Expand Down
17 changes: 17 additions & 0 deletions kv/leader_routed_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,13 +264,30 @@ func (s *LeaderRoutedStore) ApplyMutations(ctx context.Context, mutations []*sto
return errors.WithStack(s.local.ApplyMutations(ctx, mutations, readKeys, startTS, commitTS))
}

// ApplyMutationsRaft forwards to the local store's raft-apply variant. See
// store.MVCCStore for the durability contract.
func (s *LeaderRoutedStore) ApplyMutationsRaft(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error {
if s == nil || s.local == nil {
return errors.WithStack(store.ErrNotSupported)
}
return errors.WithStack(s.local.ApplyMutationsRaft(ctx, mutations, readKeys, startTS, commitTS))
}

func (s *LeaderRoutedStore) DeletePrefixAt(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error {
if s == nil || s.local == nil {
return errors.WithStack(store.ErrNotSupported)
}
return errors.WithStack(s.local.DeletePrefixAt(ctx, prefix, excludePrefix, commitTS))
}

// DeletePrefixAtRaft forwards to the local store's raft-apply variant.
func (s *LeaderRoutedStore) DeletePrefixAtRaft(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error {
if s == nil || s.local == nil {
return errors.WithStack(store.ErrNotSupported)
}
return errors.WithStack(s.local.DeletePrefixAtRaft(ctx, prefix, excludePrefix, commitTS))
}

func (s *LeaderRoutedStore) LastCommitTS() uint64 {
if s == nil || s.local == nil {
return 0
Expand Down
47 changes: 40 additions & 7 deletions kv/shard_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1127,25 +1127,45 @@ func cleanupTSWithNow(startTS, now uint64) uint64 {
// All mutations must belong to the same shard. Cross-shard mutation batches are
// not supported.
func (s *ShardStore) ApplyMutations(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error {
group, err := s.resolveSingleShardGroup(mutations)
if err != nil || group == nil {
return err
}
return errors.WithStack(group.Store.ApplyMutations(ctx, mutations, readKeys, startTS, commitTS))
}

// ApplyMutationsRaft is the raft-apply variant; see store.MVCCStore for the
// durability contract. Only the FSM may call this method.
func (s *ShardStore) ApplyMutationsRaft(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error {
group, err := s.resolveSingleShardGroup(mutations)
if err != nil || group == nil {
return err
}
return errors.WithStack(group.Store.ApplyMutationsRaft(ctx, mutations, readKeys, startTS, commitTS))
}

// resolveSingleShardGroup returns the shard group that owns every
// mutation in the batch, or an error if the batch is cross-shard or
// references an unknown group. A nil group with nil error means "empty
// batch — caller should no-op".
func (s *ShardStore) resolveSingleShardGroup(mutations []*store.KVPairMutation) (*ShardGroup, error) {
if len(mutations) == 0 {
return nil
return nil, nil
}
// Determine the shard group for the first mutation.
firstGroup, ok := s.groupForKey(mutations[0].Key)
if !ok || firstGroup == nil || firstGroup.Store == nil {
return store.ErrNotSupported
return nil, store.ErrNotSupported
}
// Ensure that all mutations in the batch belong to the same shard.
for i := 1; i < len(mutations); i++ {
g, ok := s.groupForKey(mutations[i].Key)
if !ok || g == nil || g.Store == nil {
return store.ErrNotSupported
return nil, store.ErrNotSupported
}
if g != firstGroup {
return errors.WithStack(ErrCrossShardMutationBatchNotSupported)
return nil, errors.WithStack(ErrCrossShardMutationBatchNotSupported)
}
}
return errors.WithStack(firstGroup.Store.ApplyMutations(ctx, mutations, readKeys, startTS, commitTS))
return firstGroup, nil
}

// DeletePrefixAt applies a prefix delete to every shard in the store.
Expand All @@ -1161,6 +1181,19 @@ func (s *ShardStore) DeletePrefixAt(ctx context.Context, prefix []byte, excludeP
return nil
}

// DeletePrefixAtRaft is the raft-apply variant of DeletePrefixAt.
func (s *ShardStore) DeletePrefixAtRaft(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error {
for _, g := range s.groups {
if g == nil || g.Store == nil {
continue
}
if err := g.Store.DeletePrefixAtRaft(ctx, prefix, excludePrefix, commitTS); err != nil {
return errors.WithStack(err)
}
}
return nil
}

func (s *ShardStore) LastCommitTS() uint64 {
var max uint64
for _, g := range s.groups {
Expand Down
83 changes: 68 additions & 15 deletions store/lsm_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ const (
// avoid a magic-number lint violation on the shift amount.
mebibyteShift = 20

// fsmSyncModeEnv selects the Pebble WriteOptions used on the FSM
// commit path (ApplyMutations, DeletePrefixAt). Values:
// fsmSyncModeEnv selects the Pebble WriteOptions used on the
// raft-apply FSM commit path (ApplyMutationsRaft / DeletePrefixAtRaft).
// Direct (non-raft) callers of ApplyMutations / DeletePrefixAt are
// unaffected by this knob and always use pebble.Sync. Values:
//
// "sync" (default) — b.Commit(pebble.Sync); every committed raft
// entry triggers an fsync on the Pebble WAL.
Expand Down Expand Up @@ -171,11 +173,13 @@ type pebbleStore struct {
// WriteConflictCollector; not part of the authoritative OCC path.
writeConflicts *writeConflictCounter
// fsmApplyWriteOpts is the Pebble WriteOptions value applied on the
// FSM commit path (ApplyMutations, DeletePrefixAt). Resolved once
// from ELASTICKV_FSM_SYNC_MODE in NewPebbleStore and then treated
// as read-only for the store's lifetime. The default is pebble.Sync;
// operators may opt into pebble.NoSync when the raft WAL's
// durability is considered sufficient.
// raft-apply FSM commit path (ApplyMutationsRaft / DeletePrefixAtRaft).
// Resolved once from ELASTICKV_FSM_SYNC_MODE in NewPebbleStore and
// then treated as read-only for the store's lifetime. The default is
// pebble.Sync; operators may opt into pebble.NoSync when the raft
// WAL's durability is considered sufficient. Direct (non-raft)
// callers of ApplyMutations / DeletePrefixAt use pebble.Sync
// unconditionally and are never affected by this field.
fsmApplyWriteOpts *pebble.WriteOptions
// fsmApplySyncModeLabel is the human-readable label corresponding
// to fsmApplyWriteOpts ("sync" or "nosync"). Kept alongside the
Expand Down Expand Up @@ -1084,7 +1088,46 @@ func (s *pebbleStore) applyMutationsBatch(b *pebble.Batch, mutations []*KVPairMu
return nil
}

// directApplyWriteOpts returns the Pebble WriteOptions used by the
// direct (non-raft) commit path. Always pebble.Sync: direct callers do
// not have raft-log replay as a durability backstop, so they must not
// be affected by ELASTICKV_FSM_SYNC_MODE=nosync.
//
// Exposed so tests can assert the non-raft path is always Sync even
// when the FSM-apply path has been reconfigured to NoSync.
func (s *pebbleStore) directApplyWriteOpts() *pebble.WriteOptions {
return pebble.Sync
}

// raftApplyWriteOpts returns the Pebble WriteOptions used by the
// raft-apply commit path, as configured by ELASTICKV_FSM_SYNC_MODE.
func (s *pebbleStore) raftApplyWriteOpts() *pebble.WriteOptions {
return s.fsmApplyWriteOpts
}

// ApplyMutations is the direct (non-raft) commit path. It unconditionally
// uses pebble.Sync so that callers without raft-log replay as a durability
// backstop (catalog bootstrap, admin snapshots, migrations, tests) are never
// affected by ELASTICKV_FSM_SYNC_MODE=nosync.
func (s *pebbleStore) ApplyMutations(ctx context.Context, mutations []*KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error {
return s.applyMutationsWithOpts(ctx, mutations, readKeys, startTS, commitTS, s.directApplyWriteOpts())
}

// ApplyMutationsRaft is the raft-apply commit path. Durability is governed
// by ELASTICKV_FSM_SYNC_MODE (s.fsmApplyWriteOpts, default pebble.Sync):
// operators may opt into pebble.NoSync when the raft WAL's fsync is
// considered the authoritative durability boundary. On crash, raft-log
// replay from the last FSM snapshot re-applies any entries lost from
// Pebble's un-fsynced WAL tail.
//
// Must only be called from inside the FSM apply loop. All other call sites
// must use ApplyMutations so a nosync opt-in cannot silently drop
// acknowledged writes that have no raft backstop.
func (s *pebbleStore) ApplyMutationsRaft(ctx context.Context, mutations []*KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error {
return s.applyMutationsWithOpts(ctx, mutations, readKeys, startTS, commitTS, s.raftApplyWriteOpts())
}

func (s *pebbleStore) applyMutationsWithOpts(ctx context.Context, mutations []*KVPairMutation, readKeys [][]byte, startTS, commitTS uint64, writeOpts *pebble.WriteOptions) error {
s.dbMu.RLock()
defer s.dbMu.RUnlock()

Expand Down Expand Up @@ -1120,11 +1163,7 @@ func (s *pebbleStore) ApplyMutations(ctx context.Context, mutations []*KVPairMut
s.mtx.Unlock()
return err
}
// s.fsmApplyWriteOpts is Sync by default. Operators may opt in to NoSync
// via ELASTICKV_FSM_SYNC_MODE=nosync when the raft WAL's durability is
// considered sufficient (raft-log replay from the last FSM snapshot
// re-applies any entries lost from Pebble after a crash).
if err := b.Commit(s.fsmApplyWriteOpts); err != nil {
if err := b.Commit(writeOpts); err != nil {
s.mtx.Unlock()
return errors.WithStack(err)
}
Expand All @@ -1138,7 +1177,23 @@ func (s *pebbleStore) ApplyMutations(ctx context.Context, mutations []*KVPairMut
// tombstone versions at commitTS. An empty prefix deletes all keys. Keys
// matching excludePrefix are preserved. Uses an iterator-based approach that
// avoids loading values into caller memory.
//
// Like ApplyMutations, this direct-commit entry point unconditionally uses
// pebble.Sync so non-raft callers are never affected by
// ELASTICKV_FSM_SYNC_MODE=nosync. Raft-apply callers must use
// DeletePrefixAtRaft instead.
func (s *pebbleStore) DeletePrefixAt(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error {
return s.deletePrefixAtWithOpts(ctx, prefix, excludePrefix, commitTS, s.directApplyWriteOpts())
}

// DeletePrefixAtRaft is the raft-apply variant of DeletePrefixAt. Durability
// is governed by s.fsmApplyWriteOpts (ELASTICKV_FSM_SYNC_MODE). See
// ApplyMutationsRaft for the full durability argument.
func (s *pebbleStore) DeletePrefixAtRaft(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error {
return s.deletePrefixAtWithOpts(ctx, prefix, excludePrefix, commitTS, s.raftApplyWriteOpts())
}

func (s *pebbleStore) deletePrefixAtWithOpts(_ context.Context, prefix []byte, excludePrefix []byte, commitTS uint64, writeOpts *pebble.WriteOptions) error {
s.dbMu.RLock()
defer s.dbMu.RUnlock()

Expand Down Expand Up @@ -1176,9 +1231,7 @@ func (s *pebbleStore) DeletePrefixAt(ctx context.Context, prefix []byte, exclude
if err := setPebbleUint64InBatch(batch, metaLastCommitTSBytes, newLastTS); err != nil {
return err
}
// See ApplyMutations for the durability argument behind
// s.fsmApplyWriteOpts (ELASTICKV_FSM_SYNC_MODE).
if err := batch.Commit(s.fsmApplyWriteOpts); err != nil {
if err := batch.Commit(writeOpts); err != nil {
return errors.WithStack(err)
}
s.updateLastCommitTS(newLastTS)
Expand Down
14 changes: 7 additions & 7 deletions store/lsm_store_sync_mode_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,22 @@ import (
"github.com/cockroachdb/pebble/v2"
)

// BenchmarkApplyMutations_SyncMode measures per-op latency and
// throughput of the FSM commit path under each
// BenchmarkApplyMutationsRaft_SyncMode measures per-op latency and
// throughput of the raft-apply FSM commit path under each
// ELASTICKV_FSM_SYNC_MODE value. The benchmark is write-heavy and
// serial: each iteration issues one ApplyMutations on a fresh (key,
// serial: each iteration issues one ApplyMutationsRaft on a fresh (key,
// commitTS) pair with a single Put mutation, exercising the single-
// fsync hot path.
//
// Run with:
//
// go test ./store -run='^$' -bench='BenchmarkApplyMutations_SyncMode' -benchtime=2s -benchmem
// go test ./store -run='^$' -bench='BenchmarkApplyMutationsRaft_SyncMode' -benchtime=2s -benchmem
//
// The sync/nosync ratio (not absolute numbers, which are disk-
// dependent) is the signal of interest. On a laptop SSD, nosync
// typically runs 10-50x faster per op; the exact multiplier reflects
// how cheap the platform's fsync is on a freshly-created WAL file.
func BenchmarkApplyMutations_SyncMode(b *testing.B) {
func BenchmarkApplyMutationsRaft_SyncMode(b *testing.B) {
cases := []struct {
name string
opts *pebble.WriteOptions
Expand Down Expand Up @@ -67,8 +67,8 @@ func BenchmarkApplyMutations_SyncMode(b *testing.B) {
}
startTS := uint64(i) * 2
commitTS := startTS + 1
if err := s.ApplyMutations(ctx, muts, nil, startTS, commitTS); err != nil {
b.Fatalf("ApplyMutations: %v", err)
if err := s.ApplyMutationsRaft(ctx, muts, nil, startTS, commitTS); err != nil {
b.Fatalf("ApplyMutationsRaft: %v", err)
}
}
})
Expand Down
Loading
Loading