From 78b9d0914bc9967508039c9bd41d6625c14ac66a Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 25 Apr 2026 21:35:03 +0900 Subject: [PATCH 1/2] test(grpc): retry leader churn in consistency loops without weakening the check MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Test_consistency_satisfy_write_after_read_sequence and Test_grpc_transaction both fire 9999 Put/Get (and Delete/Get) round trips against a 3-node Raft cluster. On a busy CI runner Raft re-election can fire mid-loop; the in-flight RPC then surfaces as "rpc error: code = Unknown desc = leader not found" or "etcd raft engine is not leader". Today both tests treat this as a hard failure and abort, even though the error is purely an availability hiccup — nothing was committed, so consistency cannot be violated. Wrap every RPC in the existing retryNotLeader helper so transient leader churn is absorbed within leaderChurnRetryTimeout. The post-RPC assertions (assert.Equal for the value-read invariant, assert.Nil for the post-delete invariant) are unchanged: once the Put / Delete eventually commits, the subsequent Get must agree, and a stale read still fails the test loudly. This is the explicit user constraint — "一貫性確認そのものは損なわず" — and it holds because retryNotLeader only inspects RPC error codes, never the response payload. The fix is symmetric with how rpushEventually / lpushEventually already wrap Lua list tests for the same class of CI flake. Build / vet / lint clean. --- adapter/grpc_test.go | 71 +++++++++++++++++++++++++++++++++----------- 1 file changed, 54 insertions(+), 17 deletions(-) diff --git a/adapter/grpc_test.go b/adapter/grpc_test.go index 3fd95dcd6..7db900280 100644 --- a/adapter/grpc_test.go +++ b/adapter/grpc_test.go @@ -190,30 +190,51 @@ func Test_consistency_satisfy_write_after_read_sequence(t *testing.T) { key := []byte("test-key-sequence") + // Each RPC is wrapped in retryNotLeader so an in-flight Raft + // re-election (which can fire mid-loop on a busy CI runner — emit + // "leader not found" / "etcd raft engine is not leader" — and is + // purely an availability hiccup, not a consistency violation) does + // not abort the test. The post-RPC assert.Equal still pins the + // consistency invariant: once Put eventually succeeds, the + // subsequent Get must return the same value, otherwise we fail. for i := range 9999 { want := []byte("sequence" + strconv.Itoa(i)) - _, err := c.RawPut( - context.Background(), - &pb.RawPutRequest{Key: key, Value: want}, - ) - // Stop at the first RPC failure instead of continuing: a - // genuine regression would otherwise cascade into 9998 more - // iterations, each reporting the same broken invariant, and - // drown the real cause in test-output noise. + err := retryNotLeader(context.Background(), func() error { + _, perr := c.RawPut(context.Background(), + &pb.RawPutRequest{Key: key, Value: want}) + return perr + }) + // Stop at the first non-leader-churn RPC failure instead of + // continuing: a genuine regression would otherwise cascade + // into 9998 more iterations, each reporting the same broken + // invariant, and drown the real cause in test-output noise. if !assert.NoError(t, err, "Put RPC failed") { break } - _, err = c.RawPut(context.Background(), &pb.RawPutRequest{Key: key, Value: want}) + err = retryNotLeader(context.Background(), func() error { + _, perr := c.RawPut(context.Background(), + &pb.RawPutRequest{Key: key, Value: want}) + return perr + }) if !assert.NoError(t, err, "Put RPC failed") { break } - resp, err := c.RawGet(context.Background(), &pb.RawGetRequest{Key: key}) + var resp *pb.RawGetResponse + err = retryNotLeader(context.Background(), func() error { + var gerr error + resp, gerr = c.RawGet(context.Background(), &pb.RawGetRequest{Key: key}) + return gerr + }) if !assert.NoError(t, err, "Get RPC failed") { break } + // Consistency invariant — the entire reason this test exists. + // Wrapped RPCs only mask transport-layer flakes; if the + // cluster ever returns a stale Get result here it is still + // flagged loudly. assert.Equal(t, want, resp.Value, "consistency check failed") } } @@ -226,30 +247,46 @@ func Test_grpc_transaction(t *testing.T) { key := []byte("test-key-sequence") + // Same retryNotLeader wrap as Test_consistency_satisfy_write_after_read + // _sequence: tolerate transient leader churn (purely availability, + // not consistency) while keeping the Put → Get → Delete → Get + // invariants strict. for i := range 9999 { want := []byte("sequence" + strconv.Itoa(i)) - _, err := c.Put( - context.Background(), - &pb.PutRequest{Key: key, Value: want}, - ) + err := retryNotLeader(context.Background(), func() error { + _, perr := c.Put(context.Background(), &pb.PutRequest{Key: key, Value: want}) + return perr + }) // See Test_consistency_satisfy_write_after_read_sequence: // break on first RPC failure so a single broken invariant // does not amplify into thousands of assertion lines. if !assert.NoError(t, err, "Put RPC failed") { break } - resp, err := c.Get(context.Background(), &pb.GetRequest{Key: key}) + var resp *pb.GetResponse + err = retryNotLeader(context.Background(), func() error { + var gerr error + resp, gerr = c.Get(context.Background(), &pb.GetRequest{Key: key}) + return gerr + }) if !assert.NoError(t, err, "Get RPC failed") { break } assert.Equal(t, want, resp.Value, "consistency check failed") - _, err = c.Delete(context.Background(), &pb.DeleteRequest{Key: key}) + err = retryNotLeader(context.Background(), func() error { + _, derr := c.Delete(context.Background(), &pb.DeleteRequest{Key: key}) + return derr + }) if !assert.NoError(t, err, "Delete RPC failed") { break } - resp, err = c.Get(context.Background(), &pb.GetRequest{Key: key}) + err = retryNotLeader(context.Background(), func() error { + var gerr error + resp, gerr = c.Get(context.Background(), &pb.GetRequest{Key: key}) + return gerr + }) if !assert.NoError(t, err, "Get RPC failed") { break } From 1aed38660bfece9b30804e75777839771f6aaa17 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 25 Apr 2026 22:03:56 +0900 Subject: [PATCH 2/2] test(grpc): use t.Context() for the consistency-loop RPCs Replace context.Background() in Test_consistency_satisfy_write_after_ read_sequence and Test_grpc_transaction with the per-test t.Context(). Both tests now derive a single ctx at the top of the test body and thread it through every RPC and retryNotLeader call. Two effects: - Test-level cancel (timeout, parent stopping the test, t.Cleanup draining) propagates into the in-flight RPC instead of leaking the goroutine through context.Background until the gRPC call eventually errors out on its own. - The retry helper observes the same cancel signal, so a stalled shutdown does not chase 9999 iterations of "leader not found" retries past the point the test has been told to stop. Behaviour-equivalent on the happy path; the consistency assertions (assert.Equal / assert.Nil) are unchanged. Build / vet / lint clean, both tests still pass at 9999 iterations locally. --- adapter/grpc_test.go | 38 ++++++++++++++++++++++---------------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/adapter/grpc_test.go b/adapter/grpc_test.go index 7db900280..856e9c0b0 100644 --- a/adapter/grpc_test.go +++ b/adapter/grpc_test.go @@ -188,6 +188,11 @@ func Test_consistency_satisfy_write_after_read_sequence(t *testing.T) { c := rawKVClient(t, adders) defer shutdown(nodes) + // Use t.Context() so a test-level cancel (timeout, parent test + // stopping) propagates into every RPC and the retry loop alike, + // rather than leaking work via context.Background() once the test + // goroutine returns. + ctx := t.Context() key := []byte("test-key-sequence") // Each RPC is wrapped in retryNotLeader so an in-flight Raft @@ -199,9 +204,8 @@ func Test_consistency_satisfy_write_after_read_sequence(t *testing.T) { // subsequent Get must return the same value, otherwise we fail. for i := range 9999 { want := []byte("sequence" + strconv.Itoa(i)) - err := retryNotLeader(context.Background(), func() error { - _, perr := c.RawPut(context.Background(), - &pb.RawPutRequest{Key: key, Value: want}) + err := retryNotLeader(ctx, func() error { + _, perr := c.RawPut(ctx, &pb.RawPutRequest{Key: key, Value: want}) return perr }) // Stop at the first non-leader-churn RPC failure instead of @@ -212,9 +216,8 @@ func Test_consistency_satisfy_write_after_read_sequence(t *testing.T) { break } - err = retryNotLeader(context.Background(), func() error { - _, perr := c.RawPut(context.Background(), - &pb.RawPutRequest{Key: key, Value: want}) + err = retryNotLeader(ctx, func() error { + _, perr := c.RawPut(ctx, &pb.RawPutRequest{Key: key, Value: want}) return perr }) if !assert.NoError(t, err, "Put RPC failed") { @@ -222,9 +225,9 @@ func Test_consistency_satisfy_write_after_read_sequence(t *testing.T) { } var resp *pb.RawGetResponse - err = retryNotLeader(context.Background(), func() error { + err = retryNotLeader(ctx, func() error { var gerr error - resp, gerr = c.RawGet(context.Background(), &pb.RawGetRequest{Key: key}) + resp, gerr = c.RawGet(ctx, &pb.RawGetRequest{Key: key}) return gerr }) if !assert.NoError(t, err, "Get RPC failed") { @@ -245,6 +248,9 @@ func Test_grpc_transaction(t *testing.T) { c := transactionalKVClient(t, adders) defer shutdown(nodes) + // See Test_consistency_satisfy_write_after_read_sequence for why + // we use t.Context() and retryNotLeader together. + ctx := t.Context() key := []byte("test-key-sequence") // Same retryNotLeader wrap as Test_consistency_satisfy_write_after_read @@ -253,8 +259,8 @@ func Test_grpc_transaction(t *testing.T) { // invariants strict. for i := range 9999 { want := []byte("sequence" + strconv.Itoa(i)) - err := retryNotLeader(context.Background(), func() error { - _, perr := c.Put(context.Background(), &pb.PutRequest{Key: key, Value: want}) + err := retryNotLeader(ctx, func() error { + _, perr := c.Put(ctx, &pb.PutRequest{Key: key, Value: want}) return perr }) // See Test_consistency_satisfy_write_after_read_sequence: @@ -264,9 +270,9 @@ func Test_grpc_transaction(t *testing.T) { break } var resp *pb.GetResponse - err = retryNotLeader(context.Background(), func() error { + err = retryNotLeader(ctx, func() error { var gerr error - resp, gerr = c.Get(context.Background(), &pb.GetRequest{Key: key}) + resp, gerr = c.Get(ctx, &pb.GetRequest{Key: key}) return gerr }) if !assert.NoError(t, err, "Get RPC failed") { @@ -274,17 +280,17 @@ func Test_grpc_transaction(t *testing.T) { } assert.Equal(t, want, resp.Value, "consistency check failed") - err = retryNotLeader(context.Background(), func() error { - _, derr := c.Delete(context.Background(), &pb.DeleteRequest{Key: key}) + err = retryNotLeader(ctx, func() error { + _, derr := c.Delete(ctx, &pb.DeleteRequest{Key: key}) return derr }) if !assert.NoError(t, err, "Delete RPC failed") { break } - err = retryNotLeader(context.Background(), func() error { + err = retryNotLeader(ctx, func() error { var gerr error - resp, gerr = c.Get(context.Background(), &pb.GetRequest{Key: key}) + resp, gerr = c.Get(ctx, &pb.GetRequest{Key: key}) return gerr }) if !assert.NoError(t, err, "Get RPC failed") {