Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 70 additions & 5 deletions kv/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,12 +457,28 @@ func (f *kvFSM) handleCommitRequest(ctx context.Context, r *pb.Request) error {
// commitApplyStartTS resolves the startTS to use for MVCC conflict detection
// during a COMMIT. If a commit record already exists for the primary key it
// returns commitTS (making the apply idempotent); otherwise it returns startTS.
//
// It is also the symmetric guard to appendRollbackRecord: if a rollback marker
// already exists for (primaryKey, startTS) the commit is rejected with
// ErrTxnAlreadyAborted. Together with the commit-record check in
// appendRollbackRecord, this enforces the invariant that at most one of
// {rollback marker, commit record} is present for any (primaryKey, startTS).
func (f *kvFSM) commitApplyStartTS(ctx context.Context, primaryKey []byte, startTS, commitTS uint64) (uint64, error) {
recordedCommitTS, committed, err := f.txnCommitTS(ctx, primaryKey, startTS)
if err != nil {
return 0, err
}
if !committed {
// No commit record yet: reject if a rollback marker is present.
// This catches out-of-order apply (COMMIT after ABORT), buggy
// clients, and replay races.
exists, rerr := f.store.ExistsAt(ctx, txnRollbackKey(primaryKey, startTS), ^uint64(0))
if rerr != nil {
return 0, errors.WithStack(rerr)
}
if exists {
return 0, errors.WithStack(ErrTxnAlreadyAborted)
}
return startTS, nil
}
if recordedCommitTS != commitTS {
Expand Down Expand Up @@ -525,6 +541,18 @@ func (f *kvFSM) handleAbortRequest(ctx context.Context, r *pb.Request, abortTS u
return errors.WithStack(ErrTxnCommitTSRequired)
}

// NOTE: do NOT short-circuit the whole request on rollback-marker
// presence. The marker only proves that SOME prior abort for this
// (primaryKey, startTS) ran; it does not prove cleanup ran for the
// specific keys in *this* request. In particular
// ShardStore.tryAbortExpiredPrimary issues an ABORT whose mutation
// list contains only the primary key, so a later lock-resolver
// abort for a secondary key (same primaryKey, same startTS) would
// see the marker already present and must still clean up that
// secondary's lock/intent. Idempotency is enforced per-key in
// shouldClearAbortKey (lock-missing ⇒ nothing to do) and for the
// rollback-marker Put in appendRollbackRecord.
Comment on lines +544 to +554
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description says handleAbortRequest will probe the rollback marker and return early if it exists, but the implementation explicitly avoids that (and relies on per-key idempotency instead). Please update the PR description to match the actual behavior (or adjust the code if the early-return approach is still intended).

Copilot uses AI. Check for mistakes.

uniq, err := uniqueMutations(muts)
if err != nil {
return err
Expand Down Expand Up @@ -622,13 +650,42 @@ func (f *kvFSM) buildAbortCleanupStoreMutations(ctx context.Context, muts []*pb.
}

func (f *kvFSM) appendRollbackRecord(ctx context.Context, primaryKey []byte, startTS uint64, storeMuts *[]*store.KVPairMutation) error {
// Don't allow rollback to win after commit record exists.
if _, err := f.store.GetAt(ctx, txnCommitKey(primaryKey, startTS), ^uint64(0)); err == nil {
return errors.WithStack(ErrTxnAlreadyCommitted)
} else if err != nil && !errors.Is(err, store.ErrKeyNotFound) {
// Desired invariant: for any (primaryKey, startTS) pair, at most
// one of {rollback marker, commit record} is present. The invariant
// holds when aborts/commits flow through the symmetric guards in
// this function and handleCommitRequest, but we cannot *assume* it
// on entry (a buggy client, replay, or race may violate it), so we
// verify it in-line below on both the first-time and idempotent
// paths.
//
// Idempotent rollback: if the marker already exists for this
// (primaryKey, startTS), skip the Put. Rollback markers are
// deterministic ({txnRollbackVersion}) and are written as normal
// values via Put; a second Put with applyStartTS=startTS would be
// rejected by the MVCC store as a write conflict because the key's
// latestCommitTS is already greater than startTS.
markerPresent, err := f.store.ExistsAt(ctx, txnRollbackKey(primaryKey, startTS), ^uint64(0))
if err != nil {
return errors.WithStack(err)
}

// Verify the invariant regardless of marker presence: if a commit
// record is present for this (primaryKey, startTS), refuse to
// write (or confirm) a rollback marker. This catches out-of-order
// apply where a COMMIT somehow landed after a prior ABORT, as
// well as the normal "commit wins over rollback" race.
commitExists, err := f.store.ExistsAt(ctx, txnCommitKey(primaryKey, startTS), ^uint64(0))
if err != nil {
return errors.WithStack(err)
}
if commitExists {
return errors.WithStack(ErrTxnAlreadyCommitted)
}
Comment on lines +667 to +683
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The ExistsAt calls in appendRollbackRecord appear to be performing Optimistic Concurrency Control (OCC) conflict checks. According to repository rules, these checks are not required at the store layer because the leader is responsible for issuing consistent timestamps. Consider removing these checks to simplify the logic and improve performance.

References
  1. Optimistic Concurrency Control (OCC) conflict checks are not required at the store layer because the leader is responsible for issuing consistent timestamps.

Comment on lines +667 to +683
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In appendRollbackRecord, the code performs two ExistsAt calls to enforce the invariant that a transaction cannot be both committed and aborted. To maintain consistency, ensure that the lock protecting these state variables is not released between the reads and the conditional check. Additionally, both ExistsAt calls must share the same start timestamp, and this timestamp should be used as the transaction's start timestamp for this read-modify-write operation to ensure a consistent snapshot view.

References
  1. In an MVCC system, all operations within a single transaction must share the same start timestamp to ensure atomicity and a consistent snapshot view.
  2. In transactional read-modify-write operations, set the transaction's start timestamp to the read timestamp to ensure a consistent snapshot and prevent race conditions.
  3. When performing atomic operations based on a version or state variable, do not release the lock between reading the variable and performing the conditional check to prevent race conditions and the use of stale data.


if markerPresent {
return nil
}
Comment on lines +667 to +687
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In appendRollbackRecord, if markerPresent is true, the code still performs a check for commitExists. While this is defensive and ensures the invariant (at most one of rollback marker or commit record exists), it adds an extra ExistsAt call on the idempotent path. Given that this path is identified as a hot path in production due to log spam, consider reordering the checks or short-circuiting if markerPresent is true, provided you are confident in the invariant's enforcement elsewhere.

References
  1. Short-circuit and return early when a state is already determined to avoid redundant or expensive operations, especially in hot paths.


*storeMuts = append(*storeMuts, &store.KVPairMutation{
Op: store.OpTypePut,
Key: txnRollbackKey(primaryKey, startTS),
Expand Down Expand Up @@ -773,11 +830,19 @@ func (f *kvFSM) commitTxnKeyMutations(ctx context.Context, key, primaryKey []byt
return out, nil
}

// shouldClearAbortKey reports whether this abort request must emit
// cleanup (lock+intent Delete) mutations for key. It returns false
// when the lock is already missing: lock/intent are always written
// and deleted together in a single ApplyMutations batch
// (lock missing ⇔ intent missing), so missing lock means either
// cleanup already ran for this (startTS, primaryKey) or the key was
// never prepared. Emitting Deletes on already-tombstoned keys would
// trigger MVCC write conflicts and has no observable effect.
func (f *kvFSM) shouldClearAbortKey(ctx context.Context, key, primaryKey []byte, startTS uint64) (bool, error) {
lockBytes, err := f.store.GetAt(ctx, txnLockKey(key), ^uint64(0))
if err != nil {
if errors.Is(err, store.ErrKeyNotFound) {
return true, nil
return false, nil
}
return false, errors.WithStack(err)
}
Expand Down
212 changes: 208 additions & 4 deletions kv/fsm_abort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,19 @@ func TestFSMAbort_AbortTSMustBeGreaterThanStartTS(t *testing.T) {
require.ErrorIs(t, err, ErrTxnCommitTSRequired)
}

func TestFSMAbort_SecondAbortSameTimestampConflicts(t *testing.T) {
// TestFSMAbort_SecondAbortSameKeysIsIdempotent pins post-fix
// behaviour: once a (primaryKey, startTS) abort has cleaned up its
// keys and written the rollback marker, a subsequent abort over the
// same mutation set must return nil without performing additional
// writes or store mutations (reads are allowed — the idempotent path
// still probes for the rollback marker and commit record via ExistsAt).
// Idempotency is enforced per-key in shouldClearAbortKey (lock
// already gone ⇒ skip) and by appendRollbackRecord (marker already
// present ⇒ skip). The prior behaviour (write-conflict on the
// rollback-marker Put) surfaced in prod as "secondary write failed"
// log spam whenever dualwrite replay or the lock resolver raced a
// completed abort.
func TestFSMAbort_SecondAbortSameKeysIsIdempotent(t *testing.T) {
t.Parallel()

ctx := context.Background()
Expand All @@ -318,10 +330,155 @@ func TestFSMAbort_SecondAbortSameTimestampConflicts(t *testing.T) {
_, err = st.GetAt(ctx, txnRollbackKey(primary, startTS), ^uint64(0))
require.NoError(t, err)

// Second abort with the same abortTS triggers a write conflict from the
// MVCC store because the rollback key was already written at abortTS.
// Same-abortTS retry: used to conflict; now must be a no-op.
err = abortTxn(t, fsm, primary, startTS, abortTS, [][]byte{primary, key})
require.Error(t, err, "second abort at the same abortTS should conflict")
require.NoError(t, err, "same-abortTS retry must be idempotent")

// Later-abortTS retry (HLC-monotonic): also no-op. This is the
// prod path where a second lock resolver arrives seconds after
// the first one completed.
err = abortTxn(t, fsm, primary, startTS, abortTS+100, [][]byte{primary, key})
require.NoError(t, err, "later-abortTS retry must be idempotent")
}

// TestFSMAbort_SecondAbortDifferentKeysCleansRemainder is the
// regression test for the Copilot-flagged bug: the first abort's
// mutation list contains only the primary key (writes rollback
// marker + cleans the primary). A second abort carries a secondary
// key with the same primaryKey and startTS. The secondary's
// lock/intent MUST be cleaned up — a broad short-circuit on the
// rollback marker's presence would orphan them.
func TestFSMAbort_SecondAbortDifferentKeysCleansRemainder(t *testing.T) {
t.Parallel()

ctx := context.Background()
st := store.NewMVCCStore()
fsm, ok := NewKvFSMWithHLC(st, NewHLC()).(*kvFSM)
require.True(t, ok)

startTS := uint64(10)
abortTS := uint64(20)
primary := []byte("A")
secondaryB := []byte("B")
secondaryC := []byte("C")

// Prepare locks on A (primary), B, C.
prepareTxn(t, fsm, primary, startTS,
[][]byte{primary, secondaryB, secondaryC},
[][]byte{[]byte("va"), []byte("vb"), []byte("vc")})

// Sanity: all three locks exist.
for _, k := range [][]byte{primary, secondaryB, secondaryC} {
_, err := st.GetAt(ctx, txnLockKey(k), ^uint64(0))
require.NoError(t, err, "lock for %q must exist after prepare", string(k))
}

// First abort: mimics ShardStore.tryAbortExpiredPrimary — only
// the primary key is in the mutation list. This writes the
// rollback marker on the primary shard and cleans up A's lock.
err := abortTxn(t, fsm, primary, startTS, abortTS, [][]byte{primary})
require.NoError(t, err)

// Primary's lock/intent are gone.
_, err = st.GetAt(ctx, txnLockKey(primary), ^uint64(0))
require.ErrorIs(t, err, store.ErrKeyNotFound)
_, err = st.GetAt(ctx, txnIntentKey(primary), ^uint64(0))
require.ErrorIs(t, err, store.ErrKeyNotFound)

// Rollback marker is present.
_, err = st.GetAt(ctx, txnRollbackKey(primary, startTS), ^uint64(0))
require.NoError(t, err)

// Secondaries' locks/intents are still present (first abort did
// not touch them).
_, err = st.GetAt(ctx, txnLockKey(secondaryB), ^uint64(0))
require.NoError(t, err, "B's lock must still exist before second abort")
_, err = st.GetAt(ctx, txnLockKey(secondaryC), ^uint64(0))
require.NoError(t, err, "C's lock must still exist before second abort")

// Second abort: mimics the lock-resolver path — same primaryKey
// and startTS, but mutations carry a secondary key. This MUST
// clean up B's lock/intent despite the rollback marker already
// existing.
err = abortTxn(t, fsm, primary, startTS, abortTS+1, [][]byte{secondaryB})
require.NoError(t, err)

_, err = st.GetAt(ctx, txnLockKey(secondaryB), ^uint64(0))
require.ErrorIs(t, err, store.ErrKeyNotFound,
"B's lock must be cleaned by the second abort; "+
"short-circuiting on rollback-marker presence would orphan it")
_, err = st.GetAt(ctx, txnIntentKey(secondaryB), ^uint64(0))
require.ErrorIs(t, err, store.ErrKeyNotFound,
"B's intent must be cleaned by the second abort")

// Third abort for C: same story.
err = abortTxn(t, fsm, primary, startTS, abortTS+2, [][]byte{secondaryC})
require.NoError(t, err)
_, err = st.GetAt(ctx, txnLockKey(secondaryC), ^uint64(0))
require.ErrorIs(t, err, store.ErrKeyNotFound, "C's lock must be cleaned")
_, err = st.GetAt(ctx, txnIntentKey(secondaryC), ^uint64(0))
require.ErrorIs(t, err, store.ErrKeyNotFound, "C's intent must be cleaned")

// Rollback marker still present (idempotent; no re-Put).
_, err = st.GetAt(ctx, txnRollbackKey(primary, startTS), ^uint64(0))
require.NoError(t, err)
}

// TestFSMAbort_LockResolverRaceLeavesNoOrphan simulates the prod
// flow: tryAbortExpiredPrimary issues an abort with ONLY the primary
// key, and then — racing that — lock resolvers for each secondary
// arrive one at a time (each abort carries one secondary key, same
// primaryKey, same startTS). After all of them have run, no lock or
// intent for the transaction may remain.
func TestFSMAbort_LockResolverRaceLeavesNoOrphan(t *testing.T) {
t.Parallel()

ctx := context.Background()
st := store.NewMVCCStore()
fsm, ok := NewKvFSMWithHLC(st, NewHLC()).(*kvFSM)
require.True(t, ok)

startTS := uint64(50)
abortTS := uint64(60)
primary := []byte("pk")
secondaries := [][]byte{[]byte("s1"), []byte("s2"), []byte("s3"), []byte("s4")}

allKeys := append([][]byte{primary}, secondaries...)
allValues := make([][]byte, 0, len(allKeys))
for range allKeys {
allValues = append(allValues, []byte("v"))
}

prepareTxn(t, fsm, primary, startTS, allKeys, allValues)

// tryAbortExpiredPrimary path: abort with only the primary key.
require.NoError(t, abortTxn(t, fsm, primary, startTS, abortTS, [][]byte{primary}))

// Per-secondary lock-resolver aborts (same primaryKey+startTS,
// one key each, increasing abortTS).
for i, k := range secondaries {
//nolint:gosec // loop index fits easily into uint64
bump := uint64(i) + 1
require.NoError(t, abortTxn(t, fsm, primary, startTS, abortTS+bump, [][]byte{k}))
}

// A redundant abort over all keys (a late dualwrite replay, say)
// must still be a no-op.
require.NoError(t, abortTxn(t, fsm, primary, startTS, abortTS+100, allKeys))

// No lock or intent for any key of the txn may remain.
for _, k := range allKeys {
_, err := st.GetAt(ctx, txnLockKey(k), ^uint64(0))
require.ErrorIs(t, err, store.ErrKeyNotFound,
"no orphan lock permitted for key %q", string(k))
_, err = st.GetAt(ctx, txnIntentKey(k), ^uint64(0))
require.ErrorIs(t, err, store.ErrKeyNotFound,
"no orphan intent permitted for key %q", string(k))
}
// Rollback marker is present exactly once; reading it at MaxTS
// must succeed.
_, err := st.GetAt(ctx, txnRollbackKey(primary, startTS), ^uint64(0))
require.NoError(t, err, "rollback marker must be present")
}

func TestFSMAbort_MissingPrimaryKeyReturnsError(t *testing.T) {
Expand Down Expand Up @@ -349,6 +506,53 @@ func TestFSMAbort_MissingPrimaryKeyReturnsError(t *testing.T) {
require.ErrorIs(t, err, ErrTxnPrimaryKeyRequired)
}

// TestFSMAbort_CommitAfterAbortIsRejected pins the symmetric guard in
// commitApplyStartTS: if a rollback marker is already present for a
// (primaryKey, startTS), an incoming COMMIT must be rejected with
// ErrTxnAlreadyAborted instead of silently creating the inconsistent
// (marker + commit record both exist) state.
func TestFSMAbort_CommitAfterAbortIsRejected(t *testing.T) {
t.Parallel()

ctx := context.Background()
st := store.NewMVCCStore()
fsm, ok := NewKvFSMWithHLC(st, NewHLC()).(*kvFSM)
require.True(t, ok)

startTS := uint64(10)
abortTS := uint64(20)
commitTS := uint64(30)
primary := []byte("pk")

// Prepare, then abort — marker is now written.
prepareTxn(t, fsm, primary, startTS, [][]byte{primary}, [][]byte{[]byte("v")})
require.NoError(t, abortTxn(t, fsm, primary, startTS, abortTS, [][]byte{primary}))

// Confirm marker exists and no commit record exists.
_, err := st.GetAt(ctx, txnRollbackKey(primary, startTS), ^uint64(0))
require.NoError(t, err, "rollback marker must exist after abort")
_, err = st.GetAt(ctx, txnCommitKey(primary, startTS), ^uint64(0))
require.ErrorIs(t, err, store.ErrKeyNotFound, "commit record must NOT exist after abort")

// A COMMIT for the same (primaryKey, startTS) must be rejected.
commitReq := &pb.Request{
IsTxn: true,
Phase: pb.Phase_COMMIT,
Ts: startTS,
Mutations: []*pb.Mutation{
{Op: pb.Op_PUT, Key: []byte(txnMetaPrefix), Value: EncodeTxnMeta(TxnMeta{PrimaryKey: primary, CommitTS: commitTS})},
{Op: pb.Op_PUT, Key: primary},
},
}
err = applyFSMRequest(t, fsm, commitReq)
require.Error(t, err)
require.ErrorIs(t, err, ErrTxnAlreadyAborted)

// Invariant check: commit record must still be absent after the rejection.
_, err = st.GetAt(ctx, txnCommitKey(primary, startTS), ^uint64(0))
require.ErrorIs(t, err, store.ErrKeyNotFound, "commit record must not have been written after rejection")
}

func TestFSMAbort_EmptyMutationsReturnsError(t *testing.T) {
t.Parallel()

Expand Down
1 change: 1 addition & 0 deletions kv/txn_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ var (
ErrTxnLocked = errors.New("txn locked")
ErrTxnCommitTSRequired = errors.New("txn commit ts required")
ErrTxnAlreadyCommitted = errors.New("txn already committed")
ErrTxnAlreadyAborted = errors.New("txn already aborted")
ErrTxnPrimaryKeyRequired = errors.New("txn primary key required")
)

Expand Down
Loading