Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 37 additions & 17 deletions internal/raftengine/etcd/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,23 +61,24 @@ const (
)

var (
errNilEngine = errors.New("raft engine is not configured")
errClosed = errors.New("etcd raft engine is closed")
errNotLeader = errors.Mark(errors.New("etcd raft engine is not leader"), raftengine.ErrNotLeader)
errNodeIDRequired = errors.New("etcd raft node id is required")
errDataDirRequired = errors.New("etcd raft data dir is required")
errStateMachineUnset = errors.New("etcd raft state machine is not configured")
errSnapshotRequired = errors.New("etcd raft snapshot payload is required")
errStepQueueFull = errors.New("etcd raft inbound step queue is full")
errClusterMismatch = errors.New("etcd raft persisted cluster does not match configured peers")
errConfigIndexMismatch = errors.New("etcd raft configuration index does not match")
errConfChangeContextTooLarge = errors.New("etcd raft conf change context is too large")
errLeadershipTransferTarget = errors.New("etcd raft leadership transfer target is required")
errLeadershipTransferNotReady = errors.New("etcd raft leadership transfer target is not available")
errLeadershipTransferAborted = errors.New("etcd raft leadership transfer aborted")
errLeadershipTransferRejected = errors.New("etcd raft leadership transfer was rejected by raft (target is not a voter)")
errLeadershipTransferNotLeader = errors.Mark(errors.New("etcd raft leadership transfer requires the local node to be leader"), raftengine.ErrNotLeader)
errTooManyPendingConfigs = errors.New("etcd raft engine has too many pending config changes")
errNilEngine = errors.New("raft engine is not configured")
errClosed = errors.New("etcd raft engine is closed")
errNotLeader = errors.Mark(errors.New("etcd raft engine is not leader"), raftengine.ErrNotLeader)
errNodeIDRequired = errors.New("etcd raft node id is required")
errDataDirRequired = errors.New("etcd raft data dir is required")
errStateMachineUnset = errors.New("etcd raft state machine is not configured")
errSnapshotRequired = errors.New("etcd raft snapshot payload is required")
errStepQueueFull = errors.New("etcd raft inbound step queue is full")
errClusterMismatch = errors.New("etcd raft persisted cluster does not match configured peers")
errConfigIndexMismatch = errors.New("etcd raft configuration index does not match")
errConfChangeContextTooLarge = errors.New("etcd raft conf change context is too large")
errLeadershipTransferTarget = errors.New("etcd raft leadership transfer target is required")
errLeadershipTransferNotReady = errors.New("etcd raft leadership transfer target is not available")
errLeadershipTransferAborted = errors.New("etcd raft leadership transfer aborted")
errLeadershipTransferRejected = errors.New("etcd raft leadership transfer was rejected by raft (target is not a voter)")
errLeadershipTransferNotLeader = errors.Mark(errors.New("etcd raft leadership transfer requires the local node to be leader"), raftengine.ErrNotLeader)
errLeadershipTransferInProgress = errors.Mark(errors.New("etcd raft leadership transfer is in progress"), raftengine.ErrLeadershipTransferInProgress)
errTooManyPendingConfigs = errors.New("etcd raft engine has too many pending config changes")
)

// Snapshot is an alias for the shared raftengine.Snapshot interface.
Expand Down Expand Up @@ -1026,6 +1027,15 @@ func (e *Engine) handleProposal(req proposalRequest) {
req.done <- proposalResult{err: errors.WithStack(errNotLeader)}
return
}
// etcd/raft drops proposals while a leadership transfer is in flight
// (LeadTransferee != 0) and returns ErrProposalDropped. Map that to
// the shared ErrLeadershipTransferInProgress sentinel so callers
// (lease-read invalidation, proxy retry, etc.) can recognise it via
// errors.Is instead of getting a generic dropped-proposal error.
if e.rawNode.BasicStatus().LeadTransferee != 0 {
req.done <- proposalResult{err: errors.WithStack(errLeadershipTransferInProgress)}
return
}
Comment on lines +1035 to +1038
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

While this fast-fail check correctly prevents proposals from hanging during a leadership transfer, the same issue affects ReadIndex requests in handleRead. In etcd/raft, MsgReadIndex is also silently dropped when leadTransferee != 0, which will cause LinearizableRead (and consequently LeaseRead) to hang until the context deadline.

Since this PR specifically aims to improve the LeaseRead path, and LeaseRead relies on LinearizableRead (which calls handleRead), missing this check in handleRead (around line 1051) will leave the read path susceptible to hangs during transfers. Consider applying a similar check to handleRead and the configuration change paths (handleAddVoter, handleRemoveServer) for consistency. When modifying these, ensure that handleAddVoter synchronously publishes the configuration snapshot before advertising membership, and handleRemoveServer triggers a graceful shutdown if the node removes itself, as per repository standards.

References
  1. When adding a new voter to a raft cluster, synchronously publish the configuration snapshot before advertising the new membership to prevent the voter from receiving a stale snapshot.
  2. When a raft node processes a committed configuration change that removes itself from the cluster, it should trigger a graceful shutdown of its own raft engine.

