diff --git a/internal/raftengine/engine.go b/internal/raftengine/engine.go index 95cc77b7..d110b398 100644 --- a/internal/raftengine/engine.go +++ b/internal/raftengine/engine.go @@ -94,6 +94,31 @@ type LeaseProvider interface { LeaseDuration() time.Duration // AppliedIndex returns the highest log index applied to the local FSM. AppliedIndex() uint64 + // LastQuorumAck returns the instant at which the engine most recently + // observed majority liveness on the leader -- i.e. the wall-clock time + // by which a quorum of follower Progress entries had responded. The + // engine maintains this in the background from MsgHeartbeatResp / + // MsgAppResp traffic on the leader, so a fast-path lease read does + // not need to issue its own ReadIndex to "warm" the lease. + // + // Safety: callers must verify the lease against a single + // `now := time.Now()` sample: + // state == raftengine.StateLeader && + // !ack.IsZero() && !ack.After(now) && now.Sub(ack) < LeaseDuration() + // + // The !ack.After(now) guard matters because LastQuorumAck() may be + // reconstructed from UnixNano (no monotonic component): a backwards + // wall-clock adjustment would otherwise make now.Sub(ack) negative + // and pass the duration check against a stale ack. The LeaseDuration + // is bounded by electionTimeout - safety_margin, which guarantees + // that any new leader candidate cannot yet accept writes during + // that window. + // + // Returns the zero time when no quorum has been confirmed yet or + // when the local node is not the leader. Single-node LEADERS may + // return a recent time.Now() since self is the quorum; non-leader + // single-node replicas still return the zero time. + LastQuorumAck() time.Time // RegisterLeaderLossCallback registers fn to be invoked whenever the // local node leaves the leader role (graceful transfer, partition // step-down, or shutdown). Callers use this to invalidate any diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index ed8d4a56..825c2b82 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -199,6 +199,28 @@ type Engine struct { dispatchDropCount atomic.Uint64 dispatchErrorCount atomic.Uint64 + // ackTracker records per-peer last-response times on the leader and + // publishes the majority-ack instant via quorumAckUnixNano. It is + // read lock-free from LastQuorumAck() on the hot lease-read path + // and updated inside the single event-loop goroutine from + // handleStep when a follower response arrives. + ackTracker quorumAckTracker + // singleNodeLeaderAckUnixNano short-circuits LastQuorumAck on the + // single-node leader path: self IS the quorum, so there are no + // follower responses to observe. refreshStatus keeps this value + // current (set to time.Now().UnixNano() each tick while leader and + // cluster size is 1; cleared otherwise) so the lease-read hot path + // never has to acquire e.mu to check peer count or leader state. + singleNodeLeaderAckUnixNano atomic.Int64 + // isLeader mirrors status.State == StateLeader for lock-free reads + // on the hot path. refreshStatus writes it on every tick; + // recordQuorumAck reads it before admitting a follower response + // into ackTracker (so late MsgAppResp / MsgHeartbeatResp arriving + // after a step-down cannot repopulate the tracker), and + // LastQuorumAck reads it to honor the LeaseProvider contract + // ("zero time when the local node is not the leader"). + isLeader atomic.Bool + // leaderLossCbsMu guards the slice of callbacks invoked when the node // transitions out of the leader role (graceful transfer, partition // step-down, shutdown). Callbacks fire synchronously from the @@ -622,6 +644,33 @@ func (e *Engine) AppliedIndex() uint64 { return e.appliedIndex.Load() } +// LastQuorumAck returns the wall-clock instant by which a majority of +// followers most recently responded to the leader, or the zero time +// when no such observation exists (follower / candidate / startup). +// +// Lock-free: reads atomic.Int64 values published by recordQuorumAck +// (multi-node cluster) or refreshStatus (single-node cluster keeps +// singleNodeLeaderAckUnixNano alive with time.Now() while leader, so +// the hot lease-read path performs zero lock work). See +// raftengine.LeaseProvider for the lease-read correctness contract. +func (e *Engine) LastQuorumAck() time.Time { + if e == nil { + return time.Time{} + } + // Honor the LeaseProvider contract that non-leaders always return + // the zero time. Without this guard a late MsgAppResp that sneaks + // past recordQuorumAck (or a tracker entry that survived a brief + // step-down/step-up window) could leak stale liveness into the + // caller's fast-path validation. + if !e.isLeader.Load() { + return time.Time{} + } + if ns := e.singleNodeLeaderAckUnixNano.Load(); ns != 0 { + return time.Unix(0, ns) + } + return e.ackTracker.load() +} + // RegisterLeaderLossCallback registers fn to fire every time the local // node's Raft state transitions out of leader (CheckQuorum step-down, // graceful transfer completion, partition-induced demotion) and also @@ -1240,6 +1289,7 @@ func (e *Engine) handleStep(msg raftpb.Message) { return } e.recordLeaderContact(msg) + e.recordQuorumAck(msg) if err := e.rawNode.Step(msg); err != nil { if errors.Is(err, etcdraft.ErrStepPeerNotFound) { return @@ -1248,6 +1298,75 @@ func (e *Engine) handleStep(msg raftpb.Message) { } } +// recordQuorumAck updates the per-peer last-response time when msg is +// a follower -> leader response, so LastQuorumAck() reflects ongoing +// majority liveness without requiring a fresh ReadIndex. +// +// Called inside the event-loop goroutine (single writer to e.peers +// and to the raft state), so the e.peers read is race-free. +// +// Gated on the atomic isLeader mirror: a transport-level MsgAppResp / +// MsgHeartbeatResp can land shortly after a step-down (reset() has +// already cleared ackTracker); admitting it here would repopulate +// the tracker and leak a stale liveness instant into the next +// re-election as a non-zero LastQuorumAck(). isLeader is written by +// refreshStatus on every tick, which catches every role transition +// before the next handleStep runs. +func (e *Engine) recordQuorumAck(msg raftpb.Message) { + if !isFollowerResponse(msg.Type) { + return + } + if msg.From == 0 || msg.From == e.nodeID { + return + } + if !e.isLeader.Load() { + return + } + // Reject acks from peers not in the current membership. Without + // this filter, a late MsgAppResp from a just-removed peer (which + // rawNode.Step will immediately reject with ErrStepPeerNotFound) + // would still land an ack in the tracker -- resurrecting the + // "ghost" entry that removePeer just pruned. Since we run on the + // event-loop goroutine (the sole writer to e.peers), the map read + // here is race-free. + if _, ok := e.peers[msg.From]; !ok { + return + } + clusterSize := len(e.peers) + if clusterSize <= 1 { + return + } + e.ackTracker.recordAck(msg.From, followerQuorumForClusterSize(clusterSize)) +} + +// followerQuorumForClusterSize returns the number of non-self peer +// acks required to form a Raft majority for a cluster of the given +// size. Centralising the formula keeps ackTracker callers (handleStep +// and removePeer) consistent and avoids scattered //nolint:mnd +// suppressions. clusterSize is the total voter count INCLUDING self; +// the result is floor((clusterSize - 1) / 2) + 1 − 1 = clusterSize / 2 +// for odd sizes (3 → 1, 5 → 2, 7 → 3) and clusterSize / 2 for even +// sizes (4 → 2, 6 → 3) where a strict majority still requires +// (N/2)+1 voters total, i.e. (N/2) followers beyond self. +func followerQuorumForClusterSize(clusterSize int) int { + if clusterSize <= 1 { + return 0 + } + // The Raft majority for a cluster of size N is floor(N/2)+1 voters + // INCLUDING self, which means the leader needs N/2 OTHER acks. + return clusterSize / 2 //nolint:mnd +} + +// isFollowerResponse reports whether a Raft message type represents a +// follower acknowledging the leader. We use only the two response +// types that ALL committed replication traffic passes through: +// MsgAppResp (log append ack) and MsgHeartbeatResp (passive heartbeat +// ack). Either one is proof that the peer's election timer has been +// reset, which is what the lease relies on. +func isFollowerResponse(t raftpb.MessageType) bool { + return t == raftpb.MsgAppResp || t == raftpb.MsgHeartbeatResp +} + func (e *Engine) sendMessages(messages []raftpb.Message) error { for _, msg := range messages { if e.skipDispatchMessage(msg) { @@ -1846,13 +1965,34 @@ func (e *Engine) refreshStatus() { if e.closed { e.status.State = raftengine.StateShutdown } + clusterSize := len(e.peers) e.mu.Unlock() + // Keep the lock-free single-node fast path in sync with the current + // role: populate while leader of a 1-node cluster, clear otherwise + // (including on leader loss, so LastQuorumAck transitions to the + // multi-node tracker or zero time atomically). + // Publish leader state atomically so recordQuorumAck / LastQuorumAck + // can gate on it without acquiring e.mu. MUST run before the + // single-node ack store below, otherwise a brand-new leader tick + // could publish a ack instant while isLeader is still false. + e.isLeader.Store(status.State == raftengine.StateLeader) + + if status.State == raftengine.StateLeader && clusterSize <= 1 { + e.singleNodeLeaderAckUnixNano.Store(time.Now().UnixNano()) + } else { + e.singleNodeLeaderAckUnixNano.Store(0) + } + if status.State == raftengine.StateLeader { e.leaderOnce.Do(func() { close(e.leaderReady) }) } if previous == raftengine.StateLeader && status.State != raftengine.StateLeader { e.failPending(errors.WithStack(errNotLeader)) + // Drop the per-peer ack map so a future re-election cannot + // surface a stale majority-ack instant before the new term's + // heartbeats have actually confirmed liveness. + e.ackTracker.reset() // Notify lease holders so they invalidate any cached lease; // without this hook, a former leader keeps serving fast-path // reads from local state for up to LeaseDuration after a @@ -2710,8 +2850,23 @@ func (e *Engine) removePeer(nodeID uint64) { delete(e.peers, nodeID) } e.config.Servers = removeConfigServer(e.peers, e.config.Servers, nodeID, peer.ID) + postRemovalClusterSize := len(e.peers) e.mu.Unlock() + // Drop the peer's recorded ack so a reconfiguration cannot leave a + // stale entry that falsely satisfies the new cluster's majority. + // followerQuorum is computed against the POST-removal cluster; a + // shrink to <=1 would otherwise pass 0 here, which + // quorumAckTracker.removePeer treats as "keep the current instant" + // and would surface stale liveness to LastQuorumAck if the cluster + // subsequently grew back. Clear the tracker explicitly in that + // case so any future multi-node membership starts fresh. + if postRemovalClusterSize <= 1 { + e.ackTracker.reset() + } else { + e.ackTracker.removePeer(nodeID, followerQuorumForClusterSize(postRemovalClusterSize)) + } + if e.transport != nil { e.transport.RemovePeer(nodeID) } diff --git a/internal/raftengine/etcd/quorum_ack.go b/internal/raftengine/etcd/quorum_ack.go new file mode 100644 index 00000000..154d44ac --- /dev/null +++ b/internal/raftengine/etcd/quorum_ack.go @@ -0,0 +1,139 @@ +package etcd + +import ( + "sort" + "sync" + "sync/atomic" + "time" +) + +// quorumAckTracker records the most recent response time from each +// follower and publishes the "majority-ack instant" -- the wall clock +// at which a majority of followers had all been confirmed live. +// +// LeaseRead callers pair the published instant with LeaseDuration to +// serve a leader-local read without issuing a fresh ReadIndex round. +// This replaces the prior caller-side lease scheme, which had to +// sample time.Now() before the slow path and therefore could not +// amortise reads whose own latency exceeded LeaseDuration (the bug +// that kept production GET at ~1 s under step-queue congestion). +// +// Safety: we record time.Now() when the leader OBSERVES the follower +// response, which is an UPPER bound on the follower's true ack time. +// Because lease = recorded_instant + lease_duration, that upper bound +// makes the lease extend slightly past the strictly-safe +// follower_ack_time + electionTimeout boundary by at most the one-way +// network delay plus scheduling slop. leaseSafetyMargin is sized to +// cover that overshoot, so leaseDuration = electionTimeout - +// leaseSafetyMargin keeps the lease strictly inside the no-new-leader +// window. See docs/lease_read_design.md for the full argument. +type quorumAckTracker struct { + mu sync.Mutex + peerAcks map[uint64]int64 // peer ID → last ack unix nano observed on leader + // ackBuf is reused by recomputeLocked to avoid allocating a fresh + // []int64 on every MsgAppResp / MsgHeartbeatResp. Sized to + // len(peerAcks) on first use and grown via append when the cluster + // expands. Caller must hold t.mu. + ackBuf []int64 + // quorumAckUnixNano is the Nth-most-recent peer ack where N equals + // the number of follower acks required for majority (clusterSize/2). + // Updated under mu; read lock-free via atomic.Load. + quorumAckUnixNano atomic.Int64 +} + +// recordAck notes that peerID responded to us and recomputes the +// majority-ack instant. followerQuorum is the number of non-self +// peers whose ack is required for majority (clusterSize / 2 for +// integer division; 1 for a 3-node cluster, 2 for 5-node, etc). +// +// A followerQuorum of 0 means single-node cluster: caller should +// surface LastQuorumAck = now without calling this. +func (t *quorumAckTracker) recordAck(peerID uint64, followerQuorum int) { + if followerQuorum <= 0 { + return + } + now := time.Now().UnixNano() + t.mu.Lock() + defer t.mu.Unlock() + if t.peerAcks == nil { + t.peerAcks = make(map[uint64]int64) + } + t.peerAcks[peerID] = now + t.recomputeLocked(followerQuorum) +} + +// removePeer drops peerID's recorded ack. Call when a peer leaves the +// cluster so its pre-removal ack time can no longer satisfy the +// majority threshold after a configuration change: a shrink-then-grow +// that ends with fresh peers who have not yet acked would otherwise +// let the removed peer's last ack falsely advance the quorum instant, +// which is a lease-safety violation. +// +// followerQuorum is the POST-removal follower quorum so the published +// instant is recomputed against the current cluster. Passing 0 keeps +// the current instant; the next recordAck will refresh it. +func (t *quorumAckTracker) removePeer(peerID uint64, followerQuorum int) { + t.mu.Lock() + defer t.mu.Unlock() + // delete is safe on a missing key. We still recompute even when + // peerID had no recorded entry: a shrink that reduces + // followerQuorum may let the remaining peers now satisfy the + // smaller threshold, and without an explicit recompute the + // published instant would stay at its stale value (or zero) until + // the next recordAck arrives. + delete(t.peerAcks, peerID) + if followerQuorum <= 0 { + return + } + t.recomputeLocked(followerQuorum) +} + +// recomputeLocked publishes the followerQuorum-th most recent ack as +// the quorum instant, or clears it if we lack that many recorded +// peers. Caller must hold t.mu. +// +// Reuses t.ackBuf across calls so the hot path (one call per +// MsgAppResp / MsgHeartbeatResp) does not allocate on steady state. +// The buffer is re-sliced in place and the sort is done on that +// slice; a cluster growing past the previous capacity picks up a +// single growth step via append, not a fresh allocation per call. +func (t *quorumAckTracker) recomputeLocked(followerQuorum int) { + if len(t.peerAcks) < followerQuorum { + // Not enough peers have reported to form a majority yet. + t.quorumAckUnixNano.Store(0) + return + } + t.ackBuf = t.ackBuf[:0] + for _, a := range t.peerAcks { + t.ackBuf = append(t.ackBuf, a) + } + // Sort descending so ackBuf[0] is the most recent. The + // followerQuorum-th entry (1-indexed) is the oldest ack among the + // top quorum -- i.e. the boundary instant by which majority + // liveness was confirmed. Cluster size is small in practice (3-5 + // peers), so sort.Slice is cheaper than a quickselect once the + // buffer is reused. + sort.Slice(t.ackBuf, func(i, j int) bool { return t.ackBuf[i] > t.ackBuf[j] }) + t.quorumAckUnixNano.Store(t.ackBuf[followerQuorum-1]) +} + +// reset clears all recorded peer acks. Call when the local node +// leaves the leader role so a future re-election does not resurrect +// a stale majority-ack instant. +func (t *quorumAckTracker) reset() { + t.mu.Lock() + defer t.mu.Unlock() + t.peerAcks = nil + t.ackBuf = t.ackBuf[:0] + t.quorumAckUnixNano.Store(0) +} + +// load returns the current majority-ack instant or the zero time if +// no quorum has been observed since the last reset. +func (t *quorumAckTracker) load() time.Time { + ns := t.quorumAckUnixNano.Load() + if ns == 0 { + return time.Time{} + } + return time.Unix(0, ns) +} diff --git a/internal/raftengine/etcd/quorum_ack_test.go b/internal/raftengine/etcd/quorum_ack_test.go new file mode 100644 index 00000000..b028defe --- /dev/null +++ b/internal/raftengine/etcd/quorum_ack_test.go @@ -0,0 +1,158 @@ +package etcd + +import ( + "runtime" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestQuorumAckTracker_SingleNodeFollowerQuorumZeroIsNoop(t *testing.T) { + t.Parallel() + var tr quorumAckTracker + // followerQuorum == 0 means single-node cluster -- caller handles + // that case elsewhere. recordAck must not mutate state, otherwise + // a re-election into multi-node would surface a stale instant. + tr.recordAck(42, 0) + require.Equal(t, time.Time{}, tr.load()) +} + +func TestQuorumAckTracker_QuorumAckWaitsForMajority(t *testing.T) { + t.Parallel() + var tr quorumAckTracker + // 3-node cluster: followerQuorum = 1 (need 1 follower + self). + tr.recordAck(2, 1) + first := tr.load() + require.False(t, first.IsZero(), "single follower ack already satisfies 3-node quorum") + + // 5-node cluster: followerQuorum = 2. One follower ack alone is + // NOT enough -- tracker must wait until a second follower has + // reported before publishing. + var tr2 quorumAckTracker + tr2.recordAck(2, 2) + require.Equal(t, time.Time{}, tr2.load(), "one follower is not a 5-node quorum") + tr2.recordAck(3, 2) + require.False(t, tr2.load().IsZero(), "two followers + self make a 5-node quorum") +} + +func TestQuorumAckTracker_QuorumAckIsOldestOfTopN(t *testing.T) { + t.Parallel() + var tr quorumAckTracker + // 5-node cluster (quorum = 2 followers). Record acks in staggered + // order and verify the published instant is the OLDER of the top + // two -- i.e. the boundary by which a majority was last confirmed. + tr.recordAck(2, 2) + first := tr.load() + require.True(t, first.IsZero(), "still only one follower, no quorum") + + tr.recordAck(3, 2) + second := tr.load() + require.False(t, second.IsZero()) + + // Now peer 4 acks. Even if time.Now() granularity places every + // sample at the same nanosecond, the quorum instant must NOT + // regress: the 5-node quorum requires 2 follower acks (self makes + // 3 = majority), and the OLDEST of the top two followers bounds + // the boundary. require.False(third.Before(second)) holds trivially + // when timestamps are equal, so this test does not rely on wall- + // clock granularity and is deterministic on fast CI. + tr.recordAck(4, 2) + third := tr.load() + require.False(t, third.Before(second), "quorum instant must not regress") +} + +// TestQuorumAckTracker_RemovedPeerCannotSatisfyQuorum exercises the +// safety invariant: a peer that leaves the cluster must have its +// recorded ack pruned, otherwise a shrink-then-grow that ends with +// fresh peers who have not yet acked could let the removed peer's +// pre-removal ack falsely satisfy the new cluster's majority. +func TestQuorumAckTracker_RemovedPeerCannotSatisfyQuorum(t *testing.T) { + t.Parallel() + var tr quorumAckTracker + // 5-node cluster, followerQuorum = 2. Peers 2 and 3 ack. + tr.recordAck(2, 2) + tr.recordAck(3, 2) + require.False(t, tr.load().IsZero(), "baseline: 5-node quorum satisfied") + + // Cluster shrinks to 3 (followerQuorum = 1). After removing both + // acked peers we have zero recorded entries -- not enough to + // satisfy even the smaller quorum. + tr.removePeer(2, 1) + tr.removePeer(3, 1) + require.Equal(t, time.Time{}, tr.load(), + "after removing every acked peer the quorum instant must clear") +} + +func TestQuorumAckTracker_RemovePeerZeroQuorumKeepsCurrent(t *testing.T) { + t.Parallel() + var tr quorumAckTracker + tr.recordAck(2, 1) + before := tr.load() + require.False(t, before.IsZero()) + + // followerQuorum = 0 means the caller doesn't have the post- + // removal size yet. Entry is dropped but the published instant is + // retained; the next recordAck will refresh it. + tr.removePeer(2, 0) + require.Equal(t, before, tr.load(), + "removePeer with followerQuorum=0 must not clobber the current instant") +} + +func TestQuorumAckTracker_ResetClearsState(t *testing.T) { + t.Parallel() + var tr quorumAckTracker + tr.recordAck(2, 1) + require.False(t, tr.load().IsZero()) + + tr.reset() + require.Equal(t, time.Time{}, tr.load()) + + // After reset, a subsequent ack must still populate correctly. + tr.recordAck(2, 1) + require.False(t, tr.load().IsZero()) +} + +func TestQuorumAckTracker_ConcurrentRecordAndLoad(t *testing.T) { + t.Parallel() + var tr quorumAckTracker + var wg sync.WaitGroup + wg.Add(2) + stop := make(chan struct{}) + + // Recorder alternates between two peer IDs so a 3-node followerQuorum + // always has at least one entry and the sort path runs. + // runtime.Gosched between iterations keeps the loops from pegging a + // core under `-race` while still interleaving enough recordAck / + // load pairs to exercise the atomic-pointer invariants. + go func() { + defer wg.Done() + for { + select { + case <-stop: + return + default: + tr.recordAck(2, 1) + tr.recordAck(3, 1) + runtime.Gosched() + } + } + }() + go func() { + defer wg.Done() + for { + select { + case <-stop: + return + default: + _ = tr.load() + runtime.Gosched() + } + } + }() + + time.Sleep(20 * time.Millisecond) + close(stop) + wg.Wait() +} diff --git a/kv/coordinator.go b/kv/coordinator.go index f6f404b3..72279931 100644 --- a/kv/coordinator.go +++ b/kv/coordinator.go @@ -340,12 +340,27 @@ func (c *Coordinate) LinearizableReadForKey(ctx context.Context, _ []byte) (uint } // LeaseRead returns a read fence backed by a leader-local lease when -// available, falling back to a full LinearizableRead when the lease has -// expired or the underlying engine does not implement LeaseProvider. +// available, falling back to a full LinearizableRead when no fast +// path is live or the engine does not implement LeaseProvider. // -// The returned index is the engine's current applied index (fast path) or -// the index returned by LinearizableRead (slow path). Callers that resolve -// timestamps via store.LastCommitTS may discard the value. +// The PRIMARY lease path is maintained inside the engine from ongoing +// MsgAppResp / MsgHeartbeatResp traffic, so that path does not rely +// on callers sampling time.Now() before the slow path to "extend" a +// lease afterwards. The earlier pre-read sampling was racy under +// congestion: if a LinearizableRead took longer than LeaseDuration, +// the extension would land already expired and the lease never +// warmed up. The engine-driven anchor is refreshed every heartbeat +// independent of read latency. +// +// The SECONDARY caller-side lease remains as a rollout fallback, +// still populated by the original pre-read sampling; it covers the +// narrow window between startup and the first quorum heartbeat round +// landing on the engine. +// +// The returned index is the engine's current applied index (fast +// path) or the index returned by LinearizableRead (slow path). +// Callers that resolve timestamps via store.LastCommitTS may discard +// the value. func (c *Coordinate) LeaseRead(ctx context.Context) (uint64, error) { lp, ok := c.engine.(raftengine.LeaseProvider) if !ok { @@ -353,38 +368,27 @@ func (c *Coordinate) LeaseRead(ctx context.Context) (uint64, error) { } leaseDur := lp.LeaseDuration() if leaseDur <= 0 { - // Misconfigured tick settings (Engine.Open warned about this): - // the lease can never be valid. Fall back without touching - // lease state so we do not waste extend/invalidate work. + // Misconfigured tick settings: lease is disabled. return c.LinearizableRead(ctx) } - // Capture time.Now() and the lease generation exactly once before - // any quorum work. `now` is reused for both the fast-path validity - // check and (on slow path) the extend base; `expectedGen` guards - // against a leader-loss invalidation that fires during - // LinearizableRead from being overwritten by this caller's extend. - // See Coordinate.Dispatch for the same rationale. + // Single time.Now() sample so the primary, secondary, and + // extension steps all reason about the same instant. now := time.Now() + state := c.engine.State() + if engineLeaseAckValid(state, lp.LastQuorumAck(), now, leaseDur) { + return lp.AppliedIndex(), nil + } + // Secondary: caller-side lease warmed by a previous successful + // slow-path read. Preserved so tests can prime the lease directly + // and so we still benefit on paths where LastQuorumAck is not yet + // populated (e.g. very first read after startup before the first + // quorum heartbeat round has landed). expectedGen := c.lease.generation() - // Defense-in-depth against the narrow race between an engine - // state transition out of leader and the async leader-loss - // callback flipping the lease: check the engine's current view - // too. State() is updated every Raft tick (~10 ms), which is - // tighter than the lease's time-bound. If the engine already - // knows it's not leader, force the slow path (which will fail - // fast via LinearizableRead and invalidate the lease). - if c.lease.valid(now) && c.engine.State() == raftengine.StateLeader { + if c.lease.valid(now) && state == raftengine.StateLeader { return lp.AppliedIndex(), nil } idx, err := c.LinearizableRead(ctx) if err != nil { - // Only invalidate on real leadership-loss signals. A context - // deadline or transient transport error is NOT leadership loss; - // forcing invalidation for those would push every subsequent - // read onto the slow path for the remainder of the lease - // window, mirroring the production regression the write-path - // guard fixed. RegisterLeaderLossCallback plus the - // State()==StateLeader fast-path check cover real transitions. if isLeadershipLossError(err) { c.lease.invalidate() } @@ -394,6 +398,22 @@ func (c *Coordinate) LeaseRead(ctx context.Context) (uint64, error) { return idx, nil } +// engineLeaseAckValid returns whether the engine-driven lease anchor +// published via LastQuorumAck is fresh enough to serve a leader-local +// read. Enforces the safety contract from raftengine.LeaseProvider: +// - local state must be Leader +// - ack must be non-zero (a quorum was ever observed) +// - ack must not be after now (clock-skew guard: LastQuorumAck is +// rebuilt from UnixNano with no monotonic component, so a +// backwards wall-clock step could otherwise let a stale ack pass) +// - now − ack must be strictly less than leaseDur +func engineLeaseAckValid(state raftengine.State, ack, now time.Time, leaseDur time.Duration) bool { + if state != raftengine.StateLeader || ack.IsZero() || ack.After(now) { + return false + } + return now.Sub(ack) < leaseDur +} + func (c *Coordinate) LeaseReadForKey(ctx context.Context, _ []byte) (uint64, error) { return c.LeaseRead(ctx) } diff --git a/kv/lease_read_test.go b/kv/lease_read_test.go index 6d545e9a..b64c84e5 100644 --- a/kv/lease_read_test.go +++ b/kv/lease_read_test.go @@ -21,6 +21,7 @@ type fakeLeaseEngine struct { linearizableErr error linearizableCalls atomic.Int32 state atomic.Value // stores raftengine.State; default Leader + lastQuorumAckUnixNano atomic.Int64 // 0 = no ack yet. Updated by ackNow(). leaderLossCallbacksMu sync.Mutex leaderLossCallbacks []fakeLeaseEngineCb registerLeaderLossCalled atomic.Int32 @@ -63,6 +64,22 @@ func (e *fakeLeaseEngine) Propose(context.Context, []byte) (*raftengine.Proposal func (e *fakeLeaseEngine) Close() error { return nil } func (e *fakeLeaseEngine) LeaseDuration() time.Duration { return e.leaseDur } func (e *fakeLeaseEngine) AppliedIndex() uint64 { return e.applied } +func (e *fakeLeaseEngine) LastQuorumAck() time.Time { + // Honor the raftengine.LeaseProvider contract that non-leaders + // return the zero time, mirroring the production etcd engine. A + // test that sets a fresh ack and a non-leader state MUST still + // see the slow path taken; a divergent fake would hide regressions + // where production code stops gating on engine.State() before + // consulting LastQuorumAck. + if e.State() != raftengine.StateLeader { + return time.Time{} + } + ns := e.lastQuorumAckUnixNano.Load() + if ns == 0 { + return time.Time{} + } + return time.Unix(0, ns) +} func (e *fakeLeaseEngine) RegisterLeaderLossCallback(fn func()) func() { e.registerLeaderLossCalled.Add(1) // Unique sentinel per registration so deregister can target THIS @@ -137,8 +154,78 @@ func (e *nonLeaseEngine) Propose(context.Context, []byte) (*raftengine.ProposalR } func (e *nonLeaseEngine) Close() error { return nil } +// setQuorumAck is a test helper that drives the engine-driven lease +// anchor on the fake engine so tests can exercise the new PRIMARY +// fast path (LastQuorumAck + State==Leader) independently of the +// caller-side lease state. +func (e *fakeLeaseEngine) setQuorumAck(t time.Time) { + if t.IsZero() { + e.lastQuorumAckUnixNano.Store(0) + return + } + e.lastQuorumAckUnixNano.Store(t.UnixNano()) +} + // --- Coordinate.LeaseRead ----------------------------------------------- +// TestCoordinate_LeaseRead_EngineAckFastPath covers the engine-driven +// primary path introduced in feat/engine-driven-lease: a fresh +// LastQuorumAck alone (cold caller-side lease, no prior +// LinearizableRead) must satisfy LeaseRead without consulting the +// engine's slow-path read API. +func TestCoordinate_LeaseRead_EngineAckFastPath(t *testing.T) { + t.Parallel() + eng := &fakeLeaseEngine{applied: 123, leaseDur: time.Hour} + eng.setQuorumAck(time.Now()) + c := NewCoordinatorWithEngine(nil, eng) + + require.False(t, c.lease.valid(time.Now()), + "caller-side lease must start cold so the fast-path hit is attributable to the engine ack") + + idx, err := c.LeaseRead(context.Background()) + require.NoError(t, err) + require.Equal(t, uint64(123), idx) + require.Equal(t, int32(0), eng.linearizableCalls.Load(), + "engine-driven ack alone must skip LinearizableRead") + require.False(t, c.lease.valid(time.Now()), + "engine-driven fast path must not warm the caller-side lease") +} + +// TestCoordinate_LeaseRead_EngineAckStaleFallsThrough covers the +// stale-ack case: if the engine's ack has aged past LeaseDuration we +// must NOT serve from AppliedIndex alone, and instead take the slow +// path through LinearizableRead. +func TestCoordinate_LeaseRead_EngineAckStaleFallsThrough(t *testing.T) { + t.Parallel() + eng := &fakeLeaseEngine{applied: 7, leaseDur: 50 * time.Millisecond} + // Set the ack far enough in the past that time.Since(ack) > leaseDur. + eng.setQuorumAck(time.Now().Add(-time.Hour)) + c := NewCoordinatorWithEngine(nil, eng) + + _, err := c.LeaseRead(context.Background()) + require.NoError(t, err) + require.Equal(t, int32(1), eng.linearizableCalls.Load(), + "stale engine ack must fall through to LinearizableRead") +} + +// TestCoordinate_LeaseRead_EngineAckIgnoredWhenNotLeader covers the +// engine-state guard: even with a fresh ack, if the engine reports a +// non-leader role the fast path must NOT fire -- the ack could be +// inherited state from a just-lost leader term. +func TestCoordinate_LeaseRead_EngineAckIgnoredWhenNotLeader(t *testing.T) { + t.Parallel() + sentinel := errors.New("not leader") + eng := &fakeLeaseEngine{applied: 7, leaseDur: time.Hour, linearizableErr: sentinel} + eng.setQuorumAck(time.Now()) + eng.state.Store(raftengine.StateFollower) + c := NewCoordinatorWithEngine(nil, eng) + + _, err := c.LeaseRead(context.Background()) + require.ErrorIs(t, err, sentinel) + require.Equal(t, int32(1), eng.linearizableCalls.Load(), + "non-leader state must bypass the engine ack fast path") +} + func TestCoordinate_LeaseRead_FastPathSkipsEngine(t *testing.T) { t.Parallel() eng := &fakeLeaseEngine{applied: 100, leaseDur: time.Hour} diff --git a/kv/sharded_coordinator.go b/kv/sharded_coordinator.go index e5c7790a..4d305617 100644 --- a/kv/sharded_coordinator.go +++ b/kv/sharded_coordinator.go @@ -749,28 +749,22 @@ func groupLeaseRead(ctx context.Context, g *ShardGroup) (uint64, error) { } leaseDur := lp.LeaseDuration() if leaseDur <= 0 { - // Lease disabled by tick configuration. Always take the slow - // path without mutating g.lease. return linearizableReadEngineCtx(ctx, engine) } - // Single time.Now() and generation sample before any quorum work, - // mirroring Coordinate.LeaseRead. expectedGen guards against a - // leader-loss invalidation that fires during LinearizableRead. + // Single time.Now() sample so primary/secondary/extension all see + // the same instant. Clock-skew safety delegated to + // engineLeaseAckValid (see Coordinate.LeaseRead). now := time.Now() + state := engine.State() + if engineLeaseAckValid(state, lp.LastQuorumAck(), now, leaseDur) { + return lp.AppliedIndex(), nil + } expectedGen := g.lease.generation() - // Defense-in-depth: also check the shard engine's current state. - // Async callbacks may not have flipped the lease yet, but - // State() is refreshed every tick and catches transitions - // sooner. See Coordinate.LeaseRead for details. - if g.lease.valid(now) && engine.State() == raftengine.StateLeader { + if g.lease.valid(now) && state == raftengine.StateLeader { return lp.AppliedIndex(), nil } idx, err := linearizableReadEngineCtx(ctx, engine) if err != nil { - // See Coordinate.LeaseRead: only real leadership-loss signals - // invalidate the lease. Deadlines, transport blips, and other - // transient errors must NOT force the remainder of the lease - // window onto the slow path. if isLeadershipLossError(err) { g.lease.invalidate() }