diff --git a/internal/raftengine/engine.go b/internal/raftengine/engine.go index e97afd6ab..95cc77b7a 100644 --- a/internal/raftengine/engine.go +++ b/internal/raftengine/engine.go @@ -2,10 +2,27 @@ package raftengine import ( "context" + "errors" "io" "time" ) +// Shared sentinel errors that both engine implementations should wrap +// so callers can test with errors.Is across engine backends. +var ( + // ErrNotLeader indicates the operation was rejected because the + // local node is not the Raft leader for the target group. + // Callers that care about leadership (e.g. lease invalidation + // logic) should match via errors.Is. + ErrNotLeader = errors.New("raft engine: not leader") + // ErrLeadershipLost indicates the local node was leader when the + // operation began but lost leadership before it could complete. + ErrLeadershipLost = errors.New("raft engine: leadership lost") + // ErrLeadershipTransferInProgress indicates a leadership transfer + // is under way and proposals are being held back. + ErrLeadershipTransferInProgress = errors.New("raft engine: leadership transfer in progress") +) + type State string const ( diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index fddfb9c4a..c6be98d9f 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -63,7 +63,7 @@ const ( var ( errNilEngine = errors.New("raft engine is not configured") errClosed = errors.New("etcd raft engine is closed") - errNotLeader = errors.New("etcd raft engine is not leader") + errNotLeader = errors.Mark(errors.New("etcd raft engine is not leader"), raftengine.ErrNotLeader) errNodeIDRequired = errors.New("etcd raft node id is required") errDataDirRequired = errors.New("etcd raft data dir is required") errStateMachineUnset = errors.New("etcd raft state machine is not configured") @@ -76,7 +76,7 @@ var ( errLeadershipTransferNotReady = errors.New("etcd raft leadership transfer target is not available") errLeadershipTransferAborted = errors.New("etcd raft leadership transfer aborted") errLeadershipTransferRejected = errors.New("etcd raft leadership transfer was rejected by raft (target is not a voter)") - errLeadershipTransferNotLeader = errors.New("etcd raft leadership transfer requires the local node to be leader") + errLeadershipTransferNotLeader = errors.Mark(errors.New("etcd raft leadership transfer requires the local node to be leader"), raftengine.ErrNotLeader) errTooManyPendingConfigs = errors.New("etcd raft engine has too many pending config changes") ) diff --git a/internal/raftengine/etcd/engine_test.go b/internal/raftengine/etcd/engine_test.go index 465657e6c..1f3b78e5d 100644 --- a/internal/raftengine/etcd/engine_test.go +++ b/internal/raftengine/etcd/engine_test.go @@ -1611,3 +1611,15 @@ func mustRawNode(t *testing.T, storage *etcdraft.MemoryStorage, nodeID uint64) * require.NoError(t, err) return rawNode } + +// TestErrNotLeaderMatchesRaftEngineSentinel pins the invariant that the +// etcd engine's internal leadership-loss errors are marked against the +// shared raftengine sentinels. The lease-read fast path in package kv +// relies on a single cross-backend errors.Is(err, raftengine.ErrNotLeader) +// check; a future refactor that forgets to mark these errors would +// silently force every read onto the slow LinearizableRead path. +func TestErrNotLeaderMatchesRaftEngineSentinel(t *testing.T) { + t.Parallel() + require.True(t, errors.Is(errors.WithStack(errNotLeader), raftengine.ErrNotLeader)) + require.True(t, errors.Is(errors.WithStack(errLeadershipTransferNotLeader), raftengine.ErrNotLeader)) +} diff --git a/internal/raftengine/hashicorp/engine.go b/internal/raftengine/hashicorp/engine.go index 71e8c145a..b025ba328 100644 --- a/internal/raftengine/hashicorp/engine.go +++ b/internal/raftengine/hashicorp/engine.go @@ -18,6 +18,24 @@ const unknownLastContact = time.Duration(-1) var errNilEngine = errors.New("raft engine is not configured") +// translateLeadershipErr wraps hashicorp/raft leadership-related sentinels +// with the shared raftengine sentinels so callers can use a single +// errors.Is check across engine backends. +func translateLeadershipErr(err error) error { + if err == nil { + return nil + } + switch { + case errors.Is(err, raft.ErrNotLeader): + return errors.WithStack(errors.Mark(err, raftengine.ErrNotLeader)) + case errors.Is(err, raft.ErrLeadershipLost): + return errors.WithStack(errors.Mark(err, raftengine.ErrLeadershipLost)) + case errors.Is(err, raft.ErrLeadershipTransferInProgress): + return errors.WithStack(errors.Mark(err, raftengine.ErrLeadershipTransferInProgress)) + } + return errors.WithStack(err) +} + type Engine struct { raft *raft.Raft @@ -66,7 +84,7 @@ func (e *Engine) Propose(ctx context.Context, data []byte) (*raftengine.Proposal if ctxErr := contextErr(ctx); ctxErr != nil { return nil, ctxErr } - return nil, errors.WithStack(err) + return nil, translateLeadershipErr(err) } return &raftengine.ProposalResult{ @@ -118,7 +136,7 @@ func (e *Engine) VerifyLeader(ctx context.Context) error { if ctxErr := contextErr(ctx); ctxErr != nil { return ctxErr } - return errors.WithStack(err) + return translateLeadershipErr(err) } return nil } @@ -136,7 +154,7 @@ func (e *Engine) CheckServing(ctx context.Context) error { return errors.WithStack(errNilEngine) } if e.State() != raftengine.StateLeader { - return errors.WithStack(raft.ErrNotLeader) + return errors.WithStack(errors.Mark(raft.ErrNotLeader, raftengine.ErrNotLeader)) } return nil } @@ -155,7 +173,7 @@ func (e *Engine) LinearizableRead(ctx context.Context) (uint64, error) { return 0, errors.WithStack(errNilEngine) } if e.raft.State() != raft.Leader { - return 0, errors.WithStack(raft.ErrNotLeader) + return 0, errors.WithStack(errors.Mark(raft.ErrNotLeader, raftengine.ErrNotLeader)) } // Raft ยง5.4.2: ensure at least one Barrier has been issued in the @@ -238,7 +256,7 @@ func (e *Engine) executeBarrier(ctx context.Context) (bool, error) { } if e.raft.State() != raft.Leader { - return false, errors.WithStack(raft.ErrNotLeader) + return false, errors.WithStack(errors.Mark(raft.ErrNotLeader, raftengine.ErrNotLeader)) } timeout, err := timeoutFromContext(ctx) @@ -249,7 +267,7 @@ func (e *Engine) executeBarrier(ctx context.Context) (bool, error) { if ctxErr := contextErr(ctx); ctxErr != nil { return false, ctxErr } - return false, errors.WithStack(err) + return false, translateLeadershipErr(err) } e.barrierTerm.Store(term) diff --git a/internal/raftengine/hashicorp/leadership_err_test.go b/internal/raftengine/hashicorp/leadership_err_test.go new file mode 100644 index 000000000..ada254fc7 --- /dev/null +++ b/internal/raftengine/hashicorp/leadership_err_test.go @@ -0,0 +1,55 @@ +package hashicorp + +import ( + "testing" + + "github.com/bootjp/elastickv/internal/raftengine" + "github.com/cockroachdb/errors" + "github.com/hashicorp/raft" + "github.com/stretchr/testify/require" +) + +// TestTranslateLeadershipErrMatchesRaftEngineSentinel pins the invariant +// that hashicorp/raft leadership-loss errors are marked against the +// shared raftengine sentinels. The lease-read fast path in package kv +// relies on a single cross-backend errors.Is(err, raftengine.ErrNotLeader) +// check; a future refactor that forgets to mark these errors would +// silently force every read onto the slow LinearizableRead path. +func TestTranslateLeadershipErrMatchesRaftEngineSentinel(t *testing.T) { + t.Parallel() + + cases := []struct { + name string + in error + want error + }{ + {"not leader", raft.ErrNotLeader, raftengine.ErrNotLeader}, + {"leadership lost", raft.ErrLeadershipLost, raftengine.ErrLeadershipLost}, + {"leadership transfer in progress", raft.ErrLeadershipTransferInProgress, raftengine.ErrLeadershipTransferInProgress}, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + out := translateLeadershipErr(tc.in) + require.True(t, errors.Is(out, tc.want), + "translated error must errors.Is-match the raftengine sentinel") + require.True(t, errors.Is(out, tc.in), + "translated error must retain the original raft sentinel for debugging") + }) + } + + t.Run("unrelated error is passed through", func(t *testing.T) { + t.Parallel() + orig := errors.New("write conflict") + out := translateLeadershipErr(orig) + require.False(t, errors.Is(out, raftengine.ErrNotLeader)) + require.False(t, errors.Is(out, raftengine.ErrLeadershipLost)) + require.False(t, errors.Is(out, raftengine.ErrLeadershipTransferInProgress)) + }) + + t.Run("nil stays nil", func(t *testing.T) { + t.Parallel() + require.NoError(t, translateLeadershipErr(nil)) + }) +} diff --git a/kv/coordinator.go b/kv/coordinator.go index fa4033a60..e5f6f8695 100644 --- a/kv/coordinator.go +++ b/kv/coordinator.go @@ -226,7 +226,16 @@ func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*C // "any error from engine.Propose" as an invalidation trigger. func (c *Coordinate) refreshLeaseAfterDispatch(resp *CoordinateResponse, err error, dispatchStart time.Time, expectedGen uint64) { if err != nil { - c.lease.invalidate() + // Only invalidate on errors that actually signal leadership + // loss. Write conflicts and validation errors are business- + // logic failures that do NOT mean this node stopped being + // leader; invalidating for them would force every subsequent + // read into the slow LinearizableRead path and defeat the + // lease. Engine.RegisterLeaderLossCallback and the fast-path + // State() == StateLeader guard cover real leader loss. + if isLeadershipLossError(err) { + c.lease.invalidate() + } return } if resp == nil || resp.CommitIndex == 0 { diff --git a/kv/lease_state.go b/kv/lease_state.go index c30cd3d40..ee46db604 100644 --- a/kv/lease_state.go +++ b/kv/lease_state.go @@ -3,8 +3,33 @@ package kv import ( "sync/atomic" "time" + + "github.com/bootjp/elastickv/internal/raftengine" + "github.com/cockroachdb/errors" ) +// isLeadershipLossError reports whether err signals that this node has +// lost leadership and a successor should be contacted. Propose/Commit +// errors that are NOT leadership-related (write conflict, validation, +// deadline on a non-ReadIndex path) must NOT trigger lease +// invalidation -- doing so forces every subsequent read into the slow +// LinearizableRead path and defeats the lease's purpose. +// +// Both engine backends mark their internal leadership errors with the +// shared raftengine sentinels via cockroachdb/errors.Mark, so a single +// errors.Is check (using cockroachdb's Is, which understands the +// mark-based equivalence) covers both engines without relying on +// error-message substrings. Note: stdlib errors.Is does NOT traverse +// cockroachdb marks; this file must use cockroachdb/errors.Is. +func isLeadershipLossError(err error) bool { + if err == nil { + return false + } + return errors.Is(err, raftengine.ErrNotLeader) || + errors.Is(err, raftengine.ErrLeadershipLost) || + errors.Is(err, raftengine.ErrLeadershipTransferInProgress) +} + // leaseState tracks the wall-clock expiry of a leader-local read lease. // All operations are lock-free via atomic.Pointer plus a generation // counter that prevents an in-flight extend from resurrecting a lease diff --git a/kv/lease_state_test.go b/kv/lease_state_test.go index b4b7c2688..3079092ff 100644 --- a/kv/lease_state_test.go +++ b/kv/lease_state_test.go @@ -1,13 +1,57 @@ package kv import ( + "context" + "errors" "runtime" "testing" "time" + "github.com/bootjp/elastickv/internal/raftengine" + cockroachdberrors "github.com/cockroachdb/errors" + hashicorpraft "github.com/hashicorp/raft" "github.com/stretchr/testify/require" ) +func TestIsLeadershipLossError(t *testing.T) { + t.Parallel() + + cases := []struct { + name string + err error + want bool + }{ + {"nil", nil, false}, + {"unrelated", errors.New("write conflict"), false}, + {"context canceled", context.Canceled, false}, + {"raftengine ErrNotLeader direct", raftengine.ErrNotLeader, true}, + {"raftengine ErrLeadershipLost direct", raftengine.ErrLeadershipLost, true}, + {"raftengine ErrLeadershipTransferInProgress direct", raftengine.ErrLeadershipTransferInProgress, true}, + { + "hashicorp ErrNotLeader marked with raftengine sentinel", + cockroachdberrors.WithStack(cockroachdberrors.Mark(hashicorpraft.ErrNotLeader, raftengine.ErrNotLeader)), + true, + }, + { + "hashicorp ErrLeadershipLost marked with raftengine sentinel", + cockroachdberrors.WithStack(cockroachdberrors.Mark(hashicorpraft.ErrLeadershipLost, raftengine.ErrLeadershipLost)), + true, + }, + { + "bare hashicorp ErrNotLeader (no raftengine mark) is NOT detected", + hashicorpraft.ErrNotLeader, + false, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + require.Equal(t, tc.want, isLeadershipLossError(tc.err)) + }) + } +} + func TestLeaseState_NilReceiverIsAlwaysExpired(t *testing.T) { t.Parallel() var s *leaseState diff --git a/kv/sharded_coordinator.go b/kv/sharded_coordinator.go index eafb6f792..dffdde85f 100644 --- a/kv/sharded_coordinator.go +++ b/kv/sharded_coordinator.go @@ -46,10 +46,18 @@ func (t *leaseRefreshingTxn) Commit(reqs []*pb.Request) (*TransactionResponse, e expectedGen := t.g.lease.generation() resp, err := t.inner.Commit(reqs) if err != nil { - // Propose failures commonly signal leadership loss; follow - // the design doc and invalidate so the next read takes the - // slow path and re-verifies. - t.g.lease.invalidate() + // Only invalidate on errors that actually signal a leadership + // change. Write-conflicts, validation errors, and deadline + // exceeded on non-ReadIndex paths do NOT imply the leader is + // gone; invalidating the lease for them forces every read + // into the slow LinearizableRead path and defeats the whole + // point of the lease. The engine's own leader-loss callback + // already handles true leadership loss, plus + // Coordinate.LeaseRead guards the fast path on + // engine.State() == StateLeader. + if isLeadershipLossError(err) { + t.g.lease.invalidate() + } return resp, errors.WithStack(err) } t.maybeRefresh(resp, start, expectedGen) @@ -61,7 +69,9 @@ func (t *leaseRefreshingTxn) Abort(reqs []*pb.Request) (*TransactionResponse, er expectedGen := t.g.lease.generation() resp, err := t.inner.Abort(reqs) if err != nil { - t.g.lease.invalidate() + if isLeadershipLossError(err) { + t.g.lease.invalidate() + } return resp, errors.WithStack(err) } t.maybeRefresh(resp, start, expectedGen)