e.storePendingProposal(req)
if err := e.rawNode.Propose(encodeProposalEnvelope(req.id, req.payload)); err != nil {
e.cancelPendingProposal(req.id)
Expand All @@ -1042,6 +1052,16 @@ func (e *Engine) handleRead(req readRequest) {
req.done <- readResult{err: errors.WithStack(errNotLeader)}
return
}
// etcd/raft silently drops MsgReadIndex while a leadership transfer
// is in flight (LeadTransferee != 0) -- ReadIndex does not return an
// error on drop, so without this fast-fail the caller would block on
// req.done until ctx deadline. Surface the drop as
// ErrLeadershipTransferInProgress so LeaseRead falls through to the
// new leader instead of stalling ~electionTimeout.
if e.rawNode.BasicStatus().LeadTransferee != 0 {
req.done <- readResult{err: errors.WithStack(errLeadershipTransferInProgress)}
return
}
e.storePendingRead(req)
e.rawNode.ReadIndex(encodeReadContext(req.id))
}
Expand Down
1 change: 1 addition & 0 deletions internal/raftengine/etcd/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1622,4 +1622,5 @@ func TestErrNotLeaderMatchesRaftEngineSentinel(t *testing.T) {
t.Parallel()
require.True(t, errors.Is(errors.WithStack(errNotLeader), raftengine.ErrNotLeader))
require.True(t, errors.Is(errors.WithStack(errLeadershipTransferNotLeader), raftengine.ErrNotLeader))
require.True(t, errors.Is(errors.WithStack(errLeadershipTransferInProgress), raftengine.ErrLeadershipTransferInProgress))
}
23 changes: 18 additions & 5 deletions kv/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,14 @@ func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*C
// Abort), and refreshing would be unsound because no quorum
// confirmation happened.
//
// On err != nil the lease is invalidated: a Propose error commonly
// signals leadership loss (non-leader rejection, transfer in
// progress, quorum lost, etc.) and the design doc lists
// "any error from engine.Propose" as an invalidation trigger.
// On err != nil the lease is invalidated ONLY when isLeadershipLossError
// reports a real leadership-loss signal (non-leader rejection,
// ErrLeadershipLost, transfer-in-progress). Business-logic failures
// such as write conflicts or validation errors are NOT leadership
// signals and must not invalidate the lease -- doing so would force
// every subsequent read onto the slow LinearizableRead path and defeat
// the lease's purpose. RegisterLeaderLossCallback plus the
// State()==StateLeader fast-path guard cover real leader loss.
func (c *Coordinate) refreshLeaseAfterDispatch(resp *CoordinateResponse, err error, dispatchStart time.Time, expectedGen uint64) {
if err != nil {
// Only invalidate on errors that actually signal leadership
Expand Down Expand Up @@ -374,7 +378,16 @@ func (c *Coordinate) LeaseRead(ctx context.Context) (uint64, error) {
}
idx, err := c.LinearizableRead(ctx)
if err != nil {
c.lease.invalidate()
// Only invalidate on real leadership-loss signals. A context
// deadline or transient transport error is NOT leadership loss;
// forcing invalidation for those would push every subsequent
// read onto the slow path for the remainder of the lease
// window, mirroring the production regression the write-path
// guard fixed. RegisterLeaderLossCallback plus the
// State()==StateLeader fast-path check cover real transitions.
if isLeadershipLossError(err) {
c.lease.invalidate()
}
return 0, err
}
c.lease.extend(now.Add(leaseDur), expectedGen)
Expand Down
5 changes: 5 additions & 0 deletions kv/lease_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ func TestIsLeadershipLossError(t *testing.T) {
cockroachdberrors.WithStack(cockroachdberrors.Mark(hashicorpraft.ErrLeadershipLost, raftengine.ErrLeadershipLost)),
true,
},
{
"hashicorp ErrLeadershipTransferInProgress marked with raftengine sentinel",
cockroachdberrors.WithStack(cockroachdberrors.Mark(hashicorpraft.ErrLeadershipTransferInProgress, raftengine.ErrLeadershipTransferInProgress)),
true,
},
{
"bare hashicorp ErrNotLeader (no raftengine mark) is NOT detected",
hashicorpraft.ErrNotLeader,
Expand Down
8 changes: 7 additions & 1 deletion kv/sharded_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,13 @@ func groupLeaseRead(ctx context.Context, g *ShardGroup) (uint64, error) {
}
idx, err := linearizableReadEngineCtx(ctx, engine)
if err != nil {
g.lease.invalidate()
// See Coordinate.LeaseRead: only real leadership-loss signals
// invalidate the lease. Deadlines, transport blips, and other
// transient errors must NOT force the remainder of the lease
// window onto the slow path.
if isLeadershipLossError(err) {
g.lease.invalidate()
}
return 0, err
}
g.lease.extend(now.Add(leaseDur), expectedGen)
Expand Down
Loading