diff --git a/adapter/distribution_server_test.go b/adapter/distribution_server_test.go index 7427f7f2..945d8171 100644 --- a/adapter/distribution_server_test.go +++ b/adapter/distribution_server_test.go @@ -747,3 +747,11 @@ func (s *distributionCoordinatorStub) Clock() *kv.HLC { func (s *distributionCoordinatorStub) LinearizableRead(_ context.Context) (uint64, error) { return 0, nil } + +func (s *distributionCoordinatorStub) LeaseRead(ctx context.Context) (uint64, error) { + return s.LinearizableRead(ctx) +} + +func (s *distributionCoordinatorStub) LeaseReadForKey(ctx context.Context, _ []byte) (uint64, error) { + return s.LinearizableRead(ctx) +} diff --git a/adapter/dynamodb.go b/adapter/dynamodb.go index 167b59e2..59a97359 100644 --- a/adapter/dynamodb.go +++ b/adapter/dynamodb.go @@ -65,11 +65,17 @@ const ( transactRetryMaxBackoff = 10 * time.Millisecond transactRetryBackoffFactor = 2 tableCleanupAsyncTimeout = 5 * time.Minute - itemUpdateLockStripeCount = 256 - tableLockStripeCount = 128 - batchWriteItemMaxItems = 25 - transactGetItemsMaxItems = 100 - dynamoMaxRequestBodyBytes = 1 << 20 + // dynamoLeaseReadTimeout bounds how long LeaseReadForKey's slow + // path (LinearizableRead) may block before returning an error to + // the HTTP client. Matches the order of magnitude of Redis's + // redisDispatchTimeout so both adapters give up at similar + // wall-clock budgets on quorum loss. + dynamoLeaseReadTimeout = 5 * time.Second + itemUpdateLockStripeCount = 256 + tableLockStripeCount = 128 + batchWriteItemMaxItems = 25 + transactGetItemsMaxItems = 100 + dynamoMaxRequestBodyBytes = 1 << 20 dynamoTableMetaPrefix = kv.DynamoTableMetaPrefix dynamoTableGenerationPrefix = kv.DynamoTableGenerationPrefix @@ -1340,38 +1346,85 @@ func (d *DynamoDBServer) commitItemWrite(ctx context.Context, req *kv.OperationG return nil } -func (d *DynamoDBServer) getItem(w http.ResponseWriter, r *http.Request) { +func (d *DynamoDBServer) parseGetItemInput(w http.ResponseWriter, r *http.Request) (getItemInput, bool) { body, err := io.ReadAll(maxDynamoBodyReader(w, r)) if err != nil { writeDynamoError(w, http.StatusBadRequest, dynamoErrValidation, err.Error()) - return + return getItemInput{}, false } var in getItemInput if err := json.Unmarshal(body, &in); err != nil { writeDynamoError(w, http.StatusBadRequest, dynamoErrValidation, err.Error()) - return + return getItemInput{}, false } if strings.TrimSpace(in.TableName) == "" { writeDynamoError(w, http.StatusBadRequest, dynamoErrValidation, "missing table name") - return + return getItemInput{}, false } if err := d.ensureLegacyTableMigration(r.Context(), in.TableName); err != nil { writeDynamoErrorFromErr(w, err) - return + return getItemInput{}, false } + return in, true +} - readTS := d.resolveDynamoReadTS(in.ConsistentRead) - schema, exists, err := d.loadTableSchemaAt(r.Context(), in.TableName, readTS) - if err != nil { +func (d *DynamoDBServer) getItem(w http.ResponseWriter, r *http.Request) { + in, ok := d.parseGetItemInput(w, r) + if !ok { + return + } + // Tentative TS for schema resolution only; schemas change rarely + // so a slight pre-lease stale is acceptable. The item read below + // is sampled AFTER the lease check. + tentativeTS := d.resolveDynamoReadTS(in.ConsistentRead) + _, itemKey, ok := d.resolveGetItemTarget(w, r, in, tentativeTS) + if !ok { + return + } + // Lease-check the shard that actually owns the ITEM key with a + // bounded timeout so a stalled Raft cannot hang this handler + // indefinitely if the client never cancels. Use defer so the + // cancel runs even if LeaseReadForKey panics or a future + // refactor inserts an early return; the cost of keeping ctx + // alive until handler exit is negligible because the next + // in-handler calls are local store reads. + leaseCtx, leaseCancel := context.WithTimeout(r.Context(), dynamoLeaseReadTimeout) + defer leaseCancel() + if _, err := kv.LeaseReadForKeyThrough(d.coordinator, leaseCtx, itemKey); err != nil { writeDynamoError(w, http.StatusInternalServerError, dynamoErrInternal, err.Error()) return } - if !exists { - writeDynamoError(w, http.StatusBadRequest, dynamoErrResourceNotFound, "table not found") + // Re-sample readTS AFTER the lease confirmation so that any write + // that completed on the same shard BEFORE the confirmation is + // visible. Sampling earlier would violate linearizability for + // ConsistentRead=false reads by returning a snapshot from before + // the most recent confirmed commit. + readTS := d.resolveDynamoReadTS(in.ConsistentRead) + // Pin readTS so concurrent MVCC GC cannot reclaim versions + // between the schema revalidation and the item read below; + // matches the pattern already used by queryItems / scanItems / + // transactGetItems. + readPin := d.pinReadTS(readTS) + defer readPin.Release() + + // Re-resolve schema + itemKey at readTS and verify that the key + // we lease-checked is STILL the key that will be read. A table + // migration that commits between the tentative schema load and + // the lease confirmation may shift the item to a different shard + // even if the request parameters are unchanged, so comparing the + // computed item keys (not just generation) catches any future + // schema change that alters item routing. + finalSchema, freshItemKey, ok := d.resolveGetItemTarget(w, r, in, readTS) + if !ok { + return + } + if !bytes.Equal(freshItemKey, itemKey) { + writeDynamoError(w, http.StatusServiceUnavailable, dynamoErrInternal, + "table routing changed during read; please retry") return } - current, found, err := d.readLogicalItemAt(r.Context(), schema, in.Key, readTS) + current, found, err := d.readLogicalItemAt(r.Context(), finalSchema, in.Key, readTS) if err != nil { writeDynamoError(w, http.StatusBadRequest, dynamoErrValidation, err.Error()) return @@ -1389,6 +1442,27 @@ func (d *DynamoDBServer) getItem(w http.ResponseWriter, r *http.Request) { writeDynamoJSON(w, map[string]any{"Item": projected}) } +// resolveGetItemTarget loads the schema and computes the item key whose +// shard must be lease-checked before the read. Returns false after +// writing an error response; the caller should simply return. +func (d *DynamoDBServer) resolveGetItemTarget(w http.ResponseWriter, r *http.Request, in getItemInput, readTS uint64) (*dynamoTableSchema, []byte, bool) { + schema, exists, err := d.loadTableSchemaAt(r.Context(), in.TableName, readTS) + if err != nil { + writeDynamoError(w, http.StatusInternalServerError, dynamoErrInternal, err.Error()) + return nil, nil, false + } + if !exists { + writeDynamoError(w, http.StatusBadRequest, dynamoErrResourceNotFound, "table not found") + return nil, nil, false + } + itemKey, err := schema.itemKeyFromAttributes(in.Key) + if err != nil { + writeDynamoError(w, http.StatusBadRequest, dynamoErrValidation, err.Error()) + return nil, nil, false + } + return schema, itemKey, true +} + func (d *DynamoDBServer) deleteItem(w http.ResponseWriter, r *http.Request) { in, shouldReturnOld, err := decodeDeleteItemInput(maxDynamoBodyReader(w, r)) if err != nil { diff --git a/adapter/dynamodb_test.go b/adapter/dynamodb_test.go index 07c872ec..de9e6b97 100644 --- a/adapter/dynamodb_test.go +++ b/adapter/dynamodb_test.go @@ -1859,3 +1859,11 @@ func (w *testCoordinatorWrapper) Clock() *kv.HLC { func (w *testCoordinatorWrapper) LinearizableRead(ctx context.Context) (uint64, error) { return w.inner.LinearizableRead(ctx) } + +func (w *testCoordinatorWrapper) LeaseRead(ctx context.Context) (uint64, error) { + return kv.LeaseReadThrough(w.inner, ctx) +} + +func (w *testCoordinatorWrapper) LeaseReadForKey(ctx context.Context, key []byte) (uint64, error) { + return kv.LeaseReadForKeyThrough(w.inner, ctx, key) +} diff --git a/adapter/redis.go b/adapter/redis.go index a2c5cc7c..c32fd4d2 100644 --- a/adapter/redis.go +++ b/adapter/redis.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "log" + "log/slog" "maps" "math" "net" @@ -264,6 +265,15 @@ type RedisServer struct { relayConnCache kv.GRPCConnCache requestObserver monitoring.RedisRequestObserver luaObserver monitoring.LuaScriptObserver + // baseCtx is the parent context for per-request handlers. + // NewRedisServer creates a cancelable context here; Stop() cancels + // it so in-flight handlers abort promptly instead of running + // unbounded on context.Background(). Test stubs that construct + // RedisServer literals directly (bypassing NewRedisServer) may + // leave baseCtx nil; handlerContext() falls back to + // context.Background() in that case. + baseCtx context.Context + baseCancel context.CancelFunc // TODO manage membership from raft log leaderRedis map[raft.ServerAddress]string @@ -359,6 +369,7 @@ func NewRedisServer(listen net.Listener, redisAddr string, store store.MVCCStore if relay == nil { relay = NewRedisPubSubRelay() } + baseCtx, baseCancel := context.WithCancel(context.Background()) r := &RedisServer{ listen: listen, store: store, @@ -371,6 +382,8 @@ func NewRedisServer(listen net.Listener, redisAddr string, store store.MVCCStore pubsub: newRedisPubSub(), scriptCache: map[string]string{}, traceCommands: os.Getenv("ELASTICKV_REDIS_TRACE") == "1", + baseCtx: baseCtx, + baseCancel: baseCancel, } r.relay.Bind(r.publishLocal) @@ -527,6 +540,31 @@ func (r *RedisServer) dispatchCommand(conn redcon.Conn, name string, handler fun } } +// handlerContext returns the base context for a request handler. +// Falls back to context.Background() when the server was constructed +// by a test stub that bypassed NewRedisServer. Handlers that need a +// deadline should wrap this via context.WithTimeout. +func (r *RedisServer) handlerContext() context.Context { + if r == nil || r.baseCtx == nil { + return context.Background() + } + return r.baseCtx +} + +// Close cancels the base context, signalling all in-flight handlers to +// abort. Idempotent. The underlying redcon listener is still owned by +// the caller; Close does NOT touch it so shutdown orchestration can +// remain with the server owner. +func (r *RedisServer) Close() error { + if r == nil { + return nil + } + if r.baseCancel != nil { + r.baseCancel() + } + return nil +} + func (r *RedisServer) Run() error { err := redcon.Serve(r.listen, func(conn redcon.Conn, cmd redcon.Command) { @@ -809,8 +847,23 @@ func (r *RedisServer) observeRedisSuccess(command string, dur time.Duration) { } func (r *RedisServer) Stop() { - _ = r.relayConnCache.Close() - _ = r.listen.Close() + // Cancel baseCtx first so in-flight handlers observe a cancelled + // context before their network connections are torn down. + _ = r.Close() + if err := r.relayConnCache.Close(); err != nil { + slog.Warn("redis server: relay conn cache close", + slog.String("addr", r.redisAddr), + slog.Any("err", err), + ) + } + if r.listen != nil { + if err := r.listen.Close(); err != nil && !errors.Is(err, net.ErrClosed) { + slog.Warn("redis server: listener close", + slog.String("addr", r.redisAddr), + slog.Any("err", err), + ) + } + } } func (r *RedisServer) publishLocal(channel, message []byte) int64 { @@ -948,7 +1001,7 @@ func (r *RedisServer) set(conn redcon.Conn, cmd redcon.Command) { return } - ctx, cancel := context.WithTimeout(context.Background(), redisDispatchTimeout) + ctx, cancel := context.WithTimeout(r.handlerContext(), redisDispatchTimeout) defer cancel() if opts.isFastPath() && r.trySetFastPath(conn, ctx, cmd.Args[1], cmd.Args[2], opts.ttl) { @@ -981,8 +1034,21 @@ func (r *RedisServer) get(conn redcon.Conn, cmd redcon.Command) { return } + // Single bounded context for the slow paths in this handler, + // derived from the server's base context so Close() cancels any + // in-flight handler instead of leaving it running on a detached + // context.Background(). Only LeaseReadForKey and keyTypeAt accept + // a context; readRedisStringAt is a local-store read that does + // not take one. The shared deadline bounds the only branches + // that can actually block on quorum / I/O. + ctx, cancel := context.WithTimeout(r.handlerContext(), redisDispatchTimeout) + defer cancel() + if _, err := kv.LeaseReadForKeyThrough(r.coordinator, ctx, key); err != nil { + conn.WriteError(err.Error()) + return + } readTS := r.readTS() - typ, err := r.keyTypeAt(context.Background(), key, readTS) + typ, err := r.keyTypeAt(ctx, key, readTS) if err != nil { conn.WriteError(err.Error()) return @@ -1080,7 +1146,7 @@ func (r *RedisServer) tryLeaderLogicalExists(key []byte) bool { // If this path is unavailable we fall back to raw-KV probing, which is // best-effort and may lag unflushed buffer-only TTL updates. if cli, err := r.leaderClientForKey(key); err == nil { - ctx, cancel := context.WithTimeout(context.Background(), redisDispatchTimeout) + ctx, cancel := context.WithTimeout(r.handlerContext(), redisDispatchTimeout) defer cancel() if count, existsErr := cli.Exists(ctx, string(key)).Result(); existsErr == nil { return count > 0 @@ -1138,7 +1204,7 @@ func (r *RedisServer) del(conn redcon.Conn, cmd redcon.Command) { } func (r *RedisServer) delLocal(keys [][]byte) (int, error) { - ctx, cancel := context.WithTimeout(context.Background(), redisDispatchTimeout) + ctx, cancel := context.WithTimeout(r.handlerContext(), redisDispatchTimeout) defer cancel() var removed int err := r.retryRedisWrite(ctx, func() error { @@ -1454,7 +1520,7 @@ func (r *RedisServer) proxyKeys(pattern []byte) ([]string, error) { cli := r.getOrCreateLeaderClient(leaderAddr) - ctx, cancel := context.WithTimeout(context.Background(), redisDispatchTimeout) + ctx, cancel := context.WithTimeout(r.handlerContext(), redisDispatchTimeout) defer cancel() keys, err := cli.Keys(ctx, string(pattern)).Result() @@ -1523,7 +1589,7 @@ func (r *RedisServer) proxyTransactionToLeader(conn redcon.Conn, queue []redcon. } cli := r.getOrCreateLeaderClient(leaderAddr) - ctx, cancel := context.WithTimeout(context.Background(), redisDispatchTimeout) + ctx, cancel := context.WithTimeout(r.handlerContext(), redisDispatchTimeout) defer cancel() cmds, err := r.execTxPipeline(ctx, cli, queue) @@ -2325,7 +2391,7 @@ func (t *txnContext) commit() error { CommitTS: commitTS, ReadKeys: readKeys, } - ctx, cancel := context.WithTimeout(context.Background(), redisDispatchTimeout) + ctx, cancel := context.WithTimeout(t.server.handlerContext(), redisDispatchTimeout) defer cancel() if _, err := t.server.coordinator.Dispatch(ctx, group); err != nil { return errors.WithStack(err) @@ -2571,7 +2637,7 @@ func (t *txnContext) buildTTLElems() []*kv.Elem[kv.OP] { } func (r *RedisServer) runTransaction(queue []redcon.Command) ([]redisResult, error) { - dispatchCtx, cancel := context.WithTimeout(context.Background(), redisDispatchTimeout) + dispatchCtx, cancel := context.WithTimeout(r.handlerContext(), redisDispatchTimeout) defer cancel() var results []redisResult @@ -3086,7 +3152,7 @@ func (r *RedisServer) proxyLRange(key []byte, startRaw, endRaw []byte) ([]string return nil, err } - ctx, cancel := context.WithTimeout(context.Background(), redisDispatchTimeout) + ctx, cancel := context.WithTimeout(r.handlerContext(), redisDispatchTimeout) defer cancel() res, err := cli.LRange(ctx, string(key), int64(start), int64(end)).Result() @@ -3110,7 +3176,7 @@ func (r *RedisServer) proxyRPush(key []byte, values [][]byte) (int64, error) { args = append(args, string(v)) } - ctx, cancel := context.WithTimeout(context.Background(), redisDispatchTimeout) + ctx, cancel := context.WithTimeout(r.handlerContext(), redisDispatchTimeout) defer cancel() res, err := cli.RPush(ctx, string(key), args...).Result() @@ -3134,7 +3200,7 @@ func (r *RedisServer) proxyLPush(key []byte, values [][]byte) (int64, error) { args = append(args, string(v)) } - ctx, cancel := context.WithTimeout(context.Background(), redisDispatchTimeout) + ctx, cancel := context.WithTimeout(r.handlerContext(), redisDispatchTimeout) defer cancel() res, err := cli.LPush(ctx, string(key), args...).Result() @@ -3189,7 +3255,7 @@ func (r *RedisServer) proxyToLeader(conn redcon.Conn, cmd redcon.Command, key [] return true } - ctx, cancel := context.WithTimeout(context.Background(), redisDispatchTimeout) + ctx, cancel := context.WithTimeout(r.handlerContext(), redisDispatchTimeout) defer cancel() args := make([]interface{}, len(cmd.Args)) @@ -3266,7 +3332,7 @@ func (r *RedisServer) tryLeaderGetAt(key []byte, ts uint64) ([]byte, error) { return nil, errors.WithStack(err) } - ctx, cancel := context.WithTimeout(context.Background(), redisRelayPublishTimeout) + ctx, cancel := context.WithTimeout(r.handlerContext(), redisRelayPublishTimeout) defer cancel() cli := pb.NewRawKVClient(conn) diff --git a/adapter/redis_info_test.go b/adapter/redis_info_test.go index 73fae884..f815f6ae 100644 --- a/adapter/redis_info_test.go +++ b/adapter/redis_info_test.go @@ -35,6 +35,12 @@ func (c *infoTestCoordinator) Clock() *kv.HLC { } func (c *infoTestCoordinator) LinearizableRead(_ context.Context) (uint64, error) { return 0, nil } +func (c *infoTestCoordinator) LeaseRead(ctx context.Context) (uint64, error) { + return c.LinearizableRead(ctx) +} +func (c *infoTestCoordinator) LeaseReadForKey(ctx context.Context, _ []byte) (uint64, error) { + return c.LinearizableRead(ctx) +} func TestRedisServer_Info_LeaderRole(t *testing.T) { r := &RedisServer{ diff --git a/adapter/redis_keys_pattern_test.go b/adapter/redis_keys_pattern_test.go index 8692c261..3d4b7a8a 100644 --- a/adapter/redis_keys_pattern_test.go +++ b/adapter/redis_keys_pattern_test.go @@ -65,6 +65,14 @@ func (s *stubAdapterCoordinator) LinearizableRead(_ context.Context) (uint64, er return 0, s.verifyLeaderErr } +func (s *stubAdapterCoordinator) LeaseRead(ctx context.Context) (uint64, error) { + return s.LinearizableRead(ctx) +} + +func (s *stubAdapterCoordinator) LeaseReadForKey(ctx context.Context, _ []byte) (uint64, error) { + return s.LinearizableRead(ctx) +} + func (s *stubAdapterCoordinator) VerifyLeaderCalls() int32 { if s == nil { return 0 diff --git a/adapter/redis_lua_context.go b/adapter/redis_lua_context.go index a9995d52..12caa589 100644 --- a/adapter/redis_lua_context.go +++ b/adapter/redis_lua_context.go @@ -213,11 +213,12 @@ var luaRenameHandlers = map[redisValueType]luaRenameHandler{ } func newLuaScriptContext(ctx context.Context, server *RedisServer) (*luaScriptContext, error) { - // LinearizableRead confirms leadership via quorum AND waits for the local - // FSM to apply all committed entries, so startTS reflects the latest - // committed state. All subsequent reads within the script use snapshotGetAt - // (no per-call VerifyLeader), making VerifyLeader O(1) per script. - if _, err := server.coordinator.LinearizableRead(ctx); err != nil { + // LeaseRead confirms leadership at most once per LeaseDuration window; + // inside the window it returns immediately without a Raft round-trip. + // All subsequent reads within the script use snapshotGetAt at startTS, + // so leadership is verified at most once per script and amortised across + // scripts via the lease. + if _, err := kv.LeaseReadThrough(server.coordinator, ctx); err != nil { return nil, errors.WithStack(err) } startTS := server.readTS() diff --git a/adapter/redis_retry_test.go b/adapter/redis_retry_test.go index a765851a..1720418c 100644 --- a/adapter/redis_retry_test.go +++ b/adapter/redis_retry_test.go @@ -85,6 +85,14 @@ func (c *retryOnceCoordinator) LinearizableRead(_ context.Context) (uint64, erro return 0, nil } +func (c *retryOnceCoordinator) LeaseRead(ctx context.Context) (uint64, error) { + return c.LinearizableRead(ctx) +} + +func (c *retryOnceCoordinator) LeaseReadForKey(ctx context.Context, _ []byte) (uint64, error) { + return c.LinearizableRead(ctx) +} + type recordingConn struct { ctx any err string diff --git a/adapter/s3_test.go b/adapter/s3_test.go index 985a5431..cb816fa0 100644 --- a/adapter/s3_test.go +++ b/adapter/s3_test.go @@ -704,6 +704,14 @@ func (c *followerS3Coordinator) LinearizableRead(_ context.Context) (uint64, err return 0, kv.ErrLeaderNotFound } +func (c *followerS3Coordinator) LeaseRead(ctx context.Context) (uint64, error) { + return c.LinearizableRead(ctx) +} + +func (c *followerS3Coordinator) LeaseReadForKey(ctx context.Context, _ []byte) (uint64, error) { + return c.LinearizableRead(ctx) +} + func (c *followerS3Coordinator) RaftLeader() raft.ServerAddress { return raft.ServerAddress("leader") } diff --git a/cmd/server/demo.go b/cmd/server/demo.go index 148aba0c..d58909c8 100644 --- a/cmd/server/demo.go +++ b/cmd/server/demo.go @@ -557,6 +557,13 @@ func run(ctx context.Context, eg *errgroup.Group, cfg config) error { engine := hashicorpraftengine.New(r) trx := kv.NewTransactionWithProposer(engine, kv.WithProposalObserver(proposalObserver)) coordinator := kv.NewCoordinatorWithEngine(trx, engine, kv.WithHLC(hlc)) + defer func() { + // Release the leader-loss callback slot on the engine before + // the process exits. The engine itself is closed elsewhere in + // the shutdown path; both orderings are safe, but releasing + // the closure here matches the symmetric construction order. + _ = coordinator.Close() + }() distEngine := distribution.NewEngineWithDefaultRoute() distCatalog := distribution.NewCatalogStore(st) if _, err := distribution.EnsureCatalogSnapshot(ctx, distCatalog, distEngine); err != nil { diff --git a/docs/lease_read_design.md b/docs/lease_read_design.md new file mode 100644 index 00000000..85e582db --- /dev/null +++ b/docs/lease_read_design.md @@ -0,0 +1,488 @@ +# Lease Read Design + +Status: Proposed +Author: bootjp +Date: 2026-04-20 + +--- + +## 1. Background + +### 1.1 Current read paths + +elastickv has three read paths with different consistency mechanisms: + +| Path | Read fence | Quorum cost per read | +|---|---|---| +| DynamoDB getItem / query / scan | `snapshotTS()` only | 0 | +| Redis SET / GET / non-Lua commands | `snapshotTS()` only | 0 | +| Redis Lua EVAL / EVALSHA | `coordinator.LinearizableRead(ctx)` once at script start (PR #546) | 1 ReadIndex per script | + +`snapshotTS()` returns `store.LastCommitTS()` from the local FSM. It does not +verify leadership at read time. + +### 1.2 Observed problem + +After deploying the redis.call() optimization (PR #547 + #548), per-script +latency did not improve. Investigation showed: + +- `redis.call()` time accounts for ~100% of Lua VM time +- Average time per `redis.call()` invocation is 800 ms - 2.2 s +- `Raft Commit Time` is ~500 us (not the bottleneck) +- Single-key `SET x` and `GET x` from redis-cli take ~0.96 s and ~0.92 s + +Two distinct issues are hidden in the metric: + +1. The per-script `LinearizableRead` in `newLuaScriptContext` triggers a full + etcd/raft `ReadOnlySafe` ReadIndex (heartbeat broadcast + quorum + `MsgHeartbeatResp` wait) on every Lua script invocation. There is no term + cache or fast path in `submitRead` / `handleRead` + (`internal/raftengine/etcd/engine.go:524, :841`). +2. Independently, the recent change to `defaultHeartbeatTick = 10` and + `defaultElectionTick = 100` (PR #529) widened the worst-case Raft tick gap + from 10 ms to 100 ms, slowing operations that wait on Raft progress. + +This document addresses (1). Issue (2) is out of scope. + +### 1.3 Asymmetry to remove + +The current state is asymmetric: + +- DynamoDB and Redis non-Lua paths trust local state with no quorum check. + This already accepts a partition window of up to `electionTimeout` during + which a stale leader can serve stale reads (until `CheckQuorum` steps it + down). +- Lua paths pay a full quorum round-trip on every script. + +A unified lease-read API can give all three paths the same trade-off: +serve from local state when leadership is recently confirmed, fall back to +ReadIndex when it is not. + +--- + +## 2. Goals and Non-Goals + +### 2.1 Goals + +- Eliminate the per-script ReadIndex from the Lua path under steady load. +- Provide a single API used by all read paths, so the consistency trade-off + is documented in one place. +- Improve safety of DynamoDB / Redis non-Lua reads by attaching them to a + bounded lease, instead of unconditional trust of local state. +- Keep the change confined to elastickv layers; no fork of etcd/raft. + +### 2.2 Non-Goals + +- Strict linearizability under arbitrary network partitions. The design + retains a partition window of at most `electionTimeout - margin`, the same + trade-off TiKV's lease read accepts. +- Changes to `ReadOnlyOption` in etcd/raft. The fast path lives in elastickv; + the slow path still uses `ReadOnlySafe`. +- Multi-shard read transactions. Lease check is per-shard. + +--- + +## 3. Design + +### 3.1 Lease state + +Each `Coordinate` (single-shard) and each shard inside `ShardedCoordinator` +holds: + +```go +type leaseState struct { + gen atomic.Uint64 // bumped by invalidate() + expiry atomic.Pointer[time.Time] // nil = expired / invalidated +} +``` + +- `expiry == nil` or `time.Now() >= *expiry`: lease is expired. The next + `LeaseRead` falls back to `LinearizableRead` and refreshes the lease on + success. +- `time.Now() < *expiry`: lease is valid. `LeaseRead` returns immediately + without contacting the Raft layer. +- `invalidate()` increments `gen` before clearing `expiry`. `extend()` + captures `gen` at entry and, after its CAS lands, undoes its own + write (via CAS on the pointer it stored) iff `gen` has moved. This + prevents a Dispatch that succeeded just before a leader-loss + invalidate from resurrecting the lease milliseconds after it was + cleared. A fresh `extend()` that captured the post-invalidate + generation is left intact because it stored a different pointer. + +The lock-free form lets readers do one atomic load + one wall-clock compare +on the fast path. + +### 3.2 Lease duration + +The lease duration must be strictly less than `electionTimeout`. The bound +comes from etcd/raft: with `CheckQuorum: true`, a leader that loses contact +with majority steps down within at most `electionTimeout`. Until then, it can +still serve reads from local state. As long as our lease expires before the +leader could realistically be replaced and accept new writes elsewhere, local +reads are safe. + +The engine exposes: + +```go +func (e *Engine) LeaseDuration() time.Duration +``` + +Implementation: `electionTimeout - leaseSafetyMargin`, where +`electionTimeout = defaultTickInterval * defaultElectionTick`. With current +config: `10ms * 100 - 300ms = 700 ms`. + +`leaseSafetyMargin` (proposed: 300 ms) absorbs: + +- Goroutine scheduling delay between heartbeat ack and lease refresh. +- Wall-clock skew between leader and the partition's new leader candidate. +- GC pauses on the leader. + +The margin is conservative; reducing it shortens the post-write quiet window +during which lease reads still hit local state, at the cost of a smaller +safety buffer. + +### 3.3 Refresh triggers + +The lease is refreshed on: + +1. Any successful `engine.LinearizableRead(ctx)` returning without error. + The ReadIndex protocol confirmed quorum at that moment. +2. Any successful `engine.Propose(ctx, data)` whose result indicates commit. + A committed entry implies majority append + ack, which is a stronger + confirmation than ReadIndex. + +Both refresh base the new expiry on `preOpInstant + LeaseDuration()`, +where `preOpInstant` is captured BEFORE the quorum operation starts, not +after it returns. This is strictly conservative: any real quorum +confirmation must happen at or after `preOpInstant`, so the lease window +can only be shorter than the true safety window, never longer. +Post-operation sampling would let apply-queue depth / scheduling jitter +push the window past `electionTimeout`. + +Alongside the instant, the caller also captures the lease generation via +`leaseState.generation()` and passes both to `extend(until, expectedGen)`. +The generation guard prevents a leader-loss invalidation that fires +during the quorum operation from being silently overwritten by the +caller's post-op extend. + +Heartbeat ack tracking is intentionally not used. It would require deep +hooks into etcd/raft's internals and gives only a small marginal benefit +over (1) and (2). + +### 3.4 Invalidation triggers + +The lease is invalidated (set to nil) on: + +1. State transition out of leader. `refreshStatus` fires registered + `RegisterLeaderLossCallback` hooks on the `Leader -> non-Leader` edge, + and `fail()` / `shutdown()` fire the same hooks when tearing down a + node that was still leader, so the error-shutdown path does not leave + lease holders serving stale state. +2. Any error returned by `engine.Propose` or `engine.LinearizableRead` + from inside `LeaseRead` / `groupLeaseRead`. Implemented via + `c.lease.invalidate()` on the slow-path error branch. +3. A no-op Raft commit (`resp.CommitIndex == 0`): the underlying + `TransactionManager.{Commit,Abort}` can short-circuit on empty input + or no-op abort without going through Raft. `leaseRefreshingTxn` and + `Coordinate.Dispatch` only refresh the lease when `CommitIndex > 0` + to avoid extending based on an operation that never reached quorum. + +Note: the previous draft of this doc listed "term-change detection" as +a separate defensive trigger. That is not implemented; (1) covers the +only case term changes matter (an old leader being demoted), and +adding an explicit term check would be redundant. + +### 3.5 API + +```go +// internal/raftengine/engine.go — optional capability +type LeaseProvider interface { + LeaseDuration() time.Duration + AppliedIndex() uint64 + // RegisterLeaderLossCallback returns a deregister closure so + // short-lived holders can release their slot. Long-lived holders + // whose lifetime matches the engine's may ignore the return. + RegisterLeaderLossCallback(fn func()) (deregister func()) +} + +// kv/coordinator.go +type Coordinator interface { + // ...existing... + LeaseRead(ctx context.Context) (uint64, error) + LeaseReadForKey(ctx context.Context, key []byte) (uint64, error) +} + +// Concrete *Coordinate / *ShardedCoordinator additionally expose +// Close() which calls the stored deregister. Close is NOT on the +// Coordinator interface to keep adapter test stubs unchanged. +``` + +Returned index is the engine's applied index at the moment of return. Callers +that use `store.LastCommitTS()` can ignore the index; callers that need an +explicit fence can use it. + +Pseudocode (matches `Coordinate.LeaseRead`; the sharded variant is the same +with per-shard `g.lease` and `g.Engine`): + +```go +func (c *Coordinate) LeaseRead(ctx context.Context) (uint64, error) { + lp, ok := c.engine.(raftengine.LeaseProvider) + if !ok { + return c.LinearizableRead(ctx) // hashicorp engine, test stubs + } + if d := lp.LeaseDuration(); d <= 0 { + // Misconfigured tick settings disable the lease entirely. + return c.LinearizableRead(ctx) + } + // Capture time.Now() AND lease.generation() exactly once before + // any quorum work. The generation guard prevents a leader-loss + // callback that fires during LinearizableRead from being + // silently overwritten by the post-op extend. + now := time.Now() + expectedGen := c.lease.generation() + if c.lease.valid(now) { + return lp.AppliedIndex(), nil + } + idx, err := c.LinearizableRead(ctx) + if err != nil { + c.lease.invalidate() + return 0, err + } + // `now` was sampled strictly before LinearizableRead ran, so the + // resulting lease window is strictly conservative. `expectedGen` + // is the pre-op generation; extend rejects the CAS if invalidate + // advanced it during the quorum operation. + c.lease.extend(now.Add(lp.LeaseDuration()), expectedGen) + return idx, nil +} +``` + +### 3.6 Application sites + +| File | Current | After | +|---|---|---| +| `adapter/redis_lua_context.go:215` | `LinearizableRead` once per script | `LeaseRead` once per script | +| `adapter/redis.go` `get`, `keyTypeAt` callers | no fence | `LeaseRead` once per command | +| `adapter/redis.go` `set` and other write commands | no fence | implicit via `Propose` refresh | +| `adapter/dynamodb.go` `getItem`, `query`, `scan` | no fence | `LeaseRead` once per request | +| `adapter/dynamodb.go` write paths | no fence | implicit via `Propose` refresh | + +For write paths, calling `LeaseRead` separately is not required: `Propose` +already confirms quorum at commit time and refreshes the lease. + +For read paths, `LeaseRead` is added at the entry of the handler. The +existing `snapshotTS()` call is unchanged. + +--- + +## 4. Safety + +### 4.1 Correctness invariant + +A lease read returning index `i` to a caller is safe iff, at the moment of +return, no other node holds a strictly higher commit index in a later term +that is visible to clients. Equivalently: this leader is still the unique +leader, and no replacement leader has accepted writes that this node has +not seen. + +### 4.2 Why the lease bound is sufficient + +etcd/raft with `CheckQuorum: true` enforces: + +- A leader that fails to receive `MsgHeartbeatResp` from majority within + `electionTimeout` steps down (becomes follower). +- A new leader cannot be elected until the previous leader's followers time + out their election ticks, which is at least `electionTimeout`. + +Combined: between losing quorum and a successor leader accepting writes, +at least `electionTimeout` of wall-clock time elapses on the followers' +clocks. + +If the lease is refreshed at time `t0` (heartbeat ack received at `t0` is the +implicit refresh signal, modulo the margin discussion in 4.3), and the lease +duration is `electionTimeout - margin`, then the lease expires at +`t0 + electionTimeout - margin`. Any client read before that time runs on a +leader that, modulo clock skew bounded by `margin`, has not yet been +replaced. + +### 4.3 Refresh-vs-ack gap + +The design refreshes the lease on `LinearizableRead` and `Propose` +completion, not on individual heartbeat acks. The ambiguity of exactly +when quorum was confirmed during the operation is bounded by sampling +the lease base BEFORE the operation starts (see 3.3): any real quorum +confirmation happens at or after that sample, so the computed window +can only be shorter than the true safety window, never longer. The +`leaseSafetyMargin` still absorbs bounded wall-clock skew between the +leader's local clock and a partition's successor leader's clock; it no +longer has to absorb the round-trip latency of the quorum operation. + +### 4.4 Comparison to current state + +| Path | Current safety window | After lease | Notes | +|---|---|---|---| +| DynamoDB / Redis non-Lua read | up to `electionTimeout` (until CheckQuorum step-down) | up to `LeaseDuration()` | strictly improved | +| Lua read | 0 (full ReadIndex per script) | up to `LeaseDuration()` | strictly weaker, matches the others | + +The Lua change accepts the same trade-off the other paths already accept. + +### 4.5 Read-then-write inside Lua + +Lua scripts often read state, compute, then write. The write goes through +`Propose`, which requires quorum. A stale leader's `Propose` cannot commit +because it cannot reach majority (the quorum is on the other side of the +partition). + +So a stale lease read inside a Lua script cannot directly cause a stale +write to commit. However: + +- The script may decide to write based on a stale-but-not-divergent value + (e.g. lost-update on a counter). This is the same hazard DynamoDB + conditional writes face today. +- The client may receive a success response from the stale leader for the + read portion before the write fails. With the current code, the entire + script is wrapped in a single Raft proposal, so the script either commits + atomically or returns an error. The lease change does not alter this. + +### 4.6 Failure modes considered + +- Leader losing quorum but lease still valid: stale read possible. Window + is bounded by `LeaseDuration() < electionTimeout`. Any write attempt + within the window fails because `Propose` cannot reach quorum. Same + trade-off DynamoDB / Redis non-Lua already accept today. +- Leader losing quorum and lease expired: next `LeaseRead` calls + `LinearizableRead`, which fails (no quorum), error propagated to caller. + Lease invalidated. +- Leader transferring leadership: `refreshStatus` detects state transition + out of leader and invalidates the lease. +- Clock skew exceeding `leaseSafetyMargin`: lease may extend beyond + `electionTimeout`, allowing a stale read after a successor leader has + accepted writes. Mitigation: keep `leaseSafetyMargin` larger than the + documented clock-skew SLO of the deployment. Default 300 ms is consistent + with the HLC physical window of 3 s used elsewhere. + +--- + +## 5. Implementation Plan + +### Phase 1: engine surface — DONE +1. Added `LeaseProvider` as a separate optional interface in + `internal/raftengine/engine.go` (not on `LeaderView`) so non-etcd + engines and test stubs can omit lease methods. Etcd engine implements + `LeaseDuration() time.Duration` and `AppliedIndex() uint64`. +2. `RegisterLeaderLossCallback(fn func())` was added to `LeaseProvider` + in the follow-up review pass; the etcd engine fires registered + callbacks from `refreshStatus` whenever the local node leaves the + leader role. + +### Phase 2: coordinator lease — DONE +1. `leaseState` (lock-free `atomic.Pointer[time.Time]` with monotonic + CAS extend) added to `Coordinate`; `ShardGroup` gets a per-shard + `leaseState`. +2. `Coordinate.LeaseRead` / `Coordinate.LeaseReadForKey` and + `ShardedCoordinator.LeaseRead` / `ShardedCoordinator.LeaseReadForKey` + implemented. Time is sampled BEFORE the underlying + `LinearizableRead` so the lease window starts at quorum + confirmation. +3. `Coordinate.Dispatch` refreshes the lease on successful commit using + the pre-dispatch timestamp. `ShardedCoordinator` wraps each + `g.Txn` in `leaseRefreshingTxn` so all dispatch paths (raw via + `router.Commit`, `dispatchSingleShardTxn`, `dispatchTxn` 2PC, and + `dispatchDelPrefixBroadcast`) refresh the per-shard lease on + `Commit` / `Abort` success. +4. `NewCoordinatorWithEngine` and `NewShardedCoordinator` register + `lease.invalidate` via the `LeaseProvider.RegisterLeaderLossCallback` + hook, so the engine's `refreshStatus` invalidates the lease the + instant it observes a non-leader transition. + +### Phase 3: callers — PARTIAL +1. DONE: `adapter/redis_lua_context.go:newLuaScriptContext` uses + `LeaseRead` instead of `LinearizableRead`. +2. DONE for the highest-traffic single-key handlers; rest tracked as #557: + - DONE: `adapter/redis.go` `get` (with bounded + `redisDispatchTimeout` context). + - DONE: `adapter/dynamodb.go` `getItem`. + - DEFERRED (#557): `adapter/redis.go` `keys`, `exists`-family, + ZSet/Hash/List/Set readers; `adapter/dynamodb.go` `query`, `scan`, + `transactGetItems`, `batchGetItem`. Rely on the lease being kept + warm by Lua scripts and successful Dispatch calls; safety identical + to pre-PR (no quorum check). +3. No change to write paths beyond the implicit refresh via the + `Coordinate.Dispatch` / `leaseRefreshingTxn` hooks. + +### Phase 4: tests — PARTIAL +1. DONE: `kv/lease_state_test.go` covers `leaseState` extend, expire, + invalidate, monotonic CAS, invalidate-vs-extend race. +2. DONE: `kv/lease_read_test.go` covers `Coordinate.LeaseRead` fast / + slow / error / fallback paths and the leader-loss callback wiring. + `kv/sharded_lease_test.go` covers `ShardedCoordinator` per-shard + isolation and per-shard leader-loss wiring. +3. DONE: `TestCoordinate_LeaseRead_AmortizesLinearizableRead` proves + 100 LeaseRead calls inside one lease window trigger exactly 1 + underlying LinearizableRead. Stronger end-to-end Lua amortization + under the adapter is implicit — `newLuaScriptContext` is the single + call site and is exercised by every Lua test. +4. DEFERRED: Jepsen partition workload asserting no stale-read + linearizability violation outside the lease window. Substantial + scope; tracked separately. Existing Jepsen `redis-workload` already + exercises the lease path under partition + kill faults, just without + a lease-specific assertion. + +### Phase 5: rollout +- Land Phases 1-3 behind no flag. The semantics are strictly equivalent or + stronger than today for non-Lua paths and weaker (but documented) for Lua. +- Monitor `LinearizableRead` call rate and Lua per-script latency before + and after deploy. + +--- + +## 6. Alternatives Considered + +### 6.1 Switch etcd/raft to ReadOnlyLeaseBased +One-line change: `ReadOnlyOption: etcdraft.ReadOnlyLeaseBased`. The leader +serves ReadIndex from local state without heartbeat broadcast, relying on +`CheckQuorum` for safety. + +Rejected because: +- Lease semantics are implicit and tied to etcd/raft internals. +- The lease boundary is not surfaced to the elastickv layer, so we cannot + track it for metrics or use it for non-engine reads. +- Future divergence: if elastickv ever needs to apply lease semantics to + read paths that do not call into the engine, this option does not help. + +The proposed design is essentially `ReadOnlyLeaseBased` reimplemented one +level up, with explicit timeout tracking. + +### 6.2 Term cache only, no time bound +Cache the current term; skip ReadIndex if term has not changed. + +Rejected because: +- Term changes are not the only safety trigger. A leader that loses quorum + but has not yet stepped down keeps the same term while serving stale + reads. CheckQuorum eventually catches it, but a term-only check has no + bound on the stale-read window. +- The proposed lease design subsumes the term check: leader transition + invalidates the lease, and time bounds the window even before a + transition is detected. + +### 6.3 Per-call heartbeat ack tracking inside engine +Hook into `handleHeartbeatResp` to refresh lease on every quorum ack. + +Rejected for the initial implementation because: +- Requires deeper integration with etcd/raft message handling. +- The marginal latency benefit over `Propose`-driven refresh is small under + any non-trivial write load. +- Can be added later without changing the API. + +--- + +## 7. Open Questions + +1. Should `LeaseDuration` be configurable per deployment, or kept as a + derived constant? Proposal: derived constant, exposed as a method. + Operators tune `defaultElectionTick` instead. +2. Should `LeaseRead` return the engine applied index or the store + `LastCommitTS()`? Proposal: applied index (matches `LinearizableRead`), + callers convert as needed. +3. Should a metric be added for lease hit/miss ratio? Proposal: yes, + `elastickv_lease_read_total{outcome="hit|miss|error"}`. diff --git a/internal/raftengine/engine.go b/internal/raftengine/engine.go index 5c1bbba1..e97afd6a 100644 --- a/internal/raftengine/engine.go +++ b/internal/raftengine/engine.go @@ -67,6 +67,39 @@ type LeaderView interface { LinearizableRead(ctx context.Context) (uint64, error) } +// LeaseProvider is an optional capability implemented by engines that support +// leader-local lease reads. Callers that want lease-based reads should +// type-assert to this interface and fall back to LinearizableRead when the +// underlying engine does not implement it. +type LeaseProvider interface { + // LeaseDuration returns the time during which a lease holder can serve + // reads from local state without re-confirming leadership via ReadIndex. + LeaseDuration() time.Duration + // AppliedIndex returns the highest log index applied to the local FSM. + AppliedIndex() uint64 + // RegisterLeaderLossCallback registers fn to be invoked whenever the + // local node leaves the leader role (graceful transfer, partition + // step-down, or shutdown). Callers use this to invalidate any + // leader-local lease they hold so the next read takes the slow path. + // Multiple callbacks can be registered. + // + // Callbacks fire synchronously from the engine's status-refresh + // / shutdown path and MUST be non-blocking -- each should be a + // lock-free flag flip (e.g. atomic invalidate). A panicking + // callback is contained so a bug in one holder cannot break + // others, but a blocking callback would stall the engine's main + // loop, so the contract is strict. Lease-read fast paths also + // guard on engine.State() to close the narrow race between a + // transition and this callback completing. + // + // The returned function deregisters this callback and is safe to + // call multiple times. Callers whose lifetime is shorter than the + // engine's (ephemeral Coordinators in tests, for example) MUST + // invoke the returned deregister when they are done so the engine + // does not accumulate dead callbacks. + RegisterLeaderLossCallback(fn func()) (deregister func()) +} + type StatusReader interface { Status() Status } diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index 28d7f24a..fddfb9c4 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -7,6 +7,7 @@ import ( "io" "log/slog" "path/filepath" + "runtime/debug" "sort" "strconv" "sync" @@ -24,6 +25,12 @@ const ( defaultTickInterval = 10 * time.Millisecond defaultHeartbeatTick = 10 // 100ms at 10ms interval defaultElectionTick = 100 // 1s at 10ms interval (10x heartbeat, etcd/raft recommended ratio) + // leaseSafetyMargin is subtracted from electionTimeout when computing the + // duration of a leader-local read lease. It absorbs goroutine scheduling + // delay between heartbeat ack and lease refresh, GC pauses on the leader, + // and bounded wall-clock skew between the leader and a partition's new + // leader candidate. See docs/lease_read_design.md for the safety argument. + leaseSafetyMargin = 300 * time.Millisecond // defaultMaxInflightMsg controls how many in-flight MsgApp messages Raft // allows per peer before waiting for an ACK (etcd/raft default: 256). // It also sets the per-peer dispatch channel capacity; total buffered memory @@ -108,6 +115,7 @@ type Engine struct { dataDir string fsmSnapDir string tickInterval time.Duration + electionTick int storage *etcdraft.MemoryStorage rawNode *etcdraft.RawNode @@ -151,6 +159,13 @@ type Engine struct { runErr error closed bool applied uint64 + // appliedIndex mirrors the current applied-entry index for + // lock-free readers on the lease-read fast path. Writers inside + // the Raft run loop update both `applied` (protected by the run + // loop's single-writer invariant) and `appliedIndex.Store(...)`. + // AppliedIndex() reads via atomic.Load so it does not contend + // with refreshStatus's write lock. + appliedIndex atomic.Uint64 // configIndex tracks the highest configuration index durably published to // local raft snapshot state and peer metadata. configIndex atomic.Uint64 @@ -165,6 +180,19 @@ type Engine struct { dispatchDropCount atomic.Uint64 dispatchErrorCount atomic.Uint64 + // leaderLossCbsMu guards the slice of callbacks invoked when the node + // transitions out of the leader role (graceful transfer, partition + // step-down, shutdown). Callbacks fire synchronously from the + // leader-loss handling path and MUST be non-blocking; a slow + // callback would hold up refreshStatus / shutdown / fail. See + // RegisterLeaderLossCallback for the full contract. Each entry + // carries a sentinel pointer so that the deregister closure + // returned by RegisterLeaderLossCallback can identify THIS + // specific registration even if the same fn is registered + // multiple times. + leaderLossCbsMu sync.Mutex + leaderLossCbs []leaderLossSlot + pendingProposals map[uint64]proposalRequest pendingReads map[uint64]readRequest pendingConfigs map[uint64]adminRequest @@ -290,6 +318,7 @@ func Open(ctx context.Context, cfg OpenConfig) (*Engine, error) { dataDir: prepared.cfg.DataDir, fsmSnapDir: filepath.Join(prepared.cfg.DataDir, fsmSnapDirName), tickInterval: prepared.cfg.TickInterval, + electionTick: prepared.cfg.ElectionTick, storage: prepared.disk.Storage, rawNode: rawNode, persist: prepared.disk.Persist, @@ -314,9 +343,22 @@ func Open(ctx context.Context, cfg OpenConfig) (*Engine, error) { pendingConfigs: map[uint64]adminRequest{}, } engine.configIndex.Store(maxAppliedIndex(prepared.disk.LocalSnap)) + engine.appliedIndex.Store(maxAppliedIndex(prepared.disk.LocalSnap)) engine.initTransport(prepared.cfg) engine.initSnapshotWorker() engine.refreshStatus() + // Surface a misconfiguration where the tick settings produce a + // non-positive lease window: lease reads would never hit the fast + // path. Don't fail Open -- the engine is still functional via the + // slow LinearizableRead path -- but make the degradation visible. + if lease := engine.LeaseDuration(); lease <= 0 { + slog.Warn("etcd raft engine: lease read disabled (non-positive LeaseDuration)", + slog.Duration("tick_interval", engine.tickInterval), + slog.Int("election_tick", engine.electionTick), + slog.Duration("lease_safety_margin", leaseSafetyMargin), + slog.Duration("computed_lease", lease), + ) + } go engine.run() @@ -521,6 +563,159 @@ func (e *Engine) LinearizableRead(ctx context.Context) (uint64, error) { return e.submitRead(ctx, true) } +// LeaseDuration returns the time during which a lease holder can serve +// reads from local state without re-confirming leadership via ReadIndex. +// It is bounded by electionTimeout - leaseSafetyMargin so that the lease +// expires before a successor leader could realistically be elected and +// accept new writes elsewhere. +func (e *Engine) LeaseDuration() time.Duration { + if e == nil { + return 0 + } + tick := e.tickInterval + if tick <= 0 { + tick = defaultTickInterval + } + election := e.electionTick + if election <= 0 { + election = defaultElectionTick + } + d := time.Duration(election)*tick - leaseSafetyMargin + if d < 0 { + return 0 + } + return d +} + +// AppliedIndex returns the highest log index applied to the local FSM. +// Suitable for callers that need a non-blocking read fence equivalent +// to what LinearizableRead would have returned, paired with an +// external quorum confirmation (e.g. a valid lease). +// +// Lock-free: reads the mirrored atomic.Uint64 written by the run +// loop's apply path (and by Restore's snapshot installation), so the +// lease-read fast path does not contend with refreshStatus's write +// lock under high read concurrency. +func (e *Engine) AppliedIndex() uint64 { + if e == nil { + return 0 + } + return e.appliedIndex.Load() +} + +// RegisterLeaderLossCallback registers fn to fire every time the local +// node's Raft state transitions out of leader (CheckQuorum step-down, +// graceful transfer completion, partition-induced demotion) and also +// on shutdown() while the node was still leader. Callbacks are NOT +// fired at the moment a transfer starts (LeadTransferee != 0); they +// only fire once the transfer completes and state flips to follower. +// Lease-read callers use this to invalidate cached lease state so the +// next read takes the slow path. +// +// Callbacks run synchronously from refreshStatus / shutdown / fail +// and MUST be non-blocking (each should be a fast, lock-free +// invalidation). A panic inside a callback is contained and logged +// so a bug in one holder cannot crash the engine or break other +// callbacks. LeaseRead also guards its fast path on +// engine.State() == StateLeader so the small window between the +// transition and this callback completing cannot serve stale reads. +// +// The returned deregister function removes this specific registration +// and is safe to call multiple times. Long-lived callers (coordinators +// whose lifetime matches the engine's) may ignore it; shorter-lived +// callers MUST invoke it to avoid accumulating dead callbacks in the +// engine's slice. +func (e *Engine) RegisterLeaderLossCallback(fn func()) (deregister func()) { + if e == nil || fn == nil { + return func() {} + } + // Allocate a unique sentinel pointer so the deregister closure can + // identify THIS specific registration even if the same fn is + // registered multiple times. + slot := &struct{ fn func() }{fn: fn} + e.leaderLossCbsMu.Lock() + e.leaderLossCbs = append(e.leaderLossCbs, leaderLossSlot{id: slot, fn: fn}) + e.leaderLossCbsMu.Unlock() + var once sync.Once + return func() { + once.Do(func() { + e.leaderLossCbsMu.Lock() + defer e.leaderLossCbsMu.Unlock() + for i, c := range e.leaderLossCbs { + if c.id != slot { + continue + } + // Remove without leaving a dangling reference at the + // tail of the underlying array. The removed slot's fn + // typically captures a *Coordinate; a plain + // `append(cbs[:i], cbs[i+1:]...)` would keep the old + // backing cell alive and prevent GC of the associated + // Coordinate until the engine itself is dropped. + last := len(e.leaderLossCbs) - 1 + copy(e.leaderLossCbs[i:], e.leaderLossCbs[i+1:]) + e.leaderLossCbs[last] = leaderLossSlot{} + e.leaderLossCbs = e.leaderLossCbs[:last] + return + } + }) + } +} + +// leaderLossSlot pairs a registered callback with an id-only sentinel +// pointer so deregister can distinguish identical fn values. +type leaderLossSlot struct { + id *struct{ fn func() } + fn func() +} + +// fireLeaderLossCallbacks invokes all registered callbacks +// synchronously. The registered-callback contract requires each fn +// to be non-blocking (a lock-free lease-invalidate flag flip), so +// inline execution is safe and avoids spawning an unbounded number +// of goroutines per leader-loss event when many shards / coordinators +// are registered. +// +// A panicking callback is still contained (see +// invokeLeaderLossCallback) so a bug in one holder cannot break +// subsequent callbacks or crash the process. +func (e *Engine) fireLeaderLossCallbacks() { + e.leaderLossCbsMu.Lock() + cbs := make([]func(), len(e.leaderLossCbs)) + for i, c := range e.leaderLossCbs { + cbs[i] = c.fn + } + e.leaderLossCbsMu.Unlock() + for _, fn := range cbs { + e.invokeLeaderLossCallback(fn) + } +} + +func (e *Engine) invokeLeaderLossCallback(fn func()) { + defer func() { + if r := recover(); r != nil { + // A buggy lease holder must not crash the node. Log the + // recovery so operators can see lease-invalidation hooks + // misbehaving in production; swallow the panic so the + // engine status loop / shutdown path continues. + // + // Note: if a callback panics before it invalidates its + // lease, fast-path reads on that lease keep succeeding + // until wall-clock expiry. Safety is then bounded by the + // lease duration (strictly shorter than electionTimeout), + // not by the slow-path re-verification. The slow path + // re-verifies leadership only once the lease has + // naturally expired. + slog.Error("etcd raft engine: leader-loss callback panicked", + slog.String("node_id", e.localID), + slog.Uint64("raft_node_id", e.nodeID), + slog.Any("panic", r), + slog.String("stack", string(debug.Stack())), + ) + } + }() + fn() +} + func (e *Engine) submitRead(ctx context.Context, waitApplied bool) (uint64, error) { if err := contextErr(ctx); err != nil { return 0, err @@ -1116,6 +1311,7 @@ func (e *Engine) applyReadySnapshot(snapshot raftpb.Snapshot) error { snapshot.Metadata.Index, snapshot.Metadata.Term) } e.applied = snapshot.Metadata.Index + e.appliedIndex.Store(snapshot.Metadata.Index) e.setConfigurationFromConfState(snapshot.Metadata.ConfState, snapshot.Metadata.Index) return nil } @@ -1186,7 +1382,7 @@ func (e *Engine) applyCommitted(entries []raftpb.Entry) error { switch entry.Type { case raftpb.EntryNormal: response := e.applyNormalEntry(entry) - e.applied = entry.Index + e.setApplied(entry.Index) e.resolveProposal(entry.Index, entry.Data, response) case raftpb.EntryConfChange: var cc raftpb.ConfChange @@ -1199,7 +1395,7 @@ func (e *Engine) applyCommitted(entries []raftpb.Entry) error { return err } e.applyConfigChange(cc.Type, cc.NodeID, cc.Context, entry.Index) - e.applied = entry.Index + e.setApplied(entry.Index) case raftpb.EntryConfChangeV2: var cc raftpb.ConfChangeV2 if err := cc.Unmarshal(entry.Data); err != nil { @@ -1211,14 +1407,23 @@ func (e *Engine) applyCommitted(entries []raftpb.Entry) error { return err } e.applyConfigChangeV2(cc, entry.Index) - e.applied = entry.Index + e.setApplied(entry.Index) default: - e.applied = entry.Index + e.setApplied(entry.Index) } } return nil } +// setApplied advances both the run-loop-owned `applied` field and the +// lock-free atomic mirror in a single place. Called exclusively from +// the Raft run loop, so no synchronization between the two writes is +// required beyond the single-writer invariant. +func (e *Engine) setApplied(index uint64) { + e.applied = index + e.appliedIndex.Store(index) +} + func (e *Engine) applyNormalEntry(entry raftpb.Entry) any { if len(entry.Data) == 0 { return nil @@ -1610,6 +1815,11 @@ func (e *Engine) refreshStatus() { } if previous == raftengine.StateLeader && status.State != raftengine.StateLeader { e.failPending(errors.WithStack(errNotLeader)) + // Notify lease holders so they invalidate any cached lease; + // without this hook, a former leader keeps serving fast-path + // reads from local state for up to LeaseDuration after a + // successor leader is already accepting writes. + e.fireLeaderLossCallbacks() } } @@ -1632,14 +1842,29 @@ func (e *Engine) requestShutdown() { func (e *Engine) shutdown() { e.mu.Lock() + wasLeader := e.status.State == raftengine.StateLeader e.closed = true e.status.State = raftengine.StateShutdown e.mu.Unlock() e.stopDispatchWorkers() e.stopSnapshotWorker() _ = closePersist(e.persist) - _ = e.transport.Close() + if err := e.transport.Close(); err != nil { + slog.Warn("etcd raft engine: transport close", + slog.String("node_id", e.localID), + slog.Any("err", err), + ) + } e.failPending(errors.WithStack(errClosed)) + // LeaseProvider contract promises callbacks fire on shutdown too. + // refreshStatus only fires them on the leader -> non-leader edge, + // which can be missed when shutdown short-circuits the status loop. + // Always fire here so lease holders invalidate even on engine close + // initiated while still leader, on shutdown after fail(), or via + // Close() racing against the run loop. + if wasLeader { + e.fireLeaderLossCallbacks() + } } func (e *Engine) fail(err error) { @@ -1672,6 +1897,7 @@ func (e *Engine) fail(err error) { ) } e.mu.Lock() + wasLeader := e.status.State == raftengine.StateLeader if err != nil { e.runErr = err } @@ -1684,8 +1910,21 @@ func (e *Engine) fail(err error) { e.stopDispatchWorkers() e.stopSnapshotWorker() _ = closePersist(e.persist) - _ = e.transport.Close() + if err := e.transport.Close(); err != nil { + slog.Warn("etcd raft engine: transport close", + slog.String("node_id", e.localID), + slog.Any("err", err), + ) + } e.failPending(e.currentErrorOrClosed()) + // LeaseProvider contract: fire leader-loss callbacks on shutdown if + // we were leader. fail() is the error-shutdown twin of shutdown(); + // without firing here, a run() -> fail() path that bypasses + // refreshStatus's Leader -> non-Leader edge leaves lease holders + // serving fast-path reads from stale state for up to LeaseDuration. + if wasLeader { + e.fireLeaderLossCallbacks() + } } func (e *Engine) failPending(err error) { diff --git a/internal/raftengine/etcd/leader_loss_callback_test.go b/internal/raftengine/etcd/leader_loss_callback_test.go new file mode 100644 index 00000000..7db6029f --- /dev/null +++ b/internal/raftengine/etcd/leader_loss_callback_test.go @@ -0,0 +1,64 @@ +package etcd + +import ( + "sync/atomic" + "testing" + + "github.com/stretchr/testify/require" +) + +// TestFireLeaderLossCallbacks_ContainsPanic verifies that a panicking +// callback does NOT take down the raft engine loop: the remaining +// callbacks still fire synchronously and the method returns normally. +func TestFireLeaderLossCallbacks_ContainsPanic(t *testing.T) { + t.Parallel() + + e := &Engine{} + var before, after atomic.Int32 + e.RegisterLeaderLossCallback(func() { before.Add(1) }) + e.RegisterLeaderLossCallback(func() { panic("lease holder bug") }) + e.RegisterLeaderLossCallback(func() { after.Add(1) }) + + require.NotPanics(t, e.fireLeaderLossCallbacks) + + require.Equal(t, int32(1), before.Load(), + "callbacks registered before the panicking one must have fired") + require.Equal(t, int32(1), after.Load(), + "callbacks registered after the panicking one must still fire") +} + +// TestFireLeaderLossCallbacks_NoCallbacksIsSafe exercises the empty-list +// fast path so the helper can be called unconditionally from shutdown +// and refreshStatus paths without a guard. +func TestFireLeaderLossCallbacks_NoCallbacksIsSafe(t *testing.T) { + t.Parallel() + e := &Engine{} + require.NotPanics(t, e.fireLeaderLossCallbacks) +} + +// TestAppliedIndex_LockFreeLoad confirms that AppliedIndex() reads the +// atomic mirror and does NOT acquire the engine's read-lock. +// Acquiring e.mu for write before calling AppliedIndex would deadlock +// if it were still RLock-based; the atomic path must return +// immediately regardless of lock state. +func TestAppliedIndex_LockFreeLoad(t *testing.T) { + t.Parallel() + e := &Engine{} + e.appliedIndex.Store(42) + + // Hold the engine mutex exclusively. The atomic reader must not + // block on this. + e.mu.Lock() + defer e.mu.Unlock() + + got := e.AppliedIndex() + require.Equal(t, uint64(42), got) +} + +// TestAppliedIndex_NilReceiver mirrors the other lease-related +// nil-receiver guards. +func TestAppliedIndex_NilReceiver(t *testing.T) { + t.Parallel() + var e *Engine + require.Equal(t, uint64(0), e.AppliedIndex()) +} diff --git a/kv/coordinator.go b/kv/coordinator.go index 8335c928..fa4033a6 100644 --- a/kv/coordinator.go +++ b/kv/coordinator.go @@ -55,9 +55,35 @@ func NewCoordinatorWithEngine(txm Transactional, engine raftengine.Engine, opts for _, opt := range opts { opt(c) } + // Register a leader-loss hook so the lease is invalidated the instant + // the engine notices a state transition out of the leader role, + // rather than waiting for wall-clock expiry of the current lease. + // Keep the deregister func so Close() can release the callback + // slot; owners with a shorter lifetime than the engine (tests, + // one-shot tools) MUST call Close() to avoid leaking a closure + // pointing into this Coordinate. + if lp, ok := engine.(raftengine.LeaseProvider); ok { + c.deregisterLeaseCb = lp.RegisterLeaderLossCallback(c.lease.invalidate) + } return c } +// Close releases any engine-side registrations (currently the +// leader-loss callback) held by this Coordinate. It is safe to call +// on a nil receiver and multiple times. Owners whose lifetime matches +// the engine's do not need to call Close; owners who discard the +// Coordinate before closing the engine MUST. +func (c *Coordinate) Close() error { + if c == nil { + return nil + } + if c.deregisterLeaseCb != nil { + c.deregisterLeaseCb() + c.deregisterLeaseCb = nil + } + return nil +} + // hlcLeaseEntryLen is the byte length of a serialised HLC lease Raft entry: // 1 tag byte + 8 bytes big-endian int64 ceiling ms. const hlcLeaseEntryLen = 9 //nolint:mnd @@ -81,6 +107,13 @@ type Coordinate struct { clock *HLC connCache GRPCConnCache log *slog.Logger + lease leaseState + // deregisterLeaseCb removes the leader-loss callback registered + // against engine at construction. Long-lived Coordinates don't + // need to call it (the engine will be closed after them), but + // short-lived test coordinators sharing an engine MUST invoke + // Close() to release the callback slot. + deregisterLeaseCb func() } var _ Coordinator = (*Coordinate)(nil) @@ -97,6 +130,43 @@ type Coordinator interface { Clock() *HLC } +// LeaseReadableCoordinator is the optional capability implemented by +// coordinators that participate in the leader-local lease read path +// (see docs/lease_read_design.md). Callers that want lease reads +// should type-assert to this interface and fall back to +// LinearizableRead when the assertion fails, following the same +// pattern as raftengine.LeaseProvider. Keeping the lease methods OFF +// the Coordinator interface avoids breaking existing external +// implementations that predate the lease-read feature. +type LeaseReadableCoordinator interface { + LeaseRead(ctx context.Context) (uint64, error) + LeaseReadForKey(ctx context.Context, key []byte) (uint64, error) +} + +// LeaseReadThrough is a helper that calls LeaseRead when the +// coordinator supports it, falling back to LinearizableRead otherwise. +// Adapter call sites use this so they don't have to repeat the +// type-assertion dance. +func LeaseReadThrough(c Coordinator, ctx context.Context) (uint64, error) { + if lr, ok := c.(LeaseReadableCoordinator); ok { + idx, err := lr.LeaseRead(ctx) + return idx, errors.WithStack(err) + } + idx, err := c.LinearizableRead(ctx) + return idx, errors.WithStack(err) +} + +// LeaseReadForKeyThrough is the key-routed counterpart of +// LeaseReadThrough. +func LeaseReadForKeyThrough(c Coordinator, ctx context.Context, key []byte) (uint64, error) { + if lr, ok := c.(LeaseReadableCoordinator); ok { + idx, err := lr.LeaseReadForKey(ctx, key) + return idx, errors.WithStack(err) + } + idx, err := c.LinearizableRead(ctx) + return idx, errors.WithStack(err) +} + func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*CoordinateResponse, error) { if ctx == nil { ctx = context.Background() @@ -121,11 +191,52 @@ func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*C reqs.CommitTS = 0 } + // Sample the clock AND the lease generation BEFORE dispatching. + // * dispatchStart: any real quorum confirmation happens at or + // after this instant, so using it as the lease-extension base + // is strictly conservative (window can only be SHORTER than + // the actual safety window, never longer). + // * expectedGen: if a leader-loss callback fires between this + // sample and the post-dispatch extend, the generation will + // have advanced; extend(expectedGen) will see the mismatch + // and refuse to resurrect the lease. Capturing gen INSIDE + // extend would observe the post-invalidate value as current. + dispatchStart := time.Now() + expectedGen := c.lease.generation() + var resp *CoordinateResponse + var err error if reqs.IsTxn { - return c.dispatchTxn(reqs.Elems, reqs.ReadKeys, reqs.StartTS, reqs.CommitTS) + resp, err = c.dispatchTxn(reqs.Elems, reqs.ReadKeys, reqs.StartTS, reqs.CommitTS) + } else { + resp, err = c.dispatchRaw(reqs.Elems) } + c.refreshLeaseAfterDispatch(resp, err, dispatchStart, expectedGen) + return resp, err +} - return c.dispatchRaw(reqs.Elems) +// refreshLeaseAfterDispatch extends the lease only when the dispatch +// produced a real Raft commit. CommitIndex == 0 means the underlying +// transaction manager short-circuited (empty-input Commit, no-op +// Abort), and refreshing would be unsound because no quorum +// confirmation happened. +// +// On err != nil the lease is invalidated: a Propose error commonly +// signals leadership loss (non-leader rejection, transfer in +// progress, quorum lost, etc.) and the design doc lists +// "any error from engine.Propose" as an invalidation trigger. +func (c *Coordinate) refreshLeaseAfterDispatch(resp *CoordinateResponse, err error, dispatchStart time.Time, expectedGen uint64) { + if err != nil { + c.lease.invalidate() + return + } + if resp == nil || resp.CommitIndex == 0 { + return + } + lp, ok := c.engine.(raftengine.LeaseProvider) + if !ok { + return + } + c.lease.extend(dispatchStart.Add(lp.LeaseDuration()), expectedGen) } func (c *Coordinate) IsLeader() bool { @@ -215,6 +326,56 @@ func (c *Coordinate) LinearizableReadForKey(ctx context.Context, _ []byte) (uint return c.LinearizableRead(ctx) } +// LeaseRead returns a read fence backed by a leader-local lease when +// available, falling back to a full LinearizableRead when the lease has +// expired or the underlying engine does not implement LeaseProvider. +// +// The returned index is the engine's current applied index (fast path) or +// the index returned by LinearizableRead (slow path). Callers that resolve +// timestamps via store.LastCommitTS may discard the value. +func (c *Coordinate) LeaseRead(ctx context.Context) (uint64, error) { + lp, ok := c.engine.(raftengine.LeaseProvider) + if !ok { + return c.LinearizableRead(ctx) + } + leaseDur := lp.LeaseDuration() + if leaseDur <= 0 { + // Misconfigured tick settings (Engine.Open warned about this): + // the lease can never be valid. Fall back without touching + // lease state so we do not waste extend/invalidate work. + return c.LinearizableRead(ctx) + } + // Capture time.Now() and the lease generation exactly once before + // any quorum work. `now` is reused for both the fast-path validity + // check and (on slow path) the extend base; `expectedGen` guards + // against a leader-loss invalidation that fires during + // LinearizableRead from being overwritten by this caller's extend. + // See Coordinate.Dispatch for the same rationale. + now := time.Now() + expectedGen := c.lease.generation() + // Defense-in-depth against the narrow race between an engine + // state transition out of leader and the async leader-loss + // callback flipping the lease: check the engine's current view + // too. State() is updated every Raft tick (~10 ms), which is + // tighter than the lease's time-bound. If the engine already + // knows it's not leader, force the slow path (which will fail + // fast via LinearizableRead and invalidate the lease). + if c.lease.valid(now) && c.engine.State() == raftengine.StateLeader { + return lp.AppliedIndex(), nil + } + idx, err := c.LinearizableRead(ctx) + if err != nil { + c.lease.invalidate() + return 0, err + } + c.lease.extend(now.Add(leaseDur), expectedGen) + return idx, nil +} + +func (c *Coordinate) LeaseReadForKey(ctx context.Context, _ []byte) (uint64, error) { + return c.LeaseRead(ctx) +} + func (c *Coordinate) nextStartTS() uint64 { return c.clock.Next() } diff --git a/kv/leader_routed_store_test.go b/kv/leader_routed_store_test.go index 446a12a7..63b55583 100644 --- a/kv/leader_routed_store_test.go +++ b/kv/leader_routed_store_test.go @@ -62,6 +62,14 @@ func (s *stubLeaderCoordinator) LinearizableReadForKey(ctx context.Context, _ [] return s.LinearizableRead(ctx) } +func (s *stubLeaderCoordinator) LeaseRead(ctx context.Context) (uint64, error) { + return s.LinearizableRead(ctx) +} + +func (s *stubLeaderCoordinator) LeaseReadForKey(ctx context.Context, _ []byte) (uint64, error) { + return s.LinearizableRead(ctx) +} + func (s *stubLeaderCoordinator) Clock() *HLC { if s.clock == nil { s.clock = NewHLC() diff --git a/kv/lease_read_test.go b/kv/lease_read_test.go new file mode 100644 index 00000000..6d545e9a --- /dev/null +++ b/kv/lease_read_test.go @@ -0,0 +1,314 @@ +package kv + +import ( + "context" + "errors" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/bootjp/elastickv/internal/raftengine" + "github.com/stretchr/testify/require" +) + +// fakeLeaseEngine implements raftengine.Engine + raftengine.LeaseProvider +// with controllable applied index, lease duration, and LinearizableRead +// behaviour, plus call counters for assertions. +type fakeLeaseEngine struct { + applied uint64 + leaseDur time.Duration + linearizableErr error + linearizableCalls atomic.Int32 + state atomic.Value // stores raftengine.State; default Leader + leaderLossCallbacksMu sync.Mutex + leaderLossCallbacks []fakeLeaseEngineCb + registerLeaderLossCalled atomic.Int32 +} + +// fakeLeaseEngineCb pairs a callback with a unique sentinel pointer so +// deregister can target THIS specific registration even when callbacks +// are removed out of order, matching the production etcd engine. +type fakeLeaseEngineCb struct { + id *struct{} + fn func() +} + +func (e *fakeLeaseEngine) State() raftengine.State { + if v := e.state.Load(); v != nil { + return v.(raftengine.State) //nolint:forcetypeassert + } + return raftengine.StateLeader +} +func (e *fakeLeaseEngine) Leader() raftengine.LeaderInfo { + return raftengine.LeaderInfo{ID: "n1", Address: "127.0.0.1:0"} +} +func (e *fakeLeaseEngine) VerifyLeader(context.Context) error { return nil } +func (e *fakeLeaseEngine) LinearizableRead(context.Context) (uint64, error) { + e.linearizableCalls.Add(1) + if e.linearizableErr != nil { + return 0, e.linearizableErr + } + return e.applied, nil +} +func (e *fakeLeaseEngine) Status() raftengine.Status { + return raftengine.Status{State: raftengine.StateLeader, AppliedIndex: e.applied} +} +func (e *fakeLeaseEngine) Configuration(context.Context) (raftengine.Configuration, error) { + return raftengine.Configuration{}, nil +} +func (e *fakeLeaseEngine) Propose(context.Context, []byte) (*raftengine.ProposalResult, error) { + return &raftengine.ProposalResult{}, nil +} +func (e *fakeLeaseEngine) Close() error { return nil } +func (e *fakeLeaseEngine) LeaseDuration() time.Duration { return e.leaseDur } +func (e *fakeLeaseEngine) AppliedIndex() uint64 { return e.applied } +func (e *fakeLeaseEngine) RegisterLeaderLossCallback(fn func()) func() { + e.registerLeaderLossCalled.Add(1) + // Unique sentinel per registration so deregister can target THIS + // entry even after earlier entries were removed. Mirrors the + // production etcd engine semantics; a naive index-based remover + // would drop the wrong callback under out-of-order deregister. + slot := &struct{}{} + e.leaderLossCallbacksMu.Lock() + e.leaderLossCallbacks = append(e.leaderLossCallbacks, fakeLeaseEngineCb{id: slot, fn: fn}) + e.leaderLossCallbacksMu.Unlock() + var once sync.Once + return func() { + once.Do(func() { + e.leaderLossCallbacksMu.Lock() + defer e.leaderLossCallbacksMu.Unlock() + for i, c := range e.leaderLossCallbacks { + if c.id != slot { + continue + } + // Zero the tail before truncating so the removed + // callback's captured *Coordinate can be GC'd. + // Mirrors the production etcd engine. + last := len(e.leaderLossCallbacks) - 1 + copy(e.leaderLossCallbacks[i:], e.leaderLossCallbacks[i+1:]) + e.leaderLossCallbacks[last] = fakeLeaseEngineCb{} + e.leaderLossCallbacks = e.leaderLossCallbacks[:last] + return + } + }) + } +} + +func (e *fakeLeaseEngine) fireLeaderLoss() { + e.leaderLossCallbacksMu.Lock() + cbs := make([]func(), len(e.leaderLossCallbacks)) + for i, c := range e.leaderLossCallbacks { + cbs[i] = c.fn + } + e.leaderLossCallbacksMu.Unlock() + for _, cb := range cbs { + cb() + } +} + +// nonLeaseEngine implements only raftengine.Engine, not LeaseProvider. +// Used to verify the type-assertion fallback. +type nonLeaseEngine struct { + linearizableCalls atomic.Int32 + linearizableErr error +} + +func (e *nonLeaseEngine) State() raftengine.State { return raftengine.StateLeader } +func (e *nonLeaseEngine) Leader() raftengine.LeaderInfo { + return raftengine.LeaderInfo{ID: "n1", Address: "127.0.0.1:0"} +} +func (e *nonLeaseEngine) VerifyLeader(context.Context) error { return nil } +func (e *nonLeaseEngine) LinearizableRead(context.Context) (uint64, error) { + e.linearizableCalls.Add(1) + if e.linearizableErr != nil { + return 0, e.linearizableErr + } + return 42, nil +} +func (e *nonLeaseEngine) Status() raftengine.Status { + return raftengine.Status{State: raftengine.StateLeader, AppliedIndex: 42} +} +func (e *nonLeaseEngine) Configuration(context.Context) (raftengine.Configuration, error) { + return raftengine.Configuration{}, nil +} +func (e *nonLeaseEngine) Propose(context.Context, []byte) (*raftengine.ProposalResult, error) { + return &raftengine.ProposalResult{}, nil +} +func (e *nonLeaseEngine) Close() error { return nil } + +// --- Coordinate.LeaseRead ----------------------------------------------- + +func TestCoordinate_LeaseRead_FastPathSkipsEngine(t *testing.T) { + t.Parallel() + eng := &fakeLeaseEngine{applied: 100, leaseDur: time.Hour} + c := NewCoordinatorWithEngine(nil, eng) + + c.lease.extend(time.Now().Add(time.Hour), c.lease.generation()) + + idx, err := c.LeaseRead(context.Background()) + require.NoError(t, err) + require.Equal(t, uint64(100), idx) + require.Equal(t, int32(0), eng.linearizableCalls.Load()) +} + +func TestCoordinate_LeaseRead_SlowPathRefreshesLease(t *testing.T) { + t.Parallel() + eng := &fakeLeaseEngine{applied: 50, leaseDur: time.Hour} + c := NewCoordinatorWithEngine(nil, eng) + + idx, err := c.LeaseRead(context.Background()) + require.NoError(t, err) + require.Equal(t, uint64(50), idx) + require.Equal(t, int32(1), eng.linearizableCalls.Load()) + + require.True(t, c.lease.valid(time.Now())) + + idx2, err := c.LeaseRead(context.Background()) + require.NoError(t, err) + require.Equal(t, uint64(50), idx2) + require.Equal(t, int32(1), eng.linearizableCalls.Load(), "second read should hit fast path") +} + +func TestCoordinate_LeaseRead_ErrorInvalidatesLease(t *testing.T) { + t.Parallel() + sentinel := errors.New("read-index failed") + eng := &fakeLeaseEngine{applied: 7, leaseDur: time.Hour, linearizableErr: sentinel} + c := NewCoordinatorWithEngine(nil, eng) + + c.lease.extend(time.Now().Add(time.Hour), c.lease.generation()) + c.lease.invalidate() // force slow path + + _, err := c.LeaseRead(context.Background()) + require.ErrorIs(t, err, sentinel) + require.False(t, c.lease.valid(time.Now())) + require.Equal(t, int32(1), eng.linearizableCalls.Load()) + + // Subsequent call also takes slow path because lease is invalidated. + _, err = c.LeaseRead(context.Background()) + require.ErrorIs(t, err, sentinel) + require.Equal(t, int32(2), eng.linearizableCalls.Load()) +} + +func TestCoordinate_LeaseRead_FallbackWhenEngineNotLeader(t *testing.T) { + t.Parallel() + // Even with a currently-valid lease, if the engine already reports + // a non-leader state (e.g. a leader-loss transition that has not + // yet triggered the async invalidation callback), LeaseRead must + // NOT return the fast-path AppliedIndex -- it must fall through + // to LinearizableRead, which will fail fast on a non-leader. + sentinel := errors.New("not leader") + eng := &fakeLeaseEngine{applied: 7, leaseDur: time.Hour, linearizableErr: sentinel} + c := NewCoordinatorWithEngine(nil, eng) + + // Warm the lease so valid() returns true. + c.lease.extend(time.Now().Add(time.Hour), c.lease.generation()) + require.True(t, c.lease.valid(time.Now())) + + // Engine transitioned to follower (or unknown); async invalidate + // hasn't run yet. + eng.state.Store(raftengine.StateFollower) + + _, err := c.LeaseRead(context.Background()) + require.ErrorIs(t, err, sentinel, + "fast path must not hide an already-known non-leader state") + require.Equal(t, int32(1), eng.linearizableCalls.Load(), + "non-leader state must force the slow path") +} + +func TestCoordinate_LeaseRead_FallbackWhenLeaseDurationZero(t *testing.T) { + t.Parallel() + // Misconfigured tick settings can produce LeaseDuration <= 0. + // The implementation must short-circuit to LinearizableRead + // without touching lease state; otherwise extend(now+0, ...) would + // run on every slow-path call for no benefit. + eng := &fakeLeaseEngine{applied: 3, leaseDur: 0} + c := NewCoordinatorWithEngine(nil, eng) + + idx, err := c.LeaseRead(context.Background()) + require.NoError(t, err) + require.Equal(t, uint64(3), idx) + require.Equal(t, int32(1), eng.linearizableCalls.Load()) + require.False(t, c.lease.valid(time.Now()), + "lease must not have been extended when LeaseDuration <= 0") + + // Every subsequent call must still take the slow path. + _, err = c.LeaseRead(context.Background()) + require.NoError(t, err) + require.Equal(t, int32(2), eng.linearizableCalls.Load()) +} + +func TestCoordinate_LeaseRead_FallbackWhenEngineLacksLeaseProvider(t *testing.T) { + t.Parallel() + eng := &nonLeaseEngine{} + c := NewCoordinatorWithEngine(nil, eng) + + idx, err := c.LeaseRead(context.Background()) + require.NoError(t, err) + require.Equal(t, uint64(42), idx) + require.Equal(t, int32(1), eng.linearizableCalls.Load()) + + // Without LeaseProvider the lease never becomes valid; every call + // goes through LinearizableRead. + _, err = c.LeaseRead(context.Background()) + require.NoError(t, err) + require.Equal(t, int32(2), eng.linearizableCalls.Load()) +} + +// --- Leader-loss invalidation hook -------------------------------------- + +func TestCoordinate_CloseDeregistersLeaderLossCallback(t *testing.T) { + t.Parallel() + eng := &fakeLeaseEngine{applied: 1, leaseDur: time.Hour} + c := NewCoordinatorWithEngine(nil, eng) + require.Equal(t, int32(1), eng.registerLeaderLossCalled.Load()) + + require.NoError(t, c.Close()) + + // After Close, firing leader-loss must NOT invoke this Coordinate's + // invalidate (it must have been removed from the engine's slice). + c.lease.extend(time.Now().Add(time.Hour), c.lease.generation()) + require.True(t, c.lease.valid(time.Now())) + eng.fireLeaderLoss() + require.True(t, c.lease.valid(time.Now()), + "Close must remove the callback so subsequent leader-loss firings do NOT touch this Coordinate's lease") + + // Close is idempotent. + require.NoError(t, c.Close()) +} + +func TestCoordinate_RegistersLeaderLossCallback(t *testing.T) { + t.Parallel() + eng := &fakeLeaseEngine{applied: 1, leaseDur: time.Hour} + c := NewCoordinatorWithEngine(nil, eng) + require.Equal(t, int32(1), eng.registerLeaderLossCalled.Load()) + + c.lease.extend(time.Now().Add(time.Hour), c.lease.generation()) + require.True(t, c.lease.valid(time.Now())) + + eng.fireLeaderLoss() + require.False(t, c.lease.valid(time.Now()), + "leader-loss callback must invalidate the lease") +} + +// --- Amortization end-to-end --------------------------------------------- + +// TestCoordinate_LeaseRead_AmortizesLinearizableRead is the Phase-4 design +// item proving the lease actually amortizes the cost: N calls within a +// single lease window must trigger only the first slow-path +// LinearizableRead and N-1 fast-path returns. +func TestCoordinate_LeaseRead_AmortizesLinearizableRead(t *testing.T) { + t.Parallel() + const N = 100 + eng := &fakeLeaseEngine{applied: 9, leaseDur: time.Hour} + c := NewCoordinatorWithEngine(nil, eng) + + for i := 0; i < N; i++ { + idx, err := c.LeaseRead(context.Background()) + require.NoError(t, err) + require.Equal(t, uint64(9), idx) + } + + require.Equal(t, int32(1), eng.linearizableCalls.Load(), + "100 LeaseRead calls inside the lease window should trigger exactly 1 LinearizableRead") +} diff --git a/kv/lease_state.go b/kv/lease_state.go new file mode 100644 index 00000000..c30cd3d4 --- /dev/null +++ b/kv/lease_state.go @@ -0,0 +1,93 @@ +package kv + +import ( + "sync/atomic" + "time" +) + +// leaseState tracks the wall-clock expiry of a leader-local read lease. +// All operations are lock-free via atomic.Pointer plus a generation +// counter that prevents an in-flight extend from resurrecting a lease +// that a concurrent invalidate has cleared. +// +// A nil expiry means the lease has never been issued or has been +// invalidated. A non-nil expiry is the wall-clock instant after which +// the lease is considered expired; a caller comparing time.Now() against +// the loaded value can decide whether to skip a quorum confirmation. +type leaseState struct { + gen atomic.Uint64 + expiry atomic.Pointer[time.Time] +} + +// valid reports whether the lease is unexpired at now. +func (s *leaseState) valid(now time.Time) bool { + if s == nil { + return false + } + exp := s.expiry.Load() + if exp == nil { + return false + } + return now.Before(*exp) +} + +// generation returns the current invalidation counter. Callers MUST +// sample this BEFORE issuing the quorum-confirming operation (Propose +// / LinearizableRead) and pass the result to extend. Sampling inside +// extend (after the operation returned) would see any leader-loss +// invalidation that fired DURING the operation as the "current" +// generation and let a stale lease resurrect. +func (s *leaseState) generation() uint64 { + if s == nil { + return 0 + } + return s.gen.Load() +} + +// extend sets the lease expiry to until iff (a) until is strictly +// after the currently stored expiry (or no expiry is stored) and +// (b) no invalidate has happened since the caller captured +// expectedGen via generation() BEFORE the quorum operation. The +// generation guard prevents a Dispatch that returned successfully +// *just before* a leader-loss invalidate from resurrecting the +// lease milliseconds after invalidation. +func (s *leaseState) extend(until time.Time, expectedGen uint64) { + if s == nil { + return + } + for { + // Pre-CAS gate: if invalidate already advanced the generation + // past expectedGen, skip the CAS entirely. + if s.gen.Load() != expectedGen { + return + } + current := s.expiry.Load() + if current != nil && !until.After(*current) { + return + } + if !s.expiry.CompareAndSwap(current, &until) { + continue + } + // CAS landed. If invalidate raced in between the pre-CAS gate + // and the CAS itself, undo our write iff no later writer has + // replaced it. Using CAS with our own pointer means a fresh + // extend that captured the post-invalidate generation is left + // intact. + if s.gen.Load() != expectedGen { + s.expiry.CompareAndSwap(&until, nil) + } + return + } +} + +// invalidate clears the lease so the next read takes the slow path. +// Bumping the generation first ensures any concurrent extend that +// captured the previous generation will undo its own CAS rather than +// resurrect the lease. +func (s *leaseState) invalidate() { + if s == nil { + return + } + s.gen.Add(1) + s.expiry.Store(nil) +} diff --git a/kv/lease_state_test.go b/kv/lease_state_test.go new file mode 100644 index 00000000..b4b7c268 --- /dev/null +++ b/kv/lease_state_test.go @@ -0,0 +1,161 @@ +package kv + +import ( + "runtime" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestLeaseState_NilReceiverIsAlwaysExpired(t *testing.T) { + t.Parallel() + var s *leaseState + require.False(t, s.valid(time.Now())) + s.extend(time.Now().Add(time.Hour), s.generation()) // must not panic + s.invalidate() // must not panic + require.False(t, s.valid(time.Now())) +} + +func TestLeaseState_ZeroValueIsExpired(t *testing.T) { + t.Parallel() + var s leaseState + require.False(t, s.valid(time.Now())) +} + +func TestLeaseState_ExtendAndExpire(t *testing.T) { + t.Parallel() + var s leaseState + now := time.Now() + s.extend(now.Add(50*time.Millisecond), s.generation()) + + require.True(t, s.valid(now)) + require.True(t, s.valid(now.Add(49*time.Millisecond))) + require.False(t, s.valid(now.Add(50*time.Millisecond))) + require.False(t, s.valid(now.Add(time.Hour))) +} + +func TestLeaseState_InvalidateClears(t *testing.T) { + t.Parallel() + var s leaseState + now := time.Now() + s.extend(now.Add(time.Hour), s.generation()) + require.True(t, s.valid(now)) + + s.invalidate() + require.False(t, s.valid(now)) +} + +func TestLeaseState_ExtendIsMonotonic(t *testing.T) { + t.Parallel() + var s leaseState + now := time.Now() + + s.extend(now.Add(time.Hour), s.generation()) + require.True(t, s.valid(now.Add(30*time.Minute))) + + // A shorter extension must NOT regress the lease: an out-of-order + // writer that sampled time.Now() earlier could otherwise prematurely + // expire a freshly extended lease and force callers into the slow + // path while the leader is still confirmed. + s.extend(now.Add(time.Minute), s.generation()) + require.True(t, s.valid(now.Add(30*time.Minute))) + + // A strictly longer extension wins. + s.extend(now.Add(2*time.Hour), s.generation()) + require.True(t, s.valid(now.Add(90*time.Minute))) +} + +func TestLeaseState_InvalidateBeatsConcurrentExtend(t *testing.T) { + t.Parallel() + var s leaseState + now := time.Now() + s.extend(now.Add(time.Hour), s.generation()) + + // invalidate stores nil unconditionally, even when the current expiry + // is in the future. Otherwise leadership-loss callbacks would be + // powerless once a lease is in place. + s.invalidate() + require.False(t, s.valid(now)) +} + +// TestLeaseState_ExtendCannotResurrectAfterInvalidate exercises the +// generation-guard invariant: an extend that captured the pre-invalidate +// generation must not install a fresh lease after a concurrent +// invalidate has bumped the generation. +func TestLeaseState_ExtendCannotResurrectAfterInvalidate(t *testing.T) { + t.Parallel() + var s leaseState + now := time.Now() + + // Caller pattern: sample generation BEFORE the quorum operation. + expectedGen := s.generation() + + // Leader-loss callback fires during the "quorum operation". + s.invalidate() + require.NotEqual(t, expectedGen, s.generation(), + "invalidate must bump the generation") + + // Caller returns with success and calls extend with the stale + // expected-generation. Must be a no-op. + s.extend(now.Add(time.Hour), expectedGen) + require.False(t, s.valid(now), + "stale-generation extend must NOT resurrect the lease") +} + +// TestLeaseState_ExtendWithFreshGenSucceedsAfterInvalidate verifies the +// dual to the above: a caller that captured the post-invalidate +// generation CAN install a fresh lease, so recovery from a brief +// leader-loss is possible. +func TestLeaseState_ExtendWithFreshGenSucceedsAfterInvalidate(t *testing.T) { + t.Parallel() + var s leaseState + now := time.Now() + + s.invalidate() + freshGen := s.generation() + s.extend(now.Add(time.Hour), freshGen) + require.True(t, s.valid(now)) +} + +func TestLeaseState_ConcurrentExtendAndRead(t *testing.T) { + t.Parallel() + var s leaseState + stop := make(chan struct{}) + done := make(chan struct{}, 2) + + // Cooperative scheduling: runtime.Gosched() between iterations keeps + // the workers from pegging a core while still interleaving enough + // extend/valid pairs under `-race` to exercise the atomic-pointer + // invariants. + go func() { + defer func() { done <- struct{}{} }() + for { + select { + case <-stop: + return + default: + gen := s.generation() + s.extend(time.Now().Add(time.Second), gen) + runtime.Gosched() + } + } + }() + go func() { + defer func() { done <- struct{}{} }() + for { + select { + case <-stop: + return + default: + _ = s.valid(time.Now()) + runtime.Gosched() + } + } + }() + + time.Sleep(20 * time.Millisecond) + close(stop) + <-done + <-done +} diff --git a/kv/sharded_coordinator.go b/kv/sharded_coordinator.go index 81bfab15..eafb6f79 100644 --- a/kv/sharded_coordinator.go +++ b/kv/sharded_coordinator.go @@ -3,6 +3,7 @@ package kv import ( "bytes" "context" + "io" "log/slog" "slices" "sync" @@ -21,6 +22,83 @@ type ShardGroup struct { Engine raftengine.Engine Store store.MVCCStore Txn Transactional + lease leaseState +} + +// leaseRefreshingTxn wraps a Transactional so every Commit / Abort that +// produced a real Raft commit extends its shard's lease. Mirrors +// Coordinate.Dispatch's lease hook for the per-shard case. +// +// Both TransactionManager.Commit and .Abort can return success WITHOUT +// going through Raft -- Commit short-circuits on empty input, Abort +// short-circuits when every request's abortRequestFor is nil (nothing +// to release). Refreshing the lease in those cases would be unsound: +// no quorum confirmation happened. We gate the refresh on +// resp.CommitIndex > 0, which the underlying manager sets to the +// last applied index only when at least one proposal went through. +type leaseRefreshingTxn struct { + inner Transactional + g *ShardGroup +} + +func (t *leaseRefreshingTxn) Commit(reqs []*pb.Request) (*TransactionResponse, error) { + start := time.Now() + expectedGen := t.g.lease.generation() + resp, err := t.inner.Commit(reqs) + if err != nil { + // Propose failures commonly signal leadership loss; follow + // the design doc and invalidate so the next read takes the + // slow path and re-verifies. + t.g.lease.invalidate() + return resp, errors.WithStack(err) + } + t.maybeRefresh(resp, start, expectedGen) + return resp, nil +} + +func (t *leaseRefreshingTxn) Abort(reqs []*pb.Request) (*TransactionResponse, error) { + start := time.Now() + expectedGen := t.g.lease.generation() + resp, err := t.inner.Abort(reqs) + if err != nil { + t.g.lease.invalidate() + return resp, errors.WithStack(err) + } + t.maybeRefresh(resp, start, expectedGen) + return resp, nil +} + +// maybeRefresh extends the per-shard lease only when the operation +// actually produced a Raft commit. expectedGen is sampled BEFORE the +// underlying Commit/Abort so an invalidation that fires during that +// call observes a generation mismatch inside extend and the refresh +// is rejected. See the struct doc comment for why. +func (t *leaseRefreshingTxn) maybeRefresh(resp *TransactionResponse, start time.Time, expectedGen uint64) { + if resp == nil || resp.CommitIndex == 0 { + return + } + lp, ok := t.g.Engine.(raftengine.LeaseProvider) + if !ok { + return + } + t.g.lease.extend(start.Add(lp.LeaseDuration()), expectedGen) +} + +// Close forwards to the wrapped Transactional if it implements +// io.Closer. ShardStore.closeGroup relies on the type assertion +// `g.Txn.(io.Closer)` to release per-shard resources (e.g. the gRPC +// connection cached by LeaderProxy). Without this pass-through, the +// wrapping would silently swallow the Closer capability and leak +// connections / goroutines at shutdown. +func (t *leaseRefreshingTxn) Close() error { + closer, ok := t.inner.(io.Closer) + if !ok { + return nil + } + if err := closer.Close(); err != nil { + return errors.WithStack(err) + } + return nil } const ( @@ -40,26 +118,59 @@ type ShardedCoordinator struct { clock *HLC store store.MVCCStore log *slog.Logger + // deregisterLeaseCbs removes the per-shard leader-loss callbacks + // registered at construction. See Coordinate.Close for the + // rationale. + deregisterLeaseCbs []func() } // NewShardedCoordinator builds a coordinator for the provided shard groups. // The defaultGroup is used for non-keyed leader checks. func NewShardedCoordinator(engine *distribution.Engine, groups map[uint64]*ShardGroup, defaultGroup uint64, clock *HLC, st store.MVCCStore) *ShardedCoordinator { router := NewShardRouter(engine) + var deregisters []func() for gid, g := range groups { + // Wrap Txn so every successful Commit/Abort refreshes the + // per-shard lease. Leave nil transactions unchanged, and skip + // if already wrapped so repeat calls don't stack wrappers. + if g.Txn != nil { + if _, already := g.Txn.(*leaseRefreshingTxn); !already { + g.Txn = &leaseRefreshingTxn{inner: g.Txn, g: g} + } + } router.Register(gid, g.Txn, g.Store) + // Per-shard leader-loss hook: when this group's engine notices + // a state transition out of leader, drop the lease so the next + // LeaseReadForKey on that shard takes the slow path. + if lp, ok := g.Engine.(raftengine.LeaseProvider); ok { + deregisters = append(deregisters, lp.RegisterLeaderLossCallback(g.lease.invalidate)) + } } return &ShardedCoordinator{ - engine: engine, - router: router, - groups: groups, - defaultGroup: defaultGroup, - clock: clock, - store: st, - log: slog.Default(), + engine: engine, + router: router, + groups: groups, + defaultGroup: defaultGroup, + clock: clock, + store: st, + log: slog.Default(), + deregisterLeaseCbs: deregisters, } } +// Close releases per-shard engine-side registrations. Idempotent. +func (c *ShardedCoordinator) Close() error { + if c == nil { + return nil + } + cbs := c.deregisterLeaseCbs + c.deregisterLeaseCbs = nil + for _, fn := range cbs { + fn() + } + return nil +} + func (c *ShardedCoordinator) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*CoordinateResponse, error) { if ctx == nil { ctx = context.Background() @@ -599,6 +710,60 @@ func (c *ShardedCoordinator) LinearizableReadForKey(ctx context.Context, key []b return linearizableReadEngineCtx(ctx, engineForGroup(g)) } +// LeaseRead routes through the default group's lease. See Coordinate.LeaseRead +// for semantics. +func (c *ShardedCoordinator) LeaseRead(ctx context.Context) (uint64, error) { + g, ok := c.groups[c.defaultGroup] + if !ok { + return 0, errors.WithStack(ErrLeaderNotFound) + } + return groupLeaseRead(ctx, g) +} + +// LeaseReadForKey performs the lease check on the shard group that owns key. +// Each group maintains its own lease since each group has independent +// leadership and term. +func (c *ShardedCoordinator) LeaseReadForKey(ctx context.Context, key []byte) (uint64, error) { + g, ok := c.groupForKey(key) + if !ok { + return 0, errors.WithStack(ErrLeaderNotFound) + } + return groupLeaseRead(ctx, g) +} + +func groupLeaseRead(ctx context.Context, g *ShardGroup) (uint64, error) { + engine := engineForGroup(g) + lp, ok := engine.(raftengine.LeaseProvider) + if !ok { + return linearizableReadEngineCtx(ctx, engine) + } + leaseDur := lp.LeaseDuration() + if leaseDur <= 0 { + // Lease disabled by tick configuration. Always take the slow + // path without mutating g.lease. + return linearizableReadEngineCtx(ctx, engine) + } + // Single time.Now() and generation sample before any quorum work, + // mirroring Coordinate.LeaseRead. expectedGen guards against a + // leader-loss invalidation that fires during LinearizableRead. + now := time.Now() + expectedGen := g.lease.generation() + // Defense-in-depth: also check the shard engine's current state. + // Async callbacks may not have flipped the lease yet, but + // State() is refreshed every tick and catches transitions + // sooner. See Coordinate.LeaseRead for details. + if g.lease.valid(now) && engine.State() == raftengine.StateLeader { + return lp.AppliedIndex(), nil + } + idx, err := linearizableReadEngineCtx(ctx, engine) + if err != nil { + g.lease.invalidate() + return 0, err + } + g.lease.extend(now.Add(leaseDur), expectedGen) + return idx, nil +} + func (c *ShardedCoordinator) Clock() *HLC { return c.clock } diff --git a/kv/sharded_lease_test.go b/kv/sharded_lease_test.go new file mode 100644 index 00000000..bdec9223 --- /dev/null +++ b/kv/sharded_lease_test.go @@ -0,0 +1,216 @@ +package kv + +import ( + "context" + "errors" + "io" + "sync/atomic" + "testing" + "time" + + "github.com/bootjp/elastickv/distribution" + pb "github.com/bootjp/elastickv/proto" + "github.com/stretchr/testify/require" +) + +// shardedLeaseEngine is a minimal raftengine.Engine + LeaseProvider used +// by sharded lease tests. It records LinearizableRead invocations and +// the registered leader-loss callback so tests can fire it on demand. +type shardedLeaseEngine struct { + *fakeLeaseEngine +} + +func newShardedLeaseEngine(applied uint64) *shardedLeaseEngine { + return &shardedLeaseEngine{ + fakeLeaseEngine: &fakeLeaseEngine{ + applied: applied, + leaseDur: time.Hour, + }, + } +} + +func mustShardedLeaseCoord(t *testing.T, eng1, eng2 *shardedLeaseEngine) *ShardedCoordinator { + t.Helper() + distEngine := distribution.NewEngine() + // Route a..m -> group 1, m..end -> group 2 so per-key tests can pick + // a key landing on each shard. + distEngine.UpdateRoute([]byte("a"), []byte("m"), 1) + distEngine.UpdateRoute([]byte("m"), nil, 2) + + g1Txn := &recordingTransactional{ + responses: []*TransactionResponse{{CommitIndex: 5}}, + } + g2Txn := &recordingTransactional{ + responses: []*TransactionResponse{{CommitIndex: 12}}, + } + return NewShardedCoordinator(distEngine, map[uint64]*ShardGroup{ + 1: {Engine: eng1, Txn: g1Txn}, + 2: {Engine: eng2, Txn: g2Txn}, + }, 1, NewHLC(), nil) +} + +func TestShardedCoordinator_LeaseReadForKey_PerShardIsolation(t *testing.T) { + t.Parallel() + + eng1 := newShardedLeaseEngine(100) + eng2 := newShardedLeaseEngine(200) + coord := mustShardedLeaseCoord(t, eng1, eng2) + + // Pre-extend shard 1's lease only. + g1 := coord.groups[1] + g1.lease.extend(time.Now().Add(time.Hour), g1.lease.generation()) + + idx, err := coord.LeaseReadForKey(context.Background(), []byte("apple")) + require.NoError(t, err) + require.Equal(t, uint64(100), idx) + require.Equal(t, int32(0), eng1.linearizableCalls.Load(), + "shard 1 lease is valid; engine 1 should not be called") + + idx, err = coord.LeaseReadForKey(context.Background(), []byte("zebra")) + require.NoError(t, err) + require.Equal(t, uint64(200), idx) + require.Equal(t, int32(1), eng2.linearizableCalls.Load(), + "shard 2 lease was never extended; engine 2 must take the slow path") + + // After the slow path, shard 2's lease is now valid; engine 1 must + // remain untouched. + require.Equal(t, int32(0), eng1.linearizableCalls.Load()) +} + +func TestShardedCoordinator_LeaseReadForKey_ErrorOnlyInvalidatesShard(t *testing.T) { + t.Parallel() + + sentinel := errors.New("read-index failed") + eng1 := newShardedLeaseEngine(100) + eng2 := newShardedLeaseEngine(200) + eng2.linearizableErr = sentinel + coord := mustShardedLeaseCoord(t, eng1, eng2) + + g1 := coord.groups[1] + g2 := coord.groups[2] + g1.lease.extend(time.Now().Add(time.Hour), g1.lease.generation()) + g2.lease.extend(time.Now().Add(time.Hour), g2.lease.generation()) + g2.lease.invalidate() // force shard 2 onto slow path + + _, err := coord.LeaseReadForKey(context.Background(), []byte("zebra")) + require.ErrorIs(t, err, sentinel) + require.False(t, g2.lease.valid(time.Now()), + "shard 2 lease must be invalidated after error") + require.True(t, g1.lease.valid(time.Now()), + "shard 1 lease must NOT be touched by shard 2's failure") +} + +func TestShardedCoordinator_LeaseRefreshingTxn_SkipsWhenCommitIndexZero(t *testing.T) { + t.Parallel() + eng1 := newShardedLeaseEngine(100) + eng2 := newShardedLeaseEngine(200) + coord := mustShardedLeaseCoord(t, eng1, eng2) + + g1 := coord.groups[1] + // A response with CommitIndex == 0 signals "no Raft proposal + // happened" (TransactionManager short-circuits on empty input / + // no-op abort). Refreshing in that case would be unsound. + noRaftResp := &TransactionResponse{CommitIndex: 0} + txn, ok := g1.Txn.(*leaseRefreshingTxn) + require.True(t, ok, "NewShardedCoordinator wraps Txn in leaseRefreshingTxn") + txn.inner = &fixedTransactional{response: noRaftResp} + + require.False(t, g1.lease.valid(time.Now())) + + // Commit with empty input returns success with CommitIndex=0. + _, err := g1.Txn.Commit(nil) + require.NoError(t, err) + require.False(t, g1.lease.valid(time.Now()), + "lease must NOT be refreshed when no Raft commit happened") + + // Same for Abort. + _, err = g1.Txn.Abort(nil) + require.NoError(t, err) + require.False(t, g1.lease.valid(time.Now())) + + // A response with CommitIndex > 0 refreshes the lease. + realResp := &TransactionResponse{CommitIndex: 42} + txn.inner = &fixedTransactional{response: realResp} + _, err = g1.Txn.Commit(nil) + require.NoError(t, err) + require.True(t, g1.lease.valid(time.Now()), + "lease must be refreshed after a real Raft commit") +} + +// fixedTransactional is a minimal Transactional whose Commit/Abort +// always return the same response. Used to drive the lease-refresh +// gating tests deterministically. +type fixedTransactional struct { + response *TransactionResponse +} + +func (f *fixedTransactional) Commit(_ []*pb.Request) (*TransactionResponse, error) { + return f.response, nil +} + +func (f *fixedTransactional) Abort(_ []*pb.Request) (*TransactionResponse, error) { + return f.response, nil +} + +// closableTransactional satisfies both Transactional and io.Closer so +// the Close-delegation test can observe whether the wrapper forwards +// Close to the inner value. +type closableTransactional struct { + fixedTransactional + closed atomic.Bool +} + +func (c *closableTransactional) Close() error { + c.closed.Store(true) + return nil +} + +func TestLeaseRefreshingTxn_ForwardsClose(t *testing.T) { + t.Parallel() + inner := &closableTransactional{ + fixedTransactional: fixedTransactional{response: &TransactionResponse{}}, + } + wrapper := &leaseRefreshingTxn{inner: inner, g: &ShardGroup{}} + + // ShardStore.closeGroup does a guarded type assertion + // `if closer, ok := g.Txn.(io.Closer); ok { closer.Close() }`. + // After wrapping, that `ok` must still be true and the resulting + // Close must reach the inner Transactional. + closer, ok := interface{}(wrapper).(io.Closer) + require.True(t, ok, "leaseRefreshingTxn must implement io.Closer") + require.NoError(t, closer.Close()) + require.True(t, inner.closed.Load(), + "Close must delegate to the wrapped Transactional so ShardStore.closeGroup can release its resources") +} + +func TestLeaseRefreshingTxn_CloseNoopWhenInnerIsNotCloser(t *testing.T) { + t.Parallel() + // fixedTransactional does NOT implement io.Closer. The wrapper's + // Close must be a safe no-op rather than panicking. + inner := &fixedTransactional{response: &TransactionResponse{}} + wrapper := &leaseRefreshingTxn{inner: inner, g: &ShardGroup{}} + require.NoError(t, wrapper.Close()) +} + +func TestShardedCoordinator_RegistersPerShardLeaderLossCallback(t *testing.T) { + t.Parallel() + + eng1 := newShardedLeaseEngine(100) + eng2 := newShardedLeaseEngine(200) + coord := mustShardedLeaseCoord(t, eng1, eng2) + + require.Equal(t, int32(1), eng1.registerLeaderLossCalled.Load(), + "NewShardedCoordinator must register a callback per shard engine") + require.Equal(t, int32(1), eng2.registerLeaderLossCalled.Load()) + + g1 := coord.groups[1] + g2 := coord.groups[2] + g1.lease.extend(time.Now().Add(time.Hour), g1.lease.generation()) + g2.lease.extend(time.Now().Add(time.Hour), g2.lease.generation()) + + eng1.fireLeaderLoss() + require.False(t, g1.lease.valid(time.Now()), + "shard 1 leader-loss callback must invalidate shard 1's lease") + require.True(t, g2.lease.valid(time.Now()), + "shard 2 lease must remain valid; only its own engine's callback affects it") +}