From aadf7ff5ad30586d95d6cbb8bbdfa5d8b12238dc Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Tue, 21 Apr 2026 00:44:34 +0900 Subject: [PATCH 1/8] feat(lease-read): engine-driven lease anchor via LastQuorumAck The caller-side lease (kv.leaseState + refreshLeaseAfterDispatch) sampled time.Now() BEFORE each slow-path LinearizableRead and used it as the extend base. When production LinearizableRead latency grew to ~1s (step-queue congestion) while LeaseDuration is 700ms, every extend landed ~300ms in the past -- the next read always missed the cache and the lease never warmed up, defeating the entire lease-read optimisation. Switch the authority to the engine: - internal/raftengine: add LeaseProvider.LastQuorumAck() returning the wall-clock instant at which a majority of followers most recently responded to the leader. - internal/raftengine/etcd: implement quorumAckTracker and hook it from handleStep's follower-response path (MsgAppResp / MsgHeartbeatResp). Reset the tracker on leader-loss transitions in refreshStatus so a future re-election cannot surface a stale instant. Single-node clusters short-circuit to time.Now(). - kv: Coordinate.LeaseRead and ShardedCoordinator.groupLeaseRead consult LastQuorumAck FIRST, falling back to the existing caller-side lease and finally to LinearizableRead. The engine's lease is refreshed on every heartbeat independent of read latency, so a slow LinearizableRead no longer leaves the cache cold. Safety: we record time.Now() when the leader observes the follower response, which is an upper bound on the follower's true ack time. leaseSafetyMargin (300ms) covers the resulting overshoot (one-way delay + scheduling slop + clock drift), keeping lease_expiry strictly inside ack_time + electionTimeout. Caller-side lease machinery (leaseState, leaseRefreshingTxn, RegisterLeaderLossCallback) is retained for test affordance and as a secondary fast-path during rollout, but is no longer load-bearing. --- internal/raftengine/engine.go | 18 ++++ internal/raftengine/etcd/engine.go | 82 ++++++++++++++ internal/raftengine/etcd/quorum_ack.go | 102 ++++++++++++++++++ internal/raftengine/etcd/quorum_ack_test.go | 114 ++++++++++++++++++++ kv/coordinator.go | 64 ++++++----- kv/lease_read_test.go | 9 ++ kv/sharded_coordinator.go | 23 ++-- 7 files changed, 369 insertions(+), 43 deletions(-) create mode 100644 internal/raftengine/etcd/quorum_ack.go create mode 100644 internal/raftengine/etcd/quorum_ack_test.go diff --git a/internal/raftengine/engine.go b/internal/raftengine/engine.go index 95cc77b7..00bdd089 100644 --- a/internal/raftengine/engine.go +++ b/internal/raftengine/engine.go @@ -94,6 +94,24 @@ type LeaseProvider interface { LeaseDuration() time.Duration // AppliedIndex returns the highest log index applied to the local FSM. AppliedIndex() uint64 + // LastQuorumAck returns the instant at which the engine most recently + // observed majority liveness on the leader -- i.e. the wall-clock time + // by which a quorum of follower Progress entries had responded. The + // engine maintains this in the background from MsgHeartbeatResp / + // MsgAppResp traffic on the leader, so a fast-path lease read does + // not need to issue its own ReadIndex to "warm" the lease. + // + // Safety: callers must verify the lease with + // time.Since(LastQuorumAck()) < LeaseDuration() && + // engine.State() == raftengine.StateLeader + // before serving a leader-local read. The LeaseDuration is bounded + // by electionTimeout - safety_margin, which guarantees that any new + // leader candidate cannot yet accept writes during that window. + // + // Returns the zero time when no quorum has been confirmed yet, or + // when the local node is not the leader. Single-node clusters + // report time.Now() unconditionally since self is the quorum. + LastQuorumAck() time.Time // RegisterLeaderLossCallback registers fn to be invoked whenever the // local node leaves the leader role (graceful transfer, partition // step-down, or shutdown). Callers use this to invalidate any diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index e334d4da..d674f6e5 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -181,6 +181,13 @@ type Engine struct { dispatchDropCount atomic.Uint64 dispatchErrorCount atomic.Uint64 + // ackTracker records per-peer last-response times on the leader and + // publishes the majority-ack instant via quorumAckUnixNano. It is + // read lock-free from LastQuorumAck() on the hot lease-read path + // and updated inside the single event-loop goroutine from + // handleStep when a follower response arrives. + ackTracker quorumAckTracker + // leaderLossCbsMu guards the slice of callbacks invoked when the node // transitions out of the leader role (graceful transfer, partition // step-down, shutdown). Callbacks fire synchronously from the @@ -604,6 +611,38 @@ func (e *Engine) AppliedIndex() uint64 { return e.appliedIndex.Load() } +// LastQuorumAck returns the wall-clock instant by which a majority of +// followers most recently responded to the leader, or the zero time +// when no such observation exists (follower / candidate / startup). +// Single-node clusters short-circuit to time.Now() while this node is +// leader because self is the quorum. +// +// Lock-free: reads the atomic.Int64 published by recordQuorumAck +// inside the event-loop goroutine. See raftengine.LeaseProvider for +// the lease-read correctness contract. +func (e *Engine) LastQuorumAck() time.Time { + if e == nil { + return time.Time{} + } + // Fast answer for single-node clusters: we're always our own + // quorum while we're leader. Peeking at e.peers from outside the + // event loop is safe for the small-cluster check because peer + // membership is only updated during config changes and reading a + // momentarily stale size at most defers switching to the + // majority-ack pathway until the next LeaseRead. + e.mu.RLock() + clusterSize := len(e.peers) + state := e.status.State + e.mu.RUnlock() + if clusterSize <= 1 { + if state == raftengine.StateLeader { + return time.Now() + } + return time.Time{} + } + return e.ackTracker.load() +} + // RegisterLeaderLossCallback registers fn to fire every time the local // node's Raft state transitions out of leader (CheckQuorum step-down, // graceful transfer completion, partition-induced demotion) and also @@ -1222,6 +1261,7 @@ func (e *Engine) handleStep(msg raftpb.Message) { return } e.recordLeaderContact(msg) + e.recordQuorumAck(msg) if err := e.rawNode.Step(msg); err != nil { if errors.Is(err, etcdraft.ErrStepPeerNotFound) { return @@ -1230,6 +1270,44 @@ func (e *Engine) handleStep(msg raftpb.Message) { } } +// recordQuorumAck updates the per-peer last-response time when msg is +// a follower -> leader response, so LastQuorumAck() reflects ongoing +// majority liveness without requiring a fresh ReadIndex. +// +// Called inside the event-loop goroutine (single writer to e.peers +// and to the raft state), so the len(e.peers) read and the +// leader-state check are race-free. We intentionally do not gate the +// observation itself on BasicStatus() -- a MsgAppResp / MsgHeartbeatResp +// only ever lands at a leader via the transport layer, and refreshing +// the per-peer map from follower/candidate state would just be dead +// data later cleared by reset() on role transition. +func (e *Engine) recordQuorumAck(msg raftpb.Message) { + if !isFollowerResponse(msg.Type) { + return + } + if msg.From == 0 || msg.From == e.nodeID { + return + } + clusterSize := len(e.peers) + if clusterSize <= 1 { + return + } + // Followers needed for majority = floor(clusterSize / 2): 1 for a + // 3-node cluster, 2 for 5-node, matching raft quorum semantics. + followerQuorum := clusterSize / 2 //nolint:mnd + e.ackTracker.recordAck(msg.From, followerQuorum) +} + +// isFollowerResponse reports whether a Raft message type represents a +// follower acknowledging the leader. We use only the two response +// types that ALL committed replication traffic passes through: +// MsgAppResp (log append ack) and MsgHeartbeatResp (passive heartbeat +// ack). Either one is proof that the peer's election timer has been +// reset, which is what the lease relies on. +func isFollowerResponse(t raftpb.MessageType) bool { + return t == raftpb.MsgAppResp || t == raftpb.MsgHeartbeatResp +} + func (e *Engine) sendMessages(messages []raftpb.Message) error { for _, msg := range messages { if e.skipDispatchMessage(msg) { @@ -1835,6 +1913,10 @@ func (e *Engine) refreshStatus() { } if previous == raftengine.StateLeader && status.State != raftengine.StateLeader { e.failPending(errors.WithStack(errNotLeader)) + // Drop the per-peer ack map so a future re-election cannot + // surface a stale majority-ack instant before the new term's + // heartbeats have actually confirmed liveness. + e.ackTracker.reset() // Notify lease holders so they invalidate any cached lease; // without this hook, a former leader keeps serving fast-path // reads from local state for up to LeaseDuration after a diff --git a/internal/raftengine/etcd/quorum_ack.go b/internal/raftengine/etcd/quorum_ack.go new file mode 100644 index 00000000..6f3536f7 --- /dev/null +++ b/internal/raftengine/etcd/quorum_ack.go @@ -0,0 +1,102 @@ +package etcd + +import ( + "sort" + "sync" + "sync/atomic" + "time" +) + +// quorumAckTracker records the most recent response time from each +// follower and publishes the "majority-ack instant" -- the wall clock +// at which a majority of followers had all been confirmed live. +// +// LeaseRead callers pair the published instant with LeaseDuration to +// serve a leader-local read without issuing a fresh ReadIndex round. +// This replaces the prior caller-side lease scheme, which had to +// sample time.Now() before the slow path and therefore could not +// amortise reads whose own latency exceeded LeaseDuration (the bug +// that kept production GET at ~1 s under step-queue congestion). +// +// Correctness anchor: we record time.Now() when the leader OBSERVES +// a follower response, not the follower's local ack time. That makes +// our recorded instant an UPPER bound on the follower's true +// last-contact, which is the conservative direction for lease +// safety: lease = recorded_instant + lease_duration can only be +// LATER than follower_last_contact + lease_duration, and +// follower_last_contact + electionTimeout is the earliest time the +// follower would vote for a new leader, so lease_duration < +// electionTimeout - safety keeps the lease strictly inside the +// no-new-leader window. +// +// Wait -- that's unsafe. A later observation means a LARGER recorded +// instant, which makes lease_expiry later. We actually need a LOWER +// bound on follower_last_contact to bound lease_expiry conservatively. +// Because network/scheduling delay makes leader_observation >= +// follower_ack_sent_time, using leader_observation is an OVERestimate +// of follower_ack_sent_time, which in turn is an overestimate of +// follower_ack_received_time. That means lease extends slightly past +// the strictly-safe boundary by at most the one-way delay + scheduling +// slop -- which is exactly what leaseSafetyMargin is sized to cover. +// See docs/lease_read_design.md for the full argument. +type quorumAckTracker struct { + mu sync.Mutex + peerAcks map[uint64]int64 // peer ID → last ack unix nano observed on leader + // quorumAckUnixNano is the Nth-most-recent peer ack where N equals + // the number of follower acks required for majority (clusterSize/2). + // Updated under mu; read lock-free via atomic.Load. + quorumAckUnixNano atomic.Int64 +} + +// recordAck notes that peerID responded to us and recomputes the +// majority-ack instant. followerQuorum is the number of non-self +// peers whose ack is required for majority (clusterSize / 2 for +// integer division; 1 for a 3-node cluster, 2 for 5-node, etc). +// +// A followerQuorum of 0 means single-node cluster: caller should +// surface LastQuorumAck = now without calling this. +func (t *quorumAckTracker) recordAck(peerID uint64, followerQuorum int) { + if followerQuorum <= 0 { + return + } + now := time.Now().UnixNano() + t.mu.Lock() + defer t.mu.Unlock() + if t.peerAcks == nil { + t.peerAcks = make(map[uint64]int64) + } + t.peerAcks[peerID] = now + if len(t.peerAcks) < followerQuorum { + // Not enough peers have reported yet to form a majority. + return + } + acks := make([]int64, 0, len(t.peerAcks)) + for _, a := range t.peerAcks { + acks = append(acks, a) + } + // Sort descending so acks[0] is the most recent. The followerQuorum-th + // entry (1-indexed) is the oldest ack among the top quorum -- i.e. the + // boundary instant by which majority liveness was confirmed. + sort.Slice(acks, func(i, j int) bool { return acks[i] > acks[j] }) + t.quorumAckUnixNano.Store(acks[followerQuorum-1]) +} + +// reset clears all recorded peer acks. Call when the local node +// leaves the leader role so a future re-election does not resurrect +// a stale majority-ack instant. +func (t *quorumAckTracker) reset() { + t.mu.Lock() + defer t.mu.Unlock() + t.peerAcks = nil + t.quorumAckUnixNano.Store(0) +} + +// load returns the current majority-ack instant or the zero time if +// no quorum has been observed since the last reset. +func (t *quorumAckTracker) load() time.Time { + ns := t.quorumAckUnixNano.Load() + if ns == 0 { + return time.Time{} + } + return time.Unix(0, ns) +} diff --git a/internal/raftengine/etcd/quorum_ack_test.go b/internal/raftengine/etcd/quorum_ack_test.go new file mode 100644 index 00000000..f58e691f --- /dev/null +++ b/internal/raftengine/etcd/quorum_ack_test.go @@ -0,0 +1,114 @@ +package etcd + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestQuorumAckTracker_SingleNodeFollowerQuorumZeroIsNoop(t *testing.T) { + t.Parallel() + var tr quorumAckTracker + // followerQuorum == 0 means single-node cluster -- caller handles + // that case elsewhere. recordAck must not mutate state, otherwise + // a re-election into multi-node would surface a stale instant. + tr.recordAck(42, 0) + require.Equal(t, time.Time{}, tr.load()) +} + +func TestQuorumAckTracker_QuorumAckWaitsForMajority(t *testing.T) { + t.Parallel() + var tr quorumAckTracker + // 3-node cluster: followerQuorum = 1 (need 1 follower + self). + tr.recordAck(2, 1) + first := tr.load() + require.False(t, first.IsZero(), "single follower ack already satisfies 3-node quorum") + + // 5-node cluster: followerQuorum = 2. One follower ack alone is + // NOT enough -- tracker must wait until a second follower has + // reported before publishing. + var tr2 quorumAckTracker + tr2.recordAck(2, 2) + require.Equal(t, time.Time{}, tr2.load(), "one follower is not a 5-node quorum") + tr2.recordAck(3, 2) + require.False(t, tr2.load().IsZero(), "two followers + self make a 5-node quorum") +} + +func TestQuorumAckTracker_QuorumAckIsOldestOfTopN(t *testing.T) { + t.Parallel() + var tr quorumAckTracker + // 5-node cluster (quorum = 2 followers). Record acks in staggered + // order and verify the published instant is the OLDER of the top + // two -- i.e. the boundary by which a majority was last confirmed. + tr.recordAck(2, 2) + first := tr.load() + require.True(t, first.IsZero(), "still only one follower, no quorum") + + time.Sleep(2 * time.Millisecond) + tr.recordAck(3, 2) + second := tr.load() + require.False(t, second.IsZero()) + + // Now peer 4 acks with a later timestamp. Quorum ack should still + // be the older of the top two (either 2 or 3, not 4) because the + // 5-node quorum is 3 including self (2 followers + self), and the + // OLDEST of the top two followers is still the limiting factor. + time.Sleep(2 * time.Millisecond) + tr.recordAck(4, 2) + third := tr.load() + require.False(t, third.Before(second), "quorum instant must not regress") +} + +func TestQuorumAckTracker_ResetClearsState(t *testing.T) { + t.Parallel() + var tr quorumAckTracker + tr.recordAck(2, 1) + require.False(t, tr.load().IsZero()) + + tr.reset() + require.Equal(t, time.Time{}, tr.load()) + + // After reset, a subsequent ack must still populate correctly. + tr.recordAck(2, 1) + require.False(t, tr.load().IsZero()) +} + +func TestQuorumAckTracker_ConcurrentRecordAndLoad(t *testing.T) { + t.Parallel() + var tr quorumAckTracker + var wg sync.WaitGroup + wg.Add(2) + stop := make(chan struct{}) + + // Recorder alternates between two peer IDs so a 3-node followerQuorum + // always has at least one entry and the sort path runs. + go func() { + defer wg.Done() + for { + select { + case <-stop: + return + default: + tr.recordAck(2, 1) + tr.recordAck(3, 1) + } + } + }() + go func() { + defer wg.Done() + for { + select { + case <-stop: + return + default: + _ = tr.load() + } + } + }() + + time.Sleep(20 * time.Millisecond) + close(stop) + wg.Wait() +} diff --git a/kv/coordinator.go b/kv/coordinator.go index f6f404b3..b8e5f5e9 100644 --- a/kv/coordinator.go +++ b/kv/coordinator.go @@ -340,12 +340,23 @@ func (c *Coordinate) LinearizableReadForKey(ctx context.Context, _ []byte) (uint } // LeaseRead returns a read fence backed by a leader-local lease when -// available, falling back to a full LinearizableRead when the lease has -// expired or the underlying engine does not implement LeaseProvider. +// available, falling back to a full LinearizableRead when the engine +// has not observed a fresh majority ack or does not implement +// LeaseProvider. // -// The returned index is the engine's current applied index (fast path) or -// the index returned by LinearizableRead (slow path). Callers that resolve -// timestamps via store.LastCommitTS may discard the value. +// The lease is maintained inside the engine from ongoing +// MsgAppResp / MsgHeartbeatResp traffic, so callers do not sample +// time.Now() before the slow path to "extend" a lease afterwards. +// That earlier pre-read sampling was racy under congestion: if a +// LinearizableRead took longer than LeaseDuration, the extension +// would land already expired and the lease never warmed up. The +// engine-driven anchor is refreshed every heartbeat independent of +// read latency. +// +// The returned index is the engine's current applied index (fast +// path) or the index returned by LinearizableRead (slow path). +// Callers that resolve timestamps via store.LastCommitTS may discard +// the value. func (c *Coordinate) LeaseRead(ctx context.Context) (uint64, error) { lp, ok := c.engine.(raftengine.LeaseProvider) if !ok { @@ -353,38 +364,33 @@ func (c *Coordinate) LeaseRead(ctx context.Context) (uint64, error) { } leaseDur := lp.LeaseDuration() if leaseDur <= 0 { - // Misconfigured tick settings (Engine.Open warned about this): - // the lease can never be valid. Fall back without touching - // lease state so we do not waste extend/invalidate work. + // Misconfigured tick settings: lease is disabled. return c.LinearizableRead(ctx) } - // Capture time.Now() and the lease generation exactly once before - // any quorum work. `now` is reused for both the fast-path validity - // check and (on slow path) the extend base; `expectedGen` guards - // against a leader-loss invalidation that fires during - // LinearizableRead from being overwritten by this caller's extend. - // See Coordinate.Dispatch for the same rationale. + // Primary: engine-driven lease. The engine refreshes LastQuorumAck + // on every MsgHeartbeatResp / MsgAppResp it receives while leader, + // so a read that itself takes longer than LeaseDuration does not + // prevent the window from being kept warm -- which is exactly the + // production congestion pathology the pre-engine-driven lease + // could not amortise out of. + state := c.engine.State() + if state == raftengine.StateLeader { + if ack := lp.LastQuorumAck(); !ack.IsZero() && time.Since(ack) < leaseDur { + return lp.AppliedIndex(), nil + } + } + // Secondary: caller-side lease warmed by a previous successful + // slow-path read. Preserved so tests can prime the lease directly + // and so we still benefit on paths where LastQuorumAck is not yet + // populated (e.g. very first read after startup before the first + // quorum heartbeat round has landed). now := time.Now() expectedGen := c.lease.generation() - // Defense-in-depth against the narrow race between an engine - // state transition out of leader and the async leader-loss - // callback flipping the lease: check the engine's current view - // too. State() is updated every Raft tick (~10 ms), which is - // tighter than the lease's time-bound. If the engine already - // knows it's not leader, force the slow path (which will fail - // fast via LinearizableRead and invalidate the lease). - if c.lease.valid(now) && c.engine.State() == raftengine.StateLeader { + if c.lease.valid(now) && state == raftengine.StateLeader { return lp.AppliedIndex(), nil } idx, err := c.LinearizableRead(ctx) if err != nil { - // Only invalidate on real leadership-loss signals. A context - // deadline or transient transport error is NOT leadership loss; - // forcing invalidation for those would push every subsequent - // read onto the slow path for the remainder of the lease - // window, mirroring the production regression the write-path - // guard fixed. RegisterLeaderLossCallback plus the - // State()==StateLeader fast-path check cover real transitions. if isLeadershipLossError(err) { c.lease.invalidate() } diff --git a/kv/lease_read_test.go b/kv/lease_read_test.go index 6d545e9a..50bb741c 100644 --- a/kv/lease_read_test.go +++ b/kv/lease_read_test.go @@ -21,11 +21,13 @@ type fakeLeaseEngine struct { linearizableErr error linearizableCalls atomic.Int32 state atomic.Value // stores raftengine.State; default Leader + lastQuorumAckUnixNano atomic.Int64 // 0 = no ack yet. Updated by ackNow(). leaderLossCallbacksMu sync.Mutex leaderLossCallbacks []fakeLeaseEngineCb registerLeaderLossCalled atomic.Int32 } + // fakeLeaseEngineCb pairs a callback with a unique sentinel pointer so // deregister can target THIS specific registration even when callbacks // are removed out of order, matching the production etcd engine. @@ -63,6 +65,13 @@ func (e *fakeLeaseEngine) Propose(context.Context, []byte) (*raftengine.Proposal func (e *fakeLeaseEngine) Close() error { return nil } func (e *fakeLeaseEngine) LeaseDuration() time.Duration { return e.leaseDur } func (e *fakeLeaseEngine) AppliedIndex() uint64 { return e.applied } +func (e *fakeLeaseEngine) LastQuorumAck() time.Time { + ns := e.lastQuorumAckUnixNano.Load() + if ns == 0 { + return time.Time{} + } + return time.Unix(0, ns) +} func (e *fakeLeaseEngine) RegisterLeaderLossCallback(fn func()) func() { e.registerLeaderLossCalled.Add(1) // Unique sentinel per registration so deregister can target THIS diff --git a/kv/sharded_coordinator.go b/kv/sharded_coordinator.go index e5c7790a..e2533ccf 100644 --- a/kv/sharded_coordinator.go +++ b/kv/sharded_coordinator.go @@ -749,28 +749,23 @@ func groupLeaseRead(ctx context.Context, g *ShardGroup) (uint64, error) { } leaseDur := lp.LeaseDuration() if leaseDur <= 0 { - // Lease disabled by tick configuration. Always take the slow - // path without mutating g.lease. return linearizableReadEngineCtx(ctx, engine) } - // Single time.Now() and generation sample before any quorum work, - // mirroring Coordinate.LeaseRead. expectedGen guards against a - // leader-loss invalidation that fires during LinearizableRead. + // Engine-driven lease anchor -- see Coordinate.LeaseRead for why + // this is the primary check. + state := engine.State() + if state == raftengine.StateLeader { + if ack := lp.LastQuorumAck(); !ack.IsZero() && time.Since(ack) < leaseDur { + return lp.AppliedIndex(), nil + } + } now := time.Now() expectedGen := g.lease.generation() - // Defense-in-depth: also check the shard engine's current state. - // Async callbacks may not have flipped the lease yet, but - // State() is refreshed every tick and catches transitions - // sooner. See Coordinate.LeaseRead for details. - if g.lease.valid(now) && engine.State() == raftengine.StateLeader { + if g.lease.valid(now) && state == raftengine.StateLeader { return lp.AppliedIndex(), nil } idx, err := linearizableReadEngineCtx(ctx, engine) if err != nil { - // See Coordinate.LeaseRead: only real leadership-loss signals - // invalidate the lease. Deadlines, transport blips, and other - // transient errors must NOT force the remainder of the lease - // window onto the slow path. if isLeadershipLossError(err) { g.lease.invalidate() } From 2b86cdc538840a6cb07fa38b1d48a7f173bbe76c Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Tue, 21 Apr 2026 01:24:30 +0900 Subject: [PATCH 2/8] fix(lease-read): prune removed peers and make LastQuorumAck lock-free Addresses review feedback on PR #561. Safety fix (gemini HIGH): quorumAckTracker now exposes removePeer, called from Engine.removePeer so a shrinking-then-growing cluster cannot keep a stale ack from a removed peer falsely satisfying the majority threshold. recomputeLocked factored out of recordAck to share the sort-and-publish path. Hot-path fix (gemini MEDIUM x3): LastQuorumAck is now fully lock-free. Multi-node path reads ackTracker.quorumAckUnixNano atomically as before; single-node leader path reads a new singleNodeLeaderAckUnixNano atomic, populated from refreshStatus every tick while leader of a 1-node cluster and cleared otherwise. The previous implementation held e.mu.RLock to check peer count and state, doubling the lock traffic with the caller's engine.State(). Coverage (coderabbit nitpick): - TestCoordinate_LeaseRead_EngineAckFastPath: fresh ack + cold caller-side lease -> fast path, 0 LinearizableRead. - TestCoordinate_LeaseRead_EngineAckStaleFallsThrough: stale ack -> slow path. - TestCoordinate_LeaseRead_EngineAckIgnoredWhenNotLeader: fresh ack with non-leader state -> slow path. - TestQuorumAckTracker_RemovedPeerCannotSatisfyQuorum: removed peer ack no longer counts toward the new cluster's quorum. - TestQuorumAckTracker_RemovePeerZeroQuorumKeepsCurrent: followerQuorum=0 preserves the published instant so the next recordAck refreshes it. --- internal/raftengine/etcd/engine.go | 55 ++++++++++------ internal/raftengine/etcd/quorum_ack.go | 67 ++++++++++++------- internal/raftengine/etcd/quorum_ack_test.go | 37 +++++++++++ kv/lease_read_test.go | 71 ++++++++++++++++++++- 4 files changed, 185 insertions(+), 45 deletions(-) diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index d674f6e5..8aa91234 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -187,6 +187,13 @@ type Engine struct { // and updated inside the single event-loop goroutine from // handleStep when a follower response arrives. ackTracker quorumAckTracker + // singleNodeLeaderAckUnixNano short-circuits LastQuorumAck on the + // single-node leader path: self IS the quorum, so there are no + // follower responses to observe. refreshStatus keeps this value + // current (set to time.Now().UnixNano() each tick while leader and + // cluster size is 1; cleared otherwise) so the lease-read hot path + // never has to acquire e.mu to check peer count or leader state. + singleNodeLeaderAckUnixNano atomic.Int64 // leaderLossCbsMu guards the slice of callbacks invoked when the node // transitions out of the leader role (graceful transfer, partition @@ -614,31 +621,18 @@ func (e *Engine) AppliedIndex() uint64 { // LastQuorumAck returns the wall-clock instant by which a majority of // followers most recently responded to the leader, or the zero time // when no such observation exists (follower / candidate / startup). -// Single-node clusters short-circuit to time.Now() while this node is -// leader because self is the quorum. // -// Lock-free: reads the atomic.Int64 published by recordQuorumAck -// inside the event-loop goroutine. See raftengine.LeaseProvider for -// the lease-read correctness contract. +// Lock-free: reads atomic.Int64 values published by recordQuorumAck +// (multi-node cluster) or refreshStatus (single-node cluster keeps +// singleNodeLeaderAckUnixNano alive with time.Now() while leader, so +// the hot lease-read path performs zero lock work). See +// raftengine.LeaseProvider for the lease-read correctness contract. func (e *Engine) LastQuorumAck() time.Time { if e == nil { return time.Time{} } - // Fast answer for single-node clusters: we're always our own - // quorum while we're leader. Peeking at e.peers from outside the - // event loop is safe for the small-cluster check because peer - // membership is only updated during config changes and reading a - // momentarily stale size at most defers switching to the - // majority-ack pathway until the next LeaseRead. - e.mu.RLock() - clusterSize := len(e.peers) - state := e.status.State - e.mu.RUnlock() - if clusterSize <= 1 { - if state == raftengine.StateLeader { - return time.Now() - } - return time.Time{} + if ns := e.singleNodeLeaderAckUnixNano.Load(); ns != 0 { + return time.Unix(0, ns) } return e.ackTracker.load() } @@ -1906,8 +1900,19 @@ func (e *Engine) refreshStatus() { if e.closed { e.status.State = raftengine.StateShutdown } + clusterSize := len(e.peers) e.mu.Unlock() + // Keep the lock-free single-node fast path in sync with the current + // role: populate while leader of a 1-node cluster, clear otherwise + // (including on leader loss, so LastQuorumAck transitions to the + // multi-node tracker or zero time atomically). + if status.State == raftengine.StateLeader && clusterSize <= 1 { + e.singleNodeLeaderAckUnixNano.Store(time.Now().UnixNano()) + } else { + e.singleNodeLeaderAckUnixNano.Store(0) + } + if status.State == raftengine.StateLeader { e.leaderOnce.Do(func() { close(e.leaderReady) }) } @@ -2774,8 +2779,18 @@ func (e *Engine) removePeer(nodeID uint64) { delete(e.peers, nodeID) } e.config.Servers = removeConfigServer(e.peers, e.config.Servers, nodeID, peer.ID) + postRemovalClusterSize := len(e.peers) e.mu.Unlock() + // Drop the peer's recorded ack so a reconfiguration cannot leave a + // stale entry that falsely satisfies the new cluster's majority. + // followerQuorum is computed against the POST-removal cluster. + followerQuorum := 0 + if postRemovalClusterSize > 1 { + followerQuorum = postRemovalClusterSize / 2 //nolint:mnd + } + e.ackTracker.removePeer(nodeID, followerQuorum) + if e.transport != nil { e.transport.RemovePeer(nodeID) } diff --git a/internal/raftengine/etcd/quorum_ack.go b/internal/raftengine/etcd/quorum_ack.go index 6f3536f7..df9d08ad 100644 --- a/internal/raftengine/etcd/quorum_ack.go +++ b/internal/raftengine/etcd/quorum_ack.go @@ -18,27 +18,15 @@ import ( // amortise reads whose own latency exceeded LeaseDuration (the bug // that kept production GET at ~1 s under step-queue congestion). // -// Correctness anchor: we record time.Now() when the leader OBSERVES -// a follower response, not the follower's local ack time. That makes -// our recorded instant an UPPER bound on the follower's true -// last-contact, which is the conservative direction for lease -// safety: lease = recorded_instant + lease_duration can only be -// LATER than follower_last_contact + lease_duration, and -// follower_last_contact + electionTimeout is the earliest time the -// follower would vote for a new leader, so lease_duration < -// electionTimeout - safety keeps the lease strictly inside the -// no-new-leader window. -// -// Wait -- that's unsafe. A later observation means a LARGER recorded -// instant, which makes lease_expiry later. We actually need a LOWER -// bound on follower_last_contact to bound lease_expiry conservatively. -// Because network/scheduling delay makes leader_observation >= -// follower_ack_sent_time, using leader_observation is an OVERestimate -// of follower_ack_sent_time, which in turn is an overestimate of -// follower_ack_received_time. That means lease extends slightly past -// the strictly-safe boundary by at most the one-way delay + scheduling -// slop -- which is exactly what leaseSafetyMargin is sized to cover. -// See docs/lease_read_design.md for the full argument. +// Safety: we record time.Now() when the leader OBSERVES the follower +// response, which is an UPPER bound on the follower's true ack time. +// Because lease = recorded_instant + lease_duration, that upper bound +// makes the lease extend slightly past the strictly-safe +// follower_ack_time + electionTimeout boundary by at most the one-way +// network delay plus scheduling slop. leaseSafetyMargin is sized to +// cover that overshoot, so leaseDuration = electionTimeout - +// leaseSafetyMargin keeps the lease strictly inside the no-new-leader +// window. See docs/lease_read_design.md for the full argument. type quorumAckTracker struct { mu sync.Mutex peerAcks map[uint64]int64 // peer ID → last ack unix nano observed on leader @@ -66,8 +54,39 @@ func (t *quorumAckTracker) recordAck(peerID uint64, followerQuorum int) { t.peerAcks = make(map[uint64]int64) } t.peerAcks[peerID] = now + t.recomputeLocked(followerQuorum) +} + +// removePeer drops peerID's recorded ack. Call when a peer leaves the +// cluster so its pre-removal ack time can no longer satisfy the +// majority threshold after a configuration change: a shrink-then-grow +// that ends with fresh peers who have not yet acked would otherwise +// let the removed peer's last ack falsely advance the quorum instant, +// which is a lease-safety violation. +// +// followerQuorum is the POST-removal follower quorum so the published +// instant is recomputed against the current cluster. Passing 0 keeps +// the current instant; the next recordAck will refresh it. +func (t *quorumAckTracker) removePeer(peerID uint64, followerQuorum int) { + t.mu.Lock() + defer t.mu.Unlock() + if _, ok := t.peerAcks[peerID]; !ok { + return + } + delete(t.peerAcks, peerID) + if followerQuorum <= 0 { + return + } + t.recomputeLocked(followerQuorum) +} + +// recomputeLocked publishes the followerQuorum-th most recent ack as +// the quorum instant, or clears it if we lack that many recorded +// peers. Caller must hold t.mu. +func (t *quorumAckTracker) recomputeLocked(followerQuorum int) { if len(t.peerAcks) < followerQuorum { - // Not enough peers have reported yet to form a majority. + // Not enough peers have reported to form a majority yet. + t.quorumAckUnixNano.Store(0) return } acks := make([]int64, 0, len(t.peerAcks)) @@ -75,8 +94,8 @@ func (t *quorumAckTracker) recordAck(peerID uint64, followerQuorum int) { acks = append(acks, a) } // Sort descending so acks[0] is the most recent. The followerQuorum-th - // entry (1-indexed) is the oldest ack among the top quorum -- i.e. the - // boundary instant by which majority liveness was confirmed. + // entry (1-indexed) is the oldest ack among the top quorum -- i.e. + // the boundary instant by which majority liveness was confirmed. sort.Slice(acks, func(i, j int) bool { return acks[i] > acks[j] }) t.quorumAckUnixNano.Store(acks[followerQuorum-1]) } diff --git a/internal/raftengine/etcd/quorum_ack_test.go b/internal/raftengine/etcd/quorum_ack_test.go index f58e691f..05398858 100644 --- a/internal/raftengine/etcd/quorum_ack_test.go +++ b/internal/raftengine/etcd/quorum_ack_test.go @@ -61,6 +61,43 @@ func TestQuorumAckTracker_QuorumAckIsOldestOfTopN(t *testing.T) { require.False(t, third.Before(second), "quorum instant must not regress") } +// TestQuorumAckTracker_RemovedPeerCannotSatisfyQuorum exercises the +// safety invariant: a peer that leaves the cluster must have its +// recorded ack pruned, otherwise a shrink-then-grow that ends with +// fresh peers who have not yet acked could let the removed peer's +// pre-removal ack falsely satisfy the new cluster's majority. +func TestQuorumAckTracker_RemovedPeerCannotSatisfyQuorum(t *testing.T) { + t.Parallel() + var tr quorumAckTracker + // 5-node cluster, followerQuorum = 2. Peers 2 and 3 ack. + tr.recordAck(2, 2) + tr.recordAck(3, 2) + require.False(t, tr.load().IsZero(), "baseline: 5-node quorum satisfied") + + // Cluster shrinks to 3 (followerQuorum = 1). After removing both + // acked peers we have zero recorded entries -- not enough to + // satisfy even the smaller quorum. + tr.removePeer(2, 1) + tr.removePeer(3, 1) + require.Equal(t, time.Time{}, tr.load(), + "after removing every acked peer the quorum instant must clear") +} + +func TestQuorumAckTracker_RemovePeerZeroQuorumKeepsCurrent(t *testing.T) { + t.Parallel() + var tr quorumAckTracker + tr.recordAck(2, 1) + before := tr.load() + require.False(t, before.IsZero()) + + // followerQuorum = 0 means the caller doesn't have the post- + // removal size yet. Entry is dropped but the published instant is + // retained; the next recordAck will refresh it. + tr.removePeer(2, 0) + require.Equal(t, before, tr.load(), + "removePeer with followerQuorum=0 must not clobber the current instant") +} + func TestQuorumAckTracker_ResetClearsState(t *testing.T) { t.Parallel() var tr quorumAckTracker diff --git a/kv/lease_read_test.go b/kv/lease_read_test.go index 50bb741c..3ec07576 100644 --- a/kv/lease_read_test.go +++ b/kv/lease_read_test.go @@ -27,7 +27,6 @@ type fakeLeaseEngine struct { registerLeaderLossCalled atomic.Int32 } - // fakeLeaseEngineCb pairs a callback with a unique sentinel pointer so // deregister can target THIS specific registration even when callbacks // are removed out of order, matching the production etcd engine. @@ -146,8 +145,78 @@ func (e *nonLeaseEngine) Propose(context.Context, []byte) (*raftengine.ProposalR } func (e *nonLeaseEngine) Close() error { return nil } +// setQuorumAck is a test helper that drives the engine-driven lease +// anchor on the fake engine so tests can exercise the new PRIMARY +// fast path (LastQuorumAck + State==Leader) independently of the +// caller-side lease state. +func (e *fakeLeaseEngine) setQuorumAck(t time.Time) { + if t.IsZero() { + e.lastQuorumAckUnixNano.Store(0) + return + } + e.lastQuorumAckUnixNano.Store(t.UnixNano()) +} + // --- Coordinate.LeaseRead ----------------------------------------------- +// TestCoordinate_LeaseRead_EngineAckFastPath covers the engine-driven +// primary path introduced in feat/engine-driven-lease: a fresh +// LastQuorumAck alone (cold caller-side lease, no prior +// LinearizableRead) must satisfy LeaseRead without consulting the +// engine's slow-path read API. +func TestCoordinate_LeaseRead_EngineAckFastPath(t *testing.T) { + t.Parallel() + eng := &fakeLeaseEngine{applied: 123, leaseDur: time.Hour} + eng.setQuorumAck(time.Now()) + c := NewCoordinatorWithEngine(nil, eng) + + require.False(t, c.lease.valid(time.Now()), + "caller-side lease must start cold so the fast-path hit is attributable to the engine ack") + + idx, err := c.LeaseRead(context.Background()) + require.NoError(t, err) + require.Equal(t, uint64(123), idx) + require.Equal(t, int32(0), eng.linearizableCalls.Load(), + "engine-driven ack alone must skip LinearizableRead") + require.False(t, c.lease.valid(time.Now()), + "engine-driven fast path must not warm the caller-side lease") +} + +// TestCoordinate_LeaseRead_EngineAckStaleFallsThrough covers the +// stale-ack case: if the engine's ack has aged past LeaseDuration we +// must NOT serve from AppliedIndex alone, and instead take the slow +// path through LinearizableRead. +func TestCoordinate_LeaseRead_EngineAckStaleFallsThrough(t *testing.T) { + t.Parallel() + eng := &fakeLeaseEngine{applied: 7, leaseDur: 50 * time.Millisecond} + // Set the ack far enough in the past that time.Since(ack) > leaseDur. + eng.setQuorumAck(time.Now().Add(-time.Hour)) + c := NewCoordinatorWithEngine(nil, eng) + + _, err := c.LeaseRead(context.Background()) + require.NoError(t, err) + require.Equal(t, int32(1), eng.linearizableCalls.Load(), + "stale engine ack must fall through to LinearizableRead") +} + +// TestCoordinate_LeaseRead_EngineAckIgnoredWhenNotLeader covers the +// engine-state guard: even with a fresh ack, if the engine reports a +// non-leader role the fast path must NOT fire -- the ack could be +// inherited state from a just-lost leader term. +func TestCoordinate_LeaseRead_EngineAckIgnoredWhenNotLeader(t *testing.T) { + t.Parallel() + sentinel := errors.New("not leader") + eng := &fakeLeaseEngine{applied: 7, leaseDur: time.Hour, linearizableErr: sentinel} + eng.setQuorumAck(time.Now()) + eng.state.Store(raftengine.StateFollower) + c := NewCoordinatorWithEngine(nil, eng) + + _, err := c.LeaseRead(context.Background()) + require.ErrorIs(t, err, sentinel) + require.Equal(t, int32(1), eng.linearizableCalls.Load(), + "non-leader state must bypass the engine ack fast path") +} + func TestCoordinate_LeaseRead_FastPathSkipsEngine(t *testing.T) { t.Parallel() eng := &fakeLeaseEngine{applied: 100, leaseDur: time.Hour} From 9adc6b461bd179f0c47e9af7053185dd46348447 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Tue, 21 Apr 2026 01:36:31 +0900 Subject: [PATCH 3/8] fix(lease-read): ignore acks from peers not in current membership Addresses copilot review on PR #561. recordQuorumAck ran before rawNode.Step, so a late MsgAppResp / MsgHeartbeatResp from a just-removed peer (which rawNode.Step would reject with ErrStepPeerNotFound) still landed an ack in the tracker -- resurrecting the "ghost" entry that removePeer just pruned. Filter on e.peers membership before recording; the event loop is the sole writer to e.peers so the map read is race-free. --- internal/raftengine/etcd/engine.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index 8aa91234..21e4f602 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -1282,6 +1282,16 @@ func (e *Engine) recordQuorumAck(msg raftpb.Message) { if msg.From == 0 || msg.From == e.nodeID { return } + // Reject acks from peers not in the current membership. Without + // this filter, a late MsgAppResp from a just-removed peer (which + // rawNode.Step will immediately reject with ErrStepPeerNotFound) + // would still land an ack in the tracker -- resurrecting the + // "ghost" entry that removePeer just pruned. Since we run on the + // event-loop goroutine (the sole writer to e.peers), the map read + // here is race-free. + if _, ok := e.peers[msg.From]; !ok { + return + } clusterSize := len(e.peers) if clusterSize <= 1 { return From 899c0a0324815068c4651176101dc1a8111c0042 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Tue, 21 Apr 2026 02:04:40 +0900 Subject: [PATCH 4/8] fix(lease-read): guard LeaseRead fast-path against wall-clock skew Addresses copilot review on PR #561. LastQuorumAck() returns a time reconstructed from UnixNano, which has no monotonic component. A backwards wall-clock step can leave now.Sub(ack) negative and let a stale ack pass the `<` duration comparison -- a lease-safety violation. Three related changes: 1. Coordinate.LeaseRead / groupLeaseRead: sample now := time.Now() once, then validate the engine-driven ack through a new engineLeaseAckValid helper that requires state==Leader, ack non-zero, !ack.After(now), and now.Sub(ack) < leaseDur. Treating future-dated acks as invalid forces the slow path instead of trusting a negative elapsed duration. Factoring the check into a helper keeps both call sites under the cyclomatic-complexity cap. 2. raftengine.LeaseProvider.LastQuorumAck doc: specify the skew-safe check (single-now sample + !ack.After(now)) so every implementation and every caller follows the same contract. 3. Coordinate.LeaseRead doc refreshed to clarify that only the PRIMARY engine-driven path skips pre-read sampling; the SECONDARY caller-side lease fallback still uses now.Add(leaseDur) during rollout. --- internal/raftengine/engine.go | 18 +++++++---- kv/coordinator.go | 58 ++++++++++++++++++++++------------- kv/sharded_coordinator.go | 13 ++++---- 3 files changed, 54 insertions(+), 35 deletions(-) diff --git a/internal/raftengine/engine.go b/internal/raftengine/engine.go index 00bdd089..8cc01fb1 100644 --- a/internal/raftengine/engine.go +++ b/internal/raftengine/engine.go @@ -101,12 +101,18 @@ type LeaseProvider interface { // MsgAppResp traffic on the leader, so a fast-path lease read does // not need to issue its own ReadIndex to "warm" the lease. // - // Safety: callers must verify the lease with - // time.Since(LastQuorumAck()) < LeaseDuration() && - // engine.State() == raftengine.StateLeader - // before serving a leader-local read. The LeaseDuration is bounded - // by electionTimeout - safety_margin, which guarantees that any new - // leader candidate cannot yet accept writes during that window. + // Safety: callers must verify the lease against a single + // `now := time.Now()` sample: + // state == raftengine.StateLeader && + // !ack.IsZero() && !ack.After(now) && now.Sub(ack) < LeaseDuration() + // + // The !ack.After(now) guard matters because LastQuorumAck() may be + // reconstructed from UnixNano (no monotonic component): a backwards + // wall-clock adjustment would otherwise make now.Sub(ack) negative + // and pass the duration check against a stale ack. The LeaseDuration + // is bounded by electionTimeout - safety_margin, which guarantees + // that any new leader candidate cannot yet accept writes during + // that window. // // Returns the zero time when no quorum has been confirmed yet, or // when the local node is not the leader. Single-node clusters diff --git a/kv/coordinator.go b/kv/coordinator.go index b8e5f5e9..72279931 100644 --- a/kv/coordinator.go +++ b/kv/coordinator.go @@ -340,18 +340,22 @@ func (c *Coordinate) LinearizableReadForKey(ctx context.Context, _ []byte) (uint } // LeaseRead returns a read fence backed by a leader-local lease when -// available, falling back to a full LinearizableRead when the engine -// has not observed a fresh majority ack or does not implement -// LeaseProvider. +// available, falling back to a full LinearizableRead when no fast +// path is live or the engine does not implement LeaseProvider. // -// The lease is maintained inside the engine from ongoing -// MsgAppResp / MsgHeartbeatResp traffic, so callers do not sample -// time.Now() before the slow path to "extend" a lease afterwards. -// That earlier pre-read sampling was racy under congestion: if a -// LinearizableRead took longer than LeaseDuration, the extension -// would land already expired and the lease never warmed up. The -// engine-driven anchor is refreshed every heartbeat independent of -// read latency. +// The PRIMARY lease path is maintained inside the engine from ongoing +// MsgAppResp / MsgHeartbeatResp traffic, so that path does not rely +// on callers sampling time.Now() before the slow path to "extend" a +// lease afterwards. The earlier pre-read sampling was racy under +// congestion: if a LinearizableRead took longer than LeaseDuration, +// the extension would land already expired and the lease never +// warmed up. The engine-driven anchor is refreshed every heartbeat +// independent of read latency. +// +// The SECONDARY caller-side lease remains as a rollout fallback, +// still populated by the original pre-read sampling; it covers the +// narrow window between startup and the first quorum heartbeat round +// landing on the engine. // // The returned index is the engine's current applied index (fast // path) or the index returned by LinearizableRead (slow path). @@ -367,24 +371,18 @@ func (c *Coordinate) LeaseRead(ctx context.Context) (uint64, error) { // Misconfigured tick settings: lease is disabled. return c.LinearizableRead(ctx) } - // Primary: engine-driven lease. The engine refreshes LastQuorumAck - // on every MsgHeartbeatResp / MsgAppResp it receives while leader, - // so a read that itself takes longer than LeaseDuration does not - // prevent the window from being kept warm -- which is exactly the - // production congestion pathology the pre-engine-driven lease - // could not amortise out of. + // Single time.Now() sample so the primary, secondary, and + // extension steps all reason about the same instant. + now := time.Now() state := c.engine.State() - if state == raftengine.StateLeader { - if ack := lp.LastQuorumAck(); !ack.IsZero() && time.Since(ack) < leaseDur { - return lp.AppliedIndex(), nil - } + if engineLeaseAckValid(state, lp.LastQuorumAck(), now, leaseDur) { + return lp.AppliedIndex(), nil } // Secondary: caller-side lease warmed by a previous successful // slow-path read. Preserved so tests can prime the lease directly // and so we still benefit on paths where LastQuorumAck is not yet // populated (e.g. very first read after startup before the first // quorum heartbeat round has landed). - now := time.Now() expectedGen := c.lease.generation() if c.lease.valid(now) && state == raftengine.StateLeader { return lp.AppliedIndex(), nil @@ -400,6 +398,22 @@ func (c *Coordinate) LeaseRead(ctx context.Context) (uint64, error) { return idx, nil } +// engineLeaseAckValid returns whether the engine-driven lease anchor +// published via LastQuorumAck is fresh enough to serve a leader-local +// read. Enforces the safety contract from raftengine.LeaseProvider: +// - local state must be Leader +// - ack must be non-zero (a quorum was ever observed) +// - ack must not be after now (clock-skew guard: LastQuorumAck is +// rebuilt from UnixNano with no monotonic component, so a +// backwards wall-clock step could otherwise let a stale ack pass) +// - now − ack must be strictly less than leaseDur +func engineLeaseAckValid(state raftengine.State, ack, now time.Time, leaseDur time.Duration) bool { + if state != raftengine.StateLeader || ack.IsZero() || ack.After(now) { + return false + } + return now.Sub(ack) < leaseDur +} + func (c *Coordinate) LeaseReadForKey(ctx context.Context, _ []byte) (uint64, error) { return c.LeaseRead(ctx) } diff --git a/kv/sharded_coordinator.go b/kv/sharded_coordinator.go index e2533ccf..4d305617 100644 --- a/kv/sharded_coordinator.go +++ b/kv/sharded_coordinator.go @@ -751,15 +751,14 @@ func groupLeaseRead(ctx context.Context, g *ShardGroup) (uint64, error) { if leaseDur <= 0 { return linearizableReadEngineCtx(ctx, engine) } - // Engine-driven lease anchor -- see Coordinate.LeaseRead for why - // this is the primary check. + // Single time.Now() sample so primary/secondary/extension all see + // the same instant. Clock-skew safety delegated to + // engineLeaseAckValid (see Coordinate.LeaseRead). + now := time.Now() state := engine.State() - if state == raftengine.StateLeader { - if ack := lp.LastQuorumAck(); !ack.IsZero() && time.Since(ack) < leaseDur { - return lp.AppliedIndex(), nil - } + if engineLeaseAckValid(state, lp.LastQuorumAck(), now, leaseDur) { + return lp.AppliedIndex(), nil } - now := time.Now() expectedGen := g.lease.generation() if g.lease.valid(now) && state == raftengine.StateLeader { return lp.AppliedIndex(), nil From 9346bbfe06c128f1965c14a910171dc8cddb0602 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Tue, 21 Apr 2026 02:30:49 +0900 Subject: [PATCH 5/8] fix(lease-read): recompute on removePeer and tighten LastQuorumAck doc Addresses copilot review on PR #561. - quorumAckTracker.removePeer no longer early-returns when peerID has no recorded entry. A shrink that reduces followerQuorum may let the remaining peers now satisfy the smaller threshold, and without an explicit recompute the published instant stays stale (or zero) until the next recordAck. delete() is safe on a missing key, so the unconditional path is harmless otherwise. - LeaseProvider.LastQuorumAck doc: the previous wording said "single- node clusters report time.Now() unconditionally" which contradicted the "zero when not leader" clause. Tighten to "single-node leaders may return a recent time.Now()" so callers and alternate engines don't interpret it as returning non-zero on a non-leader. --- internal/raftengine/engine.go | 7 ++++--- internal/raftengine/etcd/quorum_ack.go | 9 ++++++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/internal/raftengine/engine.go b/internal/raftengine/engine.go index 8cc01fb1..d110b398 100644 --- a/internal/raftengine/engine.go +++ b/internal/raftengine/engine.go @@ -114,9 +114,10 @@ type LeaseProvider interface { // that any new leader candidate cannot yet accept writes during // that window. // - // Returns the zero time when no quorum has been confirmed yet, or - // when the local node is not the leader. Single-node clusters - // report time.Now() unconditionally since self is the quorum. + // Returns the zero time when no quorum has been confirmed yet or + // when the local node is not the leader. Single-node LEADERS may + // return a recent time.Now() since self is the quorum; non-leader + // single-node replicas still return the zero time. LastQuorumAck() time.Time // RegisterLeaderLossCallback registers fn to be invoked whenever the // local node leaves the leader role (graceful transfer, partition diff --git a/internal/raftengine/etcd/quorum_ack.go b/internal/raftengine/etcd/quorum_ack.go index df9d08ad..f8dfd3db 100644 --- a/internal/raftengine/etcd/quorum_ack.go +++ b/internal/raftengine/etcd/quorum_ack.go @@ -70,9 +70,12 @@ func (t *quorumAckTracker) recordAck(peerID uint64, followerQuorum int) { func (t *quorumAckTracker) removePeer(peerID uint64, followerQuorum int) { t.mu.Lock() defer t.mu.Unlock() - if _, ok := t.peerAcks[peerID]; !ok { - return - } + // delete is safe on a missing key. We still recompute even when + // peerID had no recorded entry: a shrink that reduces + // followerQuorum may let the remaining peers now satisfy the + // smaller threshold, and without an explicit recompute the + // published instant would stay at its stale value (or zero) until + // the next recordAck arrives. delete(t.peerAcks, peerID) if followerQuorum <= 0 { return From a1e1f58207a7be78808ab33ad19de784a58bf504 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Tue, 21 Apr 2026 03:21:39 +0900 Subject: [PATCH 6/8] fix(lease-read): gate quorum-ack observation/publication on leader state Addresses copilot review on PR #561. A transport-level MsgAppResp / MsgHeartbeatResp can land at handleStep shortly after a step-down -- ackTracker.reset() has already fired, but the response is already in flight. Previously recordQuorumAck would admit the late ack and repopulate the tracker, and LastQuorumAck would surface the resulting non-zero instant. A subsequent re-election would then observe stale liveness from the prior term, violating the LeaseProvider contract ("zero time when the local node is not the leader") and potentially widening an already-expired lease window. Fix: atomic isLeader mirror on the Engine. - refreshStatus writes it every tick before the other lease-related atomics, so every role transition is observed before the next handleStep runs. - recordQuorumAck loads it and drops the response on the floor when not leader. Prevents the tracker from being repopulated after reset() and before the next refreshStatus pass. - LastQuorumAck returns time.Time{} when not leader, honoring the interface contract and covering the narrow window between recordQuorumAck's check and ack-tracker publication. Also drop the flaky 2ms time.Sleep calls from TestQuorumAckTracker_QuorumAckIsOldestOfTopN: the require.False( third.Before(second)) assertion holds trivially when timestamps are equal, so the sleeps added latency without exercising new behaviour and could flake on slow CI. --- internal/raftengine/etcd/engine.go | 40 +++++++++++++++++---- internal/raftengine/etcd/quorum_ack_test.go | 13 +++---- 2 files changed, 41 insertions(+), 12 deletions(-) diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index 21e4f602..b98ec8e2 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -194,6 +194,14 @@ type Engine struct { // cluster size is 1; cleared otherwise) so the lease-read hot path // never has to acquire e.mu to check peer count or leader state. singleNodeLeaderAckUnixNano atomic.Int64 + // isLeader mirrors status.State == StateLeader for lock-free reads + // on the hot path. refreshStatus writes it on every tick; + // recordQuorumAck reads it before admitting a follower response + // into ackTracker (so late MsgAppResp / MsgHeartbeatResp arriving + // after a step-down cannot repopulate the tracker), and + // LastQuorumAck reads it to honor the LeaseProvider contract + // ("zero time when the local node is not the leader"). + isLeader atomic.Bool // leaderLossCbsMu guards the slice of callbacks invoked when the node // transitions out of the leader role (graceful transfer, partition @@ -631,6 +639,14 @@ func (e *Engine) LastQuorumAck() time.Time { if e == nil { return time.Time{} } + // Honor the LeaseProvider contract that non-leaders always return + // the zero time. Without this guard a late MsgAppResp that sneaks + // past recordQuorumAck (or a tracker entry that survived a brief + // step-down/step-up window) could leak stale liveness into the + // caller's fast-path validation. + if !e.isLeader.Load() { + return time.Time{} + } if ns := e.singleNodeLeaderAckUnixNano.Load(); ns != 0 { return time.Unix(0, ns) } @@ -1269,12 +1285,15 @@ func (e *Engine) handleStep(msg raftpb.Message) { // majority liveness without requiring a fresh ReadIndex. // // Called inside the event-loop goroutine (single writer to e.peers -// and to the raft state), so the len(e.peers) read and the -// leader-state check are race-free. We intentionally do not gate the -// observation itself on BasicStatus() -- a MsgAppResp / MsgHeartbeatResp -// only ever lands at a leader via the transport layer, and refreshing -// the per-peer map from follower/candidate state would just be dead -// data later cleared by reset() on role transition. +// and to the raft state), so the e.peers read is race-free. +// +// Gated on the atomic isLeader mirror: a transport-level MsgAppResp / +// MsgHeartbeatResp can land shortly after a step-down (reset() has +// already cleared ackTracker); admitting it here would repopulate +// the tracker and leak a stale liveness instant into the next +// re-election as a non-zero LastQuorumAck(). isLeader is written by +// refreshStatus on every tick, which catches every role transition +// before the next handleStep runs. func (e *Engine) recordQuorumAck(msg raftpb.Message) { if !isFollowerResponse(msg.Type) { return @@ -1282,6 +1301,9 @@ func (e *Engine) recordQuorumAck(msg raftpb.Message) { if msg.From == 0 || msg.From == e.nodeID { return } + if !e.isLeader.Load() { + return + } // Reject acks from peers not in the current membership. Without // this filter, a late MsgAppResp from a just-removed peer (which // rawNode.Step will immediately reject with ErrStepPeerNotFound) @@ -1917,6 +1939,12 @@ func (e *Engine) refreshStatus() { // role: populate while leader of a 1-node cluster, clear otherwise // (including on leader loss, so LastQuorumAck transitions to the // multi-node tracker or zero time atomically). + // Publish leader state atomically so recordQuorumAck / LastQuorumAck + // can gate on it without acquiring e.mu. MUST run before the + // single-node ack store below, otherwise a brand-new leader tick + // could publish a ack instant while isLeader is still false. + e.isLeader.Store(status.State == raftengine.StateLeader) + if status.State == raftengine.StateLeader && clusterSize <= 1 { e.singleNodeLeaderAckUnixNano.Store(time.Now().UnixNano()) } else { diff --git a/internal/raftengine/etcd/quorum_ack_test.go b/internal/raftengine/etcd/quorum_ack_test.go index 05398858..b60f461f 100644 --- a/internal/raftengine/etcd/quorum_ack_test.go +++ b/internal/raftengine/etcd/quorum_ack_test.go @@ -46,16 +46,17 @@ func TestQuorumAckTracker_QuorumAckIsOldestOfTopN(t *testing.T) { first := tr.load() require.True(t, first.IsZero(), "still only one follower, no quorum") - time.Sleep(2 * time.Millisecond) tr.recordAck(3, 2) second := tr.load() require.False(t, second.IsZero()) - // Now peer 4 acks with a later timestamp. Quorum ack should still - // be the older of the top two (either 2 or 3, not 4) because the - // 5-node quorum is 3 including self (2 followers + self), and the - // OLDEST of the top two followers is still the limiting factor. - time.Sleep(2 * time.Millisecond) + // Now peer 4 acks. Even if time.Now() granularity places every + // sample at the same nanosecond, the quorum instant must NOT + // regress: the 5-node quorum requires 2 follower acks (self makes + // 3 = majority), and the OLDEST of the top two followers bounds + // the boundary. require.False(third.Before(second)) holds trivially + // when timestamps are equal, so this test does not rely on wall- + // clock granularity and is deterministic on fast CI. tr.recordAck(4, 2) third := tr.load() require.False(t, third.Before(second), "quorum instant must not regress") From 50d3aa45e493972a363ce59e3a443d0a3ad902d4 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Tue, 21 Apr 2026 03:51:41 +0900 Subject: [PATCH 7/8] fix(lease-read): clear ack tracker on shrink-to-single-node and reuse sort buffer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses copilot review on PR #561. - engine.removePeer: when the cluster shrinks to a single node the prior code passed followerQuorum=0 to quorumAckTracker.removePeer, which by design preserves the currently published instant so the next recordAck can refresh it. But if the cluster subsequently grew back without a recordAck arriving first, LastQuorumAck's multi-node fallback would surface the stale instant from the previous configuration. Route shrink-to-<=1 through ackTracker.reset() so any future multi-node membership starts fresh. - quorumAckTracker.recomputeLocked: reuse a per-tracker ackBuf across calls instead of allocating a fresh []int64 on every recordAck. Heartbeat frequency × cluster size keeps the allocation rate non-trivial otherwise; re-slicing in place + append-on-growth brings the steady-state alloc to zero. - quorum_ack_test concurrency loops now Gosched() between iterations so the test exercises interleavings without pegging a CI core. --- internal/raftengine/etcd/engine.go | 15 +++++++---- internal/raftengine/etcd/quorum_ack.go | 29 ++++++++++++++++----- internal/raftengine/etcd/quorum_ack_test.go | 6 +++++ 3 files changed, 38 insertions(+), 12 deletions(-) diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index b98ec8e2..95e5631a 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -2822,12 +2822,17 @@ func (e *Engine) removePeer(nodeID uint64) { // Drop the peer's recorded ack so a reconfiguration cannot leave a // stale entry that falsely satisfies the new cluster's majority. - // followerQuorum is computed against the POST-removal cluster. - followerQuorum := 0 - if postRemovalClusterSize > 1 { - followerQuorum = postRemovalClusterSize / 2 //nolint:mnd + // followerQuorum is computed against the POST-removal cluster; a + // shrink to <=1 would otherwise pass 0 here, which + // quorumAckTracker.removePeer treats as "keep the current instant" + // and would surface stale liveness to LastQuorumAck if the cluster + // subsequently grew back. Clear the tracker explicitly in that + // case so any future multi-node membership starts fresh. + if postRemovalClusterSize <= 1 { + e.ackTracker.reset() + } else { + e.ackTracker.removePeer(nodeID, postRemovalClusterSize/2) //nolint:mnd } - e.ackTracker.removePeer(nodeID, followerQuorum) if e.transport != nil { e.transport.RemovePeer(nodeID) diff --git a/internal/raftengine/etcd/quorum_ack.go b/internal/raftengine/etcd/quorum_ack.go index f8dfd3db..154d44ac 100644 --- a/internal/raftengine/etcd/quorum_ack.go +++ b/internal/raftengine/etcd/quorum_ack.go @@ -30,6 +30,11 @@ import ( type quorumAckTracker struct { mu sync.Mutex peerAcks map[uint64]int64 // peer ID → last ack unix nano observed on leader + // ackBuf is reused by recomputeLocked to avoid allocating a fresh + // []int64 on every MsgAppResp / MsgHeartbeatResp. Sized to + // len(peerAcks) on first use and grown via append when the cluster + // expands. Caller must hold t.mu. + ackBuf []int64 // quorumAckUnixNano is the Nth-most-recent peer ack where N equals // the number of follower acks required for majority (clusterSize/2). // Updated under mu; read lock-free via atomic.Load. @@ -86,21 +91,30 @@ func (t *quorumAckTracker) removePeer(peerID uint64, followerQuorum int) { // recomputeLocked publishes the followerQuorum-th most recent ack as // the quorum instant, or clears it if we lack that many recorded // peers. Caller must hold t.mu. +// +// Reuses t.ackBuf across calls so the hot path (one call per +// MsgAppResp / MsgHeartbeatResp) does not allocate on steady state. +// The buffer is re-sliced in place and the sort is done on that +// slice; a cluster growing past the previous capacity picks up a +// single growth step via append, not a fresh allocation per call. func (t *quorumAckTracker) recomputeLocked(followerQuorum int) { if len(t.peerAcks) < followerQuorum { // Not enough peers have reported to form a majority yet. t.quorumAckUnixNano.Store(0) return } - acks := make([]int64, 0, len(t.peerAcks)) + t.ackBuf = t.ackBuf[:0] for _, a := range t.peerAcks { - acks = append(acks, a) + t.ackBuf = append(t.ackBuf, a) } - // Sort descending so acks[0] is the most recent. The followerQuorum-th - // entry (1-indexed) is the oldest ack among the top quorum -- i.e. - // the boundary instant by which majority liveness was confirmed. - sort.Slice(acks, func(i, j int) bool { return acks[i] > acks[j] }) - t.quorumAckUnixNano.Store(acks[followerQuorum-1]) + // Sort descending so ackBuf[0] is the most recent. The + // followerQuorum-th entry (1-indexed) is the oldest ack among the + // top quorum -- i.e. the boundary instant by which majority + // liveness was confirmed. Cluster size is small in practice (3-5 + // peers), so sort.Slice is cheaper than a quickselect once the + // buffer is reused. + sort.Slice(t.ackBuf, func(i, j int) bool { return t.ackBuf[i] > t.ackBuf[j] }) + t.quorumAckUnixNano.Store(t.ackBuf[followerQuorum-1]) } // reset clears all recorded peer acks. Call when the local node @@ -110,6 +124,7 @@ func (t *quorumAckTracker) reset() { t.mu.Lock() defer t.mu.Unlock() t.peerAcks = nil + t.ackBuf = t.ackBuf[:0] t.quorumAckUnixNano.Store(0) } diff --git a/internal/raftengine/etcd/quorum_ack_test.go b/internal/raftengine/etcd/quorum_ack_test.go index b60f461f..b028defe 100644 --- a/internal/raftengine/etcd/quorum_ack_test.go +++ b/internal/raftengine/etcd/quorum_ack_test.go @@ -1,6 +1,7 @@ package etcd import ( + "runtime" "sync" "testing" "time" @@ -122,6 +123,9 @@ func TestQuorumAckTracker_ConcurrentRecordAndLoad(t *testing.T) { // Recorder alternates between two peer IDs so a 3-node followerQuorum // always has at least one entry and the sort path runs. + // runtime.Gosched between iterations keeps the loops from pegging a + // core under `-race` while still interleaving enough recordAck / + // load pairs to exercise the atomic-pointer invariants. go func() { defer wg.Done() for { @@ -131,6 +135,7 @@ func TestQuorumAckTracker_ConcurrentRecordAndLoad(t *testing.T) { default: tr.recordAck(2, 1) tr.recordAck(3, 1) + runtime.Gosched() } } }() @@ -142,6 +147,7 @@ func TestQuorumAckTracker_ConcurrentRecordAndLoad(t *testing.T) { return default: _ = tr.load() + runtime.Gosched() } } }() From dfa2ac9f4fa0cce82bd491d94e9e8771e1de17c4 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Tue, 21 Apr 2026 04:23:55 +0900 Subject: [PATCH 8/8] fix(lease-read): centralise follower-quorum math and gate fake on leader state Addresses copilot review on PR #561. - followerQuorumForClusterSize helper centralises the cluster/2 formula so recordQuorumAck and removePeer share a single point of truth. The //nolint:mnd suppression now lives only in the helper where the formula is documented in context, rather than sprinkled at each call site. - fakeLeaseEngine.LastQuorumAck now gates on State() == StateLeader, matching the production etcd engine and the LeaseProvider contract ("zero time when the local node is not the leader"). Without this mirror, a test could set a fresh ack plus a non-leader state and still take the fast path, masking a regression if production code ever stopped gating on engine.State() before consulting LastQuorumAck. --- internal/raftengine/etcd/engine.go | 25 ++++++++++++++++++++----- kv/lease_read_test.go | 9 +++++++++ 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index 95e5631a..64f0162d 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -1318,10 +1318,25 @@ func (e *Engine) recordQuorumAck(msg raftpb.Message) { if clusterSize <= 1 { return } - // Followers needed for majority = floor(clusterSize / 2): 1 for a - // 3-node cluster, 2 for 5-node, matching raft quorum semantics. - followerQuorum := clusterSize / 2 //nolint:mnd - e.ackTracker.recordAck(msg.From, followerQuorum) + e.ackTracker.recordAck(msg.From, followerQuorumForClusterSize(clusterSize)) +} + +// followerQuorumForClusterSize returns the number of non-self peer +// acks required to form a Raft majority for a cluster of the given +// size. Centralising the formula keeps ackTracker callers (handleStep +// and removePeer) consistent and avoids scattered //nolint:mnd +// suppressions. clusterSize is the total voter count INCLUDING self; +// the result is floor((clusterSize - 1) / 2) + 1 − 1 = clusterSize / 2 +// for odd sizes (3 → 1, 5 → 2, 7 → 3) and clusterSize / 2 for even +// sizes (4 → 2, 6 → 3) where a strict majority still requires +// (N/2)+1 voters total, i.e. (N/2) followers beyond self. +func followerQuorumForClusterSize(clusterSize int) int { + if clusterSize <= 1 { + return 0 + } + // The Raft majority for a cluster of size N is floor(N/2)+1 voters + // INCLUDING self, which means the leader needs N/2 OTHER acks. + return clusterSize / 2 //nolint:mnd } // isFollowerResponse reports whether a Raft message type represents a @@ -2831,7 +2846,7 @@ func (e *Engine) removePeer(nodeID uint64) { if postRemovalClusterSize <= 1 { e.ackTracker.reset() } else { - e.ackTracker.removePeer(nodeID, postRemovalClusterSize/2) //nolint:mnd + e.ackTracker.removePeer(nodeID, followerQuorumForClusterSize(postRemovalClusterSize)) } if e.transport != nil { diff --git a/kv/lease_read_test.go b/kv/lease_read_test.go index 3ec07576..b64c84e5 100644 --- a/kv/lease_read_test.go +++ b/kv/lease_read_test.go @@ -65,6 +65,15 @@ func (e *fakeLeaseEngine) Close() error { return nil } func (e *fakeLeaseEngine) LeaseDuration() time.Duration { return e.leaseDur } func (e *fakeLeaseEngine) AppliedIndex() uint64 { return e.applied } func (e *fakeLeaseEngine) LastQuorumAck() time.Time { + // Honor the raftengine.LeaseProvider contract that non-leaders + // return the zero time, mirroring the production etcd engine. A + // test that sets a fresh ack and a non-leader state MUST still + // see the slow path taken; a divergent fake would hide regressions + // where production code stops gating on engine.State() before + // consulting LastQuorumAck. + if e.State() != raftengine.StateLeader { + return time.Time{} + } ns := e.lastQuorumAckUnixNano.Load() if ns == 0 { return time.Time{}