diff --git a/adapter/grpc_test.go b/adapter/grpc_test.go index 3fd95dcd6..856e9c0b0 100644 --- a/adapter/grpc_test.go +++ b/adapter/grpc_test.go @@ -188,32 +188,56 @@ func Test_consistency_satisfy_write_after_read_sequence(t *testing.T) { c := rawKVClient(t, adders) defer shutdown(nodes) + // Use t.Context() so a test-level cancel (timeout, parent test + // stopping) propagates into every RPC and the retry loop alike, + // rather than leaking work via context.Background() once the test + // goroutine returns. + ctx := t.Context() key := []byte("test-key-sequence") + // Each RPC is wrapped in retryNotLeader so an in-flight Raft + // re-election (which can fire mid-loop on a busy CI runner — emit + // "leader not found" / "etcd raft engine is not leader" — and is + // purely an availability hiccup, not a consistency violation) does + // not abort the test. The post-RPC assert.Equal still pins the + // consistency invariant: once Put eventually succeeds, the + // subsequent Get must return the same value, otherwise we fail. for i := range 9999 { want := []byte("sequence" + strconv.Itoa(i)) - _, err := c.RawPut( - context.Background(), - &pb.RawPutRequest{Key: key, Value: want}, - ) - // Stop at the first RPC failure instead of continuing: a - // genuine regression would otherwise cascade into 9998 more - // iterations, each reporting the same broken invariant, and - // drown the real cause in test-output noise. + err := retryNotLeader(ctx, func() error { + _, perr := c.RawPut(ctx, &pb.RawPutRequest{Key: key, Value: want}) + return perr + }) + // Stop at the first non-leader-churn RPC failure instead of + // continuing: a genuine regression would otherwise cascade + // into 9998 more iterations, each reporting the same broken + // invariant, and drown the real cause in test-output noise. if !assert.NoError(t, err, "Put RPC failed") { break } - _, err = c.RawPut(context.Background(), &pb.RawPutRequest{Key: key, Value: want}) + err = retryNotLeader(ctx, func() error { + _, perr := c.RawPut(ctx, &pb.RawPutRequest{Key: key, Value: want}) + return perr + }) if !assert.NoError(t, err, "Put RPC failed") { break } - resp, err := c.RawGet(context.Background(), &pb.RawGetRequest{Key: key}) + var resp *pb.RawGetResponse + err = retryNotLeader(ctx, func() error { + var gerr error + resp, gerr = c.RawGet(ctx, &pb.RawGetRequest{Key: key}) + return gerr + }) if !assert.NoError(t, err, "Get RPC failed") { break } + // Consistency invariant — the entire reason this test exists. + // Wrapped RPCs only mask transport-layer flakes; if the + // cluster ever returns a stale Get result here it is still + // flagged loudly. assert.Equal(t, want, resp.Value, "consistency check failed") } } @@ -224,32 +248,51 @@ func Test_grpc_transaction(t *testing.T) { c := transactionalKVClient(t, adders) defer shutdown(nodes) + // See Test_consistency_satisfy_write_after_read_sequence for why + // we use t.Context() and retryNotLeader together. + ctx := t.Context() key := []byte("test-key-sequence") + // Same retryNotLeader wrap as Test_consistency_satisfy_write_after_read + // _sequence: tolerate transient leader churn (purely availability, + // not consistency) while keeping the Put → Get → Delete → Get + // invariants strict. for i := range 9999 { want := []byte("sequence" + strconv.Itoa(i)) - _, err := c.Put( - context.Background(), - &pb.PutRequest{Key: key, Value: want}, - ) + err := retryNotLeader(ctx, func() error { + _, perr := c.Put(ctx, &pb.PutRequest{Key: key, Value: want}) + return perr + }) // See Test_consistency_satisfy_write_after_read_sequence: // break on first RPC failure so a single broken invariant // does not amplify into thousands of assertion lines. if !assert.NoError(t, err, "Put RPC failed") { break } - resp, err := c.Get(context.Background(), &pb.GetRequest{Key: key}) + var resp *pb.GetResponse + err = retryNotLeader(ctx, func() error { + var gerr error + resp, gerr = c.Get(ctx, &pb.GetRequest{Key: key}) + return gerr + }) if !assert.NoError(t, err, "Get RPC failed") { break } assert.Equal(t, want, resp.Value, "consistency check failed") - _, err = c.Delete(context.Background(), &pb.DeleteRequest{Key: key}) + err = retryNotLeader(ctx, func() error { + _, derr := c.Delete(ctx, &pb.DeleteRequest{Key: key}) + return derr + }) if !assert.NoError(t, err, "Delete RPC failed") { break } - resp, err = c.Get(context.Background(), &pb.GetRequest{Key: key}) + err = retryNotLeader(ctx, func() error { + var gerr error + resp, gerr = c.Get(ctx, &pb.GetRequest{Key: key}) + return gerr + }) if !assert.NoError(t, err, "Get RPC failed") { break }