diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index c6be98d9..e334d4da 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -61,23 +61,24 @@ const ( ) var ( - errNilEngine = errors.New("raft engine is not configured") - errClosed = errors.New("etcd raft engine is closed") - errNotLeader = errors.Mark(errors.New("etcd raft engine is not leader"), raftengine.ErrNotLeader) - errNodeIDRequired = errors.New("etcd raft node id is required") - errDataDirRequired = errors.New("etcd raft data dir is required") - errStateMachineUnset = errors.New("etcd raft state machine is not configured") - errSnapshotRequired = errors.New("etcd raft snapshot payload is required") - errStepQueueFull = errors.New("etcd raft inbound step queue is full") - errClusterMismatch = errors.New("etcd raft persisted cluster does not match configured peers") - errConfigIndexMismatch = errors.New("etcd raft configuration index does not match") - errConfChangeContextTooLarge = errors.New("etcd raft conf change context is too large") - errLeadershipTransferTarget = errors.New("etcd raft leadership transfer target is required") - errLeadershipTransferNotReady = errors.New("etcd raft leadership transfer target is not available") - errLeadershipTransferAborted = errors.New("etcd raft leadership transfer aborted") - errLeadershipTransferRejected = errors.New("etcd raft leadership transfer was rejected by raft (target is not a voter)") - errLeadershipTransferNotLeader = errors.Mark(errors.New("etcd raft leadership transfer requires the local node to be leader"), raftengine.ErrNotLeader) - errTooManyPendingConfigs = errors.New("etcd raft engine has too many pending config changes") + errNilEngine = errors.New("raft engine is not configured") + errClosed = errors.New("etcd raft engine is closed") + errNotLeader = errors.Mark(errors.New("etcd raft engine is not leader"), raftengine.ErrNotLeader) + errNodeIDRequired = errors.New("etcd raft node id is required") + errDataDirRequired = errors.New("etcd raft data dir is required") + errStateMachineUnset = errors.New("etcd raft state machine is not configured") + errSnapshotRequired = errors.New("etcd raft snapshot payload is required") + errStepQueueFull = errors.New("etcd raft inbound step queue is full") + errClusterMismatch = errors.New("etcd raft persisted cluster does not match configured peers") + errConfigIndexMismatch = errors.New("etcd raft configuration index does not match") + errConfChangeContextTooLarge = errors.New("etcd raft conf change context is too large") + errLeadershipTransferTarget = errors.New("etcd raft leadership transfer target is required") + errLeadershipTransferNotReady = errors.New("etcd raft leadership transfer target is not available") + errLeadershipTransferAborted = errors.New("etcd raft leadership transfer aborted") + errLeadershipTransferRejected = errors.New("etcd raft leadership transfer was rejected by raft (target is not a voter)") + errLeadershipTransferNotLeader = errors.Mark(errors.New("etcd raft leadership transfer requires the local node to be leader"), raftengine.ErrNotLeader) + errLeadershipTransferInProgress = errors.Mark(errors.New("etcd raft leadership transfer is in progress"), raftengine.ErrLeadershipTransferInProgress) + errTooManyPendingConfigs = errors.New("etcd raft engine has too many pending config changes") ) // Snapshot is an alias for the shared raftengine.Snapshot interface. @@ -1026,6 +1027,15 @@ func (e *Engine) handleProposal(req proposalRequest) { req.done <- proposalResult{err: errors.WithStack(errNotLeader)} return } + // etcd/raft drops proposals while a leadership transfer is in flight + // (LeadTransferee != 0) and returns ErrProposalDropped. Map that to + // the shared ErrLeadershipTransferInProgress sentinel so callers + // (lease-read invalidation, proxy retry, etc.) can recognise it via + // errors.Is instead of getting a generic dropped-proposal error. + if e.rawNode.BasicStatus().LeadTransferee != 0 { + req.done <- proposalResult{err: errors.WithStack(errLeadershipTransferInProgress)} + return + } e.storePendingProposal(req) if err := e.rawNode.Propose(encodeProposalEnvelope(req.id, req.payload)); err != nil { e.cancelPendingProposal(req.id) @@ -1042,6 +1052,16 @@ func (e *Engine) handleRead(req readRequest) { req.done <- readResult{err: errors.WithStack(errNotLeader)} return } + // etcd/raft silently drops MsgReadIndex while a leadership transfer + // is in flight (LeadTransferee != 0) -- ReadIndex does not return an + // error on drop, so without this fast-fail the caller would block on + // req.done until ctx deadline. Surface the drop as + // ErrLeadershipTransferInProgress so LeaseRead falls through to the + // new leader instead of stalling ~electionTimeout. + if e.rawNode.BasicStatus().LeadTransferee != 0 { + req.done <- readResult{err: errors.WithStack(errLeadershipTransferInProgress)} + return + } e.storePendingRead(req) e.rawNode.ReadIndex(encodeReadContext(req.id)) } diff --git a/internal/raftengine/etcd/engine_test.go b/internal/raftengine/etcd/engine_test.go index 1f3b78e5..e5501a4c 100644 --- a/internal/raftengine/etcd/engine_test.go +++ b/internal/raftengine/etcd/engine_test.go @@ -1622,4 +1622,5 @@ func TestErrNotLeaderMatchesRaftEngineSentinel(t *testing.T) { t.Parallel() require.True(t, errors.Is(errors.WithStack(errNotLeader), raftengine.ErrNotLeader)) require.True(t, errors.Is(errors.WithStack(errLeadershipTransferNotLeader), raftengine.ErrNotLeader)) + require.True(t, errors.Is(errors.WithStack(errLeadershipTransferInProgress), raftengine.ErrLeadershipTransferInProgress)) } diff --git a/kv/coordinator.go b/kv/coordinator.go index e5f6f869..f6f404b3 100644 --- a/kv/coordinator.go +++ b/kv/coordinator.go @@ -220,10 +220,14 @@ func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*C // Abort), and refreshing would be unsound because no quorum // confirmation happened. // -// On err != nil the lease is invalidated: a Propose error commonly -// signals leadership loss (non-leader rejection, transfer in -// progress, quorum lost, etc.) and the design doc lists -// "any error from engine.Propose" as an invalidation trigger. +// On err != nil the lease is invalidated ONLY when isLeadershipLossError +// reports a real leadership-loss signal (non-leader rejection, +// ErrLeadershipLost, transfer-in-progress). Business-logic failures +// such as write conflicts or validation errors are NOT leadership +// signals and must not invalidate the lease -- doing so would force +// every subsequent read onto the slow LinearizableRead path and defeat +// the lease's purpose. RegisterLeaderLossCallback plus the +// State()==StateLeader fast-path guard cover real leader loss. func (c *Coordinate) refreshLeaseAfterDispatch(resp *CoordinateResponse, err error, dispatchStart time.Time, expectedGen uint64) { if err != nil { // Only invalidate on errors that actually signal leadership @@ -374,7 +378,16 @@ func (c *Coordinate) LeaseRead(ctx context.Context) (uint64, error) { } idx, err := c.LinearizableRead(ctx) if err != nil { - c.lease.invalidate() + // Only invalidate on real leadership-loss signals. A context + // deadline or transient transport error is NOT leadership loss; + // forcing invalidation for those would push every subsequent + // read onto the slow path for the remainder of the lease + // window, mirroring the production regression the write-path + // guard fixed. RegisterLeaderLossCallback plus the + // State()==StateLeader fast-path check cover real transitions. + if isLeadershipLossError(err) { + c.lease.invalidate() + } return 0, err } c.lease.extend(now.Add(leaseDur), expectedGen) diff --git a/kv/lease_state_test.go b/kv/lease_state_test.go index 3079092f..3ea37dd0 100644 --- a/kv/lease_state_test.go +++ b/kv/lease_state_test.go @@ -37,6 +37,11 @@ func TestIsLeadershipLossError(t *testing.T) { cockroachdberrors.WithStack(cockroachdberrors.Mark(hashicorpraft.ErrLeadershipLost, raftengine.ErrLeadershipLost)), true, }, + { + "hashicorp ErrLeadershipTransferInProgress marked with raftengine sentinel", + cockroachdberrors.WithStack(cockroachdberrors.Mark(hashicorpraft.ErrLeadershipTransferInProgress, raftengine.ErrLeadershipTransferInProgress)), + true, + }, { "bare hashicorp ErrNotLeader (no raftengine mark) is NOT detected", hashicorpraft.ErrNotLeader, diff --git a/kv/sharded_coordinator.go b/kv/sharded_coordinator.go index dffdde85..e5c7790a 100644 --- a/kv/sharded_coordinator.go +++ b/kv/sharded_coordinator.go @@ -767,7 +767,13 @@ func groupLeaseRead(ctx context.Context, g *ShardGroup) (uint64, error) { } idx, err := linearizableReadEngineCtx(ctx, engine) if err != nil { - g.lease.invalidate() + // See Coordinate.LeaseRead: only real leadership-loss signals + // invalidate the lease. Deadlines, transport blips, and other + // transient errors must NOT force the remainder of the lease + // window onto the slow path. + if isLeadershipLossError(err) { + g.lease.invalidate() + } return 0, err } g.lease.extend(now.Add(leaseDur), expectedGen)