Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions internal/raftengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,27 @@ package raftengine

import (
"context"
"errors"
"io"
"time"
)

// Shared sentinel errors that both engine implementations should wrap
// so callers can test with errors.Is across engine backends.
var (
// ErrNotLeader indicates the operation was rejected because the
// local node is not the Raft leader for the target group.
// Callers that care about leadership (e.g. lease invalidation
// logic) should match via errors.Is.
ErrNotLeader = errors.New("raft engine: not leader")
// ErrLeadershipLost indicates the local node was leader when the
// operation began but lost leadership before it could complete.
ErrLeadershipLost = errors.New("raft engine: leadership lost")
// ErrLeadershipTransferInProgress indicates a leadership transfer
// is under way and proposals are being held back.
ErrLeadershipTransferInProgress = errors.New("raft engine: leadership transfer in progress")
)

type State string

const (
Expand Down
4 changes: 2 additions & 2 deletions internal/raftengine/etcd/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ const (
var (
errNilEngine = errors.New("raft engine is not configured")
errClosed = errors.New("etcd raft engine is closed")
errNotLeader = errors.New("etcd raft engine is not leader")
errNotLeader = errors.Mark(errors.New("etcd raft engine is not leader"), raftengine.ErrNotLeader)
errNodeIDRequired = errors.New("etcd raft node id is required")
errDataDirRequired = errors.New("etcd raft data dir is required")
errStateMachineUnset = errors.New("etcd raft state machine is not configured")
Expand All @@ -76,7 +76,7 @@ var (
errLeadershipTransferNotReady = errors.New("etcd raft leadership transfer target is not available")
errLeadershipTransferAborted = errors.New("etcd raft leadership transfer aborted")
errLeadershipTransferRejected = errors.New("etcd raft leadership transfer was rejected by raft (target is not a voter)")
errLeadershipTransferNotLeader = errors.New("etcd raft leadership transfer requires the local node to be leader")
errLeadershipTransferNotLeader = errors.Mark(errors.New("etcd raft leadership transfer requires the local node to be leader"), raftengine.ErrNotLeader)
errTooManyPendingConfigs = errors.New("etcd raft engine has too many pending config changes")
)

Expand Down
12 changes: 12 additions & 0 deletions internal/raftengine/etcd/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1611,3 +1611,15 @@ func mustRawNode(t *testing.T, storage *etcdraft.MemoryStorage, nodeID uint64) *
require.NoError(t, err)
return rawNode
}

// TestErrNotLeaderMatchesRaftEngineSentinel pins the invariant that the
// etcd engine's internal leadership-loss errors are marked against the
// shared raftengine sentinels. The lease-read fast path in package kv
// relies on a single cross-backend errors.Is(err, raftengine.ErrNotLeader)
// check; a future refactor that forgets to mark these errors would
// silently force every read onto the slow LinearizableRead path.
func TestErrNotLeaderMatchesRaftEngineSentinel(t *testing.T) {
t.Parallel()
require.True(t, errors.Is(errors.WithStack(errNotLeader), raftengine.ErrNotLeader))
require.True(t, errors.Is(errors.WithStack(errLeadershipTransferNotLeader), raftengine.ErrNotLeader))
}
30 changes: 24 additions & 6 deletions internal/raftengine/hashicorp/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,24 @@ const unknownLastContact = time.Duration(-1)

var errNilEngine = errors.New("raft engine is not configured")

// translateLeadershipErr wraps hashicorp/raft leadership-related sentinels
// with the shared raftengine sentinels so callers can use a single
// errors.Is check across engine backends.
func translateLeadershipErr(err error) error {
if err == nil {
return nil
}
switch {
case errors.Is(err, raft.ErrNotLeader):
return errors.WithStack(errors.Mark(err, raftengine.ErrNotLeader))
case errors.Is(err, raft.ErrLeadershipLost):
return errors.WithStack(errors.Mark(err, raftengine.ErrLeadershipLost))
case errors.Is(err, raft.ErrLeadershipTransferInProgress):
return errors.WithStack(errors.Mark(err, raftengine.ErrLeadershipTransferInProgress))
}
return errors.WithStack(err)
}

type Engine struct {
raft *raft.Raft

Expand Down Expand Up @@ -66,7 +84,7 @@ func (e *Engine) Propose(ctx context.Context, data []byte) (*raftengine.Proposal
if ctxErr := contextErr(ctx); ctxErr != nil {
return nil, ctxErr
}
return nil, errors.WithStack(err)
return nil, translateLeadershipErr(err)
}

return &raftengine.ProposalResult{
Expand Down Expand Up @@ -118,7 +136,7 @@ func (e *Engine) VerifyLeader(ctx context.Context) error {
if ctxErr := contextErr(ctx); ctxErr != nil {
return ctxErr
}
return errors.WithStack(err)
return translateLeadershipErr(err)
}
return nil
}
Expand All @@ -136,7 +154,7 @@ func (e *Engine) CheckServing(ctx context.Context) error {
return errors.WithStack(errNilEngine)
}
if e.State() != raftengine.StateLeader {
return errors.WithStack(raft.ErrNotLeader)
return errors.WithStack(errors.Mark(raft.ErrNotLeader, raftengine.ErrNotLeader))
}
return nil
}
Expand All @@ -155,7 +173,7 @@ func (e *Engine) LinearizableRead(ctx context.Context) (uint64, error) {
return 0, errors.WithStack(errNilEngine)
}
if e.raft.State() != raft.Leader {
return 0, errors.WithStack(raft.ErrNotLeader)
return 0, errors.WithStack(errors.Mark(raft.ErrNotLeader, raftengine.ErrNotLeader))
}

// Raft §5.4.2: ensure at least one Barrier has been issued in the
Expand Down Expand Up @@ -238,7 +256,7 @@ func (e *Engine) executeBarrier(ctx context.Context) (bool, error) {
}

if e.raft.State() != raft.Leader {
return false, errors.WithStack(raft.ErrNotLeader)
return false, errors.WithStack(errors.Mark(raft.ErrNotLeader, raftengine.ErrNotLeader))
}

timeout, err := timeoutFromContext(ctx)
Expand All @@ -249,7 +267,7 @@ func (e *Engine) executeBarrier(ctx context.Context) (bool, error) {
if ctxErr := contextErr(ctx); ctxErr != nil {
return false, ctxErr
}
return false, errors.WithStack(err)
return false, translateLeadershipErr(err)
}

e.barrierTerm.Store(term)
Expand Down
55 changes: 55 additions & 0 deletions internal/raftengine/hashicorp/leadership_err_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package hashicorp

import (
"testing"

"github.com/bootjp/elastickv/internal/raftengine"
"github.com/cockroachdb/errors"
"github.com/hashicorp/raft"
"github.com/stretchr/testify/require"
)

// TestTranslateLeadershipErrMatchesRaftEngineSentinel pins the invariant
// that hashicorp/raft leadership-loss errors are marked against the
// shared raftengine sentinels. The lease-read fast path in package kv
// relies on a single cross-backend errors.Is(err, raftengine.ErrNotLeader)
// check; a future refactor that forgets to mark these errors would
// silently force every read onto the slow LinearizableRead path.
func TestTranslateLeadershipErrMatchesRaftEngineSentinel(t *testing.T) {
t.Parallel()

cases := []struct {
name string
in error
want error
}{
{"not leader", raft.ErrNotLeader, raftengine.ErrNotLeader},
{"leadership lost", raft.ErrLeadershipLost, raftengine.ErrLeadershipLost},
{"leadership transfer in progress", raft.ErrLeadershipTransferInProgress, raftengine.ErrLeadershipTransferInProgress},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
out := translateLeadershipErr(tc.in)
require.True(t, errors.Is(out, tc.want),
"translated error must errors.Is-match the raftengine sentinel")
require.True(t, errors.Is(out, tc.in),
"translated error must retain the original raft sentinel for debugging")
})
}

t.Run("unrelated error is passed through", func(t *testing.T) {
t.Parallel()
orig := errors.New("write conflict")
out := translateLeadershipErr(orig)
require.False(t, errors.Is(out, raftengine.ErrNotLeader))
require.False(t, errors.Is(out, raftengine.ErrLeadershipLost))
require.False(t, errors.Is(out, raftengine.ErrLeadershipTransferInProgress))
})

t.Run("nil stays nil", func(t *testing.T) {
t.Parallel()
require.NoError(t, translateLeadershipErr(nil))
})
}
11 changes: 10 additions & 1 deletion kv/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,16 @@ func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*C
// "any error from engine.Propose" as an invalidation trigger.
func (c *Coordinate) refreshLeaseAfterDispatch(resp *CoordinateResponse, err error, dispatchStart time.Time, expectedGen uint64) {
if err != nil {
c.lease.invalidate()
// Only invalidate on errors that actually signal leadership
// loss. Write conflicts and validation errors are business-
// logic failures that do NOT mean this node stopped being
// leader; invalidating for them would force every subsequent
// read into the slow LinearizableRead path and defeat the
// lease. Engine.RegisterLeaderLossCallback and the fast-path
// State() == StateLeader guard cover real leader loss.
if isLeadershipLossError(err) {
c.lease.invalidate()
}
return
}
if resp == nil || resp.CommitIndex == 0 {
Expand Down
25 changes: 25 additions & 0 deletions kv/lease_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,33 @@ package kv
import (
"sync/atomic"
"time"

"github.com/bootjp/elastickv/internal/raftengine"
"github.com/cockroachdb/errors"
)

// isLeadershipLossError reports whether err signals that this node has
// lost leadership and a successor should be contacted. Propose/Commit
// errors that are NOT leadership-related (write conflict, validation,
// deadline on a non-ReadIndex path) must NOT trigger lease
// invalidation -- doing so forces every subsequent read into the slow
// LinearizableRead path and defeats the lease's purpose.
//
// Both engine backends mark their internal leadership errors with the
// shared raftengine sentinels via cockroachdb/errors.Mark, so a single
// errors.Is check (using cockroachdb's Is, which understands the
// mark-based equivalence) covers both engines without relying on
// error-message substrings. Note: stdlib errors.Is does NOT traverse
// cockroachdb marks; this file must use cockroachdb/errors.Is.
func isLeadershipLossError(err error) bool {
if err == nil {
return false
}
return errors.Is(err, raftengine.ErrNotLeader) ||
errors.Is(err, raftengine.ErrLeadershipLost) ||
errors.Is(err, raftengine.ErrLeadershipTransferInProgress)
}
Comment on lines +24 to +31
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

この isLeadershipLossError の導入により、DispatchCommit/Abort における不要な lease invalidation を防げるようになりますが、Coordinate.LeaseReadShardedCoordinator.groupLeaseRead においても同様の対応が必要ではないでしょうか。現状、それらのメソッド内での LinearizableRead 失敗時は無条件で invalidate() が呼ばれています。タイムアウト等のリーダー交代を伴わない一時的なエラーで lease が破棄されると、後続の読み取りが不必要に slow path へ誘導されるため、本 PR の目的である「擬似的な invalidation による悪循環」を完全に解消するためには、読み取りパスの修正も検討すべきと考えます。


// leaseState tracks the wall-clock expiry of a leader-local read lease.
// All operations are lock-free via atomic.Pointer plus a generation
// counter that prevents an in-flight extend from resurrecting a lease
Expand Down
44 changes: 44 additions & 0 deletions kv/lease_state_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,57 @@
package kv

import (
"context"
"errors"
"runtime"
"testing"
"time"

"github.com/bootjp/elastickv/internal/raftengine"
cockroachdberrors "github.com/cockroachdb/errors"
hashicorpraft "github.com/hashicorp/raft"
"github.com/stretchr/testify/require"
)

func TestIsLeadershipLossError(t *testing.T) {
t.Parallel()

cases := []struct {
name string
err error
want bool
}{
{"nil", nil, false},
{"unrelated", errors.New("write conflict"), false},
{"context canceled", context.Canceled, false},
{"raftengine ErrNotLeader direct", raftengine.ErrNotLeader, true},
{"raftengine ErrLeadershipLost direct", raftengine.ErrLeadershipLost, true},
{"raftengine ErrLeadershipTransferInProgress direct", raftengine.ErrLeadershipTransferInProgress, true},
{
"hashicorp ErrNotLeader marked with raftengine sentinel",
cockroachdberrors.WithStack(cockroachdberrors.Mark(hashicorpraft.ErrNotLeader, raftengine.ErrNotLeader)),
true,
},
{
"hashicorp ErrLeadershipLost marked with raftengine sentinel",
cockroachdberrors.WithStack(cockroachdberrors.Mark(hashicorpraft.ErrLeadershipLost, raftengine.ErrLeadershipLost)),
true,
},
{
"bare hashicorp ErrNotLeader (no raftengine mark) is NOT detected",
hashicorpraft.ErrNotLeader,
false,
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
require.Equal(t, tc.want, isLeadershipLossError(tc.err))
})
}
}

func TestLeaseState_NilReceiverIsAlwaysExpired(t *testing.T) {
t.Parallel()
var s *leaseState
Expand Down
20 changes: 15 additions & 5 deletions kv/sharded_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,18 @@ func (t *leaseRefreshingTxn) Commit(reqs []*pb.Request) (*TransactionResponse, e
expectedGen := t.g.lease.generation()
resp, err := t.inner.Commit(reqs)
if err != nil {
// Propose failures commonly signal leadership loss; follow
// the design doc and invalidate so the next read takes the
// slow path and re-verifies.
t.g.lease.invalidate()
// Only invalidate on errors that actually signal a leadership
// change. Write-conflicts, validation errors, and deadline
// exceeded on non-ReadIndex paths do NOT imply the leader is
// gone; invalidating the lease for them forces every read
// into the slow LinearizableRead path and defeats the whole
// point of the lease. The engine's own leader-loss callback
// already handles true leadership loss, plus
// Coordinate.LeaseRead guards the fast path on
// engine.State() == StateLeader.
if isLeadershipLossError(err) {
t.g.lease.invalidate()
}
return resp, errors.WithStack(err)
}
t.maybeRefresh(resp, start, expectedGen)
Expand All @@ -61,7 +69,9 @@ func (t *leaseRefreshingTxn) Abort(reqs []*pb.Request) (*TransactionResponse, er
expectedGen := t.g.lease.generation()
resp, err := t.inner.Abort(reqs)
if err != nil {
t.g.lease.invalidate()
if isLeadershipLossError(err) {
t.g.lease.invalidate()
}
return resp, errors.WithStack(err)
}
t.maybeRefresh(resp, start, expectedGen)
Expand Down
Loading