From 529876157020fd8fd018ddd29f043027841ca411 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Mon, 20 Apr 2026 22:53:56 +0900 Subject: [PATCH 1/2] fix(lease-read): apply leadership-loss filter to LeaseRead paths and add transfer-in-progress sentinel Addresses CodeRabbit and gemini feedback on PR #558. - kv/coordinator.go LeaseRead + kv/sharded_coordinator.go groupLeaseRead: mirror the dispatch-path fix by invalidating the lease only when isLeadershipLossError reports a real leadership signal. Previously a LinearizableRead deadline or transient transport error would force the remainder of the lease window onto the slow path, reproducing the same regression the write-path fix addressed. - internal/raftengine/etcd/engine.go: add errLeadershipTransferInProgress marked with raftengine.ErrLeadershipTransferInProgress, and fail Propose fast when BasicStatus().LeadTransferee != 0 so callers see an errors.Is- matchable error instead of hanging on a proposal etcd/raft silently drops during transfer. - Refresh the stale comment on refreshLeaseAfterDispatch to reflect the filtered invalidation contract. - Tests: add the hashicorp ErrLeadershipTransferInProgress marked case to kv.TestIsLeadershipLossError and pin errLeadershipTransferInProgress in the etcd sentinel test. --- internal/raftengine/etcd/engine.go | 10 ++++++++++ internal/raftengine/etcd/engine_test.go | 1 + kv/coordinator.go | 23 ++++++++++++++++++----- kv/lease_state_test.go | 5 +++++ kv/sharded_coordinator.go | 8 +++++++- 5 files changed, 41 insertions(+), 6 deletions(-) diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index c6be98d9..e511238f 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -77,6 +77,7 @@ var ( errLeadershipTransferAborted = errors.New("etcd raft leadership transfer aborted") errLeadershipTransferRejected = errors.New("etcd raft leadership transfer was rejected by raft (target is not a voter)") errLeadershipTransferNotLeader = errors.Mark(errors.New("etcd raft leadership transfer requires the local node to be leader"), raftengine.ErrNotLeader) + errLeadershipTransferInProgress = errors.Mark(errors.New("etcd raft leadership transfer is in progress"), raftengine.ErrLeadershipTransferInProgress) errTooManyPendingConfigs = errors.New("etcd raft engine has too many pending config changes") ) @@ -1026,6 +1027,15 @@ func (e *Engine) handleProposal(req proposalRequest) { req.done <- proposalResult{err: errors.WithStack(errNotLeader)} return } + // etcd/raft silently drops proposals while a leadership transfer is + // in flight (LeadTransferee != 0). Surface this as a distinct error + // so callers (lease-read invalidation, proxy retry, etc.) can + // recognise it via errors.Is(err, raftengine.ErrLeadershipTransferInProgress) + // instead of hanging on an ack that will never come. + if e.rawNode.BasicStatus().LeadTransferee != 0 { + req.done <- proposalResult{err: errors.WithStack(errLeadershipTransferInProgress)} + return + } e.storePendingProposal(req) if err := e.rawNode.Propose(encodeProposalEnvelope(req.id, req.payload)); err != nil { e.cancelPendingProposal(req.id) diff --git a/internal/raftengine/etcd/engine_test.go b/internal/raftengine/etcd/engine_test.go index 1f3b78e5..e5501a4c 100644 --- a/internal/raftengine/etcd/engine_test.go +++ b/internal/raftengine/etcd/engine_test.go @@ -1622,4 +1622,5 @@ func TestErrNotLeaderMatchesRaftEngineSentinel(t *testing.T) { t.Parallel() require.True(t, errors.Is(errors.WithStack(errNotLeader), raftengine.ErrNotLeader)) require.True(t, errors.Is(errors.WithStack(errLeadershipTransferNotLeader), raftengine.ErrNotLeader)) + require.True(t, errors.Is(errors.WithStack(errLeadershipTransferInProgress), raftengine.ErrLeadershipTransferInProgress)) } diff --git a/kv/coordinator.go b/kv/coordinator.go index e5f6f869..f6f404b3 100644 --- a/kv/coordinator.go +++ b/kv/coordinator.go @@ -220,10 +220,14 @@ func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*C // Abort), and refreshing would be unsound because no quorum // confirmation happened. // -// On err != nil the lease is invalidated: a Propose error commonly -// signals leadership loss (non-leader rejection, transfer in -// progress, quorum lost, etc.) and the design doc lists -// "any error from engine.Propose" as an invalidation trigger. +// On err != nil the lease is invalidated ONLY when isLeadershipLossError +// reports a real leadership-loss signal (non-leader rejection, +// ErrLeadershipLost, transfer-in-progress). Business-logic failures +// such as write conflicts or validation errors are NOT leadership +// signals and must not invalidate the lease -- doing so would force +// every subsequent read onto the slow LinearizableRead path and defeat +// the lease's purpose. RegisterLeaderLossCallback plus the +// State()==StateLeader fast-path guard cover real leader loss. func (c *Coordinate) refreshLeaseAfterDispatch(resp *CoordinateResponse, err error, dispatchStart time.Time, expectedGen uint64) { if err != nil { // Only invalidate on errors that actually signal leadership @@ -374,7 +378,16 @@ func (c *Coordinate) LeaseRead(ctx context.Context) (uint64, error) { } idx, err := c.LinearizableRead(ctx) if err != nil { - c.lease.invalidate() + // Only invalidate on real leadership-loss signals. A context + // deadline or transient transport error is NOT leadership loss; + // forcing invalidation for those would push every subsequent + // read onto the slow path for the remainder of the lease + // window, mirroring the production regression the write-path + // guard fixed. RegisterLeaderLossCallback plus the + // State()==StateLeader fast-path check cover real transitions. + if isLeadershipLossError(err) { + c.lease.invalidate() + } return 0, err } c.lease.extend(now.Add(leaseDur), expectedGen) diff --git a/kv/lease_state_test.go b/kv/lease_state_test.go index 3079092f..3ea37dd0 100644 --- a/kv/lease_state_test.go +++ b/kv/lease_state_test.go @@ -37,6 +37,11 @@ func TestIsLeadershipLossError(t *testing.T) { cockroachdberrors.WithStack(cockroachdberrors.Mark(hashicorpraft.ErrLeadershipLost, raftengine.ErrLeadershipLost)), true, }, + { + "hashicorp ErrLeadershipTransferInProgress marked with raftengine sentinel", + cockroachdberrors.WithStack(cockroachdberrors.Mark(hashicorpraft.ErrLeadershipTransferInProgress, raftengine.ErrLeadershipTransferInProgress)), + true, + }, { "bare hashicorp ErrNotLeader (no raftengine mark) is NOT detected", hashicorpraft.ErrNotLeader, diff --git a/kv/sharded_coordinator.go b/kv/sharded_coordinator.go index dffdde85..e5c7790a 100644 --- a/kv/sharded_coordinator.go +++ b/kv/sharded_coordinator.go @@ -767,7 +767,13 @@ func groupLeaseRead(ctx context.Context, g *ShardGroup) (uint64, error) { } idx, err := linearizableReadEngineCtx(ctx, engine) if err != nil { - g.lease.invalidate() + // See Coordinate.LeaseRead: only real leadership-loss signals + // invalidate the lease. Deadlines, transport blips, and other + // transient errors must NOT force the remainder of the lease + // window onto the slow path. + if isLeadershipLossError(err) { + g.lease.invalidate() + } return 0, err } g.lease.extend(now.Add(leaseDur), expectedGen) From f93ae7c6acf47e2ed9e27492496222adab4b9e64 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Mon, 20 Apr 2026 23:03:57 +0900 Subject: [PATCH 2/2] fix(etcd): fail ReadIndex fast during leadership transfer Addresses gemini feedback on PR #559. etcd/raft silently drops MsgReadIndex while LeadTransferee != 0, and RawNode.ReadIndex does not return an error on drop. Without a fast- fail, LinearizableRead/LeaseRead would block on req.done until the ctx deadline (~read timeout), reintroducing the exact hang the Propose-side check was meant to prevent but on the read path. Also refresh the handleProposal comment: etcd/raft returns ErrProposalDropped (not a hang). The value of the fast-fail is the explicit mapping to the shared ErrLeadershipTransferInProgress sentinel, not avoiding a hang. --- internal/raftengine/etcd/engine.go | 54 ++++++++++++++++++------------ 1 file changed, 32 insertions(+), 22 deletions(-) diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index e511238f..e334d4da 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -61,24 +61,24 @@ const ( ) var ( - errNilEngine = errors.New("raft engine is not configured") - errClosed = errors.New("etcd raft engine is closed") - errNotLeader = errors.Mark(errors.New("etcd raft engine is not leader"), raftengine.ErrNotLeader) - errNodeIDRequired = errors.New("etcd raft node id is required") - errDataDirRequired = errors.New("etcd raft data dir is required") - errStateMachineUnset = errors.New("etcd raft state machine is not configured") - errSnapshotRequired = errors.New("etcd raft snapshot payload is required") - errStepQueueFull = errors.New("etcd raft inbound step queue is full") - errClusterMismatch = errors.New("etcd raft persisted cluster does not match configured peers") - errConfigIndexMismatch = errors.New("etcd raft configuration index does not match") - errConfChangeContextTooLarge = errors.New("etcd raft conf change context is too large") - errLeadershipTransferTarget = errors.New("etcd raft leadership transfer target is required") - errLeadershipTransferNotReady = errors.New("etcd raft leadership transfer target is not available") - errLeadershipTransferAborted = errors.New("etcd raft leadership transfer aborted") - errLeadershipTransferRejected = errors.New("etcd raft leadership transfer was rejected by raft (target is not a voter)") - errLeadershipTransferNotLeader = errors.Mark(errors.New("etcd raft leadership transfer requires the local node to be leader"), raftengine.ErrNotLeader) + errNilEngine = errors.New("raft engine is not configured") + errClosed = errors.New("etcd raft engine is closed") + errNotLeader = errors.Mark(errors.New("etcd raft engine is not leader"), raftengine.ErrNotLeader) + errNodeIDRequired = errors.New("etcd raft node id is required") + errDataDirRequired = errors.New("etcd raft data dir is required") + errStateMachineUnset = errors.New("etcd raft state machine is not configured") + errSnapshotRequired = errors.New("etcd raft snapshot payload is required") + errStepQueueFull = errors.New("etcd raft inbound step queue is full") + errClusterMismatch = errors.New("etcd raft persisted cluster does not match configured peers") + errConfigIndexMismatch = errors.New("etcd raft configuration index does not match") + errConfChangeContextTooLarge = errors.New("etcd raft conf change context is too large") + errLeadershipTransferTarget = errors.New("etcd raft leadership transfer target is required") + errLeadershipTransferNotReady = errors.New("etcd raft leadership transfer target is not available") + errLeadershipTransferAborted = errors.New("etcd raft leadership transfer aborted") + errLeadershipTransferRejected = errors.New("etcd raft leadership transfer was rejected by raft (target is not a voter)") + errLeadershipTransferNotLeader = errors.Mark(errors.New("etcd raft leadership transfer requires the local node to be leader"), raftengine.ErrNotLeader) errLeadershipTransferInProgress = errors.Mark(errors.New("etcd raft leadership transfer is in progress"), raftengine.ErrLeadershipTransferInProgress) - errTooManyPendingConfigs = errors.New("etcd raft engine has too many pending config changes") + errTooManyPendingConfigs = errors.New("etcd raft engine has too many pending config changes") ) // Snapshot is an alias for the shared raftengine.Snapshot interface. @@ -1027,11 +1027,11 @@ func (e *Engine) handleProposal(req proposalRequest) { req.done <- proposalResult{err: errors.WithStack(errNotLeader)} return } - // etcd/raft silently drops proposals while a leadership transfer is - // in flight (LeadTransferee != 0). Surface this as a distinct error - // so callers (lease-read invalidation, proxy retry, etc.) can - // recognise it via errors.Is(err, raftengine.ErrLeadershipTransferInProgress) - // instead of hanging on an ack that will never come. + // etcd/raft drops proposals while a leadership transfer is in flight + // (LeadTransferee != 0) and returns ErrProposalDropped. Map that to + // the shared ErrLeadershipTransferInProgress sentinel so callers + // (lease-read invalidation, proxy retry, etc.) can recognise it via + // errors.Is instead of getting a generic dropped-proposal error. if e.rawNode.BasicStatus().LeadTransferee != 0 { req.done <- proposalResult{err: errors.WithStack(errLeadershipTransferInProgress)} return @@ -1052,6 +1052,16 @@ func (e *Engine) handleRead(req readRequest) { req.done <- readResult{err: errors.WithStack(errNotLeader)} return } + // etcd/raft silently drops MsgReadIndex while a leadership transfer + // is in flight (LeadTransferee != 0) -- ReadIndex does not return an + // error on drop, so without this fast-fail the caller would block on + // req.done until ctx deadline. Surface the drop as + // ErrLeadershipTransferInProgress so LeaseRead falls through to the + // new leader instead of stalling ~electionTimeout. + if e.rawNode.BasicStatus().LeadTransferee != 0 { + req.done <- readResult{err: errors.WithStack(errLeadershipTransferInProgress)} + return + } e.storePendingRead(req) e.rawNode.ReadIndex(encodeReadContext(req.id)) }