Skip to content
Merged
25 changes: 25 additions & 0 deletions internal/raftengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,31 @@ type LeaseProvider interface {
LeaseDuration() time.Duration
// AppliedIndex returns the highest log index applied to the local FSM.
AppliedIndex() uint64
// LastQuorumAck returns the instant at which the engine most recently
// observed majority liveness on the leader -- i.e. the wall-clock time
// by which a quorum of follower Progress entries had responded. The
// engine maintains this in the background from MsgHeartbeatResp /
// MsgAppResp traffic on the leader, so a fast-path lease read does
// not need to issue its own ReadIndex to "warm" the lease.
//
// Safety: callers must verify the lease against a single
// `now := time.Now()` sample:
// state == raftengine.StateLeader &&
// !ack.IsZero() && !ack.After(now) && now.Sub(ack) < LeaseDuration()
//
// The !ack.After(now) guard matters because LastQuorumAck() may be
// reconstructed from UnixNano (no monotonic component): a backwards
// wall-clock adjustment would otherwise make now.Sub(ack) negative
// and pass the duration check against a stale ack. The LeaseDuration
// is bounded by electionTimeout - safety_margin, which guarantees
// that any new leader candidate cannot yet accept writes during
// that window.
//
// Returns the zero time when no quorum has been confirmed yet or
// when the local node is not the leader. Single-node LEADERS may
// return a recent time.Now() since self is the quorum; non-leader
// single-node replicas still return the zero time.
LastQuorumAck() time.Time
// RegisterLeaderLossCallback registers fn to be invoked whenever the
// local node leaves the leader role (graceful transfer, partition
// step-down, or shutdown). Callers use this to invalidate any
Expand Down
155 changes: 155 additions & 0 deletions internal/raftengine/etcd/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,28 @@ type Engine struct {
dispatchDropCount atomic.Uint64
dispatchErrorCount atomic.Uint64

// ackTracker records per-peer last-response times on the leader and
// publishes the majority-ack instant via quorumAckUnixNano. It is
// read lock-free from LastQuorumAck() on the hot lease-read path
// and updated inside the single event-loop goroutine from
// handleStep when a follower response arrives.
ackTracker quorumAckTracker
// singleNodeLeaderAckUnixNano short-circuits LastQuorumAck on the
// single-node leader path: self IS the quorum, so there are no
// follower responses to observe. refreshStatus keeps this value
// current (set to time.Now().UnixNano() each tick while leader and
// cluster size is 1; cleared otherwise) so the lease-read hot path
// never has to acquire e.mu to check peer count or leader state.
singleNodeLeaderAckUnixNano atomic.Int64
// isLeader mirrors status.State == StateLeader for lock-free reads
// on the hot path. refreshStatus writes it on every tick;
// recordQuorumAck reads it before admitting a follower response
// into ackTracker (so late MsgAppResp / MsgHeartbeatResp arriving
// after a step-down cannot repopulate the tracker), and
// LastQuorumAck reads it to honor the LeaseProvider contract
// ("zero time when the local node is not the leader").
isLeader atomic.Bool

// leaderLossCbsMu guards the slice of callbacks invoked when the node
// transitions out of the leader role (graceful transfer, partition
// step-down, shutdown). Callbacks fire synchronously from the
Expand Down Expand Up @@ -622,6 +644,33 @@ func (e *Engine) AppliedIndex() uint64 {
return e.appliedIndex.Load()
}

// LastQuorumAck returns the wall-clock instant by which a majority of
// followers most recently responded to the leader, or the zero time
// when no such observation exists (follower / candidate / startup).
//
// Lock-free: reads atomic.Int64 values published by recordQuorumAck
// (multi-node cluster) or refreshStatus (single-node cluster keeps
// singleNodeLeaderAckUnixNano alive with time.Now() while leader, so
// the hot lease-read path performs zero lock work). See
// raftengine.LeaseProvider for the lease-read correctness contract.
func (e *Engine) LastQuorumAck() time.Time {
if e == nil {
return time.Time{}
}
// Honor the LeaseProvider contract that non-leaders always return
// the zero time. Without this guard a late MsgAppResp that sneaks
// past recordQuorumAck (or a tracker entry that survived a brief
// step-down/step-up window) could leak stale liveness into the
// caller's fast-path validation.
if !e.isLeader.Load() {
return time.Time{}
}
if ns := e.singleNodeLeaderAckUnixNano.Load(); ns != 0 {
return time.Unix(0, ns)
}
return e.ackTracker.load()
}
Comment on lines +656 to +672
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LastQuorumAck() currently returns ackTracker.load() unconditionally for multi-node clusters. That can return a non-zero time even when the node is follower/candidate (e.g., late MsgAppResp/MsgHeartbeatResp arriving after a step-down can repopulate ackTracker), which violates the LeaseProvider contract (“zero time when ... not the leader”) and can also leak stale quorum acks across re-elections. Consider gating the return value on leader state (or maintaining an atomic leader flag) so non-leaders always observe time.Time{} for this API.

Copilot uses AI. Check for mistakes.

// RegisterLeaderLossCallback registers fn to fire every time the local
// node's Raft state transitions out of leader (CheckQuorum step-down,
// graceful transfer completion, partition-induced demotion) and also
Expand Down Expand Up @@ -1240,6 +1289,7 @@ func (e *Engine) handleStep(msg raftpb.Message) {
return
}
e.recordLeaderContact(msg)
e.recordQuorumAck(msg)
if err := e.rawNode.Step(msg); err != nil {
if errors.Is(err, etcdraft.ErrStepPeerNotFound) {
return
Expand All @@ -1248,6 +1298,75 @@ func (e *Engine) handleStep(msg raftpb.Message) {
}
}

// recordQuorumAck updates the per-peer last-response time when msg is
// a follower -> leader response, so LastQuorumAck() reflects ongoing
// majority liveness without requiring a fresh ReadIndex.
//
// Called inside the event-loop goroutine (single writer to e.peers
// and to the raft state), so the e.peers read is race-free.
//
// Gated on the atomic isLeader mirror: a transport-level MsgAppResp /
// MsgHeartbeatResp can land shortly after a step-down (reset() has
// already cleared ackTracker); admitting it here would repopulate
// the tracker and leak a stale liveness instant into the next
// re-election as a non-zero LastQuorumAck(). isLeader is written by
// refreshStatus on every tick, which catches every role transition
// before the next handleStep runs.
func (e *Engine) recordQuorumAck(msg raftpb.Message) {
if !isFollowerResponse(msg.Type) {
return
}
if msg.From == 0 || msg.From == e.nodeID {
return
}
if !e.isLeader.Load() {
return
}
// Reject acks from peers not in the current membership. Without
// this filter, a late MsgAppResp from a just-removed peer (which
// rawNode.Step will immediately reject with ErrStepPeerNotFound)
// would still land an ack in the tracker -- resurrecting the
// "ghost" entry that removePeer just pruned. Since we run on the
// event-loop goroutine (the sole writer to e.peers), the map read
// here is race-free.
if _, ok := e.peers[msg.From]; !ok {
return
}
clusterSize := len(e.peers)
if clusterSize <= 1 {
return
}
Comment on lines +1316 to +1338
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recordQuorumAck updates the tracker for follower-response messages without checking that the local node is currently leader. After a leader-loss transition (where ackTracker.reset() runs), late follower responses can still arrive and repopulate ackTracker while we are follower/candidate, making LastQuorumAck() non-zero and potentially reusing stale liveness on a later re-election. Suggest adding a leader-state gate here (or otherwise ensuring the tracker cannot be updated outside leader role).

Copilot uses AI. Check for mistakes.
e.ackTracker.recordAck(msg.From, followerQuorumForClusterSize(clusterSize))
}

// followerQuorumForClusterSize returns the number of non-self peer
// acks required to form a Raft majority for a cluster of the given
// size. Centralising the formula keeps ackTracker callers (handleStep
// and removePeer) consistent and avoids scattered //nolint:mnd
// suppressions. clusterSize is the total voter count INCLUDING self;
// the result is floor((clusterSize - 1) / 2) + 1 − 1 = clusterSize / 2
// for odd sizes (3 → 1, 5 → 2, 7 → 3) and clusterSize / 2 for even
// sizes (4 → 2, 6 → 3) where a strict majority still requires
// (N/2)+1 voters total, i.e. (N/2) followers beyond self.
func followerQuorumForClusterSize(clusterSize int) int {
if clusterSize <= 1 {
return 0
}
// The Raft majority for a cluster of size N is floor(N/2)+1 voters
// INCLUDING self, which means the leader needs N/2 OTHER acks.
return clusterSize / 2 //nolint:mnd
}

// isFollowerResponse reports whether a Raft message type represents a
// follower acknowledging the leader. We use only the two response
// types that ALL committed replication traffic passes through:
// MsgAppResp (log append ack) and MsgHeartbeatResp (passive heartbeat
// ack). Either one is proof that the peer's election timer has been
// reset, which is what the lease relies on.
func isFollowerResponse(t raftpb.MessageType) bool {
return t == raftpb.MsgAppResp || t == raftpb.MsgHeartbeatResp
}

func (e *Engine) sendMessages(messages []raftpb.Message) error {
for _, msg := range messages {
if e.skipDispatchMessage(msg) {
Expand Down Expand Up @@ -1846,13 +1965,34 @@ func (e *Engine) refreshStatus() {
if e.closed {
e.status.State = raftengine.StateShutdown
}
clusterSize := len(e.peers)
e.mu.Unlock()

// Keep the lock-free single-node fast path in sync with the current
// role: populate while leader of a 1-node cluster, clear otherwise
// (including on leader loss, so LastQuorumAck transitions to the
// multi-node tracker or zero time atomically).
// Publish leader state atomically so recordQuorumAck / LastQuorumAck
// can gate on it without acquiring e.mu. MUST run before the
// single-node ack store below, otherwise a brand-new leader tick
// could publish a ack instant while isLeader is still false.
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor grammar nit in the comment: "a ack instant" should be "an ack instant".

Suggested change
// could publish a ack instant while isLeader is still false.
// could publish an ack instant while isLeader is still false.

Copilot uses AI. Check for mistakes.
e.isLeader.Store(status.State == raftengine.StateLeader)

if status.State == raftengine.StateLeader && clusterSize <= 1 {
e.singleNodeLeaderAckUnixNano.Store(time.Now().UnixNano())
} else {
e.singleNodeLeaderAckUnixNano.Store(0)
}

if status.State == raftengine.StateLeader {
e.leaderOnce.Do(func() { close(e.leaderReady) })
}
if previous == raftengine.StateLeader && status.State != raftengine.StateLeader {
e.failPending(errors.WithStack(errNotLeader))
// Drop the per-peer ack map so a future re-election cannot
// surface a stale majority-ack instant before the new term's
// heartbeats have actually confirmed liveness.
e.ackTracker.reset()
// Notify lease holders so they invalidate any cached lease;
// without this hook, a former leader keeps serving fast-path
// reads from local state for up to LeaseDuration after a
Expand Down Expand Up @@ -2710,8 +2850,23 @@ func (e *Engine) removePeer(nodeID uint64) {
delete(e.peers, nodeID)
}
e.config.Servers = removeConfigServer(e.peers, e.config.Servers, nodeID, peer.ID)
postRemovalClusterSize := len(e.peers)
e.mu.Unlock()

// Drop the peer's recorded ack so a reconfiguration cannot leave a
// stale entry that falsely satisfies the new cluster's majority.
// followerQuorum is computed against the POST-removal cluster; a
// shrink to <=1 would otherwise pass 0 here, which
// quorumAckTracker.removePeer treats as "keep the current instant"
// and would surface stale liveness to LastQuorumAck if the cluster
// subsequently grew back. Clear the tracker explicitly in that
// case so any future multi-node membership starts fresh.
if postRemovalClusterSize <= 1 {
e.ackTracker.reset()
} else {
e.ackTracker.removePeer(nodeID, followerQuorumForClusterSize(postRemovalClusterSize))
}
Comment on lines +2864 to +2868
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New //nolint:mnd on the quorum calculation looks avoidable here as well. Consider reusing the same named helper/constant used in recordQuorumAck so the quorum math is centralized and the linter suppression isn't needed.

Copilot uses AI. Check for mistakes.

if e.transport != nil {
e.transport.RemovePeer(nodeID)
}
Expand Down
139 changes: 139 additions & 0 deletions internal/raftengine/etcd/quorum_ack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package etcd

import (
"sort"
"sync"
"sync/atomic"
"time"
)

// quorumAckTracker records the most recent response time from each
// follower and publishes the "majority-ack instant" -- the wall clock
// at which a majority of followers had all been confirmed live.
//
// LeaseRead callers pair the published instant with LeaseDuration to
// serve a leader-local read without issuing a fresh ReadIndex round.
// This replaces the prior caller-side lease scheme, which had to
// sample time.Now() before the slow path and therefore could not
// amortise reads whose own latency exceeded LeaseDuration (the bug
// that kept production GET at ~1 s under step-queue congestion).
//
// Safety: we record time.Now() when the leader OBSERVES the follower
// response, which is an UPPER bound on the follower's true ack time.
// Because lease = recorded_instant + lease_duration, that upper bound
// makes the lease extend slightly past the strictly-safe
// follower_ack_time + electionTimeout boundary by at most the one-way
// network delay plus scheduling slop. leaseSafetyMargin is sized to
// cover that overshoot, so leaseDuration = electionTimeout -
// leaseSafetyMargin keeps the lease strictly inside the no-new-leader
// window. See docs/lease_read_design.md for the full argument.
type quorumAckTracker struct {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The quorumAckTracker currently lacks a mechanism to handle peer removal. If a peer is removed from the cluster, its last recorded acknowledgment remains in the peerAcks map indefinitely. This could allow a leader to incorrectly satisfy the quorum requirement using a "ghost" acknowledgment from a non-existent peer, potentially violating lease safety after a configuration change. Please add a removePeer(peerID uint64) method to prune the map and ensure it is called from Engine.removePeer.

References
  1. To maintain consistency between the application's peer list and the Raft ConfState, ensure the peer list remains complete and consistent across configuration changes.

mu sync.Mutex
peerAcks map[uint64]int64 // peer ID → last ack unix nano observed on leader
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The peerAcks map is never pruned of peers that have been removed from the cluster. While reset() clears it on leader loss, a long-lived leader in a cluster with frequent membership changes will accumulate stale entries. More importantly, if the cluster size shrinks, an old timestamp from a removed peer might still be among the top followerQuorum acks, potentially inflating the majority-ack instant incorrectly.

// ackBuf is reused by recomputeLocked to avoid allocating a fresh
// []int64 on every MsgAppResp / MsgHeartbeatResp. Sized to
// len(peerAcks) on first use and grown via append when the cluster
// expands. Caller must hold t.mu.
ackBuf []int64
// quorumAckUnixNano is the Nth-most-recent peer ack where N equals
// the number of follower acks required for majority (clusterSize/2).
// Updated under mu; read lock-free via atomic.Load.
quorumAckUnixNano atomic.Int64
}

// recordAck notes that peerID responded to us and recomputes the
// majority-ack instant. followerQuorum is the number of non-self
// peers whose ack is required for majority (clusterSize / 2 for
// integer division; 1 for a 3-node cluster, 2 for 5-node, etc).
//
// A followerQuorum of 0 means single-node cluster: caller should
// surface LastQuorumAck = now without calling this.
func (t *quorumAckTracker) recordAck(peerID uint64, followerQuorum int) {
if followerQuorum <= 0 {
return
}
now := time.Now().UnixNano()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using time.Now().UnixNano() and then reconstructing the time via time.Unix(0, ns) strips the monotonic clock reading from the time.Time object. This makes subsequent time.Since(ack) calls in the lease-read path susceptible to wall-clock jumps (e.g., NTP adjustments). If the system clock is adjusted backwards, the lease could be held for longer than the safety window, which is unsafe.

Consider storing the time.Time object directly using an atomic.Pointer[time.Time] or atomic.Value to preserve monotonicity.

References
  1. For frequently accessed fields on hot paths, prefer atomic operations over mutexes to improve performance and maintain consistency.

t.mu.Lock()
defer t.mu.Unlock()
if t.peerAcks == nil {
t.peerAcks = make(map[uint64]int64)
}
t.peerAcks[peerID] = now
t.recomputeLocked(followerQuorum)
}

// removePeer drops peerID's recorded ack. Call when a peer leaves the
// cluster so its pre-removal ack time can no longer satisfy the
// majority threshold after a configuration change: a shrink-then-grow
// that ends with fresh peers who have not yet acked would otherwise
// let the removed peer's last ack falsely advance the quorum instant,
// which is a lease-safety violation.
//
// followerQuorum is the POST-removal follower quorum so the published
// instant is recomputed against the current cluster. Passing 0 keeps
// the current instant; the next recordAck will refresh it.
func (t *quorumAckTracker) removePeer(peerID uint64, followerQuorum int) {
t.mu.Lock()
defer t.mu.Unlock()
// delete is safe on a missing key. We still recompute even when
// peerID had no recorded entry: a shrink that reduces
// followerQuorum may let the remaining peers now satisfy the
// smaller threshold, and without an explicit recompute the
// published instant would stay at its stale value (or zero) until
// the next recordAck arrives.
delete(t.peerAcks, peerID)
if followerQuorum <= 0 {
return
}
t.recomputeLocked(followerQuorum)
Comment on lines +75 to +88
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In quorumAckTracker.removePeer, returning early when peerID has no recorded entry skips recomputing quorumAckUnixNano. If membership changes reduce followerQuorum, the tracker can unnecessarily keep quorumAck at 0 (or stale) until the next recordAck, even though the remaining recorded peers may already satisfy the new quorum. Consider always recomputing when followerQuorum > 0 (and delete() on a nil/missing entry is safe), or providing an explicit recompute-on-config-change path.

Copilot uses AI. Check for mistakes.
}

// recomputeLocked publishes the followerQuorum-th most recent ack as
// the quorum instant, or clears it if we lack that many recorded
// peers. Caller must hold t.mu.
//
// Reuses t.ackBuf across calls so the hot path (one call per
// MsgAppResp / MsgHeartbeatResp) does not allocate on steady state.
// The buffer is re-sliced in place and the sort is done on that
// slice; a cluster growing past the previous capacity picks up a
// single growth step via append, not a fresh allocation per call.
func (t *quorumAckTracker) recomputeLocked(followerQuorum int) {
if len(t.peerAcks) < followerQuorum {
// Not enough peers have reported to form a majority yet.
t.quorumAckUnixNano.Store(0)
return
}
t.ackBuf = t.ackBuf[:0]
for _, a := range t.peerAcks {
t.ackBuf = append(t.ackBuf, a)
}
// Sort descending so ackBuf[0] is the most recent. The
// followerQuorum-th entry (1-indexed) is the oldest ack among the
// top quorum -- i.e. the boundary instant by which majority
// liveness was confirmed. Cluster size is small in practice (3-5
// peers), so sort.Slice is cheaper than a quickselect once the
// buffer is reused.
sort.Slice(t.ackBuf, func(i, j int) bool { return t.ackBuf[i] > t.ackBuf[j] })
t.quorumAckUnixNano.Store(t.ackBuf[followerQuorum-1])
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Comment on lines +100 to +118
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

quorumAckTracker.recomputeLocked allocates a fresh slice and sorts all peer ack timestamps on every follower response. Since recordAck is called for each MsgAppResp / MsgHeartbeatResp, this can become a hot/allocating path as cluster size or heartbeat frequency grows. Consider reducing allocations (e.g., reuse a buffer) and computing the Nth-most-recent timestamp without a full sort.

Copilot uses AI. Check for mistakes.

// reset clears all recorded peer acks. Call when the local node
// leaves the leader role so a future re-election does not resurrect
// a stale majority-ack instant.
func (t *quorumAckTracker) reset() {
t.mu.Lock()
defer t.mu.Unlock()
t.peerAcks = nil
t.ackBuf = t.ackBuf[:0]
t.quorumAckUnixNano.Store(0)
}

// load returns the current majority-ack instant or the zero time if
// no quorum has been observed since the last reset.
func (t *quorumAckTracker) load() time.Time {
ns := t.quorumAckUnixNano.Load()
if ns == 0 {
return time.Time{}
}
return time.Unix(0, ns)
}
Loading
Loading