diff --git a/kv/fsm.go b/kv/fsm.go index 8e2a5853..53239622 100644 --- a/kv/fsm.go +++ b/kv/fsm.go @@ -457,12 +457,28 @@ func (f *kvFSM) handleCommitRequest(ctx context.Context, r *pb.Request) error { // commitApplyStartTS resolves the startTS to use for MVCC conflict detection // during a COMMIT. If a commit record already exists for the primary key it // returns commitTS (making the apply idempotent); otherwise it returns startTS. +// +// It is also the symmetric guard to appendRollbackRecord: if a rollback marker +// already exists for (primaryKey, startTS) the commit is rejected with +// ErrTxnAlreadyAborted. Together with the commit-record check in +// appendRollbackRecord, this enforces the invariant that at most one of +// {rollback marker, commit record} is present for any (primaryKey, startTS). func (f *kvFSM) commitApplyStartTS(ctx context.Context, primaryKey []byte, startTS, commitTS uint64) (uint64, error) { recordedCommitTS, committed, err := f.txnCommitTS(ctx, primaryKey, startTS) if err != nil { return 0, err } if !committed { + // No commit record yet: reject if a rollback marker is present. + // This catches out-of-order apply (COMMIT after ABORT), buggy + // clients, and replay races. + exists, rerr := f.store.ExistsAt(ctx, txnRollbackKey(primaryKey, startTS), ^uint64(0)) + if rerr != nil { + return 0, errors.WithStack(rerr) + } + if exists { + return 0, errors.WithStack(ErrTxnAlreadyAborted) + } return startTS, nil } if recordedCommitTS != commitTS { @@ -525,6 +541,18 @@ func (f *kvFSM) handleAbortRequest(ctx context.Context, r *pb.Request, abortTS u return errors.WithStack(ErrTxnCommitTSRequired) } + // NOTE: do NOT short-circuit the whole request on rollback-marker + // presence. The marker only proves that SOME prior abort for this + // (primaryKey, startTS) ran; it does not prove cleanup ran for the + // specific keys in *this* request. In particular + // ShardStore.tryAbortExpiredPrimary issues an ABORT whose mutation + // list contains only the primary key, so a later lock-resolver + // abort for a secondary key (same primaryKey, same startTS) would + // see the marker already present and must still clean up that + // secondary's lock/intent. Idempotency is enforced per-key in + // shouldClearAbortKey (lock-missing ⇒ nothing to do) and for the + // rollback-marker Put in appendRollbackRecord. + uniq, err := uniqueMutations(muts) if err != nil { return err @@ -622,13 +650,42 @@ func (f *kvFSM) buildAbortCleanupStoreMutations(ctx context.Context, muts []*pb. } func (f *kvFSM) appendRollbackRecord(ctx context.Context, primaryKey []byte, startTS uint64, storeMuts *[]*store.KVPairMutation) error { - // Don't allow rollback to win after commit record exists. - if _, err := f.store.GetAt(ctx, txnCommitKey(primaryKey, startTS), ^uint64(0)); err == nil { - return errors.WithStack(ErrTxnAlreadyCommitted) - } else if err != nil && !errors.Is(err, store.ErrKeyNotFound) { + // Desired invariant: for any (primaryKey, startTS) pair, at most + // one of {rollback marker, commit record} is present. The invariant + // holds when aborts/commits flow through the symmetric guards in + // this function and handleCommitRequest, but we cannot *assume* it + // on entry (a buggy client, replay, or race may violate it), so we + // verify it in-line below on both the first-time and idempotent + // paths. + // + // Idempotent rollback: if the marker already exists for this + // (primaryKey, startTS), skip the Put. Rollback markers are + // deterministic ({txnRollbackVersion}) and are written as normal + // values via Put; a second Put with applyStartTS=startTS would be + // rejected by the MVCC store as a write conflict because the key's + // latestCommitTS is already greater than startTS. + markerPresent, err := f.store.ExistsAt(ctx, txnRollbackKey(primaryKey, startTS), ^uint64(0)) + if err != nil { return errors.WithStack(err) } + // Verify the invariant regardless of marker presence: if a commit + // record is present for this (primaryKey, startTS), refuse to + // write (or confirm) a rollback marker. This catches out-of-order + // apply where a COMMIT somehow landed after a prior ABORT, as + // well as the normal "commit wins over rollback" race. + commitExists, err := f.store.ExistsAt(ctx, txnCommitKey(primaryKey, startTS), ^uint64(0)) + if err != nil { + return errors.WithStack(err) + } + if commitExists { + return errors.WithStack(ErrTxnAlreadyCommitted) + } + + if markerPresent { + return nil + } + *storeMuts = append(*storeMuts, &store.KVPairMutation{ Op: store.OpTypePut, Key: txnRollbackKey(primaryKey, startTS), @@ -773,11 +830,19 @@ func (f *kvFSM) commitTxnKeyMutations(ctx context.Context, key, primaryKey []byt return out, nil } +// shouldClearAbortKey reports whether this abort request must emit +// cleanup (lock+intent Delete) mutations for key. It returns false +// when the lock is already missing: lock/intent are always written +// and deleted together in a single ApplyMutations batch +// (lock missing ⇔ intent missing), so missing lock means either +// cleanup already ran for this (startTS, primaryKey) or the key was +// never prepared. Emitting Deletes on already-tombstoned keys would +// trigger MVCC write conflicts and has no observable effect. func (f *kvFSM) shouldClearAbortKey(ctx context.Context, key, primaryKey []byte, startTS uint64) (bool, error) { lockBytes, err := f.store.GetAt(ctx, txnLockKey(key), ^uint64(0)) if err != nil { if errors.Is(err, store.ErrKeyNotFound) { - return true, nil + return false, nil } return false, errors.WithStack(err) } diff --git a/kv/fsm_abort_test.go b/kv/fsm_abort_test.go index 87cae98e..8498c783 100644 --- a/kv/fsm_abort_test.go +++ b/kv/fsm_abort_test.go @@ -291,7 +291,19 @@ func TestFSMAbort_AbortTSMustBeGreaterThanStartTS(t *testing.T) { require.ErrorIs(t, err, ErrTxnCommitTSRequired) } -func TestFSMAbort_SecondAbortSameTimestampConflicts(t *testing.T) { +// TestFSMAbort_SecondAbortSameKeysIsIdempotent pins post-fix +// behaviour: once a (primaryKey, startTS) abort has cleaned up its +// keys and written the rollback marker, a subsequent abort over the +// same mutation set must return nil without performing additional +// writes or store mutations (reads are allowed — the idempotent path +// still probes for the rollback marker and commit record via ExistsAt). +// Idempotency is enforced per-key in shouldClearAbortKey (lock +// already gone ⇒ skip) and by appendRollbackRecord (marker already +// present ⇒ skip). The prior behaviour (write-conflict on the +// rollback-marker Put) surfaced in prod as "secondary write failed" +// log spam whenever dualwrite replay or the lock resolver raced a +// completed abort. +func TestFSMAbort_SecondAbortSameKeysIsIdempotent(t *testing.T) { t.Parallel() ctx := context.Background() @@ -318,10 +330,155 @@ func TestFSMAbort_SecondAbortSameTimestampConflicts(t *testing.T) { _, err = st.GetAt(ctx, txnRollbackKey(primary, startTS), ^uint64(0)) require.NoError(t, err) - // Second abort with the same abortTS triggers a write conflict from the - // MVCC store because the rollback key was already written at abortTS. + // Same-abortTS retry: used to conflict; now must be a no-op. err = abortTxn(t, fsm, primary, startTS, abortTS, [][]byte{primary, key}) - require.Error(t, err, "second abort at the same abortTS should conflict") + require.NoError(t, err, "same-abortTS retry must be idempotent") + + // Later-abortTS retry (HLC-monotonic): also no-op. This is the + // prod path where a second lock resolver arrives seconds after + // the first one completed. + err = abortTxn(t, fsm, primary, startTS, abortTS+100, [][]byte{primary, key}) + require.NoError(t, err, "later-abortTS retry must be idempotent") +} + +// TestFSMAbort_SecondAbortDifferentKeysCleansRemainder is the +// regression test for the Copilot-flagged bug: the first abort's +// mutation list contains only the primary key (writes rollback +// marker + cleans the primary). A second abort carries a secondary +// key with the same primaryKey and startTS. The secondary's +// lock/intent MUST be cleaned up — a broad short-circuit on the +// rollback marker's presence would orphan them. +func TestFSMAbort_SecondAbortDifferentKeysCleansRemainder(t *testing.T) { + t.Parallel() + + ctx := context.Background() + st := store.NewMVCCStore() + fsm, ok := NewKvFSMWithHLC(st, NewHLC()).(*kvFSM) + require.True(t, ok) + + startTS := uint64(10) + abortTS := uint64(20) + primary := []byte("A") + secondaryB := []byte("B") + secondaryC := []byte("C") + + // Prepare locks on A (primary), B, C. + prepareTxn(t, fsm, primary, startTS, + [][]byte{primary, secondaryB, secondaryC}, + [][]byte{[]byte("va"), []byte("vb"), []byte("vc")}) + + // Sanity: all three locks exist. + for _, k := range [][]byte{primary, secondaryB, secondaryC} { + _, err := st.GetAt(ctx, txnLockKey(k), ^uint64(0)) + require.NoError(t, err, "lock for %q must exist after prepare", string(k)) + } + + // First abort: mimics ShardStore.tryAbortExpiredPrimary — only + // the primary key is in the mutation list. This writes the + // rollback marker on the primary shard and cleans up A's lock. + err := abortTxn(t, fsm, primary, startTS, abortTS, [][]byte{primary}) + require.NoError(t, err) + + // Primary's lock/intent are gone. + _, err = st.GetAt(ctx, txnLockKey(primary), ^uint64(0)) + require.ErrorIs(t, err, store.ErrKeyNotFound) + _, err = st.GetAt(ctx, txnIntentKey(primary), ^uint64(0)) + require.ErrorIs(t, err, store.ErrKeyNotFound) + + // Rollback marker is present. + _, err = st.GetAt(ctx, txnRollbackKey(primary, startTS), ^uint64(0)) + require.NoError(t, err) + + // Secondaries' locks/intents are still present (first abort did + // not touch them). + _, err = st.GetAt(ctx, txnLockKey(secondaryB), ^uint64(0)) + require.NoError(t, err, "B's lock must still exist before second abort") + _, err = st.GetAt(ctx, txnLockKey(secondaryC), ^uint64(0)) + require.NoError(t, err, "C's lock must still exist before second abort") + + // Second abort: mimics the lock-resolver path — same primaryKey + // and startTS, but mutations carry a secondary key. This MUST + // clean up B's lock/intent despite the rollback marker already + // existing. + err = abortTxn(t, fsm, primary, startTS, abortTS+1, [][]byte{secondaryB}) + require.NoError(t, err) + + _, err = st.GetAt(ctx, txnLockKey(secondaryB), ^uint64(0)) + require.ErrorIs(t, err, store.ErrKeyNotFound, + "B's lock must be cleaned by the second abort; "+ + "short-circuiting on rollback-marker presence would orphan it") + _, err = st.GetAt(ctx, txnIntentKey(secondaryB), ^uint64(0)) + require.ErrorIs(t, err, store.ErrKeyNotFound, + "B's intent must be cleaned by the second abort") + + // Third abort for C: same story. + err = abortTxn(t, fsm, primary, startTS, abortTS+2, [][]byte{secondaryC}) + require.NoError(t, err) + _, err = st.GetAt(ctx, txnLockKey(secondaryC), ^uint64(0)) + require.ErrorIs(t, err, store.ErrKeyNotFound, "C's lock must be cleaned") + _, err = st.GetAt(ctx, txnIntentKey(secondaryC), ^uint64(0)) + require.ErrorIs(t, err, store.ErrKeyNotFound, "C's intent must be cleaned") + + // Rollback marker still present (idempotent; no re-Put). + _, err = st.GetAt(ctx, txnRollbackKey(primary, startTS), ^uint64(0)) + require.NoError(t, err) +} + +// TestFSMAbort_LockResolverRaceLeavesNoOrphan simulates the prod +// flow: tryAbortExpiredPrimary issues an abort with ONLY the primary +// key, and then — racing that — lock resolvers for each secondary +// arrive one at a time (each abort carries one secondary key, same +// primaryKey, same startTS). After all of them have run, no lock or +// intent for the transaction may remain. +func TestFSMAbort_LockResolverRaceLeavesNoOrphan(t *testing.T) { + t.Parallel() + + ctx := context.Background() + st := store.NewMVCCStore() + fsm, ok := NewKvFSMWithHLC(st, NewHLC()).(*kvFSM) + require.True(t, ok) + + startTS := uint64(50) + abortTS := uint64(60) + primary := []byte("pk") + secondaries := [][]byte{[]byte("s1"), []byte("s2"), []byte("s3"), []byte("s4")} + + allKeys := append([][]byte{primary}, secondaries...) + allValues := make([][]byte, 0, len(allKeys)) + for range allKeys { + allValues = append(allValues, []byte("v")) + } + + prepareTxn(t, fsm, primary, startTS, allKeys, allValues) + + // tryAbortExpiredPrimary path: abort with only the primary key. + require.NoError(t, abortTxn(t, fsm, primary, startTS, abortTS, [][]byte{primary})) + + // Per-secondary lock-resolver aborts (same primaryKey+startTS, + // one key each, increasing abortTS). + for i, k := range secondaries { + //nolint:gosec // loop index fits easily into uint64 + bump := uint64(i) + 1 + require.NoError(t, abortTxn(t, fsm, primary, startTS, abortTS+bump, [][]byte{k})) + } + + // A redundant abort over all keys (a late dualwrite replay, say) + // must still be a no-op. + require.NoError(t, abortTxn(t, fsm, primary, startTS, abortTS+100, allKeys)) + + // No lock or intent for any key of the txn may remain. + for _, k := range allKeys { + _, err := st.GetAt(ctx, txnLockKey(k), ^uint64(0)) + require.ErrorIs(t, err, store.ErrKeyNotFound, + "no orphan lock permitted for key %q", string(k)) + _, err = st.GetAt(ctx, txnIntentKey(k), ^uint64(0)) + require.ErrorIs(t, err, store.ErrKeyNotFound, + "no orphan intent permitted for key %q", string(k)) + } + // Rollback marker is present exactly once; reading it at MaxTS + // must succeed. + _, err := st.GetAt(ctx, txnRollbackKey(primary, startTS), ^uint64(0)) + require.NoError(t, err, "rollback marker must be present") } func TestFSMAbort_MissingPrimaryKeyReturnsError(t *testing.T) { @@ -349,6 +506,53 @@ func TestFSMAbort_MissingPrimaryKeyReturnsError(t *testing.T) { require.ErrorIs(t, err, ErrTxnPrimaryKeyRequired) } +// TestFSMAbort_CommitAfterAbortIsRejected pins the symmetric guard in +// commitApplyStartTS: if a rollback marker is already present for a +// (primaryKey, startTS), an incoming COMMIT must be rejected with +// ErrTxnAlreadyAborted instead of silently creating the inconsistent +// (marker + commit record both exist) state. +func TestFSMAbort_CommitAfterAbortIsRejected(t *testing.T) { + t.Parallel() + + ctx := context.Background() + st := store.NewMVCCStore() + fsm, ok := NewKvFSMWithHLC(st, NewHLC()).(*kvFSM) + require.True(t, ok) + + startTS := uint64(10) + abortTS := uint64(20) + commitTS := uint64(30) + primary := []byte("pk") + + // Prepare, then abort — marker is now written. + prepareTxn(t, fsm, primary, startTS, [][]byte{primary}, [][]byte{[]byte("v")}) + require.NoError(t, abortTxn(t, fsm, primary, startTS, abortTS, [][]byte{primary})) + + // Confirm marker exists and no commit record exists. + _, err := st.GetAt(ctx, txnRollbackKey(primary, startTS), ^uint64(0)) + require.NoError(t, err, "rollback marker must exist after abort") + _, err = st.GetAt(ctx, txnCommitKey(primary, startTS), ^uint64(0)) + require.ErrorIs(t, err, store.ErrKeyNotFound, "commit record must NOT exist after abort") + + // A COMMIT for the same (primaryKey, startTS) must be rejected. + commitReq := &pb.Request{ + IsTxn: true, + Phase: pb.Phase_COMMIT, + Ts: startTS, + Mutations: []*pb.Mutation{ + {Op: pb.Op_PUT, Key: []byte(txnMetaPrefix), Value: EncodeTxnMeta(TxnMeta{PrimaryKey: primary, CommitTS: commitTS})}, + {Op: pb.Op_PUT, Key: primary}, + }, + } + err = applyFSMRequest(t, fsm, commitReq) + require.Error(t, err) + require.ErrorIs(t, err, ErrTxnAlreadyAborted) + + // Invariant check: commit record must still be absent after the rejection. + _, err = st.GetAt(ctx, txnCommitKey(primary, startTS), ^uint64(0)) + require.ErrorIs(t, err, store.ErrKeyNotFound, "commit record must not have been written after rejection") +} + func TestFSMAbort_EmptyMutationsReturnsError(t *testing.T) { t.Parallel() diff --git a/kv/txn_errors.go b/kv/txn_errors.go index c347cb09..0121c92d 100644 --- a/kv/txn_errors.go +++ b/kv/txn_errors.go @@ -13,6 +13,7 @@ var ( ErrTxnLocked = errors.New("txn locked") ErrTxnCommitTSRequired = errors.New("txn commit ts required") ErrTxnAlreadyCommitted = errors.New("txn already committed") + ErrTxnAlreadyAborted = errors.New("txn already aborted") ErrTxnPrimaryKeyRequired = errors.New("txn primary key required") )