Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 60 additions & 17 deletions adapter/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,32 +188,56 @@ func Test_consistency_satisfy_write_after_read_sequence(t *testing.T) {
c := rawKVClient(t, adders)
defer shutdown(nodes)

// Use t.Context() so a test-level cancel (timeout, parent test
// stopping) propagates into every RPC and the retry loop alike,
// rather than leaking work via context.Background() once the test
// goroutine returns.
ctx := t.Context()
key := []byte("test-key-sequence")

// Each RPC is wrapped in retryNotLeader so an in-flight Raft
// re-election (which can fire mid-loop on a busy CI runner — emit
// "leader not found" / "etcd raft engine is not leader" — and is
// purely an availability hiccup, not a consistency violation) does
// not abort the test. The post-RPC assert.Equal still pins the
// consistency invariant: once Put eventually succeeds, the
// subsequent Get must return the same value, otherwise we fail.
for i := range 9999 {
want := []byte("sequence" + strconv.Itoa(i))
_, err := c.RawPut(
context.Background(),
&pb.RawPutRequest{Key: key, Value: want},
)
// Stop at the first RPC failure instead of continuing: a
// genuine regression would otherwise cascade into 9998 more
// iterations, each reporting the same broken invariant, and
// drown the real cause in test-output noise.
err := retryNotLeader(ctx, func() error {
_, perr := c.RawPut(ctx, &pb.RawPutRequest{Key: key, Value: want})
return perr
})
// Stop at the first non-leader-churn RPC failure instead of
// continuing: a genuine regression would otherwise cascade
// into 9998 more iterations, each reporting the same broken
// invariant, and drown the real cause in test-output noise.
if !assert.NoError(t, err, "Put RPC failed") {
break
}

_, err = c.RawPut(context.Background(), &pb.RawPutRequest{Key: key, Value: want})
err = retryNotLeader(ctx, func() error {
_, perr := c.RawPut(ctx, &pb.RawPutRequest{Key: key, Value: want})
return perr
})
if !assert.NoError(t, err, "Put RPC failed") {
break
}

resp, err := c.RawGet(context.Background(), &pb.RawGetRequest{Key: key})
var resp *pb.RawGetResponse
err = retryNotLeader(ctx, func() error {
var gerr error
resp, gerr = c.RawGet(ctx, &pb.RawGetRequest{Key: key})
return gerr
})
if !assert.NoError(t, err, "Get RPC failed") {
break
}

// Consistency invariant — the entire reason this test exists.
// Wrapped RPCs only mask transport-layer flakes; if the
// cluster ever returns a stale Get result here it is still
// flagged loudly.
assert.Equal(t, want, resp.Value, "consistency check failed")
}
}
Expand All @@ -224,32 +248,51 @@ func Test_grpc_transaction(t *testing.T) {
c := transactionalKVClient(t, adders)
defer shutdown(nodes)

// See Test_consistency_satisfy_write_after_read_sequence for why
// we use t.Context() and retryNotLeader together.
ctx := t.Context()
key := []byte("test-key-sequence")

// Same retryNotLeader wrap as Test_consistency_satisfy_write_after_read
// _sequence: tolerate transient leader churn (purely availability,
// not consistency) while keeping the Put → Get → Delete → Get
// invariants strict.
for i := range 9999 {
want := []byte("sequence" + strconv.Itoa(i))
_, err := c.Put(
context.Background(),
&pb.PutRequest{Key: key, Value: want},
)
err := retryNotLeader(ctx, func() error {
_, perr := c.Put(ctx, &pb.PutRequest{Key: key, Value: want})
return perr
})
// See Test_consistency_satisfy_write_after_read_sequence:
// break on first RPC failure so a single broken invariant
// does not amplify into thousands of assertion lines.
if !assert.NoError(t, err, "Put RPC failed") {
break
}
resp, err := c.Get(context.Background(), &pb.GetRequest{Key: key})
var resp *pb.GetResponse
err = retryNotLeader(ctx, func() error {
var gerr error
resp, gerr = c.Get(ctx, &pb.GetRequest{Key: key})
return gerr
})
if !assert.NoError(t, err, "Get RPC failed") {
break
}
assert.Equal(t, want, resp.Value, "consistency check failed")

_, err = c.Delete(context.Background(), &pb.DeleteRequest{Key: key})
err = retryNotLeader(ctx, func() error {
_, derr := c.Delete(ctx, &pb.DeleteRequest{Key: key})
return derr
})
if !assert.NoError(t, err, "Delete RPC failed") {
break
}

resp, err = c.Get(context.Background(), &pb.GetRequest{Key: key})
err = retryNotLeader(ctx, func() error {
var gerr error
resp, gerr = c.Get(ctx, &pb.GetRequest{Key: key})
return gerr
})
if !assert.NoError(t, err, "Get RPC failed") {
break
}
Expand Down
Loading