From 5b00411fb58346851bd155ad8deef18200528f87 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Fri, 24 Apr 2026 03:10:20 +0900 Subject: [PATCH 1/5] kv/lease: adopt CLOCK_MONOTONIC_RAW for lease-read path (#551) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Go's time.Now() is backed by CLOCK_MONOTONIC, which POSIX lets NTP slew at up to 500 ppm. At the current 300 ms leaseSafetyMargin the worst-case slew error over a 700 ms lease window is ~0.35 ms — three orders of magnitude inside the margin — but lease safety should not rest on NTP being well-behaved. TiKV uses CLOCK_MONOTONIC_RAW for the same reason; this change adopts it in elastickv. New internal/monoclock package wraps clock_gettime(CLOCK_MONOTONIC_RAW) via x/sys/unix on Linux/Darwin/FreeBSD, with a runtime-monotonic fallback on other platforms. leaseState, quorumAckTracker, the etcd engine's single-node ack, LeaseProvider.LastQuorumAck, and all coordinator LeaseRead / Dispatch / refreshLeaseAfterDispatch sites sample monoclock.Now() instead of time.Now(). Tests updated to match; quorum-ack tracker pins the raw-monotonic frame with a regression test. docs/lease_read_design.md §3.1-3.2 expanded with the rationale. --- docs/lease_read_design.md | 103 ++++++++++++++------ internal/monoclock/monoclock.go | 61 ++++++++++++ internal/monoclock/monoclock_fallback.go | 18 ++++ internal/monoclock/monoclock_test.go | 59 +++++++++++ internal/monoclock/monoclock_unix.go | 24 +++++ internal/raftengine/engine.go | 42 ++++---- internal/raftengine/etcd/engine.go | 52 +++++----- internal/raftengine/etcd/quorum_ack.go | 52 +++++----- internal/raftengine/etcd/quorum_ack_test.go | 48 ++++++--- kv/coordinator.go | 33 +++++-- kv/lease_read_test.go | 67 ++++++------- kv/lease_state.go | 58 ++++++----- kv/lease_state_test.go | 41 ++++---- kv/raft_engine.go | 6 +- kv/sharded_coordinator.go | 15 +-- kv/sharded_lease_test.go | 27 ++--- 16 files changed, 494 insertions(+), 212 deletions(-) create mode 100644 internal/monoclock/monoclock.go create mode 100644 internal/monoclock/monoclock_fallback.go create mode 100644 internal/monoclock/monoclock_test.go create mode 100644 internal/monoclock/monoclock_unix.go diff --git a/docs/lease_read_design.md b/docs/lease_read_design.md index 85e582db..d69787fa 100644 --- a/docs/lease_read_design.md +++ b/docs/lease_read_design.md @@ -91,26 +91,37 @@ holds: ```go type leaseState struct { - gen atomic.Uint64 // bumped by invalidate() - expiry atomic.Pointer[time.Time] // nil = expired / invalidated + gen atomic.Uint64 // bumped by invalidate() + expiryNanos atomic.Int64 // 0 = expired / invalidated; else monoclock.Instant nanos } ``` -- `expiry == nil` or `time.Now() >= *expiry`: lease is expired. The next - `LeaseRead` falls back to `LinearizableRead` and refreshes the lease on - success. -- `time.Now() < *expiry`: lease is valid. `LeaseRead` returns immediately - without contacting the Raft layer. -- `invalidate()` increments `gen` before clearing `expiry`. `extend()` - captures `gen` at entry and, after its CAS lands, undoes its own - write (via CAS on the pointer it stored) iff `gen` has moved. This - prevents a Dispatch that succeeded just before a leader-loss - invalidate from resurrecting the lease milliseconds after it was - cleared. A fresh `extend()` that captured the post-invalidate - generation is left intact because it stored a different pointer. - -The lock-free form lets readers do one atomic load + one wall-clock compare -on the fast path. +All timestamps on the lease path come from `internal/monoclock`, which +reads `CLOCK_MONOTONIC_RAW` via `clock_gettime(3)` on Linux / Darwin / +FreeBSD (falling back to Go's runtime monotonic on other platforms). +The raw monotonic clock is immune to NTP rate adjustment and wall-clock +step events — TiKV's lease path makes the same choice. Go's +`time.Now()` is not sufficient: its embedded monotonic component is +still NTP-slewed at up to 500 ppm under POSIX, and a misconfigured or +abused time daemon can exceed that cap. See §3.2 on why the safety +argument should not rest on NTP behaving. + +- `expiryNanos == 0` or `monoclock.Now() >= expiry`: lease is expired. + The next `LeaseRead` falls back to `LinearizableRead` and refreshes + the lease on success. +- `monoclock.Now() < expiry`: lease is valid. `LeaseRead` returns + immediately without contacting the Raft layer. +- `invalidate()` increments `gen` before clearing `expiryNanos`. + `extend()` captures `gen` at entry and, after its CAS lands, undoes + its own write (via CAS on the exact value it wrote) iff `gen` has + moved. This prevents a Dispatch that succeeded just before a + leader-loss invalidate from resurrecting the lease milliseconds + after it was cleared. A fresh `extend()` that captured the + post-invalidate generation is left intact because its CAS already + replaced the earlier target. + +The lock-free form lets readers do one atomic load + one monotonic-raw +compare on the fast path. ### 3.2 Lease duration @@ -134,13 +145,42 @@ config: `10ms * 100 - 300ms = 700 ms`. `leaseSafetyMargin` (proposed: 300 ms) absorbs: - Goroutine scheduling delay between heartbeat ack and lease refresh. -- Wall-clock skew between leader and the partition's new leader candidate. +- Clock skew between leader and the partition's new leader candidate. + (Both read `CLOCK_MONOTONIC_RAW` on their own hosts — the skew here + is between two independent monotonic oscillators, not NTP-adjusted + wall clocks. Per-host drift of quartz oscillators is ≤50 ppm, so + the two sides cannot diverge by more than that within an + electionTimeout window.) - GC pauses on the leader. The margin is conservative; reducing it shortens the post-write quiet window during which lease reads still hit local state, at the cost of a smaller safety buffer. +#### Why CLOCK_MONOTONIC_RAW, not time.Now() + +Go's `time.Now()` embeds the kernel's NTP-adjusted monotonic clock +(`CLOCK_MONOTONIC` on Linux), which is rate-slewed at up to 500 ppm +under POSIX. Inside a 700 ms lease window that amounts to ~0.35 ms — +comfortably smaller than the 300 ms safety margin on paper. But the +safety case for the lease should not depend on NTP being well-behaved: + +1. POSIX caps slew rate at 500 ppm, but a misconfigured or malicious + `adjtimex` call can exceed that cap. +2. A lease-read regression is observable only when the lease boundary + overshoots `electionTimeout`, i.e. under exactly the conditions + (partition + clock drift) where NTP is most likely to be wrong. +3. TiKV, whose lease-read design we otherwise track, already uses + `CLOCK_MONOTONIC_RAW` for this reason. Matching their choice keeps + the door open for tightening `leaseSafetyMargin` below ~5 ms, at + which point NTP slew alone becomes comparable to the margin. + +The `internal/monoclock` package wraps `clock_gettime(CLOCK_MONOTONIC_RAW, &ts)` +(`unix.ClockGettime` from `golang.org/x/sys/unix`) on Linux, Darwin, +and FreeBSD. Other platforms fall back to Go's runtime monotonic +clock; on those platforms lease safety reverts to the NTP-slewed +baseline, which is still sufficient at the current margin. + ### 3.3 Refresh triggers The lease is refreshed on: @@ -152,8 +192,9 @@ The lease is refreshed on: confirmation than ReadIndex. Both refresh base the new expiry on `preOpInstant + LeaseDuration()`, -where `preOpInstant` is captured BEFORE the quorum operation starts, not -after it returns. This is strictly conservative: any real quorum +where `preOpInstant` is a `monoclock.Now()` reading +(`CLOCK_MONOTONIC_RAW`) captured BEFORE the quorum operation starts, +not after it returns. This is strictly conservative: any real quorum confirmation must happen at or after `preOpInstant`, so the lease window can only be shorter than the true safety window, never longer. Post-operation sampling would let apply-queue depth / scheduling jitter @@ -234,11 +275,13 @@ func (c *Coordinate) LeaseRead(ctx context.Context) (uint64, error) { // Misconfigured tick settings disable the lease entirely. return c.LinearizableRead(ctx) } - // Capture time.Now() AND lease.generation() exactly once before - // any quorum work. The generation guard prevents a leader-loss - // callback that fires during LinearizableRead from being - // silently overwritten by the post-op extend. - now := time.Now() + // Capture monoclock.Now() AND lease.generation() exactly once + // before any quorum work. The monotonic-raw sample keeps the + // window safe against NTP rate adjustment / wall-clock steps; + // the generation guard prevents a leader-loss callback that + // fires during LinearizableRead from being silently overwritten + // by the post-op extend. + now := monoclock.Now() expectedGen := c.lease.generation() if c.lease.valid(now) { return lp.AppliedIndex(), nil @@ -357,9 +400,13 @@ write to commit. However: out of leader and invalidates the lease. - Clock skew exceeding `leaseSafetyMargin`: lease may extend beyond `electionTimeout`, allowing a stale read after a successor leader has - accepted writes. Mitigation: keep `leaseSafetyMargin` larger than the - documented clock-skew SLO of the deployment. Default 300 ms is consistent - with the HLC physical window of 3 s used elsewhere. + accepted writes. Because the lease path uses `CLOCK_MONOTONIC_RAW` + (see §3.2), this hazard is bounded by inter-host oscillator drift + (~50 ppm quartz-spec ceiling), not by NTP's 500 ppm slew or + operator-driven `settimeofday` jumps — a misconfigured time daemon + can no longer push the lease past its safety window. The 300 ms + default margin remains consistent with the HLC physical window of + 3 s used elsewhere. --- diff --git a/internal/monoclock/monoclock.go b/internal/monoclock/monoclock.go new file mode 100644 index 00000000..3f12175c --- /dev/null +++ b/internal/monoclock/monoclock.go @@ -0,0 +1,61 @@ +// Package monoclock exposes a monotonic-raw clock for the lease-read +// path. +// +// Go's time.Now() returns a wall-clock value backed internally by the +// kernel's CLOCK_MONOTONIC (Linux) or its equivalent — which is +// rate-adjusted ("slewed") by NTP at up to 500 ppm. That slew is small +// in steady state (~0.35 ms over a 700 ms lease window), but the safety +// case for leader-local lease reads should not depend on NTP being +// well-behaved: a misconfigured or abused time daemon can push the +// slew rate far past the 500 ppm POSIX cap, and other monotonic time +// sources (e.g. CLOCK_MONOTONIC_COARSE) can compound the error. +// CLOCK_MONOTONIC_RAW is immune to NTP rate adjustment and step events +// and is what TiKV's lease path uses. +// +// Instant values are opaque int64 nanosecond counters. They are only +// comparable within the same process lifetime and MUST NOT be +// persisted, serialized, or sent over the wire — the zero point is +// arbitrary and changes across processes. Callers that need an +// externally-meaningful timestamp should sample time.Now() separately; +// Instant is only for intra-process lease-safety reasoning. +package monoclock + +import "time" + +// Instant is a reading from the monotonic-raw clock. The zero value +// represents "no reading" and compares equal to Zero. +type Instant struct { + ns int64 +} + +// Zero is the unset Instant. +var Zero = Instant{} + +// Now returns the current monotonic-raw instant. +func Now() Instant { return Instant{ns: nowNanos()} } + +// IsZero reports whether i is the zero Instant. +func (i Instant) IsZero() bool { return i.ns == 0 } + +// After reports whether i is strictly after j. +func (i Instant) After(j Instant) bool { return i.ns > j.ns } + +// Before reports whether i is strictly before j. +func (i Instant) Before(j Instant) bool { return i.ns < j.ns } + +// Sub returns i - j as a Duration. Meaningful only when neither i nor +// j is the zero Instant; callers must guard with IsZero first. +func (i Instant) Sub(j Instant) time.Duration { return time.Duration(i.ns - j.ns) } + +// Add returns i advanced by d. +func (i Instant) Add(d time.Duration) Instant { return Instant{ns: i.ns + int64(d)} } + +// Nanos returns the raw int64 counter. Intended for atomic.Int64 +// storage where a whole Instant struct cannot be stored atomically +// (see internal/raftengine/etcd/quorum_ack.go). +func (i Instant) Nanos() int64 { return i.ns } + +// FromNanos reconstructs an Instant from a raw counter previously +// obtained via Nanos(). Counterpart to Nanos; the same intra-process +// caveats apply. +func FromNanos(ns int64) Instant { return Instant{ns: ns} } diff --git a/internal/monoclock/monoclock_fallback.go b/internal/monoclock/monoclock_fallback.go new file mode 100644 index 00000000..21b0e467 --- /dev/null +++ b/internal/monoclock/monoclock_fallback.go @@ -0,0 +1,18 @@ +//go:build !(linux || darwin || freebsd) + +package monoclock + +import "time" + +// epoch anchors the fallback monotonic counter. time.Since uses Go's +// runtime monotonic component and is step-immune, though unlike +// CLOCK_MONOTONIC_RAW it is still subject to NTP rate adjustment. On +// platforms without CLOCK_MONOTONIC_RAW this is the closest portable +// substitute; lease safety on those platforms therefore matches the +// pre-#551 behaviour. Linux / Darwin / FreeBSD use the raw clock +// (monoclock_unix.go). +var epoch = time.Now() + +func nowNanos() int64 { + return int64(time.Since(epoch)) +} diff --git a/internal/monoclock/monoclock_test.go b/internal/monoclock/monoclock_test.go new file mode 100644 index 00000000..5dde9db4 --- /dev/null +++ b/internal/monoclock/monoclock_test.go @@ -0,0 +1,59 @@ +package monoclock + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestInstant_ZeroIsZero(t *testing.T) { + t.Parallel() + require.True(t, Zero.IsZero()) + var i Instant + require.True(t, i.IsZero()) + require.True(t, FromNanos(0).IsZero()) +} + +func TestNow_IsNonZeroAndMonotonic(t *testing.T) { + t.Parallel() + // CLOCK_MONOTONIC_RAW must advance across two Now() calls (modulo + // nanosecond-granularity ties; use a sleep to ensure monotonic + // progress). A regression that returns 0 or runs the clock + // backwards would break every lease-read safety guard. + a := Now() + require.False(t, a.IsZero(), "Now must return non-zero instant on supported platforms") + time.Sleep(100 * time.Microsecond) + b := Now() + require.False(t, b.Before(a), "monotonic-raw clock must not regress across calls") + require.True(t, b.After(a) || b == a) +} + +func TestInstant_AddAndSub(t *testing.T) { + t.Parallel() + base := FromNanos(1_000_000) + later := base.Add(250 * time.Millisecond) + require.True(t, later.After(base)) + require.Equal(t, 250*time.Millisecond, later.Sub(base)) + require.Equal(t, -250*time.Millisecond, base.Sub(later)) +} + +func TestInstant_NanosRoundtrip(t *testing.T) { + t.Parallel() + i := FromNanos(42) + require.Equal(t, int64(42), i.Nanos()) +} + +func TestInstant_BeforeAfterOrdering(t *testing.T) { + t.Parallel() + a := FromNanos(100) + b := FromNanos(200) + require.True(t, a.Before(b)) + require.True(t, b.After(a)) + require.False(t, a.After(b)) + require.False(t, b.Before(a)) + // Equal instants: neither Before nor After. + c := FromNanos(100) + require.False(t, a.Before(c)) + require.False(t, a.After(c)) +} diff --git a/internal/monoclock/monoclock_unix.go b/internal/monoclock/monoclock_unix.go new file mode 100644 index 00000000..97df454e --- /dev/null +++ b/internal/monoclock/monoclock_unix.go @@ -0,0 +1,24 @@ +//go:build linux || darwin || freebsd + +package monoclock + +import "golang.org/x/sys/unix" + +// nowNanos reads CLOCK_MONOTONIC_RAW via clock_gettime(3). Linux and +// Darwin both expose this clock; FreeBSD labels the equivalent clock +// CLOCK_MONOTONIC_RAW as well. Windows and Plan 9 use the portable +// fallback (monoclock_fallback.go). +// +// A non-nil error from ClockGettime should be essentially impossible +// on supported platforms (the syscall fails only on invalid clock IDs +// or EFAULT on the timespec pointer, neither of which applies here), +// but returning zero instead of panicking keeps the lease path live +// under bizarre sandboxes; the existing engineLeaseAckValid guards +// (ack.IsZero, ack.After(now)) still hold. +func nowNanos() int64 { + var ts unix.Timespec + if err := unix.ClockGettime(unix.CLOCK_MONOTONIC_RAW, &ts); err != nil { + return 0 + } + return ts.Nano() +} diff --git a/internal/raftengine/engine.go b/internal/raftengine/engine.go index d110b398..65ad0850 100644 --- a/internal/raftengine/engine.go +++ b/internal/raftengine/engine.go @@ -5,6 +5,8 @@ import ( "errors" "io" "time" + + "github.com/bootjp/elastickv/internal/monoclock" ) // Shared sentinel errors that both engine implementations should wrap @@ -94,31 +96,33 @@ type LeaseProvider interface { LeaseDuration() time.Duration // AppliedIndex returns the highest log index applied to the local FSM. AppliedIndex() uint64 - // LastQuorumAck returns the instant at which the engine most recently - // observed majority liveness on the leader -- i.e. the wall-clock time - // by which a quorum of follower Progress entries had responded. The - // engine maintains this in the background from MsgHeartbeatResp / - // MsgAppResp traffic on the leader, so a fast-path lease read does - // not need to issue its own ReadIndex to "warm" the lease. + // LastQuorumAck returns the monotonic-raw instant at which the + // engine most recently observed majority liveness on the leader + // -- i.e. the CLOCK_MONOTONIC_RAW reading at which a quorum of + // follower Progress entries had responded. The engine maintains + // this in the background from MsgHeartbeatResp / MsgAppResp traffic + // on the leader, so a fast-path lease read does not need to issue + // its own ReadIndex to "warm" the lease. // // Safety: callers must verify the lease against a single - // `now := time.Now()` sample: + // `now := monoclock.Now()` sample: // state == raftengine.StateLeader && // !ack.IsZero() && !ack.After(now) && now.Sub(ack) < LeaseDuration() // - // The !ack.After(now) guard matters because LastQuorumAck() may be - // reconstructed from UnixNano (no monotonic component): a backwards - // wall-clock adjustment would otherwise make now.Sub(ack) negative - // and pass the duration check against a stale ack. The LeaseDuration - // is bounded by electionTimeout - safety_margin, which guarantees - // that any new leader candidate cannot yet accept writes during - // that window. + // The monotonic-raw clock (CLOCK_MONOTONIC_RAW on Linux / Darwin / + // FreeBSD; see internal/monoclock) is immune to NTP rate adjustment + // and wall-clock step events, so this comparison stays safe even if + // the system's time daemon slews or steps the wall clock. The + // !ack.After(now) guard remains as a defensive fail-closed for a + // zero / bogus ack reading. LeaseDuration is bounded by + // electionTimeout - safety_margin, guaranteeing no successor leader + // has accepted writes within that window. // - // Returns the zero time when no quorum has been confirmed yet or - // when the local node is not the leader. Single-node LEADERS may - // return a recent time.Now() since self is the quorum; non-leader - // single-node replicas still return the zero time. - LastQuorumAck() time.Time + // Returns the zero Instant when no quorum has been confirmed yet + // or when the local node is not the leader. Single-node LEADERS + // may return a recent monoclock.Now() since self is the quorum; + // non-leader single-node replicas still return the zero Instant. + LastQuorumAck() monoclock.Instant // RegisterLeaderLossCallback registers fn to be invoked whenever the // local node leaves the leader role (graceful transfer, partition // step-down, or shutdown). Callers use this to invalidate any diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index 3e4a10f1..ffef7e41 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -16,6 +16,7 @@ import ( "sync/atomic" "time" + "github.com/bootjp/elastickv/internal/monoclock" "github.com/bootjp/elastickv/internal/raftengine" "github.com/cockroachdb/errors" etcdstorage "go.etcd.io/etcd/server/v3/storage" @@ -254,18 +255,21 @@ type Engine struct { stepQueueFullCount atomic.Uint64 // ackTracker records per-peer last-response times on the leader and - // publishes the majority-ack instant via quorumAckUnixNano. It is + // publishes the majority-ack instant via quorumAckMonoNs. It is // read lock-free from LastQuorumAck() on the hot lease-read path // and updated inside the single event-loop goroutine from // handleStep when a follower response arrives. ackTracker quorumAckTracker - // singleNodeLeaderAckUnixNano short-circuits LastQuorumAck on the + // singleNodeLeaderAckMonoNs short-circuits LastQuorumAck on the // single-node leader path: self IS the quorum, so there are no // follower responses to observe. refreshStatus keeps this value - // current (set to time.Now().UnixNano() each tick while leader and - // cluster size is 1; cleared otherwise) so the lease-read hot path - // never has to acquire e.mu to check peer count or leader state. - singleNodeLeaderAckUnixNano atomic.Int64 + // current (set to monoclock.Now().Nanos() each tick while leader + // and cluster size is 1; cleared otherwise) so the lease-read hot + // path never has to acquire e.mu to check peer count or leader + // state. Stored as int64 nanoseconds on the CLOCK_MONOTONIC_RAW + // scale so the lease comparison is immune to NTP rate adjustment + // and wall-clock steps (see internal/monoclock). + singleNodeLeaderAckMonoNs atomic.Int64 // isLeader mirrors status.State == StateLeader for lock-free reads // on the hot path. refreshStatus writes it on every tick; // recordQuorumAck reads it before admitting a follower response @@ -718,29 +722,31 @@ func (e *Engine) AppliedIndex() uint64 { return e.appliedIndex.Load() } -// LastQuorumAck returns the wall-clock instant by which a majority of -// followers most recently responded to the leader, or the zero time -// when no such observation exists (follower / candidate / startup). +// LastQuorumAck returns the monotonic-raw instant by which a majority +// of followers most recently responded to the leader, or the zero +// Instant when no such observation exists (follower / candidate / +// startup). // // Lock-free: reads atomic.Int64 values published by recordQuorumAck // (multi-node cluster) or refreshStatus (single-node cluster keeps -// singleNodeLeaderAckUnixNano alive with time.Now() while leader, so -// the hot lease-read path performs zero lock work). See -// raftengine.LeaseProvider for the lease-read correctness contract. -func (e *Engine) LastQuorumAck() time.Time { +// singleNodeLeaderAckMonoNs alive with monoclock.Now() while leader, +// so the hot lease-read path performs zero lock work). The monotonic- +// raw source keeps the lease safe against NTP-slew / wall-clock-step +// events; see raftengine.LeaseProvider for the correctness contract. +func (e *Engine) LastQuorumAck() monoclock.Instant { if e == nil { - return time.Time{} + return monoclock.Zero } // Honor the LeaseProvider contract that non-leaders always return - // the zero time. Without this guard a late MsgAppResp that sneaks - // past recordQuorumAck (or a tracker entry that survived a brief - // step-down/step-up window) could leak stale liveness into the - // caller's fast-path validation. + // the zero Instant. Without this guard a late MsgAppResp that + // sneaks past recordQuorumAck (or a tracker entry that survived a + // brief step-down/step-up window) could leak stale liveness into + // the caller's fast-path validation. if !e.isLeader.Load() { - return time.Time{} + return monoclock.Zero } - if ns := e.singleNodeLeaderAckUnixNano.Load(); ns != 0 { - return time.Unix(0, ns) + if ns := e.singleNodeLeaderAckMonoNs.Load(); ns != 0 { + return monoclock.FromNanos(ns) } return e.ackTracker.load() } @@ -2168,9 +2174,9 @@ func (e *Engine) refreshStatus() { e.isLeader.Store(status.State == raftengine.StateLeader) if status.State == raftengine.StateLeader && clusterSize <= 1 { - e.singleNodeLeaderAckUnixNano.Store(time.Now().UnixNano()) + e.singleNodeLeaderAckMonoNs.Store(monoclock.Now().Nanos()) } else { - e.singleNodeLeaderAckUnixNano.Store(0) + e.singleNodeLeaderAckMonoNs.Store(0) } if status.State == raftengine.StateLeader { diff --git a/internal/raftengine/etcd/quorum_ack.go b/internal/raftengine/etcd/quorum_ack.go index 154d44ac..56618784 100644 --- a/internal/raftengine/etcd/quorum_ack.go +++ b/internal/raftengine/etcd/quorum_ack.go @@ -4,12 +4,14 @@ import ( "sort" "sync" "sync/atomic" - "time" + + "github.com/bootjp/elastickv/internal/monoclock" ) // quorumAckTracker records the most recent response time from each -// follower and publishes the "majority-ack instant" -- the wall clock -// at which a majority of followers had all been confirmed live. +// follower and publishes the "majority-ack instant" -- the monotonic- +// raw reading at which a majority of followers had all been confirmed +// live. // // LeaseRead callers pair the published instant with LeaseDuration to // serve a leader-local read without issuing a fresh ReadIndex round. @@ -18,10 +20,15 @@ import ( // amortise reads whose own latency exceeded LeaseDuration (the bug // that kept production GET at ~1 s under step-queue congestion). // -// Safety: we record time.Now() when the leader OBSERVES the follower -// response, which is an UPPER bound on the follower's true ack time. -// Because lease = recorded_instant + lease_duration, that upper bound -// makes the lease extend slightly past the strictly-safe +// Timestamps are CLOCK_MONOTONIC_RAW readings (see internal/monoclock). +// Using the raw monotonic source instead of Go's time.Now() keeps the +// lease-vs-safety-window comparison immune to NTP rate adjustment and +// wall-clock step events -- TiKV's choice, adopted here per #551. +// +// Safety: we record monoclock.Now() when the leader OBSERVES the +// follower response, which is an UPPER bound on the follower's true +// ack time. Because lease = recorded_instant + lease_duration, that +// upper bound makes the lease extend slightly past the strictly-safe // follower_ack_time + electionTimeout boundary by at most the one-way // network delay plus scheduling slop. leaseSafetyMargin is sized to // cover that overshoot, so leaseDuration = electionTimeout - @@ -29,16 +36,17 @@ import ( // window. See docs/lease_read_design.md for the full argument. type quorumAckTracker struct { mu sync.Mutex - peerAcks map[uint64]int64 // peer ID → last ack unix nano observed on leader + peerAcks map[uint64]int64 // peer ID → last ack monoclock ns observed on leader // ackBuf is reused by recomputeLocked to avoid allocating a fresh // []int64 on every MsgAppResp / MsgHeartbeatResp. Sized to // len(peerAcks) on first use and grown via append when the cluster // expands. Caller must hold t.mu. ackBuf []int64 - // quorumAckUnixNano is the Nth-most-recent peer ack where N equals - // the number of follower acks required for majority (clusterSize/2). - // Updated under mu; read lock-free via atomic.Load. - quorumAckUnixNano atomic.Int64 + // quorumAckMonoNs is the Nth-most-recent peer ack where N equals + // the number of follower acks required for majority (clusterSize/2), + // stored as a monoclock.Instant nanosecond counter. Updated under + // mu; read lock-free via atomic.Load. + quorumAckMonoNs atomic.Int64 } // recordAck notes that peerID responded to us and recomputes the @@ -52,7 +60,7 @@ func (t *quorumAckTracker) recordAck(peerID uint64, followerQuorum int) { if followerQuorum <= 0 { return } - now := time.Now().UnixNano() + now := monoclock.Now().Nanos() t.mu.Lock() defer t.mu.Unlock() if t.peerAcks == nil { @@ -100,7 +108,7 @@ func (t *quorumAckTracker) removePeer(peerID uint64, followerQuorum int) { func (t *quorumAckTracker) recomputeLocked(followerQuorum int) { if len(t.peerAcks) < followerQuorum { // Not enough peers have reported to form a majority yet. - t.quorumAckUnixNano.Store(0) + t.quorumAckMonoNs.Store(0) return } t.ackBuf = t.ackBuf[:0] @@ -114,7 +122,7 @@ func (t *quorumAckTracker) recomputeLocked(followerQuorum int) { // peers), so sort.Slice is cheaper than a quickselect once the // buffer is reused. sort.Slice(t.ackBuf, func(i, j int) bool { return t.ackBuf[i] > t.ackBuf[j] }) - t.quorumAckUnixNano.Store(t.ackBuf[followerQuorum-1]) + t.quorumAckMonoNs.Store(t.ackBuf[followerQuorum-1]) } // reset clears all recorded peer acks. Call when the local node @@ -125,15 +133,15 @@ func (t *quorumAckTracker) reset() { defer t.mu.Unlock() t.peerAcks = nil t.ackBuf = t.ackBuf[:0] - t.quorumAckUnixNano.Store(0) + t.quorumAckMonoNs.Store(0) } -// load returns the current majority-ack instant or the zero time if -// no quorum has been observed since the last reset. -func (t *quorumAckTracker) load() time.Time { - ns := t.quorumAckUnixNano.Load() +// load returns the current majority-ack instant or the zero Instant +// if no quorum has been observed since the last reset. +func (t *quorumAckTracker) load() monoclock.Instant { + ns := t.quorumAckMonoNs.Load() if ns == 0 { - return time.Time{} + return monoclock.Zero } - return time.Unix(0, ns) + return monoclock.FromNanos(ns) } diff --git a/internal/raftengine/etcd/quorum_ack_test.go b/internal/raftengine/etcd/quorum_ack_test.go index b028defe..e48acc07 100644 --- a/internal/raftengine/etcd/quorum_ack_test.go +++ b/internal/raftengine/etcd/quorum_ack_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/bootjp/elastickv/internal/monoclock" "github.com/stretchr/testify/require" ) @@ -16,7 +17,7 @@ func TestQuorumAckTracker_SingleNodeFollowerQuorumZeroIsNoop(t *testing.T) { // that case elsewhere. recordAck must not mutate state, otherwise // a re-election into multi-node would surface a stale instant. tr.recordAck(42, 0) - require.Equal(t, time.Time{}, tr.load()) + require.True(t, tr.load().IsZero()) } func TestQuorumAckTracker_QuorumAckWaitsForMajority(t *testing.T) { @@ -32,7 +33,7 @@ func TestQuorumAckTracker_QuorumAckWaitsForMajority(t *testing.T) { // reported before publishing. var tr2 quorumAckTracker tr2.recordAck(2, 2) - require.Equal(t, time.Time{}, tr2.load(), "one follower is not a 5-node quorum") + require.True(t, tr2.load().IsZero(), "one follower is not a 5-node quorum") tr2.recordAck(3, 2) require.False(t, tr2.load().IsZero(), "two followers + self make a 5-node quorum") } @@ -51,13 +52,13 @@ func TestQuorumAckTracker_QuorumAckIsOldestOfTopN(t *testing.T) { second := tr.load() require.False(t, second.IsZero()) - // Now peer 4 acks. Even if time.Now() granularity places every - // sample at the same nanosecond, the quorum instant must NOT - // regress: the 5-node quorum requires 2 follower acks (self makes - // 3 = majority), and the OLDEST of the top two followers bounds - // the boundary. require.False(third.Before(second)) holds trivially - // when timestamps are equal, so this test does not rely on wall- - // clock granularity and is deterministic on fast CI. + // Now peer 4 acks. Even if monoclock.Now() granularity places + // every sample at the same nanosecond, the quorum instant must + // NOT regress: the 5-node quorum requires 2 follower acks (self + // makes 3 = majority), and the OLDEST of the top two followers + // bounds the boundary. require.False(third.Before(second)) holds + // trivially when timestamps are equal, so this test does not rely + // on clock granularity and is deterministic on fast CI. tr.recordAck(4, 2) third := tr.load() require.False(t, third.Before(second), "quorum instant must not regress") @@ -81,7 +82,7 @@ func TestQuorumAckTracker_RemovedPeerCannotSatisfyQuorum(t *testing.T) { // satisfy even the smaller quorum. tr.removePeer(2, 1) tr.removePeer(3, 1) - require.Equal(t, time.Time{}, tr.load(), + require.True(t, tr.load().IsZero(), "after removing every acked peer the quorum instant must clear") } @@ -107,13 +108,38 @@ func TestQuorumAckTracker_ResetClearsState(t *testing.T) { require.False(t, tr.load().IsZero()) tr.reset() - require.Equal(t, time.Time{}, tr.load()) + require.True(t, tr.load().IsZero()) // After reset, a subsequent ack must still populate correctly. tr.recordAck(2, 1) require.False(t, tr.load().IsZero()) } +// TestQuorumAckTracker_LoadReturnsMonotonicRaw pins the clock source: +// quorum acks must be CLOCK_MONOTONIC_RAW readings, not time.Now(). +// A regression that stored wall-clock unix nanos would put the tracker +// back on the NTP-slew-sensitive path that #551 removed. +func TestQuorumAckTracker_LoadReturnsMonotonicRaw(t *testing.T) { + t.Parallel() + var tr quorumAckTracker + before := monoclock.Now() + tr.recordAck(2, 1) + after := monoclock.Now() + + got := tr.load() + require.False(t, got.IsZero()) + require.False(t, got.Before(before), + "recorded ack must not predate the monotonic-raw sample taken before recordAck") + require.False(t, got.After(after), + "recorded ack must not postdate the monotonic-raw sample taken after recordAck") + // Sanity: the gap between before and after bounds the recorded + // value; if recordAck stored wall-clock time, the comparison + // arithmetic below would be against a different epoch entirely + // and this subtraction would produce an out-of-range duration. + require.Less(t, got.Sub(before), time.Second, + "recorded ack must sit inside the [before, after] monotonic window") +} + func TestQuorumAckTracker_ConcurrentRecordAndLoad(t *testing.T) { t.Parallel() var tr quorumAckTracker diff --git a/kv/coordinator.go b/kv/coordinator.go index 466ecf1a..28b3f6f7 100644 --- a/kv/coordinator.go +++ b/kv/coordinator.go @@ -8,6 +8,7 @@ import ( "reflect" "time" + "github.com/bootjp/elastickv/internal/monoclock" "github.com/bootjp/elastickv/internal/raftengine" pb "github.com/bootjp/elastickv/proto" "github.com/cockroachdb/errors" @@ -246,13 +247,15 @@ func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*C // * dispatchStart: any real quorum confirmation happens at or // after this instant, so using it as the lease-extension base // is strictly conservative (window can only be SHORTER than - // the actual safety window, never longer). + // the actual safety window, never longer). Sampled from the + // monotonic-raw clock so NTP slew/step cannot push the lease + // past its true safety window (see internal/monoclock). // * expectedGen: if a leader-loss callback fires between this // sample and the post-dispatch extend, the generation will // have advanced; extend(expectedGen) will see the mismatch // and refuse to resurrect the lease. Capturing gen INSIDE // extend would observe the post-invalidate value as current. - dispatchStart := time.Now() + dispatchStart := monoclock.Now() expectedGen := c.lease.generation() var resp *CoordinateResponse var err error @@ -279,7 +282,7 @@ func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*C // every subsequent read onto the slow LinearizableRead path and defeat // the lease's purpose. RegisterLeaderLossCallback plus the // State()==StateLeader fast-path guard cover real leader loss. -func (c *Coordinate) refreshLeaseAfterDispatch(resp *CoordinateResponse, err error, dispatchStart time.Time, expectedGen uint64) { +func (c *Coordinate) refreshLeaseAfterDispatch(resp *CoordinateResponse, err error, dispatchStart monoclock.Instant, expectedGen uint64) { if err != nil { // Only invalidate on errors that actually signal leadership // loss. Write conflicts and validation errors are business- @@ -422,9 +425,13 @@ func (c *Coordinate) LeaseRead(ctx context.Context) (uint64, error) { // Misconfigured tick settings: lease is disabled. return c.LinearizableRead(ctx) } - // Single time.Now() sample so the primary, secondary, and - // extension steps all reason about the same instant. - now := time.Now() + // Single monoclock.Now() sample so the primary, secondary, and + // extension steps all reason about the same monotonic-raw instant. + // Using CLOCK_MONOTONIC_RAW (via internal/monoclock) keeps the + // lease-vs-safety-window comparison immune to NTP rate adjustment + // and wall-clock steps; a misconfigured time daemon cannot slip + // the lease past electionTimeout - leaseSafetyMargin. + now := monoclock.Now() state := c.engine.State() if engineLeaseAckValid(state, lp.LastQuorumAck(), now, leaseDur) { c.observeLeaseRead(true) @@ -467,11 +474,17 @@ func (c *Coordinate) observeLeaseRead(hit bool) { // read. Enforces the safety contract from raftengine.LeaseProvider: // - local state must be Leader // - ack must be non-zero (a quorum was ever observed) -// - ack must not be after now (clock-skew guard: LastQuorumAck is -// rebuilt from UnixNano with no monotonic component, so a -// backwards wall-clock step could otherwise let a stale ack pass) +// - ack must not be after now (defensive guard: the monotonic-raw +// clock cannot go backwards, but a zero / bogus ack reading should +// still fail closed) // - now − ack must be strictly less than leaseDur -func engineLeaseAckValid(state raftengine.State, ack, now time.Time, leaseDur time.Duration) bool { +// +// Both ack and now are monoclock.Instant readings from +// CLOCK_MONOTONIC_RAW, so the comparison is immune to NTP rate +// adjustment and wall-clock steps. See docs/lease_read_design.md §3.2 +// for why the raw monotonic source matters once leaseSafetyMargin is +// tightened below ~5 ms. +func engineLeaseAckValid(state raftengine.State, ack, now monoclock.Instant, leaseDur time.Duration) bool { if state != raftengine.StateLeader || ack.IsZero() || ack.After(now) { return false } diff --git a/kv/lease_read_test.go b/kv/lease_read_test.go index ddbfa85e..59917c45 100644 --- a/kv/lease_read_test.go +++ b/kv/lease_read_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/bootjp/elastickv/internal/monoclock" "github.com/bootjp/elastickv/internal/raftengine" "github.com/stretchr/testify/require" ) @@ -21,7 +22,7 @@ type fakeLeaseEngine struct { linearizableErr error linearizableCalls atomic.Int32 state atomic.Value // stores raftengine.State; default Leader - lastQuorumAckUnixNano atomic.Int64 // 0 = no ack yet. Updated by ackNow(). + lastQuorumAckMonoNs atomic.Int64 // 0 = no ack yet. Updated by setQuorumAck(). leaderLossCallbacksMu sync.Mutex leaderLossCallbacks []fakeLeaseEngineCb registerLeaderLossCalled atomic.Int32 @@ -64,21 +65,21 @@ func (e *fakeLeaseEngine) Propose(context.Context, []byte) (*raftengine.Proposal func (e *fakeLeaseEngine) Close() error { return nil } func (e *fakeLeaseEngine) LeaseDuration() time.Duration { return e.leaseDur } func (e *fakeLeaseEngine) AppliedIndex() uint64 { return e.applied } -func (e *fakeLeaseEngine) LastQuorumAck() time.Time { +func (e *fakeLeaseEngine) LastQuorumAck() monoclock.Instant { // Honor the raftengine.LeaseProvider contract that non-leaders - // return the zero time, mirroring the production etcd engine. A - // test that sets a fresh ack and a non-leader state MUST still + // return the zero Instant, mirroring the production etcd engine. + // A test that sets a fresh ack and a non-leader state MUST still // see the slow path taken; a divergent fake would hide regressions // where production code stops gating on engine.State() before // consulting LastQuorumAck. if e.State() != raftengine.StateLeader { - return time.Time{} + return monoclock.Zero } - ns := e.lastQuorumAckUnixNano.Load() + ns := e.lastQuorumAckMonoNs.Load() if ns == 0 { - return time.Time{} + return monoclock.Zero } - return time.Unix(0, ns) + return monoclock.FromNanos(ns) } func (e *fakeLeaseEngine) RegisterLeaderLossCallback(fn func()) func() { e.registerLeaderLossCalled.Add(1) @@ -158,12 +159,12 @@ func (e *nonLeaseEngine) Close() error { return nil } // anchor on the fake engine so tests can exercise the new PRIMARY // fast path (LastQuorumAck + State==Leader) independently of the // caller-side lease state. -func (e *fakeLeaseEngine) setQuorumAck(t time.Time) { - if t.IsZero() { - e.lastQuorumAckUnixNano.Store(0) +func (e *fakeLeaseEngine) setQuorumAck(i monoclock.Instant) { + if i.IsZero() { + e.lastQuorumAckMonoNs.Store(0) return } - e.lastQuorumAckUnixNano.Store(t.UnixNano()) + e.lastQuorumAckMonoNs.Store(i.Nanos()) } // --- Coordinate.LeaseRead ----------------------------------------------- @@ -176,10 +177,10 @@ func (e *fakeLeaseEngine) setQuorumAck(t time.Time) { func TestCoordinate_LeaseRead_EngineAckFastPath(t *testing.T) { t.Parallel() eng := &fakeLeaseEngine{applied: 123, leaseDur: time.Hour} - eng.setQuorumAck(time.Now()) + eng.setQuorumAck(monoclock.Now()) c := NewCoordinatorWithEngine(nil, eng) - require.False(t, c.lease.valid(time.Now()), + require.False(t, c.lease.valid(monoclock.Now()), "caller-side lease must start cold so the fast-path hit is attributable to the engine ack") idx, err := c.LeaseRead(context.Background()) @@ -187,7 +188,7 @@ func TestCoordinate_LeaseRead_EngineAckFastPath(t *testing.T) { require.Equal(t, uint64(123), idx) require.Equal(t, int32(0), eng.linearizableCalls.Load(), "engine-driven ack alone must skip LinearizableRead") - require.False(t, c.lease.valid(time.Now()), + require.False(t, c.lease.valid(monoclock.Now()), "engine-driven fast path must not warm the caller-side lease") } @@ -199,7 +200,7 @@ func TestCoordinate_LeaseRead_EngineAckStaleFallsThrough(t *testing.T) { t.Parallel() eng := &fakeLeaseEngine{applied: 7, leaseDur: 50 * time.Millisecond} // Set the ack far enough in the past that time.Since(ack) > leaseDur. - eng.setQuorumAck(time.Now().Add(-time.Hour)) + eng.setQuorumAck(monoclock.Now().Add(-time.Hour)) c := NewCoordinatorWithEngine(nil, eng) _, err := c.LeaseRead(context.Background()) @@ -216,7 +217,7 @@ func TestCoordinate_LeaseRead_EngineAckIgnoredWhenNotLeader(t *testing.T) { t.Parallel() sentinel := errors.New("not leader") eng := &fakeLeaseEngine{applied: 7, leaseDur: time.Hour, linearizableErr: sentinel} - eng.setQuorumAck(time.Now()) + eng.setQuorumAck(monoclock.Now()) eng.state.Store(raftengine.StateFollower) c := NewCoordinatorWithEngine(nil, eng) @@ -231,7 +232,7 @@ func TestCoordinate_LeaseRead_FastPathSkipsEngine(t *testing.T) { eng := &fakeLeaseEngine{applied: 100, leaseDur: time.Hour} c := NewCoordinatorWithEngine(nil, eng) - c.lease.extend(time.Now().Add(time.Hour), c.lease.generation()) + c.lease.extend(monoclock.Now().Add(time.Hour), c.lease.generation()) idx, err := c.LeaseRead(context.Background()) require.NoError(t, err) @@ -249,7 +250,7 @@ func TestCoordinate_LeaseRead_SlowPathRefreshesLease(t *testing.T) { require.Equal(t, uint64(50), idx) require.Equal(t, int32(1), eng.linearizableCalls.Load()) - require.True(t, c.lease.valid(time.Now())) + require.True(t, c.lease.valid(monoclock.Now())) idx2, err := c.LeaseRead(context.Background()) require.NoError(t, err) @@ -263,12 +264,12 @@ func TestCoordinate_LeaseRead_ErrorInvalidatesLease(t *testing.T) { eng := &fakeLeaseEngine{applied: 7, leaseDur: time.Hour, linearizableErr: sentinel} c := NewCoordinatorWithEngine(nil, eng) - c.lease.extend(time.Now().Add(time.Hour), c.lease.generation()) + c.lease.extend(monoclock.Now().Add(time.Hour), c.lease.generation()) c.lease.invalidate() // force slow path _, err := c.LeaseRead(context.Background()) require.ErrorIs(t, err, sentinel) - require.False(t, c.lease.valid(time.Now())) + require.False(t, c.lease.valid(monoclock.Now())) require.Equal(t, int32(1), eng.linearizableCalls.Load()) // Subsequent call also takes slow path because lease is invalidated. @@ -289,8 +290,8 @@ func TestCoordinate_LeaseRead_FallbackWhenEngineNotLeader(t *testing.T) { c := NewCoordinatorWithEngine(nil, eng) // Warm the lease so valid() returns true. - c.lease.extend(time.Now().Add(time.Hour), c.lease.generation()) - require.True(t, c.lease.valid(time.Now())) + c.lease.extend(monoclock.Now().Add(time.Hour), c.lease.generation()) + require.True(t, c.lease.valid(monoclock.Now())) // Engine transitioned to follower (or unknown); async invalidate // hasn't run yet. @@ -316,7 +317,7 @@ func TestCoordinate_LeaseRead_FallbackWhenLeaseDurationZero(t *testing.T) { require.NoError(t, err) require.Equal(t, uint64(3), idx) require.Equal(t, int32(1), eng.linearizableCalls.Load()) - require.False(t, c.lease.valid(time.Now()), + require.False(t, c.lease.valid(monoclock.Now()), "lease must not have been extended when LeaseDuration <= 0") // Every subsequent call must still take the slow path. @@ -354,10 +355,10 @@ func TestCoordinate_CloseDeregistersLeaderLossCallback(t *testing.T) { // After Close, firing leader-loss must NOT invoke this Coordinate's // invalidate (it must have been removed from the engine's slice). - c.lease.extend(time.Now().Add(time.Hour), c.lease.generation()) - require.True(t, c.lease.valid(time.Now())) + c.lease.extend(monoclock.Now().Add(time.Hour), c.lease.generation()) + require.True(t, c.lease.valid(monoclock.Now())) eng.fireLeaderLoss() - require.True(t, c.lease.valid(time.Now()), + require.True(t, c.lease.valid(monoclock.Now()), "Close must remove the callback so subsequent leader-loss firings do NOT touch this Coordinate's lease") // Close is idempotent. @@ -370,11 +371,11 @@ func TestCoordinate_RegistersLeaderLossCallback(t *testing.T) { c := NewCoordinatorWithEngine(nil, eng) require.Equal(t, int32(1), eng.registerLeaderLossCalled.Load()) - c.lease.extend(time.Now().Add(time.Hour), c.lease.generation()) - require.True(t, c.lease.valid(time.Now())) + c.lease.extend(monoclock.Now().Add(time.Hour), c.lease.generation()) + require.True(t, c.lease.valid(monoclock.Now())) eng.fireLeaderLoss() - require.False(t, c.lease.valid(time.Now()), + require.False(t, c.lease.valid(monoclock.Now()), "leader-loss callback must invalidate the lease") } @@ -446,7 +447,7 @@ func TestCoordinate_LeaseRead_ObserverSeparatesHitsFromMisses(t *testing.T) { func TestLeaseReadEngineCtx_FastPath_SkipsLinearizableRead(t *testing.T) { t.Parallel() eng := &fakeLeaseEngine{applied: 42, leaseDur: time.Hour} - eng.setQuorumAck(time.Now()) + eng.setQuorumAck(monoclock.Now()) idx, err := leaseReadEngineCtx(context.Background(), eng) require.NoError(t, err) @@ -461,7 +462,7 @@ func TestLeaseReadEngineCtx_FastPath_SkipsLinearizableRead(t *testing.T) { func TestLeaseReadEngineCtx_StaleAck_FallsThroughToLinearizable(t *testing.T) { t.Parallel() eng := &fakeLeaseEngine{applied: 7, leaseDur: 50 * time.Millisecond} - eng.setQuorumAck(time.Now().Add(-time.Hour)) + eng.setQuorumAck(monoclock.Now().Add(-time.Hour)) idx, err := leaseReadEngineCtx(context.Background(), eng) require.NoError(t, err) @@ -476,7 +477,7 @@ func TestLeaseReadEngineCtx_StaleAck_FallsThroughToLinearizable(t *testing.T) { func TestLeaseReadEngineCtx_NotLeader_FallsThrough(t *testing.T) { t.Parallel() eng := &fakeLeaseEngine{applied: 99, leaseDur: time.Hour} - eng.setQuorumAck(time.Now()) + eng.setQuorumAck(monoclock.Now()) eng.state.Store(raftengine.StateFollower) // On a non-leader the fake honours the LeaseProvider contract and // returns zero ack, so the engineLeaseAckValid state guard also diff --git a/kv/lease_state.go b/kv/lease_state.go index ee46db60..32c4bbc1 100644 --- a/kv/lease_state.go +++ b/kv/lease_state.go @@ -2,8 +2,8 @@ package kv import ( "sync/atomic" - "time" + "github.com/bootjp/elastickv/internal/monoclock" "github.com/bootjp/elastickv/internal/raftengine" "github.com/cockroachdb/errors" ) @@ -30,30 +30,36 @@ func isLeadershipLossError(err error) bool { errors.Is(err, raftengine.ErrLeadershipTransferInProgress) } -// leaseState tracks the wall-clock expiry of a leader-local read lease. -// All operations are lock-free via atomic.Pointer plus a generation -// counter that prevents an in-flight extend from resurrecting a lease -// that a concurrent invalidate has cleared. +// leaseState tracks the monotonic-raw expiry of a leader-local read +// lease. All operations are lock-free via atomic.Int64 (the raw +// monoclock.Instant nanoseconds) plus a generation counter that +// prevents an in-flight extend from resurrecting a lease that a +// concurrent invalidate has cleared. // -// A nil expiry means the lease has never been issued or has been -// invalidated. A non-nil expiry is the wall-clock instant after which -// the lease is considered expired; a caller comparing time.Now() against -// the loaded value can decide whether to skip a quorum confirmation. +// expiry == 0 means the lease has never been issued or has been +// invalidated. A non-zero value is the monotonic-raw instant after +// which the lease is considered expired; a caller comparing +// monoclock.Now() against the loaded value can decide whether to skip +// a quorum confirmation. The monotonic-raw clock is immune to NTP +// rate adjustment and wall-clock steps (see internal/monoclock). type leaseState struct { - gen atomic.Uint64 - expiry atomic.Pointer[time.Time] + gen atomic.Uint64 + // expiryNanos stores monoclock.Instant.Nanos(); 0 means "no lease". + // Stored as int64 so CAS can be expressed without an extra pointer + // indirection, preserving the lock-free fast-path performance. + expiryNanos atomic.Int64 } // valid reports whether the lease is unexpired at now. -func (s *leaseState) valid(now time.Time) bool { +func (s *leaseState) valid(now monoclock.Instant) bool { if s == nil { return false } - exp := s.expiry.Load() - if exp == nil { + ns := s.expiryNanos.Load() + if ns == 0 { return false } - return now.Before(*exp) + return now.Before(monoclock.FromNanos(ns)) } // generation returns the current invalidation counter. Callers MUST @@ -76,30 +82,36 @@ func (s *leaseState) generation() uint64 { // generation guard prevents a Dispatch that returned successfully // *just before* a leader-loss invalidate from resurrecting the // lease milliseconds after invalidation. -func (s *leaseState) extend(until time.Time, expectedGen uint64) { +func (s *leaseState) extend(until monoclock.Instant, expectedGen uint64) { if s == nil { return } + target := until.Nanos() + if target == 0 { + // Refuse to store 0 — that value is the sentinel for "no + // lease" and would race with invalidate's zero-store. + return + } for { // Pre-CAS gate: if invalidate already advanced the generation // past expectedGen, skip the CAS entirely. if s.gen.Load() != expectedGen { return } - current := s.expiry.Load() - if current != nil && !until.After(*current) { + current := s.expiryNanos.Load() + if current != 0 && target <= current { return } - if !s.expiry.CompareAndSwap(current, &until) { + if !s.expiryNanos.CompareAndSwap(current, target) { continue } // CAS landed. If invalidate raced in between the pre-CAS gate // and the CAS itself, undo our write iff no later writer has - // replaced it. Using CAS with our own pointer means a fresh + // replaced it. Using CAS with our own target means a fresh // extend that captured the post-invalidate generation is left - // intact. + // intact (its CAS already replaced our target with its own). if s.gen.Load() != expectedGen { - s.expiry.CompareAndSwap(&until, nil) + s.expiryNanos.CompareAndSwap(target, 0) } return } @@ -114,5 +126,5 @@ func (s *leaseState) invalidate() { return } s.gen.Add(1) - s.expiry.Store(nil) + s.expiryNanos.Store(0) } diff --git a/kv/lease_state_test.go b/kv/lease_state_test.go index 2602ce83..7424ab05 100644 --- a/kv/lease_state_test.go +++ b/kv/lease_state_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/bootjp/elastickv/internal/monoclock" "github.com/bootjp/elastickv/internal/raftengine" "github.com/stretchr/testify/require" ) @@ -38,22 +39,22 @@ func TestIsLeadershipLossError(t *testing.T) { func TestLeaseState_NilReceiverIsAlwaysExpired(t *testing.T) { t.Parallel() var s *leaseState - require.False(t, s.valid(time.Now())) - s.extend(time.Now().Add(time.Hour), s.generation()) // must not panic - s.invalidate() // must not panic - require.False(t, s.valid(time.Now())) + require.False(t, s.valid(monoclock.Now())) + s.extend(monoclock.Now().Add(time.Hour), s.generation()) // must not panic + s.invalidate() // must not panic + require.False(t, s.valid(monoclock.Now())) } func TestLeaseState_ZeroValueIsExpired(t *testing.T) { t.Parallel() var s leaseState - require.False(t, s.valid(time.Now())) + require.False(t, s.valid(monoclock.Now())) } func TestLeaseState_ExtendAndExpire(t *testing.T) { t.Parallel() var s leaseState - now := time.Now() + now := monoclock.Now() s.extend(now.Add(50*time.Millisecond), s.generation()) require.True(t, s.valid(now)) @@ -65,7 +66,7 @@ func TestLeaseState_ExtendAndExpire(t *testing.T) { func TestLeaseState_InvalidateClears(t *testing.T) { t.Parallel() var s leaseState - now := time.Now() + now := monoclock.Now() s.extend(now.Add(time.Hour), s.generation()) require.True(t, s.valid(now)) @@ -76,15 +77,15 @@ func TestLeaseState_InvalidateClears(t *testing.T) { func TestLeaseState_ExtendIsMonotonic(t *testing.T) { t.Parallel() var s leaseState - now := time.Now() + now := monoclock.Now() s.extend(now.Add(time.Hour), s.generation()) require.True(t, s.valid(now.Add(30*time.Minute))) // A shorter extension must NOT regress the lease: an out-of-order - // writer that sampled time.Now() earlier could otherwise prematurely - // expire a freshly extended lease and force callers into the slow - // path while the leader is still confirmed. + // writer that sampled monoclock.Now() earlier could otherwise + // prematurely expire a freshly extended lease and force callers + // into the slow path while the leader is still confirmed. s.extend(now.Add(time.Minute), s.generation()) require.True(t, s.valid(now.Add(30*time.Minute))) @@ -96,12 +97,12 @@ func TestLeaseState_ExtendIsMonotonic(t *testing.T) { func TestLeaseState_InvalidateBeatsConcurrentExtend(t *testing.T) { t.Parallel() var s leaseState - now := time.Now() + now := monoclock.Now() s.extend(now.Add(time.Hour), s.generation()) - // invalidate stores nil unconditionally, even when the current expiry - // is in the future. Otherwise leadership-loss callbacks would be - // powerless once a lease is in place. + // invalidate stores the zero sentinel unconditionally, even when + // the current expiry is in the future. Otherwise leadership-loss + // callbacks would be powerless once a lease is in place. s.invalidate() require.False(t, s.valid(now)) } @@ -113,7 +114,7 @@ func TestLeaseState_InvalidateBeatsConcurrentExtend(t *testing.T) { func TestLeaseState_ExtendCannotResurrectAfterInvalidate(t *testing.T) { t.Parallel() var s leaseState - now := time.Now() + now := monoclock.Now() // Caller pattern: sample generation BEFORE the quorum operation. expectedGen := s.generation() @@ -137,7 +138,7 @@ func TestLeaseState_ExtendCannotResurrectAfterInvalidate(t *testing.T) { func TestLeaseState_ExtendWithFreshGenSucceedsAfterInvalidate(t *testing.T) { t.Parallel() var s leaseState - now := time.Now() + now := monoclock.Now() s.invalidate() freshGen := s.generation() @@ -153,7 +154,7 @@ func TestLeaseState_ConcurrentExtendAndRead(t *testing.T) { // Cooperative scheduling: runtime.Gosched() between iterations keeps // the workers from pegging a core while still interleaving enough - // extend/valid pairs under `-race` to exercise the atomic-pointer + // extend/valid pairs under `-race` to exercise the atomic-int64 // invariants. go func() { defer func() { done <- struct{}{} }() @@ -163,7 +164,7 @@ func TestLeaseState_ConcurrentExtendAndRead(t *testing.T) { return default: gen := s.generation() - s.extend(time.Now().Add(time.Second), gen) + s.extend(monoclock.Now().Add(time.Second), gen) runtime.Gosched() } } @@ -175,7 +176,7 @@ func TestLeaseState_ConcurrentExtendAndRead(t *testing.T) { case <-stop: return default: - _ = s.valid(time.Now()) + _ = s.valid(monoclock.Now()) runtime.Gosched() } } diff --git a/kv/raft_engine.go b/kv/raft_engine.go index 75cf5785..b5f68b8c 100644 --- a/kv/raft_engine.go +++ b/kv/raft_engine.go @@ -2,8 +2,8 @@ package kv import ( "context" - "time" + "github.com/bootjp/elastickv/internal/monoclock" "github.com/bootjp/elastickv/internal/raftengine" "github.com/cockroachdb/errors" ) @@ -71,14 +71,14 @@ func linearizableReadEngineCtx(ctx context.Context, engine raftengine.LeaderView // Safety mirrors Coordinator.LeaseRead (see engineLeaseAckValid): // the returned AppliedIndex is only served when the local node is // Leader AND LastQuorumAck is within LeaseDuration of a single -// time.Now() sample. +// monoclock.Now() sample (CLOCK_MONOTONIC_RAW). func leaseReadEngineCtx(ctx context.Context, engine raftengine.LeaderView) (uint64, error) { if engine == nil { return 0, errors.WithStack(ErrLeaderNotFound) } if lp, ok := engine.(raftengine.LeaseProvider); ok { if leaseDur := lp.LeaseDuration(); leaseDur > 0 { - now := time.Now() + now := monoclock.Now() if engineLeaseAckValid(engine.State(), lp.LastQuorumAck(), now, leaseDur) { return lp.AppliedIndex(), nil } diff --git a/kv/sharded_coordinator.go b/kv/sharded_coordinator.go index b6fc036c..de8c02fd 100644 --- a/kv/sharded_coordinator.go +++ b/kv/sharded_coordinator.go @@ -11,6 +11,7 @@ import ( "time" "github.com/bootjp/elastickv/distribution" + "github.com/bootjp/elastickv/internal/monoclock" "github.com/bootjp/elastickv/internal/raftengine" pb "github.com/bootjp/elastickv/proto" "github.com/bootjp/elastickv/store" @@ -41,7 +42,7 @@ type leaseRefreshingTxn struct { } func (t *leaseRefreshingTxn) Commit(reqs []*pb.Request) (*TransactionResponse, error) { - start := time.Now() + start := monoclock.Now() expectedGen := t.g.lease.generation() resp, err := t.inner.Commit(reqs) if err != nil { @@ -64,7 +65,7 @@ func (t *leaseRefreshingTxn) Commit(reqs []*pb.Request) (*TransactionResponse, e } func (t *leaseRefreshingTxn) Abort(reqs []*pb.Request) (*TransactionResponse, error) { - start := time.Now() + start := monoclock.Now() expectedGen := t.g.lease.generation() resp, err := t.inner.Abort(reqs) if err != nil { @@ -82,7 +83,7 @@ func (t *leaseRefreshingTxn) Abort(reqs []*pb.Request) (*TransactionResponse, er // underlying Commit/Abort so an invalidation that fires during that // call observes a generation mismatch inside extend and the refresh // is rejected. See the struct doc comment for why. -func (t *leaseRefreshingTxn) maybeRefresh(resp *TransactionResponse, start time.Time, expectedGen uint64) { +func (t *leaseRefreshingTxn) maybeRefresh(resp *TransactionResponse, start monoclock.Instant, expectedGen uint64) { if resp == nil || resp.CommitIndex == 0 { return } @@ -773,10 +774,10 @@ func groupLeaseRead(ctx context.Context, g *ShardGroup, observer LeaseReadObserv if leaseDur <= 0 { return linearizableReadEngineCtx(ctx, engine) } - // Single time.Now() sample so primary/secondary/extension all see - // the same instant. Clock-skew safety delegated to - // engineLeaseAckValid (see Coordinate.LeaseRead). - now := time.Now() + // Single monoclock.Now() sample so primary/secondary/extension + // all see the same monotonic-raw instant. Clock-skew safety + // delegated to engineLeaseAckValid (see Coordinate.LeaseRead). + now := monoclock.Now() state := engine.State() if engineLeaseAckValid(state, lp.LastQuorumAck(), now, leaseDur) { observeLeaseRead(observer, true) diff --git a/kv/sharded_lease_test.go b/kv/sharded_lease_test.go index bdec9223..b06feb3b 100644 --- a/kv/sharded_lease_test.go +++ b/kv/sharded_lease_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/bootjp/elastickv/distribution" + "github.com/bootjp/elastickv/internal/monoclock" pb "github.com/bootjp/elastickv/proto" "github.com/stretchr/testify/require" ) @@ -58,7 +59,7 @@ func TestShardedCoordinator_LeaseReadForKey_PerShardIsolation(t *testing.T) { // Pre-extend shard 1's lease only. g1 := coord.groups[1] - g1.lease.extend(time.Now().Add(time.Hour), g1.lease.generation()) + g1.lease.extend(monoclock.Now().Add(time.Hour), g1.lease.generation()) idx, err := coord.LeaseReadForKey(context.Background(), []byte("apple")) require.NoError(t, err) @@ -88,15 +89,15 @@ func TestShardedCoordinator_LeaseReadForKey_ErrorOnlyInvalidatesShard(t *testing g1 := coord.groups[1] g2 := coord.groups[2] - g1.lease.extend(time.Now().Add(time.Hour), g1.lease.generation()) - g2.lease.extend(time.Now().Add(time.Hour), g2.lease.generation()) + g1.lease.extend(monoclock.Now().Add(time.Hour), g1.lease.generation()) + g2.lease.extend(monoclock.Now().Add(time.Hour), g2.lease.generation()) g2.lease.invalidate() // force shard 2 onto slow path _, err := coord.LeaseReadForKey(context.Background(), []byte("zebra")) require.ErrorIs(t, err, sentinel) - require.False(t, g2.lease.valid(time.Now()), + require.False(t, g2.lease.valid(monoclock.Now()), "shard 2 lease must be invalidated after error") - require.True(t, g1.lease.valid(time.Now()), + require.True(t, g1.lease.valid(monoclock.Now()), "shard 1 lease must NOT be touched by shard 2's failure") } @@ -115,25 +116,25 @@ func TestShardedCoordinator_LeaseRefreshingTxn_SkipsWhenCommitIndexZero(t *testi require.True(t, ok, "NewShardedCoordinator wraps Txn in leaseRefreshingTxn") txn.inner = &fixedTransactional{response: noRaftResp} - require.False(t, g1.lease.valid(time.Now())) + require.False(t, g1.lease.valid(monoclock.Now())) // Commit with empty input returns success with CommitIndex=0. _, err := g1.Txn.Commit(nil) require.NoError(t, err) - require.False(t, g1.lease.valid(time.Now()), + require.False(t, g1.lease.valid(monoclock.Now()), "lease must NOT be refreshed when no Raft commit happened") // Same for Abort. _, err = g1.Txn.Abort(nil) require.NoError(t, err) - require.False(t, g1.lease.valid(time.Now())) + require.False(t, g1.lease.valid(monoclock.Now())) // A response with CommitIndex > 0 refreshes the lease. realResp := &TransactionResponse{CommitIndex: 42} txn.inner = &fixedTransactional{response: realResp} _, err = g1.Txn.Commit(nil) require.NoError(t, err) - require.True(t, g1.lease.valid(time.Now()), + require.True(t, g1.lease.valid(monoclock.Now()), "lease must be refreshed after a real Raft commit") } @@ -205,12 +206,12 @@ func TestShardedCoordinator_RegistersPerShardLeaderLossCallback(t *testing.T) { g1 := coord.groups[1] g2 := coord.groups[2] - g1.lease.extend(time.Now().Add(time.Hour), g1.lease.generation()) - g2.lease.extend(time.Now().Add(time.Hour), g2.lease.generation()) + g1.lease.extend(monoclock.Now().Add(time.Hour), g1.lease.generation()) + g2.lease.extend(monoclock.Now().Add(time.Hour), g2.lease.generation()) eng1.fireLeaderLoss() - require.False(t, g1.lease.valid(time.Now()), + require.False(t, g1.lease.valid(monoclock.Now()), "shard 1 leader-loss callback must invalidate shard 1's lease") - require.True(t, g2.lease.valid(time.Now()), + require.True(t, g2.lease.valid(monoclock.Now()), "shard 2 lease must remain valid; only its own engine's callback affects it") } From 54a6f98b31f6b56089066e9eb5f2dfcb2d3f0b6d Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Fri, 24 Apr 2026 03:21:39 +0900 Subject: [PATCH 2/5] monoclock: drop FreeBSD from raw-clock build target golang.org/x/sys/unix does not export CLOCK_MONOTONIC_RAW on FreeBSD (its kernel exposes CLOCK_MONOTONIC_PRECISE, a different clock), so the previous `linux || darwin || freebsd` tag broke GOOS=freebsd cross-compiles with "undefined: unix.CLOCK_MONOTONIC_RAW". FreeBSD now falls into the runtime-monotonic fallback alongside Windows / Plan 9; lease safety on FreeBSD reverts to the NTP-slewed baseline, still well inside the 300 ms safety margin. --- docs/lease_read_design.md | 16 ++++++++++------ internal/monoclock/monoclock_fallback.go | 11 ++++++----- internal/monoclock/monoclock_unix.go | 11 ++++++----- 3 files changed, 22 insertions(+), 16 deletions(-) diff --git a/docs/lease_read_design.md b/docs/lease_read_design.md index d69787fa..efc4ce48 100644 --- a/docs/lease_read_design.md +++ b/docs/lease_read_design.md @@ -97,8 +97,10 @@ type leaseState struct { ``` All timestamps on the lease path come from `internal/monoclock`, which -reads `CLOCK_MONOTONIC_RAW` via `clock_gettime(3)` on Linux / Darwin / -FreeBSD (falling back to Go's runtime monotonic on other platforms). +reads `CLOCK_MONOTONIC_RAW` via `clock_gettime(3)` on Linux and Darwin +(falling back to Go's runtime monotonic on other platforms — FreeBSD +included, since `golang.org/x/sys/unix` does not export +`CLOCK_MONOTONIC_RAW` on FreeBSD). The raw monotonic clock is immune to NTP rate adjustment and wall-clock step events — TiKV's lease path makes the same choice. Go's `time.Now()` is not sufficient: its embedded monotonic component is @@ -176,10 +178,12 @@ safety case for the lease should not depend on NTP being well-behaved: which point NTP slew alone becomes comparable to the margin. The `internal/monoclock` package wraps `clock_gettime(CLOCK_MONOTONIC_RAW, &ts)` -(`unix.ClockGettime` from `golang.org/x/sys/unix`) on Linux, Darwin, -and FreeBSD. Other platforms fall back to Go's runtime monotonic -clock; on those platforms lease safety reverts to the NTP-slewed -baseline, which is still sufficient at the current margin. +(`unix.ClockGettime` from `golang.org/x/sys/unix`) on Linux and +Darwin. Other platforms — including FreeBSD, where `x/sys/unix` does +not export the `CLOCK_MONOTONIC_RAW` constant — fall back to Go's +runtime monotonic clock; on those platforms lease safety reverts to +the NTP-slewed baseline, which is still sufficient at the current +margin. ### 3.3 Refresh triggers diff --git a/internal/monoclock/monoclock_fallback.go b/internal/monoclock/monoclock_fallback.go index 21b0e467..3ecfe95b 100644 --- a/internal/monoclock/monoclock_fallback.go +++ b/internal/monoclock/monoclock_fallback.go @@ -1,4 +1,4 @@ -//go:build !(linux || darwin || freebsd) +//go:build !(linux || darwin) package monoclock @@ -7,10 +7,11 @@ import "time" // epoch anchors the fallback monotonic counter. time.Since uses Go's // runtime monotonic component and is step-immune, though unlike // CLOCK_MONOTONIC_RAW it is still subject to NTP rate adjustment. On -// platforms without CLOCK_MONOTONIC_RAW this is the closest portable -// substitute; lease safety on those platforms therefore matches the -// pre-#551 behaviour. Linux / Darwin / FreeBSD use the raw clock -// (monoclock_unix.go). +// platforms where golang.org/x/sys/unix does not export +// CLOCK_MONOTONIC_RAW (FreeBSD, Windows, Plan 9, ...) this is the +// closest portable substitute; lease safety on those platforms +// therefore matches the pre-#551 behaviour. Linux and Darwin use +// the raw clock (monoclock_unix.go). var epoch = time.Now() func nowNanos() int64 { diff --git a/internal/monoclock/monoclock_unix.go b/internal/monoclock/monoclock_unix.go index 97df454e..8381ea35 100644 --- a/internal/monoclock/monoclock_unix.go +++ b/internal/monoclock/monoclock_unix.go @@ -1,13 +1,14 @@ -//go:build linux || darwin || freebsd +//go:build linux || darwin package monoclock import "golang.org/x/sys/unix" -// nowNanos reads CLOCK_MONOTONIC_RAW via clock_gettime(3). Linux and -// Darwin both expose this clock; FreeBSD labels the equivalent clock -// CLOCK_MONOTONIC_RAW as well. Windows and Plan 9 use the portable -// fallback (monoclock_fallback.go). +// nowNanos reads CLOCK_MONOTONIC_RAW via clock_gettime(3). Only Linux +// and Darwin export this constant in golang.org/x/sys/unix; FreeBSD +// lacks the binding (its kernel exposes CLOCK_MONOTONIC_PRECISE, a +// different clock) and all other platforms use the portable fallback +// in monoclock_fallback.go. // // A non-nil error from ClockGettime should be essentially impossible // on supported platforms (the syscall fails only on invalid clock IDs From 03edaa1626d783217f41d6f12377240d68e4fa5f Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Fri, 24 Apr 2026 03:55:13 +0900 Subject: [PATCH 3/5] monoclock: fail closed when clock_gettime fails When unix.ClockGettime errors (realistically: seccomp / sandbox denies it), monoclock.Now() returns the zero Instant. Without additional guards, leaseState.valid(zero) compared the zero against any warmed expiry and returned true (zero.Before(positive) holds), so a node whose clock read persistently failed could keep serving stale reads off a once-warmed lease -- bypassing LinearizableRead even during leader isolation. Teach leaseState.valid and engineLeaseAckValid to reject a zero now explicitly; the lease fast paths fall through to LinearizableRead, which fails closed on a truly broken clock and succeeds as soon as the clock starts returning real readings again. --- internal/monoclock/monoclock_unix.go | 12 +++++++----- kv/coordinator.go | 5 ++++- kv/lease_read_test.go | 16 ++++++++++++++++ kv/lease_state.go | 9 ++++++++- kv/lease_state_test.go | 15 +++++++++++++++ 5 files changed, 50 insertions(+), 7 deletions(-) diff --git a/internal/monoclock/monoclock_unix.go b/internal/monoclock/monoclock_unix.go index 8381ea35..f592df0e 100644 --- a/internal/monoclock/monoclock_unix.go +++ b/internal/monoclock/monoclock_unix.go @@ -11,11 +11,13 @@ import "golang.org/x/sys/unix" // in monoclock_fallback.go. // // A non-nil error from ClockGettime should be essentially impossible -// on supported platforms (the syscall fails only on invalid clock IDs -// or EFAULT on the timespec pointer, neither of which applies here), -// but returning zero instead of panicking keeps the lease path live -// under bizarre sandboxes; the existing engineLeaseAckValid guards -// (ack.IsZero, ack.After(now)) still hold. +// on supported platforms — the syscall fails only on invalid clock +// IDs (compile-time constant here) or EFAULT on the timespec pointer +// (stack-allocated here). The realistic failure mode is a +// seccomp/sandbox profile that denies clock_gettime. We return 0 in +// that case: callers (leaseState.valid, engineLeaseAckValid) treat a +// zero Instant as "clock unavailable" and force the slow path, so a +// persistent syscall failure cannot leave a warmed lease valid. func nowNanos() int64 { var ts unix.Timespec if err := unix.ClockGettime(unix.CLOCK_MONOTONIC_RAW, &ts); err != nil { diff --git a/kv/coordinator.go b/kv/coordinator.go index 28b3f6f7..1340eab6 100644 --- a/kv/coordinator.go +++ b/kv/coordinator.go @@ -473,6 +473,9 @@ func (c *Coordinate) observeLeaseRead(hit bool) { // published via LastQuorumAck is fresh enough to serve a leader-local // read. Enforces the safety contract from raftengine.LeaseProvider: // - local state must be Leader +// - now must be a real reading; a zero now signals that the +// caller's monoclock.Now() read failed (e.g. clock_gettime denied +// under seccomp) and the lease fast path must fail closed // - ack must be non-zero (a quorum was ever observed) // - ack must not be after now (defensive guard: the monotonic-raw // clock cannot go backwards, but a zero / bogus ack reading should @@ -485,7 +488,7 @@ func (c *Coordinate) observeLeaseRead(hit bool) { // for why the raw monotonic source matters once leaseSafetyMargin is // tightened below ~5 ms. func engineLeaseAckValid(state raftengine.State, ack, now monoclock.Instant, leaseDur time.Duration) bool { - if state != raftengine.StateLeader || ack.IsZero() || ack.After(now) { + if state != raftengine.StateLeader || now.IsZero() || ack.IsZero() || ack.After(now) { return false } return now.Sub(ack) < leaseDur diff --git a/kv/lease_read_test.go b/kv/lease_read_test.go index 59917c45..3e200490 100644 --- a/kv/lease_read_test.go +++ b/kv/lease_read_test.go @@ -511,3 +511,19 @@ func TestLeaseReadEngineCtx_NilEngine(t *testing.T) { require.Error(t, err) require.ErrorIs(t, err, ErrLeaderNotFound) } + +// TestEngineLeaseAckValid_ZeroNowFailsClosed pins the clock-failure +// fail-closed behaviour on the engine-ack path: when monoclock.Now() +// returns the zero Instant (clock_gettime denied under seccomp or +// similar), the fast path must NOT certify the lease even against a +// fresh ack. Without the now.IsZero guard, a persistent clock +// failure would let ack.After(zero) still hold only accidentally +// (depends on ack.Nanos > 0); an explicit zero-check keeps the +// invariant independent of ack-value internals. +func TestEngineLeaseAckValid_ZeroNowFailsClosed(t *testing.T) { + t.Parallel() + ack := monoclock.Now() // fresh ack + require.False(t, + engineLeaseAckValid(raftengine.StateLeader, ack, monoclock.Zero, time.Hour), + "zero now must force the slow path") +} diff --git a/kv/lease_state.go b/kv/lease_state.go index 32c4bbc1..f7c42c76 100644 --- a/kv/lease_state.go +++ b/kv/lease_state.go @@ -51,8 +51,15 @@ type leaseState struct { } // valid reports whether the lease is unexpired at now. +// +// A zero-valued now indicates that the caller's monotonic-raw clock +// read failed (e.g. clock_gettime denied under seccomp) and is treated +// as "no lease evidence available" -- fail closed onto the slow path. +// Without this guard, a warmed lease (non-zero expiry) would stay +// forever valid for any caller sampling monoclock.Zero, since +// zero.Before(positive) holds. func (s *leaseState) valid(now monoclock.Instant) bool { - if s == nil { + if s == nil || now.IsZero() { return false } ns := s.expiryNanos.Load() diff --git a/kv/lease_state_test.go b/kv/lease_state_test.go index 7424ab05..70a6d56c 100644 --- a/kv/lease_state_test.go +++ b/kv/lease_state_test.go @@ -51,6 +51,21 @@ func TestLeaseState_ZeroValueIsExpired(t *testing.T) { require.False(t, s.valid(monoclock.Now())) } +// TestLeaseState_ZeroNowFailsClosed pins the "clock unavailable" +// invariant: if the caller's monoclock.Now() read failed (e.g. +// clock_gettime denied under seccomp) and surfaced monoclock.Zero, +// valid() MUST return false even for a freshly warmed lease. +// Otherwise a persistent clock failure would keep the node serving +// fast-path reads off a stale lease indefinitely. +func TestLeaseState_ZeroNowFailsClosed(t *testing.T) { + t.Parallel() + var s leaseState + s.extend(monoclock.Now().Add(time.Hour), s.generation()) + require.True(t, s.valid(monoclock.Now()), "sanity: warmed lease is valid for a real now") + require.False(t, s.valid(monoclock.Zero), + "a zero now signals clock failure; lease must fail closed") +} + func TestLeaseState_ExtendAndExpire(t *testing.T) { t.Parallel() var s leaseState From 587717eb2c3e90da290d557c963fbd1e28c15d47 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Fri, 24 Apr 2026 04:14:20 +0900 Subject: [PATCH 4/5] kv/lease: gate lease-rollback CAS on pointer identity The previous leaseState stored expiry as atomic.Int64 and rolled back a raced extend via expiryNanos.CompareAndSwap(target, 0). When two extenders computed the same target (clock-granularity tie across a leader-loss invalidate), the stale extender's value-gated rollback could clobber the fresh extender's still-valid lease, breaking the generation guard's intent. Replace the int64+gen pair with atomic.Pointer[leaseSlot]. Each successful extend installs a freshly-allocated *leaseSlot, so pointer identity alone disambiguates co-targeted extenders: a rollback CAS against the extender's own *leaseSlot cannot match a pointer a concurrent winner has already installed, even when the expiry values are identical. Same monotonic generation-counter semantics preserved inside the slot. Add a regression test (TestLeaseState_RollbackCASUsesPointerIdentity) that deterministically simulates the clock-tie race by manipulating internal state: a pointer-gated CAS fails while a value-gated CAS would have erased the fresh lease. Positive-control test pins the in-place clearing path so we do not silently stop rolling back when we should. --- kv/lease_state.go | 142 ++++++++++++++++++++++++++++------------- kv/lease_state_test.go | 98 ++++++++++++++++++++++++++++ 2 files changed, 195 insertions(+), 45 deletions(-) diff --git a/kv/lease_state.go b/kv/lease_state.go index f7c42c76..a06f493a 100644 --- a/kv/lease_state.go +++ b/kv/lease_state.go @@ -30,43 +30,74 @@ func isLeadershipLossError(err error) bool { errors.Is(err, raftengine.ErrLeadershipTransferInProgress) } +// leaseSlot is the immutable payload stored behind leaseState.current. +// Each successful extend installs a freshly-allocated *leaseSlot, so +// pointer identity alone disambiguates two extenders that happened to +// compute the same expiry value (clock-granularity tie). A rollback +// CAS that compares against the extender's own *leaseSlot therefore +// cannot clobber a newer lease installed by a concurrent winner, even +// if the newer lease carries an equal expiryNanos. +// +// expiryNanos == 0 is the sentinel for "no lease"; the zero-valued +// *leaseSlot (returned on startup before any extend) carries gen 0. +type leaseSlot struct { + // expiryNanos is monoclock.Instant.Nanos(); 0 means "no lease". + expiryNanos int64 + // gen is the monotonic invalidation counter. Each invalidate + // installs a new slot with gen+1; each extend observes it via + // generation() BEFORE the quorum operation and refuses to install + // a slot whose gen no longer matches. This preserves the + // leader-loss guard: a Dispatch that returned just before an + // invalidate fires must not resurrect the lease. + gen uint64 +} + // leaseState tracks the monotonic-raw expiry of a leader-local read -// lease. All operations are lock-free via atomic.Int64 (the raw -// monoclock.Instant nanoseconds) plus a generation counter that -// prevents an in-flight extend from resurrecting a lease that a -// concurrent invalidate has cleared. +// lease. The hot-path read (valid) remains lock-free via a single +// atomic pointer load; writes go through CAS on the pointer so +// pointer identity gates the extender-vs-rollback race. // -// expiry == 0 means the lease has never been issued or has been -// invalidated. A non-zero value is the monotonic-raw instant after -// which the lease is considered expired; a caller comparing -// monoclock.Now() against the loaded value can decide whether to skip -// a quorum confirmation. The monotonic-raw clock is immune to NTP -// rate adjustment and wall-clock steps (see internal/monoclock). +// expiryNanos == 0 in the current slot means the lease has never been +// issued or has been invalidated. A non-zero value is the +// monotonic-raw instant after which the lease is considered expired; +// a caller comparing monoclock.Now() against the loaded value can +// decide whether to skip a quorum confirmation. The monotonic-raw +// clock is immune to NTP rate adjustment and wall-clock steps (see +// internal/monoclock). type leaseState struct { - gen atomic.Uint64 - // expiryNanos stores monoclock.Instant.Nanos(); 0 means "no lease". - // Stored as int64 so CAS can be expressed without an extra pointer - // indirection, preserving the lock-free fast-path performance. - expiryNanos atomic.Int64 + // current is the live *leaseSlot. A nil pointer means "never + // extended"; valid() treats it identically to an explicit + // zero-expiry slot. + current atomic.Pointer[leaseSlot] +} + +// slotOrZero returns the current slot, substituting a synthetic zero +// slot when the pointer has never been stored. Keeps callers free of +// nil checks. +func slotOrZero(p *leaseSlot) *leaseSlot { + if p == nil { + return &leaseSlot{} + } + return p } // valid reports whether the lease is unexpired at now. // // A zero-valued now indicates that the caller's monotonic-raw clock -// read failed (e.g. clock_gettime denied under seccomp) and is treated -// as "no lease evidence available" -- fail closed onto the slow path. -// Without this guard, a warmed lease (non-zero expiry) would stay -// forever valid for any caller sampling monoclock.Zero, since -// zero.Before(positive) holds. +// read failed (e.g. clock_gettime denied under seccomp) and is +// treated as "no lease evidence available" -- fail closed onto the +// slow path. Without this guard, a warmed lease (non-zero expiry) +// would stay forever valid for any caller sampling monoclock.Zero, +// since zero.Before(positive) holds. func (s *leaseState) valid(now monoclock.Instant) bool { if s == nil || now.IsZero() { return false } - ns := s.expiryNanos.Load() - if ns == 0 { + slot := s.current.Load() + if slot == nil || slot.expiryNanos == 0 { return false } - return now.Before(monoclock.FromNanos(ns)) + return now.Before(monoclock.FromNanos(slot.expiryNanos)) } // generation returns the current invalidation counter. Callers MUST @@ -79,59 +110,80 @@ func (s *leaseState) generation() uint64 { if s == nil { return 0 } - return s.gen.Load() + return slotOrZero(s.current.Load()).gen } // extend sets the lease expiry to until iff (a) until is strictly // after the currently stored expiry (or no expiry is stored) and // (b) no invalidate has happened since the caller captured -// expectedGen via generation() BEFORE the quorum operation. The -// generation guard prevents a Dispatch that returned successfully -// *just before* a leader-loss invalidate from resurrecting the -// lease milliseconds after invalidation. +// expectedGen via generation() BEFORE the quorum operation. +// +// Pointer-identity CAS guards the rollback: after a racing +// invalidate fires, the rollback clears ONLY the exact *leaseSlot +// this extender installed, so a concurrent extender that captured +// the post-invalidate generation and computed the same expiry value +// (clock-granularity tie) cannot be clobbered -- its slot is a +// distinct allocation with a distinct pointer. func (s *leaseState) extend(until monoclock.Instant, expectedGen uint64) { if s == nil { return } target := until.Nanos() if target == 0 { - // Refuse to store 0 — that value is the sentinel for "no + // Refuse to store 0 -- that value is the sentinel for "no // lease" and would race with invalidate's zero-store. return } for { + // Load the live pointer once per iteration; all decisions + // below are based on THIS observation. The CAS old-value + // must match this exact pointer. + stored := s.current.Load() + old := slotOrZero(stored) // Pre-CAS gate: if invalidate already advanced the generation // past expectedGen, skip the CAS entirely. - if s.gen.Load() != expectedGen { + if old.gen != expectedGen { return } - current := s.expiryNanos.Load() - if current != 0 && target <= current { + if old.expiryNanos != 0 && target <= old.expiryNanos { return } - if !s.expiryNanos.CompareAndSwap(current, target) { + next := &leaseSlot{expiryNanos: target, gen: expectedGen} + if !s.current.CompareAndSwap(stored, next) { continue } - // CAS landed. If invalidate raced in between the pre-CAS gate - // and the CAS itself, undo our write iff no later writer has - // replaced it. Using CAS with our own target means a fresh - // extend that captured the post-invalidate generation is left - // intact (its CAS already replaced our target with its own). - if s.gen.Load() != expectedGen { - s.expiryNanos.CompareAndSwap(target, 0) + // CAS landed. If invalidate raced in between the pre-CAS + // gate and the CAS itself, the current gen now exceeds + // expectedGen; undo our write via a pointer-identity CAS on + // `next`. A later writer (concurrent extender, concurrent + // invalidate) that already replaced `next` with its own + // allocation owns the state; its pointer differs from + // `next`, our CAS fails, and we leave its value intact -- + // EVEN IF its expiryNanos equals our target. + if observed := slotOrZero(s.current.Load()); observed.gen != expectedGen { + rb := &leaseSlot{expiryNanos: 0, gen: observed.gen} + s.current.CompareAndSwap(next, rb) } return } } // invalidate clears the lease so the next read takes the slow path. -// Bumping the generation first ensures any concurrent extend that -// captured the previous generation will undo its own CAS rather than -// resurrect the lease. +// Each call installs a fresh zero-expiry slot whose gen is one more +// than the current slot's, via CAS retry. A concurrent extender that +// captured the pre-invalidate generation will see the advanced gen +// on its post-CAS recheck and roll back its own slot via pointer- +// identity CAS. func (s *leaseState) invalidate() { if s == nil { return } - s.gen.Add(1) - s.expiryNanos.Store(0) + for { + stored := s.current.Load() + old := slotOrZero(stored) + next := &leaseSlot{expiryNanos: 0, gen: old.gen + 1} + if s.current.CompareAndSwap(stored, next) { + return + } + } } diff --git a/kv/lease_state_test.go b/kv/lease_state_test.go index 70a6d56c..870610d9 100644 --- a/kv/lease_state_test.go +++ b/kv/lease_state_test.go @@ -161,6 +161,104 @@ func TestLeaseState_ExtendWithFreshGenSucceedsAfterInvalidate(t *testing.T) { require.True(t, s.valid(now)) } +// TestLeaseState_RollbackCASUsesPointerIdentity pins the codex-P3 +// invariant: when a stale extender rolls back after a racing +// invalidate, its rollback CAS must be gated on pointer identity, NOT +// on the expiry-nanos value. A value-gated CAS could clobber a fresh +// lease installed by a concurrent extender that (due to clock- +// granularity ties) computed the same expiry. +// +// Scenario: +// 1. Extender A captures genA = 0 and installs slot{target, gen=0}. +// 2. invalidate fires, bumping the live slot to gen=1 with zero +// expiry. A's post-CAS gen check now MUST see the mismatch and +// attempt a rollback. +// 3. A concurrent extender B captures genB = 1 and installs a FRESH +// *leaseSlot{target, gen=1} -- same target value, distinct +// allocation. +// 4. A's rollback CAS runs LAST. A value-gated CAS (old impl +// expiryNanos.CompareAndSwap(target, 0)) would erase B's +// still-valid lease. A pointer-gated CAS (new impl +// current.CompareAndSwap(aSlot, rb)) fails because the live +// pointer is B's allocation, preserving B's lease. +// +// Simulated step-by-step (no goroutines) against internal state so +// the ordering is deterministic. Without the fix this test fails: +// s.valid(now) observes the rolled-back zero-expiry slot. +func TestLeaseState_RollbackCASUsesPointerIdentity(t *testing.T) { + t.Parallel() + var s leaseState + now := monoclock.Now() + target := now.Add(time.Hour) + targetNanos := target.Nanos() + + // Step 1: simulate A's CAS having already landed. + aSlot := &leaseSlot{expiryNanos: targetNanos, gen: 0} + s.current.Store(aSlot) + + // Step 2: invalidate races in. It CAS-replaces aSlot with a + // zero-expiry slot at gen=1. + s.invalidate() + require.Equal(t, uint64(1), s.generation()) + + // Step 3: concurrent extender B captures genB=1 (post-invalidate) + // and installs a distinct *leaseSlot with the same target value. + // This exercises the clock-granularity tie. + bSlot := &leaseSlot{expiryNanos: targetNanos, gen: 1} + bOld := s.current.Load() + require.True(t, s.current.CompareAndSwap(bOld, bSlot), + "B's CAS install must succeed in this simulated ordering") + require.True(t, s.valid(now), "B's fresh lease must be valid") + + // Step 4: A's delayed rollback runs. The fix is that rollback is + // pointer-identity CAS on aSlot, which fails here because the + // live pointer is bSlot (even though bSlot.expiryNanos equals + // aSlot.expiryNanos). + rb := &leaseSlot{expiryNanos: 0, gen: 1} + swapped := s.current.CompareAndSwap(aSlot, rb) + require.False(t, swapped, + "pointer-identity rollback must NOT clobber a fresh lease that happened to compute the same expiry value") + + // Invariant: B's lease is still live. + require.True(t, s.valid(now), + "fresh lease with matching expiry must survive a stale extender's rollback") + require.Equal(t, bSlot, s.current.Load(), + "live slot must still be B's allocation, unchanged") +} + +// TestLeaseState_RollbackCASClearsOwnSlot is the positive control: +// when NO concurrent extender has replaced A's slot, A's rollback +// DOES clear it (because the live pointer is still aSlot). Exercises +// the happy path for the rollback branch inside extend. +func TestLeaseState_RollbackCASClearsOwnSlot(t *testing.T) { + t.Parallel() + var s leaseState + now := monoclock.Now() + + // Caller pattern: sample generation, then quorum op races with + // invalidate, then extend. extend sees the gen advance in its + // post-CAS recheck and rolls back its own freshly-installed slot. + expectedGen := s.generation() + s.invalidate() // concurrent leader-loss during the quorum op + + // extend with the pre-invalidate generation. Internal flow: + // pre-CAS gate observes gen mismatch and returns WITHOUT + // installing any slot. The invalidate's zero-slot remains. + s.extend(now.Add(time.Hour), expectedGen) + require.False(t, s.valid(now), + "pre-invalidate extend must not resurrect the lease") + + // Separately, run the actual extend race: sample a FRESH gen + // (simulating a caller that re-entered after invalidate), install + // a lease, then invalidate, then verify clearing. + freshGen := s.generation() + s.extend(now.Add(time.Hour), freshGen) + require.True(t, s.valid(now)) + s.invalidate() + require.False(t, s.valid(now), + "invalidate after a successful extend must clear the lease") +} + func TestLeaseState_ConcurrentExtendAndRead(t *testing.T) { t.Parallel() var s leaseState From c5c27b7b19e2b43a4d340025f77d2de0c49bf6eb Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Fri, 24 Apr 2026 04:26:36 +0900 Subject: [PATCH 5/5] raftengine: align LeaseProvider.LastQuorumAck contract with fail-closed guards Two staleness bugs in the interface docstring: 1. The safety formula omitted !now.IsZero(), which engineLeaseAckValid enforces after the fail-closed fix. Implementers copying the contract comment would miss the guard that prevents a seccomp / clock_gettime failure from resurrecting a warmed lease. 2. The platform note listed FreeBSD as a CLOCK_MONOTONIC_RAW target, but golang.org/x/sys/unix does not export the constant there and internal/monoclock routes FreeBSD through the runtime-monotonic fallback. Update both so the interface docs match the actual behaviour. --- internal/raftengine/engine.go | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/internal/raftengine/engine.go b/internal/raftengine/engine.go index 65ad0850..f94cb9f5 100644 --- a/internal/raftengine/engine.go +++ b/internal/raftengine/engine.go @@ -107,16 +107,25 @@ type LeaseProvider interface { // Safety: callers must verify the lease against a single // `now := monoclock.Now()` sample: // state == raftengine.StateLeader && - // !ack.IsZero() && !ack.After(now) && now.Sub(ack) < LeaseDuration() + // !now.IsZero() && !ack.IsZero() && !ack.After(now) && + // now.Sub(ack) < LeaseDuration() // - // The monotonic-raw clock (CLOCK_MONOTONIC_RAW on Linux / Darwin / - // FreeBSD; see internal/monoclock) is immune to NTP rate adjustment - // and wall-clock step events, so this comparison stays safe even if - // the system's time daemon slews or steps the wall clock. The - // !ack.After(now) guard remains as a defensive fail-closed for a - // zero / bogus ack reading. LeaseDuration is bounded by - // electionTimeout - safety_margin, guaranteeing no successor leader - // has accepted writes within that window. + // The !now.IsZero() guard fails closed when the caller's + // clock_gettime read errored (e.g. seccomp denies it) and + // monoclock.Now() returned the zero Instant; without it, a + // persistent clock failure could keep a once-warmed lease valid + // forever. See kv.engineLeaseAckValid. + // + // The monotonic-raw clock (CLOCK_MONOTONIC_RAW on Linux / Darwin; + // runtime-monotonic fallback on FreeBSD / Windows / others, see + // internal/monoclock) is immune to NTP rate adjustment and + // wall-clock step events on the raw-clock platforms, so the + // comparison stays safe even if the system's time daemon slews + // or steps the wall clock. The !ack.After(now) guard remains as + // a defensive fail-closed for a zero / bogus ack reading. + // LeaseDuration is bounded by electionTimeout - safety_margin, + // guaranteeing no successor leader has accepted writes within + // that window. // // Returns the zero Instant when no quorum has been confirmed yet // or when the local node is not the leader. Single-node LEADERS