From 665ddde78ad2f9a7f9d078b6f3d173195090e7a6 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Mon, 20 Apr 2026 22:02:41 +0900 Subject: [PATCH 1/3] fix(lease-read): only invalidate lease on leadership-loss errors Production observation (prod cluster on 192.168.0.210, all-reads slow path): EVALSHA avg 6.3 s/op, redis.call() avg 6.25 s/call, GET avg 1.11 s/op. The lease fast path is never taken because every Dispatch error -- write-conflicts in particular, which are frequent in the Lua retry-loop workload -- invalidates the per-shard lease. The next LeaseRead then falls through to LinearizableRead, which blocks on a heartbeat round with followers; after the read, a subsequent write error invalidates again, ad infinitum. The lease therefore never stays warm long enough for a fast-path read to hit. Root cause: refreshLeaseAfterDispatch (Coordinate.Dispatch) and leaseRefreshingTxn.Commit/Abort treat any err as a leadership-loss signal, but that is too aggressive. Write-conflict / validation / deadline-on-a-non-ReadIndex-path errors are business-logic failures that do NOT mean this node stopped being leader. Fix: introduce isLeadershipLossError(err) and invalidate ONLY when it returns true. Recognized signals are hashicorp raft.ErrNotLeader / raft.ErrLeadershipLost / raft.ErrLeadershipTransferInProgress, plus substring matches against the etcd engine's "not leader" / "leadership transfer" / "leadership lost" sentinels (cockroachdb/errors wraps errors in a way that errors.Is does not always traverse across package boundaries). Real leadership loss is still caught by the engine's RegisterLeaderLossCallback hook; the LeaseRead fast path also guards on engine.State() == StateLeader for defense in depth. Under this combination, a genuine step-down still invalidates the lease promptly, but a storm of write-conflicts no longer carpet- invalidates and lets the fast path actually serve reads. --- kv/coordinator.go | 11 ++++++++++- kv/lease_state.go | 33 +++++++++++++++++++++++++++++++++ kv/sharded_coordinator.go | 20 +++++++++++++++----- 3 files changed, 58 insertions(+), 6 deletions(-) diff --git a/kv/coordinator.go b/kv/coordinator.go index fa4033a60..e5f6f8695 100644 --- a/kv/coordinator.go +++ b/kv/coordinator.go @@ -226,7 +226,16 @@ func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*C // "any error from engine.Propose" as an invalidation trigger. func (c *Coordinate) refreshLeaseAfterDispatch(resp *CoordinateResponse, err error, dispatchStart time.Time, expectedGen uint64) { if err != nil { - c.lease.invalidate() + // Only invalidate on errors that actually signal leadership + // loss. Write conflicts and validation errors are business- + // logic failures that do NOT mean this node stopped being + // leader; invalidating for them would force every subsequent + // read into the slow LinearizableRead path and defeat the + // lease. Engine.RegisterLeaderLossCallback and the fast-path + // State() == StateLeader guard cover real leader loss. + if isLeadershipLossError(err) { + c.lease.invalidate() + } return } if resp == nil || resp.CommitIndex == 0 { diff --git a/kv/lease_state.go b/kv/lease_state.go index c30cd3d40..e1a5fb3ec 100644 --- a/kv/lease_state.go +++ b/kv/lease_state.go @@ -1,10 +1,43 @@ package kv import ( + "errors" + "strings" "sync/atomic" "time" + + "github.com/hashicorp/raft" ) +// isLeadershipLossError reports whether err signals that this node has +// lost leadership and a successor should be contacted. Propose/Commit +// errors that are NOT leadership-related (write conflict, validation, +// deadline on a non-ReadIndex path) must NOT trigger lease +// invalidation -- doing so forces every subsequent read into the slow +// LinearizableRead path and defeats the lease's purpose. +// +// The underlying engines surface leadership loss via distinct +// sentinel errors; we recognize the hashicorp variant directly and +// match the etcd engine's "not leader" string via errors.Is / +// substring match so a future rename in etcd/raft does not silently +// reintroduce the over-invalidation bug. +func isLeadershipLossError(err error) bool { + if err == nil { + return false + } + if errors.Is(err, raft.ErrNotLeader) || errors.Is(err, raft.ErrLeadershipLost) || + errors.Is(err, raft.ErrLeadershipTransferInProgress) { + return true + } + msg := err.Error() + // etcd engine errors (cockroachdb/errors wraps them, so Is may + // not traverse reliably across package boundaries; fall back to + // substring match against the sentinel messages). + return strings.Contains(msg, "not leader") || + strings.Contains(msg, "leadership transfer") || + strings.Contains(msg, "leadership lost") +} + // leaseState tracks the wall-clock expiry of a leader-local read lease. // All operations are lock-free via atomic.Pointer plus a generation // counter that prevents an in-flight extend from resurrecting a lease diff --git a/kv/sharded_coordinator.go b/kv/sharded_coordinator.go index eafb6f792..dffdde85f 100644 --- a/kv/sharded_coordinator.go +++ b/kv/sharded_coordinator.go @@ -46,10 +46,18 @@ func (t *leaseRefreshingTxn) Commit(reqs []*pb.Request) (*TransactionResponse, e expectedGen := t.g.lease.generation() resp, err := t.inner.Commit(reqs) if err != nil { - // Propose failures commonly signal leadership loss; follow - // the design doc and invalidate so the next read takes the - // slow path and re-verifies. - t.g.lease.invalidate() + // Only invalidate on errors that actually signal a leadership + // change. Write-conflicts, validation errors, and deadline + // exceeded on non-ReadIndex paths do NOT imply the leader is + // gone; invalidating the lease for them forces every read + // into the slow LinearizableRead path and defeats the whole + // point of the lease. The engine's own leader-loss callback + // already handles true leadership loss, plus + // Coordinate.LeaseRead guards the fast path on + // engine.State() == StateLeader. + if isLeadershipLossError(err) { + t.g.lease.invalidate() + } return resp, errors.WithStack(err) } t.maybeRefresh(resp, start, expectedGen) @@ -61,7 +69,9 @@ func (t *leaseRefreshingTxn) Abort(reqs []*pb.Request) (*TransactionResponse, er expectedGen := t.g.lease.generation() resp, err := t.inner.Abort(reqs) if err != nil { - t.g.lease.invalidate() + if isLeadershipLossError(err) { + t.g.lease.invalidate() + } return resp, errors.WithStack(err) } t.maybeRefresh(resp, start, expectedGen) From edfa0fa2b7eaf7e4028d839145c00f3bf41c0d72 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Mon, 20 Apr 2026 22:44:49 +0900 Subject: [PATCH 2/3] refactor(lease-read): detect leadership loss via errors.Is on shared sentinels Replace the substring-matching fallback in isLeadershipLossError with a strict cockroachdb/errors.Is check against shared raftengine sentinels. - Add raftengine.Err{NotLeader,LeadershipLost,LeadershipTransferInProgress} as the single source of truth both engine backends mark their internal errors against. - etcd engine: mark errNotLeader and errLeadershipTransferNotLeader. - hashicorp engine: translate raft.Err{NotLeader,LeadershipLost, LeadershipTransferInProgress} via a shared helper on all return sites. - kv/lease_state.go: drop strings.Contains, drop the hashicorp/raft dependency, rely solely on the raftengine sentinels. Use cockroachdb/errors.Is since stdlib errors.Is does not traverse cockroachdb mark chains. - Add TestIsLeadershipLossError covering the mark-based detection path and the negative cases (write-conflict-style errors, context cancel). --- internal/raftengine/engine.go | 17 ++++++++++ internal/raftengine/etcd/engine.go | 4 +-- internal/raftengine/hashicorp/engine.go | 30 +++++++++++++---- kv/lease_state.go | 30 +++++++---------- kv/lease_state_test.go | 44 +++++++++++++++++++++++++ 5 files changed, 98 insertions(+), 27 deletions(-) diff --git a/internal/raftengine/engine.go b/internal/raftengine/engine.go index e97afd6ab..95cc77b7a 100644 --- a/internal/raftengine/engine.go +++ b/internal/raftengine/engine.go @@ -2,10 +2,27 @@ package raftengine import ( "context" + "errors" "io" "time" ) +// Shared sentinel errors that both engine implementations should wrap +// so callers can test with errors.Is across engine backends. +var ( + // ErrNotLeader indicates the operation was rejected because the + // local node is not the Raft leader for the target group. + // Callers that care about leadership (e.g. lease invalidation + // logic) should match via errors.Is. + ErrNotLeader = errors.New("raft engine: not leader") + // ErrLeadershipLost indicates the local node was leader when the + // operation began but lost leadership before it could complete. + ErrLeadershipLost = errors.New("raft engine: leadership lost") + // ErrLeadershipTransferInProgress indicates a leadership transfer + // is under way and proposals are being held back. + ErrLeadershipTransferInProgress = errors.New("raft engine: leadership transfer in progress") +) + type State string const ( diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index fddfb9c4a..c6be98d9f 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -63,7 +63,7 @@ const ( var ( errNilEngine = errors.New("raft engine is not configured") errClosed = errors.New("etcd raft engine is closed") - errNotLeader = errors.New("etcd raft engine is not leader") + errNotLeader = errors.Mark(errors.New("etcd raft engine is not leader"), raftengine.ErrNotLeader) errNodeIDRequired = errors.New("etcd raft node id is required") errDataDirRequired = errors.New("etcd raft data dir is required") errStateMachineUnset = errors.New("etcd raft state machine is not configured") @@ -76,7 +76,7 @@ var ( errLeadershipTransferNotReady = errors.New("etcd raft leadership transfer target is not available") errLeadershipTransferAborted = errors.New("etcd raft leadership transfer aborted") errLeadershipTransferRejected = errors.New("etcd raft leadership transfer was rejected by raft (target is not a voter)") - errLeadershipTransferNotLeader = errors.New("etcd raft leadership transfer requires the local node to be leader") + errLeadershipTransferNotLeader = errors.Mark(errors.New("etcd raft leadership transfer requires the local node to be leader"), raftengine.ErrNotLeader) errTooManyPendingConfigs = errors.New("etcd raft engine has too many pending config changes") ) diff --git a/internal/raftengine/hashicorp/engine.go b/internal/raftengine/hashicorp/engine.go index 71e8c145a..b025ba328 100644 --- a/internal/raftengine/hashicorp/engine.go +++ b/internal/raftengine/hashicorp/engine.go @@ -18,6 +18,24 @@ const unknownLastContact = time.Duration(-1) var errNilEngine = errors.New("raft engine is not configured") +// translateLeadershipErr wraps hashicorp/raft leadership-related sentinels +// with the shared raftengine sentinels so callers can use a single +// errors.Is check across engine backends. +func translateLeadershipErr(err error) error { + if err == nil { + return nil + } + switch { + case errors.Is(err, raft.ErrNotLeader): + return errors.WithStack(errors.Mark(err, raftengine.ErrNotLeader)) + case errors.Is(err, raft.ErrLeadershipLost): + return errors.WithStack(errors.Mark(err, raftengine.ErrLeadershipLost)) + case errors.Is(err, raft.ErrLeadershipTransferInProgress): + return errors.WithStack(errors.Mark(err, raftengine.ErrLeadershipTransferInProgress)) + } + return errors.WithStack(err) +} + type Engine struct { raft *raft.Raft @@ -66,7 +84,7 @@ func (e *Engine) Propose(ctx context.Context, data []byte) (*raftengine.Proposal if ctxErr := contextErr(ctx); ctxErr != nil { return nil, ctxErr } - return nil, errors.WithStack(err) + return nil, translateLeadershipErr(err) } return &raftengine.ProposalResult{ @@ -118,7 +136,7 @@ func (e *Engine) VerifyLeader(ctx context.Context) error { if ctxErr := contextErr(ctx); ctxErr != nil { return ctxErr } - return errors.WithStack(err) + return translateLeadershipErr(err) } return nil } @@ -136,7 +154,7 @@ func (e *Engine) CheckServing(ctx context.Context) error { return errors.WithStack(errNilEngine) } if e.State() != raftengine.StateLeader { - return errors.WithStack(raft.ErrNotLeader) + return errors.WithStack(errors.Mark(raft.ErrNotLeader, raftengine.ErrNotLeader)) } return nil } @@ -155,7 +173,7 @@ func (e *Engine) LinearizableRead(ctx context.Context) (uint64, error) { return 0, errors.WithStack(errNilEngine) } if e.raft.State() != raft.Leader { - return 0, errors.WithStack(raft.ErrNotLeader) + return 0, errors.WithStack(errors.Mark(raft.ErrNotLeader, raftengine.ErrNotLeader)) } // Raft ยง5.4.2: ensure at least one Barrier has been issued in the @@ -238,7 +256,7 @@ func (e *Engine) executeBarrier(ctx context.Context) (bool, error) { } if e.raft.State() != raft.Leader { - return false, errors.WithStack(raft.ErrNotLeader) + return false, errors.WithStack(errors.Mark(raft.ErrNotLeader, raftengine.ErrNotLeader)) } timeout, err := timeoutFromContext(ctx) @@ -249,7 +267,7 @@ func (e *Engine) executeBarrier(ctx context.Context) (bool, error) { if ctxErr := contextErr(ctx); ctxErr != nil { return false, ctxErr } - return false, errors.WithStack(err) + return false, translateLeadershipErr(err) } e.barrierTerm.Store(term) diff --git a/kv/lease_state.go b/kv/lease_state.go index e1a5fb3ec..ee46db604 100644 --- a/kv/lease_state.go +++ b/kv/lease_state.go @@ -1,12 +1,11 @@ package kv import ( - "errors" - "strings" "sync/atomic" "time" - "github.com/hashicorp/raft" + "github.com/bootjp/elastickv/internal/raftengine" + "github.com/cockroachdb/errors" ) // isLeadershipLossError reports whether err signals that this node has @@ -16,26 +15,19 @@ import ( // invalidation -- doing so forces every subsequent read into the slow // LinearizableRead path and defeats the lease's purpose. // -// The underlying engines surface leadership loss via distinct -// sentinel errors; we recognize the hashicorp variant directly and -// match the etcd engine's "not leader" string via errors.Is / -// substring match so a future rename in etcd/raft does not silently -// reintroduce the over-invalidation bug. +// Both engine backends mark their internal leadership errors with the +// shared raftengine sentinels via cockroachdb/errors.Mark, so a single +// errors.Is check (using cockroachdb's Is, which understands the +// mark-based equivalence) covers both engines without relying on +// error-message substrings. Note: stdlib errors.Is does NOT traverse +// cockroachdb marks; this file must use cockroachdb/errors.Is. func isLeadershipLossError(err error) bool { if err == nil { return false } - if errors.Is(err, raft.ErrNotLeader) || errors.Is(err, raft.ErrLeadershipLost) || - errors.Is(err, raft.ErrLeadershipTransferInProgress) { - return true - } - msg := err.Error() - // etcd engine errors (cockroachdb/errors wraps them, so Is may - // not traverse reliably across package boundaries; fall back to - // substring match against the sentinel messages). - return strings.Contains(msg, "not leader") || - strings.Contains(msg, "leadership transfer") || - strings.Contains(msg, "leadership lost") + return errors.Is(err, raftengine.ErrNotLeader) || + errors.Is(err, raftengine.ErrLeadershipLost) || + errors.Is(err, raftengine.ErrLeadershipTransferInProgress) } // leaseState tracks the wall-clock expiry of a leader-local read lease. diff --git a/kv/lease_state_test.go b/kv/lease_state_test.go index b4b7c2688..3079092ff 100644 --- a/kv/lease_state_test.go +++ b/kv/lease_state_test.go @@ -1,13 +1,57 @@ package kv import ( + "context" + "errors" "runtime" "testing" "time" + "github.com/bootjp/elastickv/internal/raftengine" + cockroachdberrors "github.com/cockroachdb/errors" + hashicorpraft "github.com/hashicorp/raft" "github.com/stretchr/testify/require" ) +func TestIsLeadershipLossError(t *testing.T) { + t.Parallel() + + cases := []struct { + name string + err error + want bool + }{ + {"nil", nil, false}, + {"unrelated", errors.New("write conflict"), false}, + {"context canceled", context.Canceled, false}, + {"raftengine ErrNotLeader direct", raftengine.ErrNotLeader, true}, + {"raftengine ErrLeadershipLost direct", raftengine.ErrLeadershipLost, true}, + {"raftengine ErrLeadershipTransferInProgress direct", raftengine.ErrLeadershipTransferInProgress, true}, + { + "hashicorp ErrNotLeader marked with raftengine sentinel", + cockroachdberrors.WithStack(cockroachdberrors.Mark(hashicorpraft.ErrNotLeader, raftengine.ErrNotLeader)), + true, + }, + { + "hashicorp ErrLeadershipLost marked with raftengine sentinel", + cockroachdberrors.WithStack(cockroachdberrors.Mark(hashicorpraft.ErrLeadershipLost, raftengine.ErrLeadershipLost)), + true, + }, + { + "bare hashicorp ErrNotLeader (no raftengine mark) is NOT detected", + hashicorpraft.ErrNotLeader, + false, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + require.Equal(t, tc.want, isLeadershipLossError(tc.err)) + }) + } +} + func TestLeaseState_NilReceiverIsAlwaysExpired(t *testing.T) { t.Parallel() var s *leaseState From c41421804e64f35a5f256c4cdc7bd0a49973711b Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Mon, 20 Apr 2026 22:50:15 +0900 Subject: [PATCH 3/3] test(raftengine): pin errors.Is mapping to raftengine sentinels Add tests that lock in the cross-backend errors.Is contract introduced in edfa0fa2. Without these, a future refactor could drop the errors.Mark calls and silently force every lease-read onto the slow LinearizableRead path. - etcd: assert errors.Is(errors.WithStack(errNotLeader), raftengine.ErrNotLeader) and the same for errLeadershipTransferNotLeader. - hashicorp: cover translateLeadershipErr for all three raft sentinels, the pass-through case, and nil. --- internal/raftengine/etcd/engine_test.go | 12 ++++ .../hashicorp/leadership_err_test.go | 55 +++++++++++++++++++ 2 files changed, 67 insertions(+) create mode 100644 internal/raftengine/hashicorp/leadership_err_test.go diff --git a/internal/raftengine/etcd/engine_test.go b/internal/raftengine/etcd/engine_test.go index 465657e6c..1f3b78e5d 100644 --- a/internal/raftengine/etcd/engine_test.go +++ b/internal/raftengine/etcd/engine_test.go @@ -1611,3 +1611,15 @@ func mustRawNode(t *testing.T, storage *etcdraft.MemoryStorage, nodeID uint64) * require.NoError(t, err) return rawNode } + +// TestErrNotLeaderMatchesRaftEngineSentinel pins the invariant that the +// etcd engine's internal leadership-loss errors are marked against the +// shared raftengine sentinels. The lease-read fast path in package kv +// relies on a single cross-backend errors.Is(err, raftengine.ErrNotLeader) +// check; a future refactor that forgets to mark these errors would +// silently force every read onto the slow LinearizableRead path. +func TestErrNotLeaderMatchesRaftEngineSentinel(t *testing.T) { + t.Parallel() + require.True(t, errors.Is(errors.WithStack(errNotLeader), raftengine.ErrNotLeader)) + require.True(t, errors.Is(errors.WithStack(errLeadershipTransferNotLeader), raftengine.ErrNotLeader)) +} diff --git a/internal/raftengine/hashicorp/leadership_err_test.go b/internal/raftengine/hashicorp/leadership_err_test.go new file mode 100644 index 000000000..ada254fc7 --- /dev/null +++ b/internal/raftengine/hashicorp/leadership_err_test.go @@ -0,0 +1,55 @@ +package hashicorp + +import ( + "testing" + + "github.com/bootjp/elastickv/internal/raftengine" + "github.com/cockroachdb/errors" + "github.com/hashicorp/raft" + "github.com/stretchr/testify/require" +) + +// TestTranslateLeadershipErrMatchesRaftEngineSentinel pins the invariant +// that hashicorp/raft leadership-loss errors are marked against the +// shared raftengine sentinels. The lease-read fast path in package kv +// relies on a single cross-backend errors.Is(err, raftengine.ErrNotLeader) +// check; a future refactor that forgets to mark these errors would +// silently force every read onto the slow LinearizableRead path. +func TestTranslateLeadershipErrMatchesRaftEngineSentinel(t *testing.T) { + t.Parallel() + + cases := []struct { + name string + in error + want error + }{ + {"not leader", raft.ErrNotLeader, raftengine.ErrNotLeader}, + {"leadership lost", raft.ErrLeadershipLost, raftengine.ErrLeadershipLost}, + {"leadership transfer in progress", raft.ErrLeadershipTransferInProgress, raftengine.ErrLeadershipTransferInProgress}, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + out := translateLeadershipErr(tc.in) + require.True(t, errors.Is(out, tc.want), + "translated error must errors.Is-match the raftengine sentinel") + require.True(t, errors.Is(out, tc.in), + "translated error must retain the original raft sentinel for debugging") + }) + } + + t.Run("unrelated error is passed through", func(t *testing.T) { + t.Parallel() + orig := errors.New("write conflict") + out := translateLeadershipErr(orig) + require.False(t, errors.Is(out, raftengine.ErrNotLeader)) + require.False(t, errors.Is(out, raftengine.ErrLeadershipLost)) + require.False(t, errors.Is(out, raftengine.ErrLeadershipTransferInProgress)) + }) + + t.Run("nil stays nil", func(t *testing.T) { + t.Parallel() + require.NoError(t, translateLeadershipErr(nil)) + }) +}