Hide transient leader errors behind bounded retries in coordinator#618
Hide transient leader errors behind bounded retries in coordinator#618
Conversation
PR #614 worked around mid-test leader churn by retrying "not leader" / "leader not found" RPC errors at the test client. That weakens the gRPC API contract: a linearizable client expects every call to either commit atomically or fail definitively, not to leak raft-internal re-election windows out as transient errors the caller has to retry. Move the retry into the coordinator instead so the gRPC layer stays linearizable from the client's perspective: - Coordinate.Dispatch wraps the per-attempt path in a bounded retry loop (5s budget, 25ms poll). Transient errors detected via isTransientLeaderError (ErrLeaderNotFound or any of the etcd/raft leadership-loss sentinels via isLeadershipLossError) are absorbed; business-logic errors surface unchanged. StartTS issuance moves inside the per-attempt path so a new leader's HLC mints a fresh timestamp that floors above any committed physical-ceiling lease. - LeaderProxy.forwardWithRetry no longer bails out immediately on ErrLeaderNotFound; it polls leaderProxyRetryBudget while the leader re-publishes, matching the coordinator's behaviour. Revert the test-side workarounds now that the server hides the churn: - Restore the three consistency tests to call gRPC directly with assert.NoError, the way they verify linearizability on the wire. - Drop rawPutEventually / rawGetEventually / txnPutEventually / txnGetEventually / txnDeleteEventually from adapter/test_util.go. The Redis rpushEventually / lpushEventually helpers stay in place (separate scope; the Redis adapter's leader handling is unchanged). Test plan - go vet ./... — clean - go build ./... — clean - go test -race -run '^(Test_consistency_satisfy_write_after_read_sequence|Test_grpc_transaction|Test_consistency_satisfy_write_after_read_for_parallel)$' ./adapter/ -count=1 — ok 218s - go test -race -count=1 ./kv/ — ok 2.1s - go test -race -count=1 ./adapter/ — ok 282s https://claude.ai/code/session_012boXutHkKDuYHfBeabQCap
|
You have reached your Codex usage limits for code reviews. You can see your limits in the Codex usage dashboard. |
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 27 minutes and 25 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (5)
📝 WalkthroughWalkthroughThe PR refactors transient leader error handling across coordinator and leader proxy layers. It removes RPC-specific retry helpers from tests, introduces bounded retry loops with deadline enforcement in the coordinator and leader proxy, implements fresh timestamp minting on retries, and adds improved transient error classification. The leader proxy method signature is updated to accept a parent context for deadline propagation. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Coordinator
participant LeaderProxy
participant LeaderEngine
participant LeaderServer
Client->>Coordinator: Dispatch(request)
Coordinator->>Coordinator: Validate request once
loop Retry Loop (bounded by dispatchLeaderRetryBudget)
Coordinator->>Coordinator: dispatchOnce: reset StartTS if txn (mints fresh HLC)
Coordinator->>LeaderProxy: Commit(reqs)
alt Leader resolvable
LeaderProxy->>LeaderEngine: Leader() → leader address
LeaderProxy->>LeaderServer: Forward(reqs)
alt Transient leader error
LeaderServer-->>LeaderProxy: "not leader" / wire error
LeaderProxy->>LeaderProxy: Backoff & re-resolve
else Success
LeaderServer-->>LeaderProxy: TransactionResponse
LeaderProxy-->>Coordinator: response
end
else Leader not found
LeaderEngine-->>LeaderProxy: empty / ErrLeaderNotFound
LeaderProxy->>LeaderProxy: Backoff & retry resolution
end
alt Transient error detected
Coordinator->>Coordinator: Sleep (waitForDispatchRetry)
Coordinator->>Coordinator: Re-check deadline
Note over Coordinator: Loop continues if budget remains
else Retryable error or success
break On success or non-transient error
Coordinator-->>Client: response / error
end
end
end
alt Budget exceeded
Coordinator-->>Client: retry timeout error
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
golangci-lint (gci) flagged adapter/test_util.go as improperly formatted after the helper-removal commit left a trailing blank line at end of file. Equivalent to running `gci write`; no behaviour change. https://claude.ai/code/session_012boXutHkKDuYHfBeabQCap
There was a problem hiding this comment.
Code Review
This pull request introduces bounded retry logic for transient leader-unavailable errors in the coordinator and leader proxy to improve gRPC linearizability. It also refactors several tests to remove reliance on 'Eventually' helpers in favor of direct assertions. I have provided feedback regarding the use of magic numbers in tests, the importance of deferring wg.Done() for robustness, and the recommendation to use time.Timer instead of time.After in high-frequency loops to avoid potential memory leaks.
| wg.Add(1000) | ||
| for i := range 1000 { |
| go func(i int) { | ||
| defer wg.Done() | ||
| key := []byte("test-key-parallel" + strconv.Itoa(i)) | ||
| want := []byte(strconv.Itoa(i)) | ||
| put := func() error { | ||
| _, err := c.RawPut(ctx, &pb.RawPutRequest{Key: key, Value: want}) | ||
| return err | ||
| } | ||
| if err := retryNotLeader(ctx, put); err != nil { | ||
| errCh <- err | ||
| return | ||
| } | ||
| if err := retryNotLeader(ctx, put); err != nil { | ||
| errCh <- err | ||
| return | ||
| } | ||
| var resp *pb.RawGetResponse | ||
| err := retryNotLeader(ctx, func() error { | ||
| r, err := c.RawGet(ctx, &pb.RawGetRequest{Key: key}) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| resp = r | ||
| return nil | ||
| }) | ||
| if err != nil { | ||
| errCh <- err | ||
| return | ||
| } | ||
| if !bytes.Equal(want, resp.Value) { | ||
| errCh <- fmt.Errorf("consistency check failed for key %s: want %q got %q", key, want, resp.Value) | ||
| } | ||
| _, err := c.RawPut( | ||
| context.Background(), | ||
| &pb.RawPutRequest{Key: key, Value: want}, | ||
| ) | ||
| assert.NoError(t, err, "Put RPC failed") | ||
| _, err = c.RawPut(context.TODO(), &pb.RawPutRequest{Key: key, Value: want}) | ||
| assert.NoError(t, err, "Put RPC failed") | ||
|
|
||
| resp, err := c.RawGet(context.TODO(), &pb.RawGetRequest{Key: key}) | ||
| assert.NoError(t, err, "Get RPC failed") | ||
| assert.Equal(t, want, resp.Value, "consistency check failed") | ||
| wg.Done() | ||
| }(i) |
There was a problem hiding this comment.
The wg.Done() call should be deferred at the beginning of the goroutine to ensure it is always executed, even if a panic occurs or if early returns are added in the future. Additionally, the use of context.TODO() is inconsistent with context.Background() used earlier in the same test.
| go func(i int) { | |
| defer wg.Done() | |
| key := []byte("test-key-parallel" + strconv.Itoa(i)) | |
| want := []byte(strconv.Itoa(i)) | |
| put := func() error { | |
| _, err := c.RawPut(ctx, &pb.RawPutRequest{Key: key, Value: want}) | |
| return err | |
| } | |
| if err := retryNotLeader(ctx, put); err != nil { | |
| errCh <- err | |
| return | |
| } | |
| if err := retryNotLeader(ctx, put); err != nil { | |
| errCh <- err | |
| return | |
| } | |
| var resp *pb.RawGetResponse | |
| err := retryNotLeader(ctx, func() error { | |
| r, err := c.RawGet(ctx, &pb.RawGetRequest{Key: key}) | |
| if err != nil { | |
| return err | |
| } | |
| resp = r | |
| return nil | |
| }) | |
| if err != nil { | |
| errCh <- err | |
| return | |
| } | |
| if !bytes.Equal(want, resp.Value) { | |
| errCh <- fmt.Errorf("consistency check failed for key %s: want %q got %q", key, want, resp.Value) | |
| } | |
| _, err := c.RawPut( | |
| context.Background(), | |
| &pb.RawPutRequest{Key: key, Value: want}, | |
| ) | |
| assert.NoError(t, err, "Put RPC failed") | |
| _, err = c.RawPut(context.TODO(), &pb.RawPutRequest{Key: key, Value: want}) | |
| assert.NoError(t, err, "Put RPC failed") | |
| resp, err := c.RawGet(context.TODO(), &pb.RawGetRequest{Key: key}) | |
| assert.NoError(t, err, "Get RPC failed") | |
| assert.Equal(t, want, resp.Value, "consistency check failed") | |
| wg.Done() | |
| }(i) | |
| go func(i int) { | |
| defer wg.Done() | |
| key := []byte("test-key-parallel" + strconv.Itoa(i)) | |
| want := []byte(strconv.Itoa(i)) | |
| _, err := c.RawPut( | |
| context.Background(), | |
| &pb.RawPutRequest{Key: key, Value: want}, | |
| ) | |
| assert.NoError(t, err, "Put RPC failed") | |
| _, err = c.RawPut(context.Background(), &pb.RawPutRequest{Key: key, Value: want}) | |
| assert.NoError(t, err, "Put RPC failed") | |
| resp, err := c.RawGet(context.Background(), &pb.RawGetRequest{Key: key}) | |
| assert.NoError(t, err, "Get RPC failed") | |
| assert.Equal(t, want, resp.Value, "consistency check failed") | |
| }(i) |
| select { | ||
| case <-ctx.Done(): | ||
| return lastResp, lastErr | ||
| case <-time.After(dispatchLeaderRetryInterval): | ||
| } |
There was a problem hiding this comment.
Using time.After in a loop can lead to short-term memory leaks because the underlying timer is not garbage collected until it expires. For a high-frequency retry loop, it is more efficient to use a single time.Timer and reset it. Additionally, centralize common complex logic, such as retry/backoff mechanisms, into shared helper functions to avoid code duplication and improve maintainability.
timer := time.NewTimer(dispatchLeaderRetryInterval)
defer timer.Stop()
for {
lastResp, lastErr = c.dispatchOnce(ctx, reqs)
if lastErr == nil || !isTransientLeaderError(lastErr) {
return lastResp, lastErr
}
if !time.Now().Before(deadline) {
return lastResp, lastErr
}
timer.Reset(dispatchLeaderRetryInterval)
select {
case <-ctx.Done():
return lastResp, lastErr
case <-timer.C:
}
}References
- Centralize common complex logic, such as retry/backoff mechanisms, into shared helper functions to avoid code duplication and improve maintainability.
adapter/grpc_test.go (Test_consistency_satisfy_write_after_read_for_parallel): - Extract the worker count 1000 to a const workers so wg.Add(workers) and for i := range workers read off the same source of truth. - Move wg.Done() to defer at goroutine entry so panics or future early returns can't leave wg.Wait deadlocked. - Replace the remaining context.TODO() calls with context.Background() to match the first RawPut in the same block. kv/coordinator.go (Dispatch retry loop): - Reuse a single time.Timer with Reset across iterations instead of allocating a fresh timer per round via time.After. Go 1.23+ timer semantics make Reset on an unfired/expired timer safe without an explicit drain, so no extra bookkeeping is needed. Avoids a short-term allocation per retry under heavy leader churn. Test plan - go vet ./... — clean - go test -race ./kv/ -run 'Coordinate|Coordinator|LeaderProxy' -count=1 — ok - go test -race -run '^(Test_consistency_satisfy_write_after_read_sequence|Test_grpc_transaction|Test_consistency_satisfy_write_after_read_for_parallel)$' ./adapter/ -count=1 — ok 193s https://claude.ai/code/session_012boXutHkKDuYHfBeabQCap
|
/gemini review |
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
There was a problem hiding this comment.
💡 Codex Review
Line 301 in 24b09cc
Dispatch retries dispatchOnce with the same reqs object, but dispatchOnce only mints a timestamp when reqs.StartTS == 0. After the first transient leadership-loss failure, retries reuse the stale StartTS from the failed attempt instead of issuing a fresh one. That stale timestamp is used in OCC conflict checks (LatestCommitTS > startTS in the FSM path), so writes that commit during the election window can surface as write-conflict errors instead of being absorbed by the retry logic.
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| case <-ctx.Done(): | ||
| return lastResp, lastErr |
There was a problem hiding this comment.
Return context cancellation when retry wait is interrupted
When the retry sleep is interrupted by ctx.Done(), Dispatch returns lastErr rather than ctx.Err(). If the caller deadline is shorter than the 5s retry budget and the cluster is leaderless, canceled requests are reported as leader-not-found/leadership errors instead of context.Canceled or context.DeadlineExceeded, which breaks expected cancellation semantics for gRPC clients.
Useful? React with 👍 / 👎.
Two correctness issues on the retry loop added in 18a25bd, flagged by Codex: P1 — stale StartTS on retry dispatchOnce only mints a fresh timestamp when reqs.StartTS == 0. Once the first attempt filled StartTS in place, every retry reused that stale value, and the FSM's OCC guard (LatestCommitTS > startTS in validateConflicts) could reject the retry with a write-conflict error if any other write committed against the same keys during the election window — exactly the case the retry is meant to absorb. Remember whether the caller asked us to assign timestamps (reqs.IsTxn && reqs.StartTS == 0 on entry) and, if so, reset StartTS/CommitTS back to 0 before each retry so dispatchOnce re-mints against the post-churn leader's HLC. P2 — ctx cancellation reported as leader error When the retry sleep was interrupted by ctx.Done the loop returned lastErr (the transient leader error) instead of ctx.Err(). Callers with a shorter deadline than the 5s retry budget need context.Canceled / context.DeadlineExceeded so they can tell "I gave up" from "cluster unavailable". Surface ctx.Err() on the ctx.Done branch. Test plan - go vet ./... — clean - go test -race -count=1 ./kv/ — ok - go test -race -run '^(Test_consistency_satisfy_write_after_read_sequence|Test_grpc_transaction|Test_consistency_satisfy_write_after_read_for_parallel)$' ./adapter/ -count=1 — ok 205s https://claude.ai/code/session_012boXutHkKDuYHfBeabQCap
golangci-lint (cyclop) flagged Dispatch at cyclomatic complexity 11 (limit 10). Split the retry loop's decision points into three small helpers, dropping the in-function branch count to 8: - coordinatorAssignsTimestamps(reqs) — does the caller expect the coordinator to mint StartTS/CommitTS? - shouldRetryDispatch(err) — is the error a transient leader-unavailable signal worth retrying? - waitForDispatchRetry(ctx, timer, interval) — sleep on the timer or return ctx.Err() wrapped, whichever comes first. No behaviour change; all existing retry semantics (StartTS refresh, ctx.Err on cancellation, 5s budget, 25ms interval, reused Timer) are preserved. Test plan - go vet ./... — clean - go test -race -count=1 ./kv/ — ok - go test -race -run '^(Test_consistency_satisfy_write_after_read_sequence|Test_grpc_transaction|Test_consistency_satisfy_write_after_read_for_parallel)$' ./adapter/ -count=1 — ok 209s https://claude.ai/code/session_012boXutHkKDuYHfBeabQCap
There was a problem hiding this comment.
Pull request overview
This PR moves transient raft leader-churn retry behavior from adapter tests into core KV components (coordinator + leader proxy) so gRPC callers see fewer “not leader/leader not found” failures during re-elections and can rely on bounded, linearizable outcomes.
Changes:
- Add a bounded retry loop in
Coordinate.Dispatch()to absorb transient leader-unavailable errors and reset txn timestamps on retry when the coordinator assigns them. - Extend
LeaderProxy.forwardWithRetry()with a budgeted polling loop when no leader address is published yet. - Remove leader-churn retry helper wrappers from adapter tests and update tests to use direct RPC calls/assertions.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 8 comments.
| File | Description |
|---|---|
kv/leader_proxy.go |
Adds bounded leader-resolution polling to forwarding retry logic. |
kv/coordinator.go |
Wraps dispatch in a bounded retry loop + introduces transient-leader error classification helpers. |
adapter/test_util.go |
Removes test-only “eventually” wrappers for raw/txn RPCs. |
adapter/grpc_test.go |
Simplifies tests by removing explicit retry logic and using direct RPC assertions. |
| func isTransientLeaderError(err error) bool { | ||
| if err == nil { | ||
| return false | ||
| } | ||
| if errors.Is(err, ErrLeaderNotFound) { | ||
| return true | ||
| } | ||
| return isLeadershipLossError(err) | ||
| } |
There was a problem hiding this comment.
isTransientLeaderError only matches kv.ErrLeaderNotFound and raftengine leadership-loss sentinels. Errors returned from redirect()’s Internal.Forward RPC on a stale leader are currently adapter/internal.ErrNotLeader (transported as a gRPC status error), which won’t satisfy isLeadershipLossError and therefore won’t be retried—so transient “not leader” can still leak to external callers. Either make the Internal.Forward RPC return/encode raftengine.ErrNotLeader (or kv.ErrLeaderNotFound) in a way that survives the gRPC boundary, or extend this classification to recognize the forwarded not-leader signal (e.g., via gRPC status codes).
| // Wrap the actual dispatch in a bounded retry loop so that transient | ||
| // leader-unavailable errors (no leader resolvable yet, local node just | ||
| // stepped down, forwarded RPC bounced off a stale leader) are absorbed | ||
| // inside the coordinator instead of leaking out through the gRPC API. | ||
| // The gRPC contract is linearizable: a single client call either | ||
| // commits atomically or returns a definitive error. "Leader not found" | ||
| // during a re-election is neither, so we wait briefly for the cluster | ||
| // to re-stabilise. Non-leader errors that exceed the retry budget are | ||
| // surfaced unchanged for callers to observe. | ||
| leaderAssignsTS := coordinatorAssignsTimestamps(reqs) | ||
| deadline := time.Now().Add(dispatchLeaderRetryBudget) | ||
| // Reuse a single Timer across retries. time.After would allocate a | ||
| // fresh timer per iteration whose Go runtime entry lingers until the | ||
| // interval elapses, producing a short-term leak proportional to the | ||
| // retry rate. Under heavy mid-dispatch leader churn this is a hot | ||
| // loop, so we Reset the timer in place instead. Go 1.23+ timer | ||
| // semantics make Reset on an unfired/expired Timer safe without an | ||
| // explicit drain. | ||
| timer := time.NewTimer(dispatchLeaderRetryInterval) | ||
| defer timer.Stop() | ||
| var lastResp *CoordinateResponse | ||
| var lastErr error | ||
| for { | ||
| lastResp, lastErr = c.dispatchOnce(ctx, reqs) | ||
| if !shouldRetryDispatch(lastErr) { | ||
| return lastResp, lastErr | ||
| } | ||
| if !time.Now().Before(deadline) { | ||
| return lastResp, lastErr | ||
| } | ||
| if leaderAssignsTS { | ||
| // Force dispatchOnce to mint a fresh StartTS on the next | ||
| // attempt; keep CommitTS tied to StartTS by clearing it too | ||
| // (the same invariant dispatchOnce enforces). Re-using a | ||
| // stale StartTS would race the OCC conflict check in | ||
| // fsm.validateConflicts (LatestCommitTS > startTS) against | ||
| // writes that committed during the election window. | ||
| reqs.StartTS = 0 | ||
| reqs.CommitTS = 0 | ||
| } | ||
| if err := waitForDispatchRetry(ctx, timer, dispatchLeaderRetryInterval); err != nil { | ||
| return lastResp, err | ||
| } | ||
| } |
There was a problem hiding this comment.
This new bounded retry loop in Dispatch materially changes coordinator semantics (including timestamp reset behavior on retries) but there are no accompanying unit tests in kv/ to pin the expected retry conditions and invariants. Please add tests that (a) verify transient leader errors are retried up to the budget and (b) confirm StartTS/CommitTS are re-minted on retry when the coordinator assigns timestamps.
| // forwardWithRetry attempts to forward to the leader, re-fetching the | ||
| // leader address on each failure to handle leadership changes between | ||
| // attempts. Two retry signals are interleaved: | ||
| // | ||
| // - Forward-RPC failures are bounded by maxForwardRetries (each attempt | ||
| // re-resolves the leader address inside forward()). | ||
| // - ErrLeaderNotFound (no leader published yet) is bounded by | ||
| // leaderProxyRetryBudget so a brief re-election window does not bubble | ||
| // up to gRPC clients as a hard failure. Linearizable callers expect | ||
| // the proxy to either commit atomically or fail definitively, not to | ||
| // leak transient raft-internal churn. | ||
| func (p *LeaderProxy) forwardWithRetry(reqs []*pb.Request) (*TransactionResponse, error) { | ||
| if len(reqs) == 0 { | ||
| return &TransactionResponse{}, nil | ||
| } | ||
|
|
||
| deadline := time.Now().Add(leaderProxyRetryBudget) | ||
| var lastErr error | ||
| for attempt := 0; attempt < maxForwardRetries; attempt++ { | ||
| resp, err := p.forward(reqs) | ||
| if err == nil { | ||
| return resp, nil | ||
| for { | ||
| // Each iteration of the outer loop runs up to maxForwardRetries | ||
| // fast retries against whatever leader is currently visible. If | ||
| // none is, we sleep one leaderProxyRetryInterval and re-poll | ||
| // until leaderProxyRetryBudget elapses. | ||
| for attempt := 0; attempt < maxForwardRetries; attempt++ { | ||
| resp, err := p.forward(reqs) | ||
| if err == nil { | ||
| return resp, nil | ||
| } | ||
| lastErr = err | ||
| if errors.Is(err, ErrLeaderNotFound) { | ||
| break | ||
| } | ||
| } | ||
| if !errors.Is(lastErr, ErrLeaderNotFound) { | ||
| return nil, errors.Wrapf(lastErr, "leader forward failed after %d retries", maxForwardRetries) | ||
| } | ||
| lastErr = err | ||
| // If the leader is simply not found, retry won't help immediately. | ||
| if errors.Is(err, ErrLeaderNotFound) { | ||
| return nil, err | ||
| if !time.Now().Before(deadline) { | ||
| return nil, lastErr | ||
| } | ||
| time.Sleep(leaderProxyRetryInterval) | ||
| } |
There was a problem hiding this comment.
The new leader-unavailability polling behavior in forwardWithRetry isn’t covered by tests. kv/leader_proxy_test.go currently exercises basic forwarding, but it doesn’t validate the ErrLeaderNotFound polling budget/interval behavior introduced here. Please add a test that simulates “no leader published” for some time and asserts forwardWithRetry retries until a leader appears (and stops retrying once the budget elapses).
| func Test_consistency_satisfy_write_after_read_for_parallel(t *testing.T) { | ||
| t.Parallel() | ||
| nodes, adders, _ := createNode(t, 3) | ||
| c := rawKVClient(t, adders) | ||
|
|
||
| // 1000 concurrent clients × 3 RPCs saturates the single raft leader | ||
| // hard enough to provoke brief quorum checks to fail on CI, so retry | ||
| // transient leader-unavailable errors. The *Eventually helpers are | ||
| // intentionally NOT used here: they end in require.NoError, and | ||
| // require calls t.FailNow() which must run on the main test goroutine. | ||
| // Workers use retryNotLeader + an errors channel instead so all | ||
| // require/assert calls happen on the main goroutine after wg.Wait(). | ||
| ctx := context.Background() | ||
| const workers = 1000 | ||
| errCh := make(chan error, workers) | ||
| wg := sync.WaitGroup{} | ||
| const workers = 1000 | ||
| wg.Add(workers) | ||
| for i := range workers { | ||
| go func(i int) { | ||
| defer wg.Done() | ||
| key := []byte("test-key-parallel" + strconv.Itoa(i)) | ||
| want := []byte(strconv.Itoa(i)) | ||
| put := func() error { | ||
| _, err := c.RawPut(ctx, &pb.RawPutRequest{Key: key, Value: want}) | ||
| return err | ||
| } | ||
| if err := retryNotLeader(ctx, put); err != nil { | ||
| errCh <- err | ||
| return | ||
| } | ||
| if err := retryNotLeader(ctx, put); err != nil { | ||
| errCh <- err | ||
| return | ||
| } | ||
| var resp *pb.RawGetResponse | ||
| err := retryNotLeader(ctx, func() error { | ||
| r, err := c.RawGet(ctx, &pb.RawGetRequest{Key: key}) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| resp = r | ||
| return nil | ||
| }) | ||
| if err != nil { | ||
| errCh <- err | ||
| return | ||
| } | ||
| if !bytes.Equal(want, resp.Value) { | ||
| errCh <- fmt.Errorf("consistency check failed for key %s: want %q got %q", key, want, resp.Value) | ||
| } | ||
| _, err := c.RawPut( | ||
| context.Background(), | ||
| &pb.RawPutRequest{Key: key, Value: want}, | ||
| ) | ||
| assert.NoError(t, err, "Put RPC failed") | ||
| _, err = c.RawPut(context.Background(), &pb.RawPutRequest{Key: key, Value: want}) | ||
| assert.NoError(t, err, "Put RPC failed") | ||
|
|
||
| resp, err := c.RawGet(context.Background(), &pb.RawGetRequest{Key: key}) | ||
| assert.NoError(t, err, "Get RPC failed") | ||
| assert.Equal(t, want, resp.Value, "consistency check failed") | ||
| }(i) | ||
| } | ||
| wg.Wait() | ||
| close(errCh) | ||
| for err := range errCh { | ||
| assert.NoError(t, err) | ||
| } | ||
| shutdown(nodes) | ||
| } |
There was a problem hiding this comment.
Test_consistency_satisfy_write_after_read_for_parallel calls shutdown(nodes) manually at the end rather than deferring it. If any goroutine hits an unexpected panic (e.g., due to a nil resp after an RPC error) the test can exit early and leak running nodes/ports into subsequent parallel tests. Prefer defer shutdown(nodes) immediately after createNode, consistent with the other tests in this file.
| assert.NoError(t, err, "Put RPC failed") | ||
| _, err = c.RawPut(context.Background(), &pb.RawPutRequest{Key: key, Value: want}) | ||
| assert.NoError(t, err, "Put RPC failed") | ||
|
|
||
| resp, err := c.RawGet(context.Background(), &pb.RawGetRequest{Key: key}) | ||
| assert.NoError(t, err, "Get RPC failed") |
There was a problem hiding this comment.
In the parallel worker goroutine, failures from RawPut/RawGet are asserted but execution continues unconditionally. If RawGet returns an error, resp will be nil and the subsequent resp.Value dereference can panic, aborting the test and masking the real failure. Use the boolean return of assert.NoError (or an explicit if err != nil { return }) to stop the goroutine before accessing resp when an RPC fails.
| assert.NoError(t, err, "Put RPC failed") | |
| _, err = c.RawPut(context.Background(), &pb.RawPutRequest{Key: key, Value: want}) | |
| assert.NoError(t, err, "Put RPC failed") | |
| resp, err := c.RawGet(context.Background(), &pb.RawGetRequest{Key: key}) | |
| assert.NoError(t, err, "Get RPC failed") | |
| if !assert.NoError(t, err, "Put RPC failed") { | |
| return | |
| } | |
| _, err = c.RawPut(context.Background(), &pb.RawPutRequest{Key: key, Value: want}) | |
| if !assert.NoError(t, err, "Put RPC failed") { | |
| return | |
| } | |
| resp, err := c.RawGet(context.Background(), &pb.RawGetRequest{Key: key}) | |
| if !assert.NoError(t, err, "Get RPC failed") { | |
| return | |
| } |
| assert.NoError(t, err, "Put RPC failed") | ||
|
|
||
| resp, err := c.RawGet(context.TODO(), &pb.RawGetRequest{Key: key}) | ||
| assert.NoError(t, err, "Get RPC failed") |
There was a problem hiding this comment.
In Test_consistency_satisfy_write_after_read_sequence, the code asserts NoError on RawGet but then immediately uses resp.Value even if the assertion fails. On an RPC error, resp can be nil and this will panic. Gate the resp usage on a successful assert.NoError (or return/continue on error) to keep failures deterministic.
| assert.NoError(t, err, "Get RPC failed") | |
| if !assert.NoError(t, err, "Get RPC failed") { | |
| continue | |
| } |
| resp, err := c.Get(context.TODO(), &pb.GetRequest{Key: key}) | ||
| assert.NoError(t, err, "Get RPC failed") | ||
| assert.Equal(t, want, resp.Value, "consistency check failed") | ||
|
|
||
| txnDeleteEventually(t, ctx, c, &pb.DeleteRequest{Key: key}) | ||
| _, err = c.Delete(context.TODO(), &pb.DeleteRequest{Key: key}) | ||
| assert.NoError(t, err, "Delete RPC failed") | ||
|
|
||
| resp = txnGetEventually(t, ctx, c, &pb.GetRequest{Key: key}) | ||
| resp, err = c.Get(context.TODO(), &pb.GetRequest{Key: key}) | ||
| assert.NoError(t, err, "Get RPC failed") | ||
| assert.Nil(t, resp.Value, "consistency check failed") |
There was a problem hiding this comment.
In Test_grpc_transaction, resp is used after assert.NoError on Get, but if the Get RPC fails resp can be nil and resp.Value will panic. Please short-circuit the loop iteration when the RPC returns an error (e.g., if !assert.NoError(...) { return } / continue) before dereferencing resp.
| deadline := time.Now().Add(leaderProxyRetryBudget) | ||
| var lastErr error | ||
| for attempt := 0; attempt < maxForwardRetries; attempt++ { | ||
| resp, err := p.forward(reqs) | ||
| if err == nil { | ||
| return resp, nil | ||
| for { | ||
| // Each iteration of the outer loop runs up to maxForwardRetries | ||
| // fast retries against whatever leader is currently visible. If | ||
| // none is, we sleep one leaderProxyRetryInterval and re-poll | ||
| // until leaderProxyRetryBudget elapses. | ||
| for attempt := 0; attempt < maxForwardRetries; attempt++ { | ||
| resp, err := p.forward(reqs) | ||
| if err == nil { | ||
| return resp, nil | ||
| } | ||
| lastErr = err | ||
| if errors.Is(err, ErrLeaderNotFound) { | ||
| break | ||
| } | ||
| } | ||
| if !errors.Is(lastErr, ErrLeaderNotFound) { | ||
| return nil, errors.Wrapf(lastErr, "leader forward failed after %d retries", maxForwardRetries) | ||
| } | ||
| lastErr = err | ||
| // If the leader is simply not found, retry won't help immediately. | ||
| if errors.Is(err, ErrLeaderNotFound) { | ||
| return nil, err | ||
| if !time.Now().Before(deadline) { | ||
| return nil, lastErr | ||
| } | ||
| time.Sleep(leaderProxyRetryInterval) |
There was a problem hiding this comment.
forwardWithRetry only treats ErrLeaderNotFound as the “slow-path” retry signal. When forwarding to a stale leader, Internal.Forward currently returns a gRPC error with message "not leader" (adapter/internal.go) rather than kv.ErrLeaderNotFound, so this path will exit after maxForwardRetries and still surface a transient election as an error. Consider classifying that forwarded "not leader" response as transient too (e.g., by mapping Internal.Forward to raftengine.ErrNotLeader on the server side, or by detecting the gRPC status/message here) so brief leader churn is consistently hidden.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: fead00e031
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if errors.Is(err, ErrLeaderNotFound) { | ||
| return true | ||
| } | ||
| return isLeadershipLossError(err) |
There was a problem hiding this comment.
Retry forwarded "not leader" status errors
Dispatch only retries ErrLeaderNotFound and isLeadershipLossError, but a stale-forward path returns adapter/internal's ErrNotLeader over gRPC as a status error (not one of these sentinels). In leader churn, redirect() can hit an old leader and return this transient error, and shouldRetryDispatch will treat it as terminal, so the new coordinator-level retry still leaks transient leadership changes to clients/tests instead of absorbing them. Please classify forwarded "not leader" gRPC errors as transient here as well.
Useful? React with 👍 / 👎.
Addresses the outstanding Copilot / Codex review comments on PR #618. Transport-boundary classification (Codex P1, Copilot) When Coordinate.redirect forwards to a stale leader, the destination returns adapter.ErrNotLeader over gRPC as a generic Unknown status carrying only the string "not leader". errors.Is cannot traverse that wire boundary, so the existing isTransientLeaderError missed it and the retry loop exited early, leaking transient election churn to gRPC clients. Add a last-resort substring match on a closed list of phrases ("not leader", "leader not found") and route both Coordinate.Dispatch and LeaderProxy.forwardWithRetry through the same classifier so a forwarded stale-leader error triggers the slow-path poll instead of an immediate exit. Test-side nil-dereference guards (Copilot) The three consistency tests asserted NoError on each RPC but still dereferenced resp.Value unconditionally. On a failed assert, resp could be nil and the next line would panic, masking the real failure mode. Gate every resp access behind the assert.NoError return value (return for the parallel worker, continue for the sequence/txn loops) and replace the lingering context.TODO() calls with context.Background() so the dispatch paths don't carry a "placeholder" context all the way down. New unit tests (Copilot) kv/coordinator_retry_test.go: - TestIsTransientLeaderError_Classification pins the sentinel + string-match matrix (includes a simulated gRPC-Unknown wrapper). - TestCoordinateDispatch_RetriesTransientLeaderError drives a scripted transactional that fails twice (once with the typed raftengine sentinel, once with the wire string) and verifies Dispatch absorbs both. - TestCoordinateDispatch_NonTransientErrorSurfacesImmediately confirms a business-logic error exits the loop after one attempt. - TestCoordinateDispatch_RefreshesStartTSOnRetry checks that each retry mints a fresh, strictly greater StartTS when the caller passes 0 (the fix from 31ca8bd). - TestCoordinateDispatch_CtxCancelDuringRetrySurfaces verifies ctx.Err() wins over the transient leader error. kv/leader_proxy_test.go: - TestLeaderProxy_ForwardsAfterLeaderPublishes flips the engine's leader address from empty to a real gRPC Forward server after a brief delay and asserts the proxy polls through the gap. - TestLeaderProxy_FailsAfterLeaderBudgetElapses pins the 5s budget by never publishing a leader and asserting ErrLeaderNotFound is surfaced after the budget. Test plan - go vet ./... — clean - go test -race -count=1 ./kv/ — ok 6.7s - go test -race -run '^(Test_consistency_satisfy_write_after_read_sequence|Test_grpc_transaction|Test_consistency_satisfy_write_after_read_for_parallel)$' ./adapter/ -count=1 — ok 210s https://claude.ai/code/session_012boXutHkKDuYHfBeabQCap
Three lint fixes on the new test file: - gci: align the method-receiver block (VerifyLeader vs. LinearizableRead) the formatter prefers. - gosec G115: CommitIndex conversion from the atomic.Int64 counter is safe (monotonic non-negative) but gosec can't see the invariant; annotate inline. - copyloopvar: drop the Go 1.22+ redundant tc := tc shadow inside the classification table loop. https://claude.ai/code/session_012boXutHkKDuYHfBeabQCap
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: e53e7437b6
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| for { | ||
| // Each iteration of the outer loop runs up to maxForwardRetries | ||
| // fast retries against whatever leader is currently visible. If | ||
| // none is (or the forward bounced off a stale leader), we sleep | ||
| // one leaderProxyRetryInterval and re-poll until |
There was a problem hiding this comment.
Check forward retry budget before starting a new cycle
forwardWithRetry only evaluates deadline after running another round of p.forward attempts, so if the budget expires during the prior sleep it still enters this loop and can perform up to maxForwardRetries more RPCs. Because each RPC has leaderForwardTimeout (5s), a request can run far longer than the advertised 5s budget (e.g., no-leader transient errors until deadline, then a newly published but unreachable leader adds up to ~15s more), which breaks the bounded-failure behavior this change is trying to guarantee.
Useful? React with 👍 / 👎.
| lastResp, lastErr = c.dispatchOnce(ctx, reqs) | ||
| if !shouldRetryDispatch(lastErr) { | ||
| return lastResp, lastErr | ||
| } | ||
| if !time.Now().Before(deadline) { | ||
| return lastResp, lastErr |
There was a problem hiding this comment.
Enforce dispatch retry deadline before another dispatch
In Dispatch, the retry deadline is checked only after dispatchOnce returns. If the deadline expires while waiting between retries, the next loop iteration still performs another full dispatch attempt first; when that attempt goes through redirect, it can block for redirectForwardTimeout and push total latency well past the 5s retry budget. This makes the budget non-strict and can unexpectedly extend client-visible tail latency during leader churn.
Useful? React with 👍 / 👎.
Both retry loops evaluated the budget only AFTER a full attempt had run, so the budget was advisory rather than strict. Codex flagged this as P2: - Coordinate.Dispatch (kv/coordinator.go:279) — after the retry back-off, the loop unconditionally ran another dispatchOnce. When the second-chance attempt took the redirect() path it could block for redirectForwardTimeout (5s), pushing total latency well past the advertised dispatchLeaderRetryBudget (5s). - LeaderProxy.forwardWithRetry (kv/leader_proxy.go:93) — after the inner maxForwardRetries cycle ended, the outer loop could sleep and then start another full cycle even if deadline had already passed. Each forward() inside that cycle carried its own 5s RPC timeout detached from the retry deadline, so the same 5s-over problem applied, potentially three times per cycle. Two changes: 1. Re-check the deadline immediately after the back-off sleep so a budget that expired during the sleep terminates the loop before issuing another attempt. Keep the existing pre-sleep check so the loop still short-circuits when the previous attempt itself ran the clock out. This preserves the happy-path (first attempt always runs even when the deadline is tight — deadline = now + budget on entry). 2. In LeaderProxy, derive a parent context with the retry deadline and plumb it into forward(); forward() then uses context.WithTimeout(parentCtx, leaderForwardTimeout) so its effective per-call timeout is min(leaderForwardTimeout, remaining budget). A forward() issued with <5s of budget left caps at exactly the remaining budget rather than the full RPC timeout. The inner fast-retry loop also now checks the deadline before each forward() to avoid kicking off extra RPCs as the budget dribbles away. Coordinate.dispatchOnce still calls transactionManager.Commit, which does not propagate a context down to engine.Propose today, so the local-leader path can still take longer than the budget if Commit itself blocks. That is an orthogonal refactor (the Transactional interface would need a ctx parameter) and is out of scope for this fix; the advertised budget guarantee now holds for the redirect path, which is the one clients observe over gRPC. Test plan - go vet ./... — clean - go test -race -count=1 ./kv/ — ok 6.9s - go test -race -run '^(Test_consistency_satisfy_write_after_read_sequence|Test_grpc_transaction|Test_consistency_satisfy_write_after_read_for_parallel)$' ./adapter/ -count=1 — ok 212s https://claude.ai/code/session_012boXutHkKDuYHfBeabQCap
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 4f105043c1
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| var lastResp *CoordinateResponse | ||
| var lastErr error | ||
| for { | ||
| lastResp, lastErr = c.dispatchOnce(ctx, reqs) |
There was a problem hiding this comment.
Cap dispatch attempt by remaining retry budget
Dispatch enforces dispatchLeaderRetryBudget only with wall-clock checks around retries, but this call still invokes dispatchOnce with the original context, so a near-expiry iteration can start a follower redirect() that uses its own 5s per-RPC timeout and run well past the advertised budget. This means a request can exceed the 5s bound (e.g., leader address appears just before budget expiry but is unreachable), which defeats the bounded-failure behavior this change is trying to guarantee. Fresh evidence in this revision is that even with the new post-sleep deadline guard, the per-attempt context at this call site is not bounded by remaining budget.
Useful? React with 👍 / 👎.
Codex P2 follow-up to 4f10504: that commit added wall-clock checks around the retry loop, but the per-attempt context handed to dispatchOnce was still the caller's original ctx. A near-expiry iteration could legitimately enter dispatchOnce, dispatch through redirect(), and sit in cli.Forward for the full 5s redirectForwardTimeout — the post-sleep wall-clock check would trip on the next pass, but the offending call had already exceeded the advertised 5s dispatchLeaderRetryBudget. Derive a deadline-bounded boundedCtx via context.WithDeadline(ctx, deadline) once at the top of the retry loop and pass it to dispatchOnce. context.WithDeadline picks the earlier of the caller's and our own deadline, so callers with tighter deadlines keep their cancellation semantics. The waitForDispatchRetry call still uses the caller's ctx so caller cancellation surfaces cleanly via ctx.Done in the back-off select. Add an explicit ctx.Err() check after each dispatchOnce so that caller cancellation/deadline takes precedence over any gRPC error that wraps boundedCtx's DeadlineExceeded — without that check the caller would see a context-deadline error from inside the gRPC stack instead of their own ctx.Err(). The local-leader path (transactionManager.Commit -> engine.Propose) still cannot honor the bounded ctx today because Transactional.Commit does not take one; that is an orthogonal interface change. The budget guarantee now holds for the redirect path, which is what external gRPC clients observe. Test plan - go vet ./... — clean - go test -race -count=1 ./kv/ — ok 6.8s - go test -race -run '^(Test_consistency_satisfy_write_after_read_sequence|Test_grpc_transaction|Test_consistency_satisfy_write_after_read_for_parallel)$' ./adapter/ -count=1 — ok 201s https://claude.ai/code/session_012boXutHkKDuYHfBeabQCap
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (5)
adapter/grpc_test.go (2)
149-183: Cleanup: usedefer shutdown(nodes)like the sibling tests.Lines 181–182 call
wg.Wait()and thenshutdown(nodes)withoutdefer. If any worker goroutine panics or blocks (e.g., a worker is mid-retry inside the coordinator's 5s budget when the enclosing test is cancelled by the Go test timeout),wg.Wait()never returns and the nodes — including their raft state and open listeners — leak across the remainder of the package's test run. The sibling test at Line 189 already usesdefer shutdown(nodes); match that.♻️ Proposed change
nodes, adders, _ := createNode(t, 3) c := rawKVClient(t, adders) + defer shutdown(nodes) wg := sync.WaitGroup{} const workers = 1000 wg.Add(workers) for i := range workers { go func(i int) { ... }(i) } wg.Wait() - shutdown(nodes) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@adapter/grpc_test.go` around lines 149 - 183, In Test_consistency_satisfy_write_after_read_for_parallel move the shutdown(nodes) call to a defer placed immediately after nodes are created (right after the createNode(...) call) so that the cluster is always torn down even if wg.Wait() blocks or a goroutine panics; locate the Test_consistency_satisfy_write_after_read_for_parallel function and replace the final explicit shutdown(nodes) invocation with a defer shutdown(nodes) near the top of the test (after nodes, adders, _ := createNode(...)).
193-251:continueonassert.NoErrorfailure amplifies noise on first regression.Both 9999-iteration loops (
Test_consistency_satisfy_write_after_read_sequence,Test_grpc_transaction) useassert.NoError(...) ; continueon every RPC. If the coordinator's retry logic ever fails to absorb a transient leader error on iteration 0,assert.NoErrorrecords a failure — but the loop then runs 9998 more iterations, each re-issuing the same broken RPC and appending another assertion failure to the report. A single logical bug will generate thousands of lines of test output, making root-cause analysis painful.Consider promoting these to
require.NoError(test stops on first real failure) orif !assert.NoError(...) { break }— whichever better matches the intent. Thecontinuepattern silently turns correctness failures into throughput noise.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@adapter/grpc_test.go` around lines 193 - 251, Both loops in the tests (the 9999-iteration loop that exercises c.RawPut/c.RawGet in Test_consistency_satisfy_write_after_read_sequence and the loop in Test_grpc_transaction that uses c.Put/c.Get/c.Delete) use assert.NoError(...); continue which multiplies failures; change those checks so the test stops on the first RPC failure: replace assert.NoError(...) with require.NoError(...) (from testify/require) for the RPC calls (RawPut/RawGet/Put/Get/Delete) or, if you prefer not to pull in require, change the post-assert handling to if !assert.NoError(...) { break } to avoid repeating the same error across thousands of iterations.kv/coordinator_retry_test.go (1)
186-218: Add a test that pins "success wins over concurrent cancel".This test validates that cancelling during back-off surfaces
context.Canceled, but does not cover the inverse race:dispatchOncereturns success on the same iteration that the caller cancels. That ordering is the one flagged onkv/coordinator.go:286-299. Consider adding a sibling case whereonCommitcancelsctxbut the scripted error slice for that call is empty (so Commit succeeds), and assertingrequire.NoError(err)plus a non-zeroCommitIndex. It pins the contract so future refactors of the retry loop can't re-introduce the silent-discard regression.Minor nit while you're here: this test reconstructs the
Coordinateinline at Lines 208–212, while the other tests go throughnewRetryCoordinate. Either channel the inline construction through the helper or accept that the helper is for the common case — worth being consistent.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@kv/coordinator_retry_test.go` around lines 186 - 218, Add a sibling test to TestCoordinateDispatch_CtxCancelDuringRetrySurfaces that verifies "success wins over concurrent cancel": create a scriptedTransactional whose errs slice yields a success (empty error) on the same call where its onCommit cancels the ctx, call Coordinate.Dispatch (use newRetryCoordinate helper or replace the inline Coordinate construction to be consistent), then assert require.NoError(t, err) and that the returned CommitIndex is non-zero; ensure the test targets the same retry/backoff race (i.e., cancel inside onCommit) so dispatchOnce's success path beats the concurrent cancel.kv/leader_proxy_test.go (1)
256-291: Consider gating the 5-second pinned test behindtesting.Short().This test is explicitly designed to exhaust
leaderProxyRetryBudget(5s) to pin the bounded-termination contract.t.Parallel()keeps it from serializing with peers, but every developer runninggo test -short ./kv/...still pays the full 5s just for this case. Since the test is a pinned-contract check rather than a fast-feedback correctness test, guard it withif testing.Short() { t.Skip("exhausts 5s retry budget; skipping in -short mode") }at the top. CI running without-shortwill still exercise it.♻️ Proposed change
func TestLeaderProxy_FailsAfterLeaderBudgetElapses(t *testing.T) { t.Parallel() + if testing.Short() { + t.Skip("exhausts the full 5s leaderProxyRetryBudget; skipped in -short mode") + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@kv/leader_proxy_test.go` around lines 256 - 291, The test TestLeaderProxy_FailsAfterLeaderBudgetElapses currently always waits the full leaderProxyRetryBudget (5s); gate it for fast runs by adding a short-mode skip at the top: check testing.Short() and call t.Skip("exhausts 5s retry budget; skipping in -short mode") if true. Edit the test function TestLeaderProxy_FailsAfterLeaderBudgetElapses to perform this skip before creating the togglingFollowerEngine or calling NewLeaderProxyWithEngine so CI still runs the test but local `go test -short` skips it.kv/coordinator.go (1)
439-478: Substring classification is correct but brittle against future error-text drift.
hasTransientLeaderPhrasedoes a lowercasestrings.Containsover a closed allowlist ("not leader","leader not found"). The doc-comment rightly narrows the risk, but there is no compile-time or test-time coupling between this list and the upstreamadapter.ErrNotLeader/raftengine.ErrNotLeadermessage strings — if either sentinel'sError()text is reworded in a future PR, this match silently stops triggering and transient churn starts surfacing to gRPC clients as hard failures again.Worth considering for a follow-up: add a targeted test that exercises the real
.Error()strings of those sentinels (and of therpc error: code = Unknown desc = …wrappergoogle.golang.org/grpc/status.FromErrorproduces) so future message renames are caught by CI rather than by production re-election windows. The classification table inkv/coordinator_retry_test.goalready covers synthetic strings; pinning the sentinels themselves would close the gap.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@kv/coordinator.go` around lines 439 - 478, The substring-based fallback in hasTransientLeaderPhrase/leaderErrorPhrases is brittle; add a unit test that directly asserts the current .Error() output of the real sentinel errors (e.g., adapter.ErrNotLeader, raftengine.ErrNotLeader, kv.ErrLeaderNotFound) and of a grpc-wrapped Unknown/desc error (use status.FromError on a constructed error) to ensure any future rewording is caught by CI; if those .Error() strings change the test will fail, prompting maintenance of leaderErrorPhrases or a migration to a safer sentinel check in isTransientLeaderError.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@kv/coordinator.go`:
- Around line 286-299: The loop currently checks ctx.Err() before honoring a
successful dispatch, which causes successful lastResp from dispatchOnce to be
discarded when ctx is canceled; change the ordering in the retry loop so that if
lastErr == nil you immediately return lastResp, nil (preserving
committed-success semantics), and only when lastErr != nil perform the ctx.Err()
check and return ctx error (wrapped) if present; keep the existing
shouldRetryDispatch(lastErr) logic for non-nil errors so retries still occur as
before.
In `@kv/leader_proxy.go`:
- Around line 102-128: The inner retry loop can exit without ever calling
p.forward, leaving lastErr == nil and causing a (nil, nil) return; add a
defensive check immediately after the inner loop (before calling
isTransientLeaderError(lastErr)) to handle lastErr == nil explicitly — e.g.,
return nil, errors.WithStack(ErrLeaderNotFound) — so update the code around the
symbols lastErr, isTransientLeaderError, ErrLeaderNotFound, forward,
maxForwardRetries and deadline in leader_proxy.go to return a non-nil error when
no RPC was attempted.
---
Nitpick comments:
In `@adapter/grpc_test.go`:
- Around line 149-183: In Test_consistency_satisfy_write_after_read_for_parallel
move the shutdown(nodes) call to a defer placed immediately after nodes are
created (right after the createNode(...) call) so that the cluster is always
torn down even if wg.Wait() blocks or a goroutine panics; locate the
Test_consistency_satisfy_write_after_read_for_parallel function and replace the
final explicit shutdown(nodes) invocation with a defer shutdown(nodes) near the
top of the test (after nodes, adders, _ := createNode(...)).
- Around line 193-251: Both loops in the tests (the 9999-iteration loop that
exercises c.RawPut/c.RawGet in
Test_consistency_satisfy_write_after_read_sequence and the loop in
Test_grpc_transaction that uses c.Put/c.Get/c.Delete) use assert.NoError(...);
continue which multiplies failures; change those checks so the test stops on the
first RPC failure: replace assert.NoError(...) with require.NoError(...) (from
testify/require) for the RPC calls (RawPut/RawGet/Put/Get/Delete) or, if you
prefer not to pull in require, change the post-assert handling to if
!assert.NoError(...) { break } to avoid repeating the same error across
thousands of iterations.
In `@kv/coordinator_retry_test.go`:
- Around line 186-218: Add a sibling test to
TestCoordinateDispatch_CtxCancelDuringRetrySurfaces that verifies "success wins
over concurrent cancel": create a scriptedTransactional whose errs slice yields
a success (empty error) on the same call where its onCommit cancels the ctx,
call Coordinate.Dispatch (use newRetryCoordinate helper or replace the inline
Coordinate construction to be consistent), then assert require.NoError(t, err)
and that the returned CommitIndex is non-zero; ensure the test targets the same
retry/backoff race (i.e., cancel inside onCommit) so dispatchOnce's success path
beats the concurrent cancel.
In `@kv/coordinator.go`:
- Around line 439-478: The substring-based fallback in
hasTransientLeaderPhrase/leaderErrorPhrases is brittle; add a unit test that
directly asserts the current .Error() output of the real sentinel errors (e.g.,
adapter.ErrNotLeader, raftengine.ErrNotLeader, kv.ErrLeaderNotFound) and of a
grpc-wrapped Unknown/desc error (use status.FromError on a constructed error) to
ensure any future rewording is caught by CI; if those .Error() strings change
the test will fail, prompting maintenance of leaderErrorPhrases or a migration
to a safer sentinel check in isTransientLeaderError.
In `@kv/leader_proxy_test.go`:
- Around line 256-291: The test TestLeaderProxy_FailsAfterLeaderBudgetElapses
currently always waits the full leaderProxyRetryBudget (5s); gate it for fast
runs by adding a short-mode skip at the top: check testing.Short() and call
t.Skip("exhausts 5s retry budget; skipping in -short mode") if true. Edit the
test function TestLeaderProxy_FailsAfterLeaderBudgetElapses to perform this skip
before creating the togglingFollowerEngine or calling NewLeaderProxyWithEngine
so CI still runs the test but local `go test -short` skips it.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 843159f8-c75e-4c16-8508-7bed12436eee
📒 Files selected for processing (6)
adapter/grpc_test.goadapter/test_util.gokv/coordinator.gokv/coordinator_retry_test.gokv/leader_proxy.gokv/leader_proxy_test.go
💤 Files with no reviewable changes (1)
- adapter/test_util.go
Two CodeRabbit-actionable issues plus the accompanying nit cluster.
Actionable: Dispatch must not convert a successful commit into a
ctx error (coordinator.go).
The retry loop used to check ctx.Err() ahead of lastErr == nil.
If dispatchOnce returned success concurrently with caller
cancellation, the commit was already durable in the FSM, but the
loop handed the caller ctx.Err() and discarded lastResp. A
retrying client would then re-issue the write and — for
non-idempotent ops — risk observable duplicate effects. Reorder so
a nil error returns lastResp, nil immediately; ctx.Err() only
wins on an error path.
New test TestCoordinateDispatch_SuccessBeatsConcurrentCancel
pins the contract by cancelling inside Commit just before it
returns success and asserting NoError + non-zero CommitIndex.
Actionable: LeaderProxy.forwardWithRetry defensive nil guard
(leader_proxy.go).
The inner fast-retry loop can, under a future shorter budget or a
clock jump between the deadline computation and the first
time.Now() check, exit without ever calling forward() and leave
lastErr == nil. errors.Wrapf(nil, ...) is nil, so the next
classification step would return (nil, nil) — a silent success
the proxy never produced. Add an explicit
if lastErr == nil { return nil, ErrLeaderNotFound } after the
inner loop so that path surfaces a real error regardless of
future refactors.
Test-side nits:
- adapter/grpc_test.go: move shutdown(nodes) to defer in the
parallel test so a wedged worker or a test-timeout doesn't leak
raft nodes into sibling tests.
- adapter/grpc_test.go: swap continue -> break in the
9999-iteration sequence/transaction loops. A genuine regression
used to emit thousands of duplicate assertion lines, drowning
the root cause; now the first failure short-circuits the rest
of the loop while defer shutdown still runs.
- kv/leader_proxy_test.go: gate
TestLeaderProxy_FailsAfterLeaderBudgetElapses behind
testing.Short() so `go test -short ./kv/...` runs in ~100ms
instead of 5s+. CI invocations without -short still exercise
the budget-exhaust contract.
- kv/coordinator_retry_test.go:
TestIsTransientLeaderError_PinsRealSentinels asserts the three
upstream sentinel values (kv.ErrLeaderNotFound,
raftengine.ErrNotLeader, raftengine.ErrLeadershipLost) still
classify as transient. A future rename of those error messages
that drifts them out of leaderErrorPhrases would now fail CI
instead of silently surfacing transient leader churn to gRPC
clients. adapter.ErrNotLeader can't be pinned here without a
kv->adapter test-time import cycle; a symmetric pin belongs in
the adapter test package (out of scope for this commit).
Test plan
- go vet ./... — clean
- go test -race -count=1 ./kv/ — ok 6.9s
- go test -count=1 -short ./kv/ -run 'TestLeaderProxy' — 0.14s
(5s budget-exhaust test correctly skipped)
- go test -race -run '^(Test_consistency_satisfy_write_after_read_sequence|Test_grpc_transaction|Test_consistency_satisfy_write_after_read_for_parallel)$' ./adapter/ -count=1 — ok 245s
https://claude.ai/code/session_012boXutHkKDuYHfBeabQCap
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: afd8e301d2
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if !isTransientLeaderError(lastErr) { | ||
| return nil, errors.Wrapf(lastErr, "leader forward failed after %d retries", maxForwardRetries) |
There was a problem hiding this comment.
Guard nil error before wrapping forward retry failure
If the retry budget is already exhausted before the inner loop runs its first p.forward call (for example after a long scheduler pause/VM stall), lastErr remains nil and this branch executes errors.Wrapf(lastErr, ...), which returns nil. That lets forwardWithRetry return (nil, nil) as if it succeeded, and callers like adapter/internal.go then dereference the nil transaction response (r.CommitIndex), causing a panic instead of a bounded leader-unavailable error.
Useful? React with 👍 / 👎.
…orwardWithRetry golangci cyclop flagged both Dispatch (11) and forwardWithRetry (11) after the CodeRabbit-major success-vs-cancel ordering fix and the defensive nil-err guard in the previous commit pushed them over the limit (10). Split out small single-purpose helpers so the hot-path functions stay readable without reshuffling semantics: - kv/coordinator.go: extract prepareDispatchRetry(ctx, reqs, leaderAssignsTS, timer) that bundles the "clear txn timestamps + wait one back-off interval" bookkeeping. Dispatch now does one helper call instead of an if-block + wait call, dropping its cyclomatic complexity from 11 to 10. - kv/leader_proxy.go: extract runForwardCycle(parentCtx, reqs, deadline) that runs the inner maxForwardRetries fast-retry cycle and returns (resp, err, done). done=true is the terminal branch (commit or non-transient error); done=false feeds the outer slow poll. forwardWithRetry drops from 11 to 7. Both defensive nil-err guards from afd8e30 are preserved: runForwardCycle only calls errors.Wrapf when lastErr != nil, and forwardWithRetry still surfaces ErrLeaderNotFound when the cycle exited without ever calling forward(). A Codex P2 flag on this path was a false-positive against afd8e30 (the nil-guard was already in place there). Test plan - go vet ./... — clean - go test -race -count=1 ./kv/ — ok 7.4s - go test -race -run '^(Test_consistency_satisfy_write_after_read_sequence|Test_grpc_transaction|Test_consistency_satisfy_write_after_read_for_parallel)$' ./adapter/ -count=1 — ok 283s https://claude.ai/code/session_012boXutHkKDuYHfBeabQCap
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: db58eef3e9
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| var leaderErrorPhrases = []string{ | ||
| "not leader", // adapter.ErrNotLeader, raftengine.ErrNotLeader, ErrLeadershipLost messages | ||
| "leader not found", // kv.ErrLeaderNotFound, adapter.ErrLeaderNotFound | ||
| } |
There was a problem hiding this comment.
Classify leadership-loss wire errors as transient
The transient classifier only whitelists "not leader" and "leader not found", so forwarded gRPC failures like "raft engine: leadership lost" and "raft engine: leadership transfer in progress" are treated as terminal after crossing the RPC boundary (where errors.Is no longer matches sentinels). In leader churn, Dispatch/forwardWithRetry will leak these transient failures to clients instead of using the new bounded retry path, which defeats the intended behavior of this change under real re-elections.
Useful? React with 👍 / 👎.
Codex P1 follow-up. The transient classifier whitelist was missing two sentinel strings: - raftengine.ErrLeadershipLost → "raft engine: leadership lost" - raftengine.ErrLeadershipTransferInProgress → "raft engine: leadership transfer in progress" When either sentinel crosses a gRPC boundary (e.g. a follower forwards to a leader that steps down mid-proposal, or Internal.Forward returns these to redirect()), the typed chain is dropped and only the message text survives. isLeadershipLossError's errors.Is check fails, and without a matching entry in leaderErrorPhrases the substring fallback also misses — so forwardWithRetry / Dispatch would treat a genuine re-election signal as terminal and leak the transient error to gRPC clients, defeating the bounded-retry contract of this PR. Extend leaderErrorPhrases to include "leadership lost" and "leadership transfer in progress", pair each phrase with the sentinel that emits it in the inline comment, and add four new classifier cases + one sentinel pin (ErrLeadershipTransferInProgress) so a future rename on either side fails CI rather than surfacing at production re-election time. Test plan - go vet ./... — clean - go test -race -count=1 ./kv/ -run 'TestIsTransientLeaderError' -v — ok, all 13 new/existing cases pass - go test -race -count=1 ./kv/ — ok 7.1s - go test -race -run '^(Test_consistency_satisfy_write_after_read_sequence|Test_grpc_transaction|Test_consistency_satisfy_write_after_read_for_parallel)$' ./adapter/ -count=1 — ok 256s https://claude.ai/code/session_012boXutHkKDuYHfBeabQCap
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 7ce64f5014
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| msg := strings.ToLower(err.Error()) | ||
| for _, phrase := range leaderErrorPhrases { | ||
| if strings.Contains(msg, phrase) { | ||
| return true |
There was a problem hiding this comment.
Stop classifying arbitrary error text as transient leader loss
hasTransientLeaderPhrase retries whenever err.Error() contains phrases like "not leader", but several non-leader errors include user-controlled text in their message (for example, store.WriteConflictError.Error() formats as key: <key>: write conflict). If a conflicted key contains one of these phrases, Dispatch/forwardWithRetry will misclassify a real business error as transient, spin retries for up to the 5s budget, and can even alter outcome by reissuing with fresh timestamps instead of surfacing the original conflict immediately.
Useful? React with 👍 / 👎.
Codex P2 follow-up. hasTransientLeaderPhrase used strings.Contains, so
any non-leader error whose text happened to embed one of the
allowlisted phrases was misclassified as transient. The documented
failure mode: store.WriteConflictError formats as
"key: <user-key>: write conflict", and a user-chosen key containing
"not leader" (or any other allowlisted phrase) would trip the
Contains match. Dispatch / forwardWithRetry would then spin retries
for up to the 5s budget — reissuing a txn with fresh StartTS each
time — instead of surfacing the original conflict immediately.
Switched to strings.HasSuffix. I verified the actual rendered output
of every wrapper the retry path produces:
cerrors.Wrapf(leaderErr, "forward failed after %d retries", 3)
-> "forward failed after 3 retries: leader not found"
cerrors.WithStack(leaderErr)
-> "leader not found"
fmt.Errorf("forwarded: %w", leaderErr)
-> "forwarded: leader not found"
gRPC status Errorf
-> "rpc error: code = Unknown desc = leader not found"
gRPC wrap stacked under cerrors.Wrapf
-> "forward: rpc error: code = Unknown desc = leader not found"
Every observed wrapper places the original error at the END of the
composed string, so a suffix check remains sufficient while rejecting
WriteConflictError's middle-of-string leak. Pinned both "sneaky key"
shapes in TestIsTransientLeaderError_Classification so the tradeoff
can't silently regress.
Test plan
- go vet ./... — clean
- go test -race -count=1 ./kv/ — ok 7.0s, includes two new
"sneaky key" write-conflict cases that now correctly stay
non-transient
- go test -race -run '^(Test_consistency_satisfy_write_after_read_sequence|Test_grpc_transaction|Test_consistency_satisfy_write_after_read_for_parallel)$' ./adapter/ -count=1 — ok 187s
https://claude.ai/code/session_012boXutHkKDuYHfBeabQCap
| // idx is strictly non-negative (atomic.Add on an int64 that starts | ||
| // at 0 and only ever increments), so the conversion to uint64 for | ||
| // CommitIndex is safe. gosec G115 can't see that invariant, so | ||
| // silence it inline. | ||
| return &TransactionResponse{CommitIndex: uint64(idx + 1)}, nil //nolint:gosec | ||
| } |
There was a problem hiding this comment.
In this test helper, the signed->unsigned conversion is currently silenced with //nolint:gosec (G115). Since this is new code, it’d be better to avoid introducing a linter suppression by using an unsigned counter (e.g., atomic.Uint64) or otherwise keeping the value as uint64 throughout, so gosec doesn’t have to be bypassed.
| func waitForDispatchRetry(ctx context.Context, timer *time.Timer, interval time.Duration) error { | ||
| timer.Reset(interval) | ||
| select { | ||
| case <-ctx.Done(): | ||
| return errors.WithStack(ctx.Err()) | ||
| case <-timer.C: | ||
| return nil | ||
| } |
There was a problem hiding this comment.
The retry loop sleeps a fixed dispatchLeaderRetryInterval even when the remaining time in dispatchLeaderRetryBudget is smaller, so overall Dispatch latency can exceed the advertised budget by up to one interval. Consider capping the sleep to min(dispatchLeaderRetryInterval, time.Until(deadline)) (or passing a deadline-bounded context into the wait) to keep the budget strictly bounded.
| if !time.Now().Before(deadline) { | ||
| return nil, lastErr | ||
| } | ||
| time.Sleep(leaderProxyRetryInterval) | ||
| // Re-check the deadline AFTER the back-off: if the budget is | ||
| // exhausted, do not enter another maxForwardRetries cycle | ||
| // (which could issue up to three more RPCs, each bounded by | ||
| // leaderForwardTimeout relative to the now-expired deadline). | ||
| if !time.Now().Before(deadline) { | ||
| return nil, lastErr | ||
| } |
There was a problem hiding this comment.
time.Sleep(leaderProxyRetryInterval) isn’t capped by the remaining retry budget and isn’t interruptible by the deadline context, so forwardWithRetry can exceed leaderProxyRetryBudget by up to one interval. Consider sleeping for min(leaderProxyRetryInterval, time.Until(deadline)) and/or using a timer + parentCtx.Done() select to keep total time strictly within the budget.
Three Copilot points on e410490, all genuine: 1. (kv/coordinator.go) waitForDispatchRetry slept a fixed dispatchLeaderRetryInterval even when the remaining time in dispatchLeaderRetryBudget was smaller. Worst case: Dispatch exceeded its advertised budget by up to one full interval (25ms). Pass deadline into the helper and cap sleep at min(interval, time.Until(deadline)) so the budget is strictly bounded. 2. (kv/leader_proxy.go) time.Sleep(leaderProxyRetryInterval) was neither capped nor interruptible. It both slept past the budget by up to one interval AND ignored parentCtx, so a context cancellation in the budget-bounded parentCtx would still wait the full 25ms before tearing down. Replace with a timer + parentCtx select and cap at min(interval, remaining budget). 3. (kv/coordinator_retry_test.go) the scriptedTransactional commit counter was atomic.Int64 with a //nolint:gosec G115 suppression on the CommitIndex return. Switching to atomic.Uint64 drops both the unsigned conversion and the linter bypass — the counter is only ever monotonically incremented from zero so a signed type was never meaningful here. Test plan - go vet ./... — clean - go test -race -count=1 ./kv/ — ok 6.8s - go test -race -run '^(Test_consistency_satisfy_write_after_read_sequence|Test_grpc_transaction|Test_consistency_satisfy_write_after_read_for_parallel)$' ./adapter/ -count=1 — ok 190s https://claude.ai/code/session_012boXutHkKDuYHfBeabQCap
Two golangci flags on 640f009: - kv/coordinator_retry_test.go G115: the atomic.Uint64 switch in the previous commit moved the conversion from "uint64 return" to "int(idx) < len(s.errs)" on the slice bounds check, which just reintroduces G115 on the other side. Change errs from []error to map[uint64]error so the whole counter / index path stays unsigned end-to-end. Map lookup returns the zero value (nil error) on absent keys, so no bounds check is required and no int<->uint64 conversion is needed anywhere. No //nolint:gosec suppression required now. - kv/leader_proxy.go cyclop: the new timer+select back-off pushed forwardWithRetry's branch count to 11 (max 10). Extract waitLeaderProxyBackoff(parentCtx, interval, deadline) that bundles the cap + interruptible select so forwardWithRetry stays at 7 branches. Behaviour is unchanged — sleep is still capped at min(interval, remaining budget) and the select still wakes on parentCtx.Done(). Four existing scriptedTransactional call sites updated from slice literals to indexed map literals (errs: map[uint64]error{0: ..., 1: ...}) to match the new API. Test plan - go vet ./... — clean - go test -race -count=1 ./kv/ — ok 6.8s - go test -race -run '^(Test_consistency_satisfy_write_after_read_sequence|Test_grpc_transaction|Test_consistency_satisfy_write_after_read_for_parallel)$' ./adapter/ -count=1 — ok 191s https://claude.ai/code/session_012boXutHkKDuYHfBeabQCap
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 846125befc
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if !shouldRetryDispatch(lastErr) { | ||
| return lastResp, lastErr | ||
| } |
There was a problem hiding this comment.
Preserve leader error when retry budget expires mid-dispatch
If an attempt starts with only a small amount of dispatchLeaderRetryBudget remaining, dispatchOnce can fail with context.DeadlineExceeded from the internal boundedCtx (for example while redirect() is waiting on cli.Forward). This branch returns that non-transient error immediately, even when the caller context is still alive and previous failures were transient leader-churn signals, so clients see an internal timeout instead of the intended leader-unavailable outcome after bounded retries.
Useful? React with 👍 / 👎.
Codex P2 on 846125b. When Dispatch's bounded retry budget fires while dispatchOnce is mid-call (for example redirect() waiting on cli.Forward), boundedCtx cancels the RPC and dispatchOnce returns a context.DeadlineExceeded wrapped through the gRPC stack. The previous code then took the "not transient, return as-is" path and handed that DeadlineExceeded to the gRPC caller — clients saw a generic internal timeout even though every prior attempt during the 5s window had reported a transient leader-churn signal, which is the behaviour client retry logic actually expects. Track the most recent TRANSIENT leader error we observed (lastTransientErr) separately from lastErr, and pick between them in a new finalDispatchErr helper when the retry loop terminates via shouldRetryDispatch == false. Rule: if our own retry budget has expired and we previously saw a transient leader error, surface that — the deadline is just how the loop noticed the budget ran out, not the real failure mode. Otherwise (business error inside the budget, or first attempt ever), surface lastErr unchanged. Pinned four cases in kv/coordinator_retry_test.go: 1. past deadline with a transient recorded → returns the transient (this is the Codex-flagged scenario) 2. past deadline with no transient seen → returns lastErr (prevents the helper from clobbering a genuine first-attempt failure with nil) 3. within budget, non-transient error → returns lastErr (write conflict / validation must not be masked by a stale transient) 4. within budget, nil transient → returns lastErr (nil-guard) Test plan - go vet ./... — clean - go test -race -count=1 ./kv/ — ok 6.7s (includes the new TestFinalDispatchErr_PrefersTransientOnBudgetExpiry pin) - go test -race -run '^(Test_consistency_satisfy_write_after_read_sequence|Test_grpc_transaction|Test_consistency_satisfy_write_after_read_for_parallel)$' ./adapter/ -count=1 — ok 182s https://claude.ai/code/session_012boXutHkKDuYHfBeabQCap
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.
Comments suppressed due to low confidence (1)
kv/coordinator.go:461
- The comment in
isTransientLeaderErrorsays the gRPC-fallback uses a "case-insensitive substring match", buthasTransientLeaderPhraseactually usesstrings.HasSuffix(suffix matching). Please update the comment to match the implementation (suffix match), since the distinction is important for avoiding accidental retries on business errors containing user-controlled text.
// the actual safety window, never longer). Sampled from the
// monotonic-raw clock so NTP slew/step cannot push the lease
// past its true safety window (see internal/monoclock).
// * expectedGen: if a leader-loss callback fires between this
// sample and the post-dispatch extend, the generation will
// have advanced; extend(expectedGen) will see the mismatch
// and refuse to resurrect the lease. Capturing gen INSIDE
// extend would observe the post-invalidate value as current.
dispatchStart := monoclock.Now()
expectedGen := c.lease.generation()
| // Publish the leader address after ~100ms (several poll intervals, | ||
| // well inside leaderProxyRetryBudget). forwardWithRetry should be | ||
| // mid-poll at that point and converge on success. | ||
| publishDelay := 100 * time.Millisecond | ||
| go func() { | ||
| time.Sleep(publishDelay) | ||
| eng.setLeader(lis.Addr().String()) | ||
| }() | ||
|
|
||
| reqs := []*pb.Request{ | ||
| { | ||
| IsTxn: false, | ||
| Phase: pb.Phase_NONE, | ||
| Ts: 10, | ||
| Mutations: []*pb.Mutation{ | ||
| {Op: pb.Op_PUT, Key: []byte("k"), Value: []byte("v")}, | ||
| }, | ||
| }, | ||
| } | ||
|
|
||
| start := time.Now() | ||
| resp, err := p.Commit(reqs) | ||
| elapsed := time.Since(start) | ||
| require.NoError(t, err) | ||
| require.Equal(t, uint64(42), resp.CommitIndex) | ||
| // The proxy must have waited at least until setLeader fired; | ||
| // otherwise it did not actually poll the missing-leader window. | ||
| require.GreaterOrEqual(t, elapsed, publishDelay) |
There was a problem hiding this comment.
In TestLeaderProxy_ForwardsAfterLeaderPublishes, elapsed is measured starting at start := time.Now() after the goroutine that sleeps publishDelay is launched. Any setup time between launching that goroutine and recording start reduces the observed elapsed, so require.GreaterOrEqual(t, elapsed, publishDelay) can fail even when the proxy correctly waits for leader publication. Consider recording start before starting the publish goroutine, or synchronizing on a channel so the delay is measured relative to the same start instant as the Commit call.
|
Reviewed the new round of Copilot / Codex comments against current HEAD (640f009). Nothing actionable: Forwarded "not leader" classification (Copilot + Codex, multiple comments). Missing retry-loop unit tests (Copilot x2). Budget strictness (Codex P2 x3). ctx cancellation return semantics (Codex P2). grpc_test.go concerns (Copilot x4). The consistent pattern across the Copilot round is the reviewer analysing the pre-revision state; the Codex P2 round restates points already committed earlier in the PR history (afd8e30, 3b136d5, 4f10504, 640f009). Leaving the code as-is and declining silently rather than re-litigating. /gemini review |
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
…t timing Two Copilot items on 29e1f2e, both small: - kv/coordinator.go:490: the doc-comment on isTransientLeaderError still said the wire-level fallback uses a "case-insensitive substring match". The implementation switched to strings.HasSuffix back in e410490 so WriteConflictError with a user-chosen key matching a leader phrase stays non-transient. Reword the doc to say SUFFIX match and spell out why (user-text in WriteConflictError keys). - kv/leader_proxy_test.go: in TestLeaderProxy_ForwardsAfterLeaderPublishes the start instant was captured after the `go func(){ time.Sleep(publishDelay) ...}()` launch, so the goroutine-scheduling delay ate into the observed elapsed and `require.GreaterOrEqual(t, elapsed, publishDelay)` could flake on slow runners even when the proxy did wait the full delay. Move `start := time.Now()` above the publish goroutine so the publishDelay lower bound is measured from the same instant as the Commit call. Test plan - go vet ./... — clean - go test -race -count=1 ./kv/ — ok 7.0s (incl. the now-stable TestLeaderProxy_ForwardsAfterLeaderPublishes) https://claude.ai/code/session_012boXutHkKDuYHfBeabQCap
|
Codex Review: Didn't find any major issues. More of your lovely PRs please. ℹ️ About Codex in GitHubYour team has set up Codex to review pull requests in this repo. Reviews are triggered when you
If Codex has suggestions, it will comment; otherwise it will react with 👍. Codex can also answer questions or update the PR. Try commenting "@codex address that feedback". |
Summary
This PR moves leader-unavailability retry logic from the test layer into the core coordinator and leader proxy, ensuring that transient raft leadership churn is transparently absorbed by the system rather than surfaced to gRPC clients. This provides linearizable semantics where operations either commit atomically or fail definitively, without leaking internal cluster state transitions.
Key Changes
Coordinator retry loop: Added
Dispatch()wrapper that retries transient leader errors (no leader resolvable, local node stepped down, stale leader rejection) within a bounded 5-second budget before surfacing errors to callers. Non-transient errors (write conflicts, validation failures) are returned immediately.Leader proxy retry enhancement: Extended
forwardWithRetry()to interleave two retry signals:Test simplification: Removed helper functions (
rawPutEventually,rawGetEventually,txnPutEventually,txnGetEventually,txnDeleteEventually) and retry logic from test code since the coordinator now handles transient errors transparently. Tests now use direct assertions instead of eventual-consistency patterns.New error classification: Added
isTransientLeaderError()to distinguish recoverable leadership transitions from business-logic failures that must surface to callers unchanged.Implementation Details
StartTSissuance is kept inside the per-attempt path to ensure fresh HLC ceiling after leader transitions, maintaining monotonicity guaranteeshttps://claude.ai/code/session_012boXutHkKDuYHfBeabQCap
Summary by CodeRabbit
New Features
Tests