From 443ceb5f9c57f5676796677dc5fe98045628f425 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sun, 3 May 2026 00:03:37 +0900 Subject: [PATCH 1/7] feat(sqs): wire partitioned-FIFO data plane through dispatch helpers (Phase 3.D PR 5b-2) Stage 2 of PR 5b: wire SendMessage / ReceiveMessage / DeleteMessage / ChangeMessageVisibility through the per-key dispatch helpers landed in PR 5b-1. The section 11 PR 2 dormancy gate still rejects PartitionCount > 1 at CreateQueue, so production queues remain on the legacy keyspace and observable behaviour is byte-identical. PR 5b-3 lifts the gate atomically with the capability check. Wiring - SendMessage (sendMessage, sendBatchStandardOnce): partition is a compile-time 0 (Standard queues reject PartitionCount > 1 via the cross-attribute validator). Dispatch helpers route to legacy on the steady-state path. - sendFifoMessage: hashes MessageGroupId once via partitionFor and threads the result through every key construction (data, vis, byage, dedup, group-lock). - ReceiveMessage: scanAndDeliverOnce wraps the scan loop with a per-partition fanout over effectivePartitionCount(meta) iterations. The shared wall-clock + per-call max budget caps apply across the whole call, not per-partition. sqsMsgCandidate carries the partition field stamped at scan time so downstream rotate / delete / expire helpers route to the right partition. - DeleteMessage / ChangeMessageVisibility: handle.Partition from the v2 receipt handle drives every key construction. Receipt-handle dispatch encodeReceiptHandleDispatch(meta, partition, ...) is the single point that picks v1 vs v2. ReceiveMessage on a partitioned queue records cand.partition into the handle so DeleteMessage routes back to the same partition. decodeClientReceiptHandle is now version-agnostic; the queue- aware version check moved to validateReceiptHandleVersion which runs in loadMessageForDelete / loadAndVerifyMessage once meta is in scope. v1 on a partitioned queue and v2 on a non-partitioned queue both surface as ReceiptHandleIsInvalid -- preserving the dormancy promise (no v2 wire-format probability from the public API) under the new contract. The section 11 PR 2 dormancy gate still in force in this PR keeps every queue non-partitioned, so v2 handles still surface as ReceiptHandleIsInvalid downstream, the exact PR 5a observable behaviour. Reaper Reaper iterates legacy byAge keys only -- partition-iterating enumeration ships in the later partition reaper PR. buildReapOps / reapOneRecord pass nil meta + partition 0 through dispatch helpers so the keys are byte-identical to the pre-PR-5b layout. Tests (10 added) Unit tests (sqs_keys_dispatch_v2_test.go): - TestEncodeReceiptHandleDispatch_PicksVersionByPartitionCount: pins the version dispatch decision across nil/0/1/4 metas. - TestEncodeReceiptHandleDispatch_LegacyByteIdenticalToV1: protects the byte-identical guarantee on legacy queues. - TestEncodeReceiptHandleDispatch_PerQueueUsesV2: pins that perQueue + PartitionCount=4 still produces v2 handles (keyspace is partitioned even when partitionFor collapses every group to partition 0). - TestValidateReceiptHandleVersion_QueueAwareRules: 10 sub-cases covering the (meta.PartitionCount x handle.Version) matrix, including perQueue + PartitionCount=4. - TestValidateReceiptHandleVersion_NilHandle: defensive nil. - TestValidateReceiptHandleVersion_RejectsV2OnNonPartitioned: named regression for the dormancy guarantee under the new contract. - TestSQSMsgVisScanBoundsDispatch_LegacyMatchesLegacy: byte-identical to legacy sqsMsgVisScanBounds on legacy meta. - TestSQSMsgVisScanBoundsDispatch_PartitionedUsesPartitionedPrefix: different partitions yield disjoint scan ranges. - TestSQSMsgVisScanBoundsDispatch_PerQueueOnPartitionedKeyspace: pins the PR 731 round 2 forward-note invariant -- perQueue + PartitionCount=4 keeps the partitioned vis prefix at partition 0; collapsing to legacy would silently strand send writes. Integration tests (sqs_partitioned_dispatch_test.go) install a partitioned meta directly on a queue created via the public API, short-circuiting the dormancy gate without disabling it for production CreateQueue: - TestSQSServer_PartitionedFIFO_SendReceiveDeleteRoundTrip: end-to-end smoke test (send 6 groups, receive surfaces all via fanout, every handle is v2, delete via v2 handle, queue is empty afterwards, legacy keyspace stays empty). - TestSQSServer_PartitionedFIFO_RejectsV1Handle: forged v1 handle on a partitioned queue surfaces as ReceiptHandleIsInvalid via DeleteMessage and ChangeMessageVisibility. - TestSQSServer_PartitionedFIFO_PerQueueCollapsesToPartitionZero: perQueue + PartitionCount=4 receive surfaces every message in one fanout pass; every v2 handle records Partition=0. Updated TestDecodeClientReceiptHandle_RejectsV2 to TestDecodeClientReceiptHandle_AcceptsV2 to reflect the contract shift (rejection moved from API boundary to meta-aware validateReceiptHandleVersion). Self-review (per CLAUDE.md) 1. Data loss -- Dispatch helpers byte-identical on legacy queues (PartitionCount<=1 routes to legacy constructors). DLQ FIFO computes dlqPartition via partitionFor(dlqMeta, srcRec.MessageGroupId) so cross-queue redrive lands in the right partition. Receive fanout scans every partition. No issue. 2. Concurrency -- Receive fanout iterates partitions sequentially under one shared wall-clock + max budget, so the original per-call SLA is preserved. meta is loaded once and passed by pointer through helpers; no concurrent meta refresh in a single call. validateReceiptHandleVersion runs after the gen check so a SetQueueAttributes race cannot flip the answer (PartitionCount is immutable). go test -race clean. 3. Performance -- Hot path adds 1 nil-check + 1 PartitionCount compare per dispatch. Receive fanout iterates effectivePartitionCount(meta) times: 1 on legacy / perQueue, N on perMessageGroupId, with N partitions splitting roughly 1/N of the messages each. 4. Data consistency -- Receipt handle's recorded partition matches the storage partition (commitReceiveRotation uses cand.partition for both newVisKey and the handle). Cross-version handles rejected as ReceiptHandleIsInvalid -- no leak between keyspaces. perQueue + PartitionCount=4 invariant pinned by both unit and integration tests. 5. Test coverage -- 10 new tests across the contract surface. Existing legacy-path SQS tests pass unchanged. go test -race ./adapter/... and golangci-lint clean. --- adapter/sqs_fifo.go | 35 ++- adapter/sqs_keys_dispatch.go | 78 ++++++ adapter/sqs_keys_dispatch_v2_test.go | 282 ++++++++++++++++++++ adapter/sqs_messages.go | 232 ++++++++++------- adapter/sqs_messages_batch.go | 11 +- adapter/sqs_partitioned_dispatch_test.go | 318 +++++++++++++++++++++++ adapter/sqs_reaper.go | 22 +- adapter/sqs_receipt_handle_v2_test.go | 58 ++--- adapter/sqs_redrive.go | 24 +- 9 files changed, 898 insertions(+), 162 deletions(-) create mode 100644 adapter/sqs_keys_dispatch_v2_test.go create mode 100644 adapter/sqs_partitioned_dispatch_test.go diff --git a/adapter/sqs_fifo.go b/adapter/sqs_fifo.go index c603a8f6d..a532c3553 100644 --- a/adapter/sqs_fifo.go +++ b/adapter/sqs_fifo.go @@ -91,8 +91,8 @@ func resolveFifoDedupID(meta *sqsQueueMeta, in sqsSendMessageInput) string { // or (nil, nil) when there is no live record for this dedup-id. // Expired records are surfaced as nil so a stale entry does not block // a fresh send within the same FIFO queue. -func (s *SQSServer) loadFifoDedupRecord(ctx context.Context, queueName string, gen uint64, dedupID string, readTS uint64) (*sqsFifoDedupRecord, []byte, error) { - key := sqsMsgDedupKey(queueName, gen, dedupID) +func (s *SQSServer) loadFifoDedupRecord(ctx context.Context, queueName string, meta *sqsQueueMeta, partition uint32, gen uint64, dedupID string, readTS uint64) (*sqsFifoDedupRecord, []byte, error) { + key := sqsMsgDedupKeyDispatch(meta, queueName, partition, gen, dedupID) raw, err := s.store.GetAt(ctx, key, readTS) if err != nil { if errors.Is(err, store.ErrKeyNotFound) { @@ -130,11 +130,11 @@ func (s *SQSServer) loadFifoSequence(ctx context.Context, queueName string, read // loadFifoGroupLock fetches the in-flight lock for a group, if any. // Returns nil when no lock is held. Callers that also need the key -// can recompute it via sqsMsgGroupKey — the helper used to return it -// alongside the lock, but every caller already had the key in scope -// from a different code path. -func (s *SQSServer) loadFifoGroupLock(ctx context.Context, queueName string, gen uint64, groupID string, readTS uint64) (*sqsFifoGroupLock, error) { - key := sqsMsgGroupKey(queueName, gen, groupID) +// can recompute it via sqsMsgGroupKeyDispatch — the helper used to +// return it alongside the lock, but every caller already had the +// key in scope from a different code path. +func (s *SQSServer) loadFifoGroupLock(ctx context.Context, queueName string, meta *sqsQueueMeta, partition uint32, gen uint64, groupID string, readTS uint64) (*sqsFifoGroupLock, error) { + key := sqsMsgGroupKeyDispatch(meta, queueName, partition, gen, groupID) raw, err := s.store.GetAt(ctx, key, readTS) if err != nil { if errors.Is(err, store.ErrKeyNotFound) { @@ -172,7 +172,14 @@ func (s *SQSServer) sendFifoMessage( delay int64, readTS uint64, ) (map[string]string, bool, error) { - dedup, dedupKey, err := s.loadFifoDedupRecord(ctx, queueName, meta.Generation, dedupID, readTS) + // HT-FIFO: hash the MessageGroupId once at the entry point so + // every key built in this transaction (data, vis, byage, dedup, + // group-lock, sequence) lands in the same partition. partitionFor + // returns 0 on legacy / non-partitioned queues and on the + // perQueue throughput short-circuit, so the dispatch helpers + // round-trip to legacy output for those cases. + partition := partitionFor(meta, in.MessageGroupId) + dedup, dedupKey, err := s.loadFifoDedupRecord(ctx, queueName, meta, partition, meta.Generation, dedupID, readTS) if err != nil { return nil, false, err } @@ -216,9 +223,9 @@ func (s *SQSServer) sendFifoMessage( return nil, false, errors.WithStack(err) } - dataKey := sqsMsgDataKey(queueName, meta.Generation, rec.MessageID) - visKey := sqsMsgVisKey(queueName, meta.Generation, rec.AvailableAtMillis, rec.MessageID) - byAgeKey := sqsMsgByAgeKey(queueName, meta.Generation, rec.SendTimestampMillis, rec.MessageID) + dataKey := sqsMsgDataKeyDispatch(meta, queueName, partition, meta.Generation, rec.MessageID) + visKey := sqsMsgVisKeyDispatch(meta, queueName, partition, meta.Generation, rec.AvailableAtMillis, rec.MessageID) + byAgeKey := sqsMsgByAgeKeyDispatch(meta, queueName, partition, meta.Generation, rec.SendTimestampMillis, rec.MessageID) seqKey := sqsQueueSeqKey(queueName) metaKey := sqsQueueMetaKey(queueName) genKey := sqsQueueGenKey(queueName) @@ -270,9 +277,9 @@ const ( // classifyFifoGroupLock decides whether a FIFO candidate is eligible // for delivery. Standard queues bypass the function entirely. -func (s *SQSServer) classifyFifoGroupLock(ctx context.Context, queueName string, gen uint64, rec *sqsMessageRecord, readTS uint64) (fifoCandidateLockState, []byte, error) { - lockKey := sqsMsgGroupKey(queueName, gen, rec.MessageGroupId) - lock, err := s.loadFifoGroupLock(ctx, queueName, gen, rec.MessageGroupId, readTS) +func (s *SQSServer) classifyFifoGroupLock(ctx context.Context, queueName string, meta *sqsQueueMeta, partition uint32, gen uint64, rec *sqsMessageRecord, readTS uint64) (fifoCandidateLockState, []byte, error) { + lockKey := sqsMsgGroupKeyDispatch(meta, queueName, partition, gen, rec.MessageGroupId) + lock, err := s.loadFifoGroupLock(ctx, queueName, meta, partition, gen, rec.MessageGroupId, readTS) if err != nil { return fifoLockSkip, lockKey, err } diff --git a/adapter/sqs_keys_dispatch.go b/adapter/sqs_keys_dispatch.go index 0b57877b0..53faf44a7 100644 --- a/adapter/sqs_keys_dispatch.go +++ b/adapter/sqs_keys_dispatch.go @@ -1,5 +1,11 @@ package adapter +import ( + "bytes" + + "github.com/cockroachdb/errors" +) + // Per-key dispatch helpers that route to the legacy single-partition // constructor or the partitioned-FIFO constructor based on // meta.PartitionCount. Phase 3.D PR 5b's central abstraction: @@ -116,3 +122,75 @@ func effectivePartitionCount(meta *sqsQueueMeta) uint32 { } return meta.PartitionCount } + +// sqsMsgVisScanBoundsDispatch returns the start/end byte ranges that +// ReceiveMessage's per-partition visibility-index scan iterates. +// Mirrors sqsMsgVisScanBounds (legacy keyspace) but parametrises the +// prefix on partition when the queue is partitioned. The bounds are +// always [prefix||u64(0), prefix||u64(maxVisibleAtMillis+1)) so +// messages with visible_at == maxVisibleAtMillis are included. +func sqsMsgVisScanBoundsDispatch(meta *sqsQueueMeta, queueName string, partition uint32, gen uint64, maxVisibleAtMillis int64) (start, end []byte) { + prefix := sqsMsgVisPrefixForQueueDispatch(meta, queueName, partition, gen) + start = append(bytes.Clone(prefix), zeroU64()...) + upper := uint64MaxZero(maxVisibleAtMillis) + if upper < ^uint64(0) { + upper++ + } + end = append(bytes.Clone(prefix), encodedU64(upper)...) + return start, end +} + +// encodeReceiptHandleDispatch picks the receipt-handle wire format +// based on meta.PartitionCount: v1 on legacy / non-partitioned +// queues, v2 on partitioned ones. The partition argument is only +// consulted on the v2 branch — callers may pass 0 on the legacy +// branch. +// +// This is the single point where a fresh receipt handle commits to +// a wire version. Pairing the choice with the same meta.PartitionCount +// the dispatch helpers used to build keys keeps the handle's +// recorded partition consistent with the partition the message was +// stored under, so a later DeleteMessage / ChangeMessageVisibility +// routes to the right keyspace. +func encodeReceiptHandleDispatch(meta *sqsQueueMeta, partition uint32, queueGen uint64, messageIDHex string, receiptToken []byte) (string, error) { + if meta != nil && meta.PartitionCount > 1 { + return encodeReceiptHandleV2(partition, queueGen, messageIDHex, receiptToken) + } + return encodeReceiptHandle(queueGen, messageIDHex, receiptToken) +} + +// validateReceiptHandleVersion enforces the queue-aware version +// rule that replaced the dormancy gate from PR 5a: +// +// - meta.PartitionCount <= 1 (legacy / non-partitioned queue): +// handle MUST be v1. A v2 handle on a non-partitioned queue +// is structurally impossible (SendMessage would never have +// produced one) and accepting it would let a malicious caller +// re-encode a v1 handle as v2 to probe / corrupt the v2 layout +// before any partitioned queue exists. +// - meta.PartitionCount > 1 (partitioned queue): handle MUST be +// v2. A v1 handle on a partitioned queue carries no partition +// index, so dispatch would default to partition 0 and the +// delete / change-visibility would silently miss messages on +// other partitions. +// +// Mismatches surface as ReceiptHandleIsInvalid (the same AWS error +// shape used for malformed handles), so a misrouted client cannot +// distinguish "wrong version" from "garbled bytes" — preserving the +// PR 5a / PR 724 round 3 dormancy guarantee that the v2 wire format +// is not probeable from the public API. +func validateReceiptHandleVersion(meta *sqsQueueMeta, handle *decodedReceiptHandle) error { + if handle == nil { + return errors.New("receipt handle is nil") + } + if meta != nil && meta.PartitionCount > 1 { + if handle.Version != sqsReceiptHandleVersion2 { + return errors.New("receipt handle version mismatch for partitioned queue") + } + return nil + } + if handle.Version != sqsReceiptHandleVersion1 { + return errors.New("receipt handle version mismatch for non-partitioned queue") + } + return nil +} diff --git a/adapter/sqs_keys_dispatch_v2_test.go b/adapter/sqs_keys_dispatch_v2_test.go new file mode 100644 index 000000000..196b24bd4 --- /dev/null +++ b/adapter/sqs_keys_dispatch_v2_test.go @@ -0,0 +1,282 @@ +package adapter + +import ( + "bytes" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +// Tests for the PR 5b-2 dispatch helpers added on top of the PR 5b-1 +// per-key wrappers: encodeReceiptHandleDispatch (handle wire format +// dispatch), validateReceiptHandleVersion (queue-aware version check +// that replaces PR 5a's blanket rejection), and +// sqsMsgVisScanBoundsDispatch (per-partition scan bounds). + +const ( + dispatchTestMsgIDHex = "deadbeefdeadbeefdeadbeefdeadbeef" + dispatchTestQueue = "orders.fifo" +) + +// TestEncodeReceiptHandleDispatch_PicksVersionByPartitionCount pins +// the wire-format dispatch decision: meta.PartitionCount > 1 → v2, +// otherwise v1. The partition argument is only consulted on the v2 +// branch (a v1 handle has no partition field). +func TestEncodeReceiptHandleDispatch_PicksVersionByPartitionCount(t *testing.T) { + t.Parallel() + token := make([]byte, sqsReceiptTokenBytes) + + // Legacy queue: nil meta → v1. + h, err := encodeReceiptHandleDispatch(nil, 0, 7, dispatchTestMsgIDHex, token) + require.NoError(t, err) + parsed, err := decodeReceiptHandle(h) + require.NoError(t, err) + require.Equal(t, sqsReceiptHandleVersion1, parsed.Version, + "nil meta must dispatch to v1 — the legacy single-partition layout") + + // PartitionCount = 0 → v1 (canonical legacy / unset). + h, err = encodeReceiptHandleDispatch(&sqsQueueMeta{PartitionCount: 0}, 0, 7, dispatchTestMsgIDHex, token) + require.NoError(t, err) + parsed, err = decodeReceiptHandle(h) + require.NoError(t, err) + require.Equal(t, sqsReceiptHandleVersion1, parsed.Version) + + // PartitionCount = 1 → v1. + h, err = encodeReceiptHandleDispatch(&sqsQueueMeta{PartitionCount: 1}, 0, 7, dispatchTestMsgIDHex, token) + require.NoError(t, err) + parsed, err = decodeReceiptHandle(h) + require.NoError(t, err) + require.Equal(t, sqsReceiptHandleVersion1, parsed.Version) + + // PartitionCount = 4 → v2; partition is preserved. + h, err = encodeReceiptHandleDispatch(&sqsQueueMeta{PartitionCount: 4}, 3, 7, dispatchTestMsgIDHex, token) + require.NoError(t, err) + parsed, err = decodeReceiptHandle(h) + require.NoError(t, err) + require.Equal(t, sqsReceiptHandleVersion2, parsed.Version) + require.Equal(t, uint32(3), parsed.Partition) + require.Equal(t, uint64(7), parsed.QueueGeneration) +} + +// TestEncodeReceiptHandleDispatch_LegacyByteIdenticalToV1 protects +// the byte-identical-output guarantee on legacy queues: the +// dispatch helper's output must equal what encodeReceiptHandle +// would produce directly. Without this, a future refactor that +// added a v1.5 layout via the dispatch path could change the wire +// format on existing deployments without anyone noticing in the +// test suite. +func TestEncodeReceiptHandleDispatch_LegacyByteIdenticalToV1(t *testing.T) { + t.Parallel() + token := bytes.Repeat([]byte{0xAB}, sqsReceiptTokenBytes) + want, err := encodeReceiptHandle(7, dispatchTestMsgIDHex, token) + require.NoError(t, err) + + for _, meta := range []*sqsQueueMeta{nil, {PartitionCount: 0}, {PartitionCount: 1}} { + got, err := encodeReceiptHandleDispatch(meta, 0, 7, dispatchTestMsgIDHex, token) + require.NoError(t, err) + require.Equal(t, want, got, + "legacy dispatch (PartitionCount=%d) must be byte-identical to encodeReceiptHandle", + func() uint32 { + if meta == nil { + return 0 + } + return meta.PartitionCount + }()) + } +} + +// TestValidateReceiptHandleVersion_QueueAwareRules covers the four +// (handle.Version, meta.PartitionCount) cells of the PR 5b-2 +// version-validation matrix. The PR 5a blanket rejection moved +// here from decodeClientReceiptHandle, so this test pins the +// dormancy guarantee under the new contract. +func TestValidateReceiptHandleVersion_QueueAwareRules(t *testing.T) { + t.Parallel() + v1Handle := &decodedReceiptHandle{Version: sqsReceiptHandleVersion1} + v2Handle := &decodedReceiptHandle{Version: sqsReceiptHandleVersion2, Partition: 3} + + cases := []struct { + name string + meta *sqsQueueMeta + handle *decodedReceiptHandle + wantError bool + }{ + {"nil_meta_v1", nil, v1Handle, false}, + {"nil_meta_v2", nil, v2Handle, true}, + {"legacy_count0_v1", &sqsQueueMeta{PartitionCount: 0}, v1Handle, false}, + {"legacy_count0_v2", &sqsQueueMeta{PartitionCount: 0}, v2Handle, true}, + {"legacy_count1_v1", &sqsQueueMeta{PartitionCount: 1}, v1Handle, false}, + {"legacy_count1_v2", &sqsQueueMeta{PartitionCount: 1}, v2Handle, true}, + {"partitioned4_v1", &sqsQueueMeta{PartitionCount: 4}, v1Handle, true}, + {"partitioned4_v2", &sqsQueueMeta{PartitionCount: 4}, v2Handle, false}, + // perQueue throughput on PartitionCount=4 still requires v2 on + // the wire — partitionFor collapses every group to partition + // 0, but the keyspace is still partitioned, and a v1 handle + // would route through the legacy data-key constructor. + {"partitioned4_perQueue_v1", &sqsQueueMeta{PartitionCount: 4, FifoThroughputLimit: htfifoThroughputPerQueue}, v1Handle, true}, + {"partitioned4_perQueue_v2", &sqsQueueMeta{PartitionCount: 4, FifoThroughputLimit: htfifoThroughputPerQueue}, v2Handle, false}, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + err := validateReceiptHandleVersion(tc.meta, tc.handle) + if tc.wantError { + require.Error(t, err) + return + } + require.NoError(t, err) + }) + } +} + +// TestValidateReceiptHandleVersion_NilHandle pins the defensive nil +// branch so a typed-nil mistake does not silently pass. +func TestValidateReceiptHandleVersion_NilHandle(t *testing.T) { + t.Parallel() + err := validateReceiptHandleVersion(&sqsQueueMeta{PartitionCount: 4}, nil) + require.Error(t, err) +} + +// TestSQSMsgVisScanBoundsDispatch_LegacyMatchesLegacy pins that on +// legacy / non-partitioned metas the dispatch helper produces the +// same start/end pair as the legacy sqsMsgVisScanBounds — the +// receive fanout's byte-identical guarantee on the steady-state +// path. +func TestSQSMsgVisScanBoundsDispatch_LegacyMatchesLegacy(t *testing.T) { + t.Parallel() + const gen uint64 = 7 + const maxTS int64 = 1_700_000_000_000 + + wantStart, wantEnd := sqsMsgVisScanBounds(dispatchTestQueue, gen, maxTS) + + for _, meta := range []*sqsQueueMeta{nil, {PartitionCount: 0}, {PartitionCount: 1}} { + gotStart, gotEnd := sqsMsgVisScanBoundsDispatch(meta, dispatchTestQueue, 0, gen, maxTS) + require.Equal(t, wantStart, gotStart, "start must match legacy sqsMsgVisScanBounds") + require.Equal(t, wantEnd, gotEnd, "end must match legacy sqsMsgVisScanBounds") + } +} + +// TestSQSMsgVisScanBoundsDispatch_PartitionedUsesPartitionedPrefix +// pins that on a partitioned queue the bounds carry the +// partitioned vis prefix — different partitions yield disjoint +// ranges, which is what makes the receive fanout safe (no scan +// overlap, no cross-partition leakage). +func TestSQSMsgVisScanBoundsDispatch_PartitionedUsesPartitionedPrefix(t *testing.T) { + t.Parallel() + const gen uint64 = 7 + const maxTS int64 = 1_700_000_000_000 + meta := &sqsQueueMeta{PartitionCount: 4} + + startP0, endP0 := sqsMsgVisScanBoundsDispatch(meta, dispatchTestQueue, 0, gen, maxTS) + startP1, endP1 := sqsMsgVisScanBoundsDispatch(meta, dispatchTestQueue, 1, gen, maxTS) + + require.NotEqual(t, startP0, startP1, + "different partitions must produce disjoint scan ranges") + require.Equal(t, sqsPartitionedMsgVisPrefixForQueue(dispatchTestQueue, 0, gen), + startP0[:len(sqsPartitionedMsgVisPrefixForQueue(dispatchTestQueue, 0, gen))], + "partition 0 start must be prefixed by sqsPartitionedMsgVisPrefixForQueue(_, 0, _)") + require.Equal(t, sqsPartitionedMsgVisPrefixForQueue(dispatchTestQueue, 1, gen), + startP1[:len(sqsPartitionedMsgVisPrefixForQueue(dispatchTestQueue, 1, gen))], + "partition 1 start must be prefixed by sqsPartitionedMsgVisPrefixForQueue(_, 1, _)") + require.True(t, bytes.HasPrefix(startP0, []byte(SqsMsgVisPrefix)), + "partitioned start key still begins with the SqsMsgVisPrefix marker") + require.True(t, bytes.HasPrefix(endP0, []byte(SqsMsgVisPrefix)), + "partitioned end key still begins with the SqsMsgVisPrefix marker") + require.True(t, bytes.Compare(startP0, endP0) < 0, + "start must compare strictly less than end") + require.True(t, bytes.Compare(startP1, endP1) < 0, + "start must compare strictly less than end") +} + +// TestSQSMsgVisScanBoundsDispatch_PerQueueOnPartitionedKeyspace +// pins the PR 731 round 2 forward note: when meta.PartitionCount=4 +// and FifoThroughputLimit=perQueue, partitionFor collapses every +// group to partition 0, effectivePartitionCount returns 1, but the +// partition-0 vis prefix is still the partitioned-keyspace one +// (not legacy) because dispatch is keyed on PartitionCount > 1. +// +// Without this invariant a perQueue queue would write to the +// partitioned vis prefix at partition 0 (under sendFifoMessage's +// dispatch helpers) but the receive fanout — collapsing to a +// single iteration — would scan the legacy prefix and miss +// everything. +func TestSQSMsgVisScanBoundsDispatch_PerQueueOnPartitionedKeyspace(t *testing.T) { + t.Parallel() + const gen uint64 = 7 + const maxTS int64 = 1_700_000_000_000 + meta := &sqsQueueMeta{ + PartitionCount: 4, + FifoThroughputLimit: htfifoThroughputPerQueue, + } + + // effectivePartitionCount collapses to 1 (perQueue short-circuit). + require.Equal(t, uint32(1), effectivePartitionCount(meta)) + + // But the partition-0 scan bounds still use the partitioned + // prefix — the dispatch helpers route by PartitionCount, not + // by the throughput-limit short-circuit. + gotStart, gotEnd := sqsMsgVisScanBoundsDispatch(meta, dispatchTestQueue, 0, gen, maxTS) + wantPrefix := sqsPartitionedMsgVisPrefixForQueue(dispatchTestQueue, 0, gen) + require.True(t, bytes.HasPrefix(gotStart, wantPrefix), + "perQueue + PartitionCount=4 must scan the partitioned "+ + "prefix at partition 0 (where send writes), not the "+ + "legacy prefix — without this the fanout misses everything") + require.True(t, bytes.HasPrefix(gotEnd, wantPrefix)) + + // And the legacy bounds for this same (queue, gen, maxTS) must + // be byte-distinct from the partitioned partition-0 bounds, so + // a future refactor that accidentally read the legacy prefix + // here would be caught by direct byte comparison. + legacyStart, _ := sqsMsgVisScanBounds(dispatchTestQueue, gen, maxTS) + require.NotEqual(t, legacyStart, gotStart, + "perQueue + PartitionCount=4 must NOT collapse to the "+ + "legacy keyspace — that would silently strand send writes") +} + +// TestEncodeReceiptHandleDispatch_PerQueueUsesV2 pins that perQueue +// + PartitionCount=4 still produces v2 handles (because the +// keyspace is partitioned). The handle records the partition the +// message was actually stored under — partitionFor returns 0 in +// perQueue mode, so every handle carries Partition=0, and the +// later DeleteMessage / ChangeMessageVisibility routes back to +// partition 0 of the partitioned keyspace. +func TestEncodeReceiptHandleDispatch_PerQueueUsesV2(t *testing.T) { + t.Parallel() + token := make([]byte, sqsReceiptTokenBytes) + meta := &sqsQueueMeta{ + PartitionCount: 4, + FifoThroughputLimit: htfifoThroughputPerQueue, + } + h, err := encodeReceiptHandleDispatch(meta, 0, 7, dispatchTestMsgIDHex, token) + require.NoError(t, err) + parsed, err := decodeReceiptHandle(h) + require.NoError(t, err) + require.Equal(t, sqsReceiptHandleVersion2, parsed.Version, + "perQueue + PartitionCount=4 must use the v2 wire format — "+ + "the partition the send wrote to is meaningful even "+ + "when partitionFor collapses every group to partition 0") + require.Equal(t, uint32(0), parsed.Partition) +} + +// TestValidateReceiptHandleVersion_RejectsV2OnNonPartitioned is a +// named regression for the dormancy guarantee under the new +// contract. PR 5a rejected v2 at the public API; PR 5b-2 moves +// that to validateReceiptHandleVersion against the loaded meta. +// In PR 5b-2 the §11 PR 2 dormancy gate still rejects +// PartitionCount > 1 at CreateQueue, so every queue is +// non-partitioned, so every v2 handle still surfaces as +// ReceiptHandleIsInvalid downstream — exactly the PR 5a +// observable behaviour. +func TestValidateReceiptHandleVersion_RejectsV2OnNonPartitioned(t *testing.T) { + t.Parallel() + v2 := &decodedReceiptHandle{Version: sqsReceiptHandleVersion2, Partition: 3} + for _, meta := range []*sqsQueueMeta{nil, {PartitionCount: 0}, {PartitionCount: 1}} { + err := validateReceiptHandleVersion(meta, v2) + require.Error(t, err, + "v2 handle on a non-partitioned queue must be rejected") + require.True(t, strings.Contains(err.Error(), "version"), + "error message should reference version mismatch") + } +} diff --git a/adapter/sqs_messages.go b/adapter/sqs_messages.go index 4f3ee6d06..2151af780 100644 --- a/adapter/sqs_messages.go +++ b/adapter/sqs_messages.go @@ -369,45 +369,28 @@ func decodeReceiptHandleV1(b []byte) (*decodedReceiptHandle, error) { } // decodeClientReceiptHandle is the public-API entry point for -// decoding a client-supplied receipt handle. It wraps -// decodeReceiptHandle with the dormancy gate that keeps the v2 -// codec inert until Phase 3.D PR 5b wires the partitioned-FIFO -// data plane. +// decoding a client-supplied receipt handle. It returns the parsed +// shape without validating the version against the target queue — +// PR 5a's blanket v2 rejection has been replaced with the queue- +// aware check in validateReceiptHandleVersion, which the meta- +// loading callers (loadMessageForDelete / loadAndVerifyMessage) +// invoke once they have the queue's PartitionCount in scope. // -// # Why the gate +// Splitting decode from version validation lets DeleteMessage / +// ChangeMessageVisibility produce a single ReceiptHandleIsInvalid +// shape for both "garbled bytes" and "wrong version for this +// queue" without leaking which one happened — preserving the +// dormancy promise of PR 724 round 3 (no v2 wire-format +// probability from the public API) under the new contract. // -// PR 5a adds the v2 codec to the binary but does NOT yet wire any -// production path that produces v2 handles — SendMessage on a -// partitioned queue is rejected by the §11 PR 2 dormancy gate -// (PartitionCount > 1 → InvalidAttributeValue). Without this -// helper, a client could craft a v2 handle (re-encoding a -// legitimately-issued v1 handle's queue_gen / message_id / -// receipt_token under the v2 layout) and DeleteMessage / -// ChangeMessageVisibility would accept it, since the downstream -// validation only checks queue_gen + receipt_token. The behaviour -// is technically correct (the v1 keyspace lookup still finds the -// message) but it leaks the new wire format before PR 5b lands — -// breaking the "no behavior change yet" guarantee of this PR -// (codex/coderabbit major on PR #724). -// -// PR 5b lifts this gate together with the rest of the data-plane -// fanout: it replaces the != v1 check with a queue-aware version -// (v1 required on non-partitioned queues, v2 required on -// partitioned ones), so neither version leaks into the wrong -// keyspace. Until then, any v2 handle on the public API surfaces -// as ReceiptHandleIsInvalid. +// On partitioned queues (PartitionCount > 1) the §11 PR 2 +// dormancy gate is still in force in PR 5b-2 — CreateQueue +// rejects PartitionCount > 1, so no production queue can be in +// the partitioned branch yet, and validateReceiptHandleVersion +// against a non-partitioned meta still rejects every v2 handle. +// PR 5b-3 lifts the gate together with the capability check. func decodeClientReceiptHandle(raw string) (*decodedReceiptHandle, error) { - handle, err := decodeReceiptHandle(raw) - if err != nil { - return nil, err - } - if handle.Version != sqsReceiptHandleVersion1 { - // v2 codec is added but dormant until PR 5b. Reject any - // non-v1 handle on the public API so the wire format - // does not leak. - return nil, errors.New("receipt handle version is not yet enabled on the public API") - } - return handle, nil + return decodeReceiptHandle(raw) } func decodeReceiptHandleV2(b []byte) (*decodedReceiptHandle, error) { @@ -533,9 +516,14 @@ func (s *SQSServer) sendMessage(w http.ResponseWriter, r *http.Request) { return } - dataKey := sqsMsgDataKey(queueName, meta.Generation, rec.MessageID) - visKey := sqsMsgVisKey(queueName, meta.Generation, rec.AvailableAtMillis, rec.MessageID) - byAgeKey := sqsMsgByAgeKey(queueName, meta.Generation, rec.SendTimestampMillis, rec.MessageID) + // Standard queues never have PartitionCount > 1 (the cross- + // attribute validator rejects it), so partition is always 0 + // here and the dispatch helpers route to the legacy keyspace + // — byte-identical to the pre-PR-5b output. + const partition uint32 = 0 + dataKey := sqsMsgDataKeyDispatch(meta, queueName, partition, meta.Generation, rec.MessageID) + visKey := sqsMsgVisKeyDispatch(meta, queueName, partition, meta.Generation, rec.AvailableAtMillis, rec.MessageID) + byAgeKey := sqsMsgByAgeKeyDispatch(meta, queueName, partition, meta.Generation, rec.SendTimestampMillis, rec.MessageID) metaKey := sqsQueueMetaKey(queueName) genKey := sqsQueueGenKey(queueName) // StartTS + ReadKeys fence against a concurrent DeleteQueue / @@ -867,19 +855,57 @@ func (s *SQSServer) scanAndDeliverOnce(ctx context.Context, queueName string, op if !exists { return nil, newSQSAPIError(http.StatusBadRequest, sqsErrQueueDoesNotExist, "queue does not exist") } - now := time.Now().UnixMilli() - start, end := sqsMsgVisScanBounds(queueName, meta.Generation, now) pageSize := opts.Max * sqsReceiveScanOverfetchFactor if pageSize > sqsVisScanPageLimit { pageSize = sqsVisScanPageLimit } deadline := time.Now().Add(sqsVisScanWallClockBudget) delivered := make([]map[string]any, 0, opts.Max) + // Per-partition fanout. effectivePartitionCount returns 1 on + // legacy / non-partitioned queues (and on perQueue throughput + // mode, where every group hashes to partition 0 — see + // effectivePartitionCount doc), so this loop is byte-identical + // to the pre-fanout path on those queues. Partitions are + // scanned sequentially so the wall-clock + opts.Max budgets are + // shared across the whole receive call instead of being + // multiplied by N. + partitions := effectivePartitionCount(meta) + for partition := uint32(0); partition < partitions; partition++ { + if len(delivered) >= opts.Max { + break + } + if time.Now().After(deadline) { + return delivered, nil + } + fresh, err := s.scanAndDeliverPartition(ctx, queueName, meta, partition, readTS, deadline, pageSize, sqsReceiveOptions{ + Max: opts.Max - len(delivered), + VisibilityTimeout: opts.VisibilityTimeout, + WaitSeconds: opts.WaitSeconds, + MessageAttributeNames: opts.MessageAttributeNames, + }) + if err != nil { + return delivered, err + } + delivered = append(delivered, fresh...) + } + return delivered, nil +} + +// scanAndDeliverPartition pages the visibility index for one +// partition under the shared wall-clock + per-call max budget. On +// legacy / non-partitioned queues the caller invokes this exactly +// once with partition=0 and the bounds come from +// sqsMsgVisScanBoundsDispatch's legacy branch — byte-identical to +// the pre-fanout scanAndDeliverOnce body. +func (s *SQSServer) scanAndDeliverPartition(ctx context.Context, queueName string, meta *sqsQueueMeta, partition uint32, readTS uint64, deadline time.Time, pageSize int, opts sqsReceiveOptions) ([]map[string]any, error) { + now := time.Now().UnixMilli() + start, end := sqsMsgVisScanBoundsDispatch(meta, queueName, partition, meta.Generation, now) + delivered := make([]map[string]any, 0, opts.Max) for len(delivered) < opts.Max { if time.Now().After(deadline) { return delivered, nil } - page, next, done, scanErr := s.scanOneVisibleMessagePage(ctx, start, end, pageSize, readTS) + page, next, done, scanErr := s.scanOneVisibleMessagePage(ctx, start, end, pageSize, readTS, partition) if scanErr != nil { return nil, scanErr } @@ -923,11 +949,15 @@ func resolveReceiveMaxMessages(requested *int) (int, error) { // scanVisibleMessageCandidates returns vis-index entries with // visible_at <= now, up to limit. Each entry carries the key (needed -// for the delete-old-vis step) and the message_id pointed at by its -// value. +// for the delete-old-vis step), the message_id pointed at by its +// value, and the partition the vis-index entry was found under (so +// downstream rotate / delete / expire helpers can look up data / +// byage / group-lock keys in the same partition the message was +// originally stored under). On legacy queues partition is always 0. type sqsMsgCandidate struct { visKey []byte messageID string + partition uint32 } // sqsVisScanWallClockBudget caps how long the scan + deliver loop may @@ -941,8 +971,11 @@ const sqsVisScanWallClockBudget = 100 * time.Millisecond // scanOneVisibleMessagePage reads a single page of the visibility // index starting at `start`, returning the parsed candidates plus the // cursor for the next page. `done=true` means the scan range is -// drained and the caller should stop paging. -func (s *SQSServer) scanOneVisibleMessagePage(ctx context.Context, start, end []byte, pageSize int, readTS uint64) ([]sqsMsgCandidate, []byte, bool, error) { +// drained and the caller should stop paging. `partition` is stamped +// onto every candidate so downstream rotate / delete / expire +// helpers route to the same partition the vis-index entry was found +// under (legacy queues always pass 0). +func (s *SQSServer) scanOneVisibleMessagePage(ctx context.Context, start, end []byte, pageSize int, readTS uint64, partition uint32) ([]sqsMsgCandidate, []byte, bool, error) { kvs, err := s.store.ScanAt(ctx, start, end, pageSize, readTS) if err != nil { return nil, start, true, errors.WithStack(err) @@ -952,7 +985,7 @@ func (s *SQSServer) scanOneVisibleMessagePage(ctx context.Context, start, end [] } out := make([]sqsMsgCandidate, 0, len(kvs)) for _, kvp := range kvs { - out = append(out, sqsMsgCandidate{visKey: bytes.Clone(kvp.Key), messageID: string(kvp.Value)}) + out = append(out, sqsMsgCandidate{visKey: bytes.Clone(kvp.Key), messageID: string(kvp.Value), partition: partition}) } if len(kvs) < pageSize { return out, start, true, nil @@ -979,8 +1012,8 @@ func (s *SQSServer) scanOneVisibleMessagePage(ctx context.Context, start, end [] // - skip=true, err=nil : ErrKeyNotFound race; caller skips this one. // - skip=false, err!=nil : non-retryable; propagate. // - skip=false, err=nil : record loaded. -func (s *SQSServer) loadCandidateRecord(ctx context.Context, queueName string, gen uint64, cand sqsMsgCandidate, readTS uint64) (*sqsMessageRecord, []byte, bool, error) { - dataKey := sqsMsgDataKey(queueName, gen, cand.messageID) +func (s *SQSServer) loadCandidateRecord(ctx context.Context, queueName string, meta *sqsQueueMeta, gen uint64, cand sqsMsgCandidate, readTS uint64) (*sqsMessageRecord, []byte, bool, error) { + dataKey := sqsMsgDataKeyDispatch(meta, queueName, cand.partition, gen, cand.messageID) raw, err := s.store.GetAt(ctx, dataKey, readTS) if err != nil { if errors.Is(err, store.ErrKeyNotFound) { @@ -1002,8 +1035,8 @@ func (s *SQSServer) loadCandidateRecord(ctx context.Context, queueName string, g // no longer our responsibility either way. Any other error propagates // so a coordinator / storage failure does not silently fall through // to "delivered empty", matching the receive-error policy. -func (s *SQSServer) expireMessage(ctx context.Context, queueName string, gen uint64, visKey, dataKey []byte, rec *sqsMessageRecord, readTS uint64) error { - byAgeKey := sqsMsgByAgeKey(queueName, gen, rec.SendTimestampMillis, rec.MessageID) +func (s *SQSServer) expireMessage(ctx context.Context, queueName string, meta *sqsQueueMeta, partition uint32, gen uint64, visKey, dataKey []byte, rec *sqsMessageRecord, readTS uint64) error { + byAgeKey := sqsMsgByAgeKeyDispatch(meta, queueName, partition, gen, rec.SendTimestampMillis, rec.MessageID) readKeys := [][]byte{visKey, dataKey, sqsQueueMetaKey(queueName), sqsQueueGenKey(queueName)} elems := []*kv.Elem[kv.OP]{ {Op: kv.Del, Key: visKey}, @@ -1014,8 +1047,8 @@ func (s *SQSServer) expireMessage(ctx context.Context, queueName string, gen uin // in the same group can become deliverable. This mirrors the delete // and redrive paths. if rec.MessageGroupId != "" { - lockKey := sqsMsgGroupKey(queueName, gen, rec.MessageGroupId) - lock, err := s.loadFifoGroupLock(ctx, queueName, gen, rec.MessageGroupId, readTS) + lockKey := sqsMsgGroupKeyDispatch(meta, queueName, partition, gen, rec.MessageGroupId) + lock, err := s.loadFifoGroupLock(ctx, queueName, meta, partition, gen, rec.MessageGroupId, readTS) if err != nil { return err } @@ -1061,7 +1094,7 @@ func (s *SQSServer) rotateMessagesForDelivery( if len(delivered) >= opts.Max { break } - msg, skip, err := s.tryDeliverCandidate(ctx, queueName, meta.Generation, cand, meta.MessageRetentionSeconds, readTS, opts, redrive) + msg, skip, err := s.tryDeliverCandidate(ctx, queueName, meta, cand, readTS, opts, redrive) if err != nil { return delivered, err } @@ -1087,18 +1120,18 @@ func (s *SQSServer) rotateMessagesForDelivery( func (s *SQSServer) tryDeliverCandidate( ctx context.Context, queueName string, - gen uint64, + meta *sqsQueueMeta, cand sqsMsgCandidate, - retentionSeconds int64, readTS uint64, opts sqsReceiveOptions, redrive *parsedRedrivePolicy, ) (map[string]any, bool, error) { - rec, dataKey, skip, err := s.loadCandidateRecord(ctx, queueName, gen, cand, readTS) + gen := meta.Generation + rec, dataKey, skip, err := s.loadCandidateRecord(ctx, queueName, meta, gen, cand, readTS) if skip || err != nil { return nil, skip, err } - if expired, err := s.handleRetentionExpiry(ctx, queueName, gen, cand, dataKey, rec, retentionSeconds, readTS); expired || err != nil { + if expired, err := s.handleRetentionExpiry(ctx, queueName, meta, cand, dataKey, rec, meta.MessageRetentionSeconds, readTS); expired || err != nil { return nil, expired, err } if shouldRedrive(rec, redrive) { @@ -1107,7 +1140,7 @@ func (s *SQSServer) tryDeliverCandidate( // receive response intentionally omits redriven messages — // AWS does the same and consumers polling the source queue // must not observe a poison message past the limit. - moved, err := s.redriveCandidateToDLQ(ctx, queueName, gen, cand, dataKey, rec, redrive, s.queueArn(queueName), readTS) + moved, err := s.redriveCandidateToDLQ(ctx, queueName, meta, cand, dataKey, rec, redrive, s.queueArn(queueName), readTS) if err != nil { return nil, false, err } @@ -1119,7 +1152,7 @@ func (s *SQSServer) tryDeliverCandidate( lockState := fifoLockAcquire var lockKey []byte if rec.MessageGroupId != "" { - state, key, err := s.classifyFifoGroupLock(ctx, queueName, gen, rec, readTS) + state, key, err := s.classifyFifoGroupLock(ctx, queueName, meta, cand.partition, gen, rec, readTS) if err != nil { return nil, false, err } @@ -1129,7 +1162,7 @@ func (s *SQSServer) tryDeliverCandidate( lockState = state lockKey = key } - return s.commitReceiveRotation(ctx, queueName, gen, cand, dataKey, rec, readTS, opts, lockKey, lockState) + return s.commitReceiveRotation(ctx, queueName, meta, cand, dataKey, rec, readTS, opts, lockKey, lockState) } // handleRetentionExpiry deletes the candidate inline when its @@ -1137,7 +1170,7 @@ func (s *SQSServer) tryDeliverCandidate( // does not keep re-finding it. Returns (expired, err): expired=true // means the candidate has been (or is being) reaped and the caller // must skip. -func (s *SQSServer) handleRetentionExpiry(ctx context.Context, queueName string, gen uint64, cand sqsMsgCandidate, dataKey []byte, rec *sqsMessageRecord, retentionSeconds int64, readTS uint64) (bool, error) { +func (s *SQSServer) handleRetentionExpiry(ctx context.Context, queueName string, meta *sqsQueueMeta, cand sqsMsgCandidate, dataKey []byte, rec *sqsMessageRecord, retentionSeconds int64, readTS uint64) (bool, error) { if retentionSeconds <= 0 { return false, nil } @@ -1145,7 +1178,7 @@ func (s *SQSServer) handleRetentionExpiry(ctx context.Context, queueName string, if now-rec.SendTimestampMillis <= retentionSeconds*sqsMillisPerSecond { return false, nil } - if err := s.expireMessage(ctx, queueName, gen, cand.visKey, dataKey, rec, readTS); err != nil { + if err := s.expireMessage(ctx, queueName, meta, cand.partition, meta.Generation, cand.visKey, dataKey, rec, readTS); err != nil { return false, err } return true, nil @@ -1156,7 +1189,8 @@ func (s *SQSServer) handleRetentionExpiry(ctx context.Context, queueName string, // the candidate carries a MessageGroupId the transaction also // installs (or refreshes) the per-group lock so a later message in // the same group cannot overtake it on the next receive. -func (s *SQSServer) commitReceiveRotation(ctx context.Context, queueName string, gen uint64, cand sqsMsgCandidate, dataKey []byte, rec *sqsMessageRecord, readTS uint64, opts sqsReceiveOptions, lockKey []byte, lockState fifoCandidateLockState) (map[string]any, bool, error) { +func (s *SQSServer) commitReceiveRotation(ctx context.Context, queueName string, meta *sqsQueueMeta, cand sqsMsgCandidate, dataKey []byte, rec *sqsMessageRecord, readTS uint64, opts sqsReceiveOptions, lockKey []byte, lockState fifoCandidateLockState) (map[string]any, bool, error) { + gen := meta.Generation newToken, err := newReceiptToken() if err != nil { return nil, false, err @@ -1173,7 +1207,7 @@ func (s *SQSServer) commitReceiveRotation(ctx context.Context, queueName string, if err != nil { return nil, false, err } - newVisKey := sqsMsgVisKey(queueName, gen, newVisibleAt, cand.messageID) + newVisKey := sqsMsgVisKeyDispatch(meta, queueName, cand.partition, gen, newVisibleAt, cand.messageID) req, err := buildReceiveRotationOps(queueName, cand, dataKey, recordBytes, newVisKey, lockKey, lockState, newVisibleAt, readTS) if err != nil { return nil, false, err @@ -1185,7 +1219,7 @@ func (s *SQSServer) commitReceiveRotation(ctx context.Context, queueName string, return nil, false, errors.WithStack(err) } - handle, err := encodeReceiptHandle(gen, cand.messageID, newToken) + handle, err := encodeReceiptHandleDispatch(meta, cand.partition, gen, cand.messageID, newToken) if err != nil { return nil, false, err } @@ -1335,14 +1369,14 @@ func (s *SQSServer) deleteMessageWithRetry(ctx context.Context, queueName string backoff := transactRetryInitialBackoff deadline := time.Now().Add(transactRetryMaxDuration) for range transactRetryMaxAttempts { - rec, dataKey, readTS, outcome, err := s.loadMessageForDelete(ctx, queueName, handle) + meta, rec, dataKey, readTS, outcome, err := s.loadMessageForDelete(ctx, queueName, handle) if err != nil { return err } if outcome == sqsDeleteNoOp { return nil } - req, err := s.buildDeleteOps(ctx, queueName, handle, rec, dataKey, readTS) + req, err := s.buildDeleteOps(ctx, queueName, meta, handle, rec, dataKey, readTS) if err != nil { return err } @@ -1362,9 +1396,9 @@ func (s *SQSServer) deleteMessageWithRetry(ctx context.Context, queueName string // buildDeleteOps assembles the OCC OperationGroup for a DeleteMessage // commit. The FIFO group-lock release branch lives here so the // retry-loop wrapper stays readable and within the cyclomatic budget. -func (s *SQSServer) buildDeleteOps(ctx context.Context, queueName string, handle *decodedReceiptHandle, rec *sqsMessageRecord, dataKey []byte, readTS uint64) (*kv.OperationGroup[kv.OP], error) { - visKey := sqsMsgVisKey(queueName, handle.QueueGeneration, rec.VisibleAtMillis, rec.MessageID) - byAgeKey := sqsMsgByAgeKey(queueName, handle.QueueGeneration, rec.SendTimestampMillis, rec.MessageID) +func (s *SQSServer) buildDeleteOps(ctx context.Context, queueName string, meta *sqsQueueMeta, handle *decodedReceiptHandle, rec *sqsMessageRecord, dataKey []byte, readTS uint64) (*kv.OperationGroup[kv.OP], error) { + visKey := sqsMsgVisKeyDispatch(meta, queueName, handle.Partition, handle.QueueGeneration, rec.VisibleAtMillis, rec.MessageID) + byAgeKey := sqsMsgByAgeKeyDispatch(meta, queueName, handle.Partition, handle.QueueGeneration, rec.SendTimestampMillis, rec.MessageID) readKeys := [][]byte{dataKey, visKey, sqsQueueMetaKey(queueName), sqsQueueGenKey(queueName)} elems := []*kv.Elem[kv.OP]{ {Op: kv.Del, Key: dataKey}, @@ -1372,8 +1406,8 @@ func (s *SQSServer) buildDeleteOps(ctx context.Context, queueName string, handle {Op: kv.Del, Key: byAgeKey}, } if rec.MessageGroupId != "" { - lockKey := sqsMsgGroupKey(queueName, handle.QueueGeneration, rec.MessageGroupId) - lock, err := s.loadFifoGroupLock(ctx, queueName, handle.QueueGeneration, rec.MessageGroupId, readTS) + lockKey := sqsMsgGroupKeyDispatch(meta, queueName, handle.Partition, handle.QueueGeneration, rec.MessageGroupId) + lock, err := s.loadFifoGroupLock(ctx, queueName, meta, handle.Partition, handle.QueueGeneration, rec.MessageGroupId, readTS) if err != nil { return nil, err } @@ -1414,34 +1448,37 @@ const ( // to a different (or recreated) queue and we reject it as a structural // error — silently succeeding would let misrouted deletes ack messages // that cannot possibly be deleted on this queue. -func (s *SQSServer) loadMessageForDelete(ctx context.Context, queueName string, handle *decodedReceiptHandle) (*sqsMessageRecord, []byte, uint64, sqsDeleteOutcome, error) { +func (s *SQSServer) loadMessageForDelete(ctx context.Context, queueName string, handle *decodedReceiptHandle) (*sqsQueueMeta, *sqsMessageRecord, []byte, uint64, sqsDeleteOutcome, error) { readTS := s.nextTxnReadTS(ctx) meta, exists, err := s.loadQueueMetaAt(ctx, queueName, readTS) if err != nil { - return nil, nil, readTS, sqsDeleteProceed, errors.WithStack(err) + return nil, nil, nil, readTS, sqsDeleteProceed, errors.WithStack(err) } if !exists { - return nil, nil, readTS, sqsDeleteProceed, newSQSAPIError(http.StatusBadRequest, sqsErrQueueDoesNotExist, "queue does not exist") + return nil, nil, nil, readTS, sqsDeleteProceed, newSQSAPIError(http.StatusBadRequest, sqsErrQueueDoesNotExist, "queue does not exist") } if meta.Generation != handle.QueueGeneration { - return nil, nil, readTS, sqsDeleteProceed, newSQSAPIError(http.StatusBadRequest, sqsErrReceiptHandleInvalid, "receipt handle does not belong to this queue") + return nil, nil, nil, readTS, sqsDeleteProceed, newSQSAPIError(http.StatusBadRequest, sqsErrReceiptHandleInvalid, "receipt handle does not belong to this queue") + } + if err := validateReceiptHandleVersion(meta, handle); err != nil { + return nil, nil, nil, readTS, sqsDeleteProceed, newSQSAPIError(http.StatusBadRequest, sqsErrReceiptHandleInvalid, "receipt handle is not valid for this queue") } - dataKey := sqsMsgDataKey(queueName, handle.QueueGeneration, handle.MessageIDHex) + dataKey := sqsMsgDataKeyDispatch(meta, queueName, handle.Partition, handle.QueueGeneration, handle.MessageIDHex) raw, err := s.store.GetAt(ctx, dataKey, readTS) if err != nil { if errors.Is(err, store.ErrKeyNotFound) { - return nil, nil, readTS, sqsDeleteNoOp, nil + return meta, nil, nil, readTS, sqsDeleteNoOp, nil } - return nil, nil, readTS, sqsDeleteProceed, errors.WithStack(err) + return nil, nil, nil, readTS, sqsDeleteProceed, errors.WithStack(err) } rec, err := decodeSQSMessageRecord(raw) if err != nil { - return nil, nil, readTS, sqsDeleteProceed, errors.WithStack(err) + return nil, nil, nil, readTS, sqsDeleteProceed, errors.WithStack(err) } if !bytes.Equal(rec.CurrentReceiptToken, handle.ReceiptToken) { - return nil, nil, readTS, sqsDeleteNoOp, nil + return meta, nil, nil, readTS, sqsDeleteNoOp, nil } - return rec, dataKey, readTS, sqsDeleteProceed, nil + return meta, rec, dataKey, readTS, sqsDeleteProceed, nil } func (s *SQSServer) changeMessageVisibility(w http.ResponseWriter, r *http.Request) { @@ -1485,7 +1522,7 @@ func (s *SQSServer) changeVisibilityWithRetry(ctx context.Context, queueName str backoff := transactRetryInitialBackoff deadline := time.Now().Add(transactRetryMaxDuration) for range transactRetryMaxAttempts { - rec, dataKey, readTS, apiErr := s.loadAndVerifyMessage(ctx, queueName, handle) + meta, rec, dataKey, readTS, apiErr := s.loadAndVerifyMessage(ctx, queueName, handle) if apiErr != nil { return apiErr } @@ -1493,13 +1530,13 @@ func (s *SQSServer) changeVisibilityWithRetry(ctx context.Context, queueName str if rec.VisibleAtMillis <= now { return newSQSAPIError(http.StatusBadRequest, sqsErrMessageNotInflight, "message is not currently in flight") } - oldVisKey := sqsMsgVisKey(queueName, handle.QueueGeneration, rec.VisibleAtMillis, rec.MessageID) + oldVisKey := sqsMsgVisKeyDispatch(meta, queueName, handle.Partition, handle.QueueGeneration, rec.VisibleAtMillis, rec.MessageID) rec.VisibleAtMillis = now + newTimeout*sqsMillisPerSecond recordBytes, err := encodeSQSMessageRecord(rec) if err != nil { return errors.WithStack(err) } - newVisKey := sqsMsgVisKey(queueName, handle.QueueGeneration, rec.VisibleAtMillis, rec.MessageID) + newVisKey := sqsMsgVisKeyDispatch(meta, queueName, handle.Partition, handle.QueueGeneration, rec.VisibleAtMillis, rec.MessageID) // StartTS pins OCC to the snapshot; without it the coordinator // would auto-assign a newer StartTS and a concurrent receive / // delete that commits between our load and dispatch could slip @@ -1555,34 +1592,37 @@ func (s *SQSServer) parseQueueAndReceipt(queueUrl, receiptHandle string) (string // cleans them up, so a handle from a deleted / recreated queue must // be rejected with ReceiptHandleIsInvalid instead of silently // mutating the orphan record. -func (s *SQSServer) loadAndVerifyMessage(ctx context.Context, queueName string, handle *decodedReceiptHandle) (*sqsMessageRecord, []byte, uint64, error) { +func (s *SQSServer) loadAndVerifyMessage(ctx context.Context, queueName string, handle *decodedReceiptHandle) (*sqsQueueMeta, *sqsMessageRecord, []byte, uint64, error) { readTS := s.nextTxnReadTS(ctx) meta, exists, err := s.loadQueueMetaAt(ctx, queueName, readTS) if err != nil { - return nil, nil, readTS, errors.WithStack(err) + return nil, nil, nil, readTS, errors.WithStack(err) } if !exists { - return nil, nil, readTS, newSQSAPIError(http.StatusBadRequest, sqsErrQueueDoesNotExist, "queue does not exist") + return nil, nil, nil, readTS, newSQSAPIError(http.StatusBadRequest, sqsErrQueueDoesNotExist, "queue does not exist") } if meta.Generation != handle.QueueGeneration { - return nil, nil, readTS, newSQSAPIError(http.StatusBadRequest, sqsErrReceiptHandleInvalid, "receipt handle does not belong to this queue") + return nil, nil, nil, readTS, newSQSAPIError(http.StatusBadRequest, sqsErrReceiptHandleInvalid, "receipt handle does not belong to this queue") + } + if err := validateReceiptHandleVersion(meta, handle); err != nil { + return nil, nil, nil, readTS, newSQSAPIError(http.StatusBadRequest, sqsErrReceiptHandleInvalid, "receipt handle is not valid for this queue") } - dataKey := sqsMsgDataKey(queueName, handle.QueueGeneration, handle.MessageIDHex) + dataKey := sqsMsgDataKeyDispatch(meta, queueName, handle.Partition, handle.QueueGeneration, handle.MessageIDHex) raw, err := s.store.GetAt(ctx, dataKey, readTS) if err != nil { if errors.Is(err, store.ErrKeyNotFound) { - return nil, nil, readTS, newSQSAPIError(http.StatusBadRequest, sqsErrReceiptHandleInvalid, "message not found") + return nil, nil, nil, readTS, newSQSAPIError(http.StatusBadRequest, sqsErrReceiptHandleInvalid, "message not found") } - return nil, nil, readTS, errors.WithStack(err) + return nil, nil, nil, readTS, errors.WithStack(err) } rec, err := decodeSQSMessageRecord(raw) if err != nil { - return nil, nil, readTS, errors.WithStack(err) + return nil, nil, nil, readTS, errors.WithStack(err) } if !bytes.Equal(rec.CurrentReceiptToken, handle.ReceiptToken) { - return nil, nil, readTS, newSQSAPIError(http.StatusBadRequest, sqsErrInvalidReceiptHandle, "receipt handle token does not match") + return nil, nil, nil, readTS, newSQSAPIError(http.StatusBadRequest, sqsErrInvalidReceiptHandle, "receipt handle token does not match") } - return rec, dataKey, readTS, nil + return meta, rec, dataKey, readTS, nil } // ------------------------ small helpers ------------------------ diff --git a/adapter/sqs_messages_batch.go b/adapter/sqs_messages_batch.go index e837750c7..6a9c61cec 100644 --- a/adapter/sqs_messages_batch.go +++ b/adapter/sqs_messages_batch.go @@ -193,9 +193,14 @@ func (s *SQSServer) sendBatchStandardOnce( failed = append(failed, batchErrorEntryFromAPIErr(entry.Id, apiErr)) continue } - dataKey := sqsMsgDataKey(queueName, meta.Generation, rec.MessageID) - visKey := sqsMsgVisKey(queueName, meta.Generation, rec.AvailableAtMillis, rec.MessageID) - byAgeKey := sqsMsgByAgeKey(queueName, meta.Generation, rec.SendTimestampMillis, rec.MessageID) + // Standard batch send: PartitionCount > 1 is rejected on + // non-FIFO queues by the cross-attribute validator, so + // partition is always 0 and dispatch routes to legacy + // keys — byte-identical to the pre-PR-5b output. + const partition uint32 = 0 + dataKey := sqsMsgDataKeyDispatch(meta, queueName, partition, meta.Generation, rec.MessageID) + visKey := sqsMsgVisKeyDispatch(meta, queueName, partition, meta.Generation, rec.AvailableAtMillis, rec.MessageID) + byAgeKey := sqsMsgByAgeKeyDispatch(meta, queueName, partition, meta.Generation, rec.SendTimestampMillis, rec.MessageID) elems = append(elems, &kv.Elem[kv.OP]{Op: kv.Put, Key: dataKey, Value: recordBytes}, &kv.Elem[kv.OP]{Op: kv.Put, Key: visKey, Value: []byte(rec.MessageID)}, diff --git a/adapter/sqs_partitioned_dispatch_test.go b/adapter/sqs_partitioned_dispatch_test.go new file mode 100644 index 000000000..237fd6a82 --- /dev/null +++ b/adapter/sqs_partitioned_dispatch_test.go @@ -0,0 +1,318 @@ +package adapter + +import ( + "context" + "net/http" + "strings" + "testing" + + "github.com/bootjp/elastickv/kv" + "github.com/stretchr/testify/require" +) + +// Integration tests for the PR 5b-2 partitioned-FIFO data plane +// wiring. The §11 PR 2 dormancy gate still rejects PartitionCount +// > 1 at CreateQueue (lifted atomically with the capability check +// in PR 5b-3), so these tests reach below the public CreateQueue +// surface to install a partitioned meta record directly. That +// short-circuits the dormancy gate for the duration of the test +// without disabling it for production CreateQueue calls — which +// is the exact split the design doc envisaged for "data plane +// landed but not user-creatable yet." + +// installPartitionedMetaForTest overwrites the queue's meta record +// with a partitioned shape (PartitionCount > 1) by dispatching a +// raw OCC put against the meta key. The queue is created via the +// normal CreateQueue path first (so generation / incarnation +// counters and the catalog index are populated correctly); only +// the partition-shape attributes are mutated. +// +// The dormancy gate intercepts CreateQueue, not the data plane, +// so once the meta record carries PartitionCount > 1 every +// SendMessage / ReceiveMessage / DeleteMessage call routes +// through the partitioned dispatch helpers exactly as it would +// after PR 5b-3 lifts the gate. +func installPartitionedMetaForTest(t *testing.T, node Node, queueName string, partitionCount uint32, throughputLimit string) { + t.Helper() + s := node.sqsServer + require.NotNil(t, s, "test must run on a node with sqsServer wired") + + ctx := context.Background() + readTS := s.nextTxnReadTS(ctx) + meta, exists, err := s.loadQueueMetaAt(ctx, queueName, readTS) + require.NoError(t, err) + require.True(t, exists, "queue %q must exist before partition meta override", queueName) + + meta.PartitionCount = partitionCount + meta.FifoThroughputLimit = throughputLimit + meta.DeduplicationScope = htfifoDedupeScopeMessageGroup + + body, err := encodeSQSQueueMeta(meta) + require.NoError(t, err) + + req := &kv.OperationGroup[kv.OP]{ + IsTxn: true, + StartTS: readTS, + ReadKeys: [][]byte{sqsQueueMetaKey(queueName)}, + Elems: []*kv.Elem[kv.OP]{ + {Op: kv.Put, Key: sqsQueueMetaKey(queueName), Value: body}, + }, + } + _, err = s.coordinator.Dispatch(ctx, req) + require.NoError(t, err) +} + +// TestSQSServer_PartitionedFIFO_SendReceiveDeleteRoundTrip is the +// end-to-end smoke test for PR 5b-2's wiring: SendMessage on a +// partitioned FIFO queue stores the message under the partitioned +// keyspace, ReceiveMessage's fanout finds it via the partitioned +// vis prefix, the returned ReceiptHandle is in v2 wire format, and +// DeleteMessage routes back to the same partition via +// handle.Partition. Failure of any step is a wiring break. +func TestSQSServer_PartitionedFIFO_SendReceiveDeleteRoundTrip(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + const queueName = "orders.fifo" + status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{ + "QueueName": queueName, + "Attributes": map[string]string{"FifoQueue": "true"}, + }) + require.Equal(t, http.StatusOK, status, "create FIFO queue: %v", out) + queueURL, _ := out["QueueUrl"].(string) + require.NotEmpty(t, queueURL) + + installPartitionedMetaForTest(t, node, queueName, 4, htfifoThroughputPerMessageGroupID) + + // Send a few messages spread across distinct group ids so + // partitionFor gets to actually pick different partitions. + groups := []string{"alpha", "beta", "gamma", "delta", "epsilon", "zeta"} + sent := make(map[string]string, len(groups)) // group -> messageID + for i, g := range groups { + body := "body-" + g + dedup := "dedup-" + g + status, out := callSQS(t, node, sqsSendMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MessageBody": body, + "MessageGroupId": g, + "MessageDeduplicationId": dedup, + }) + require.Equal(t, http.StatusOK, status, + "send #%d (group=%s): %v", i, g, out) + msgID, _ := out["MessageId"].(string) + require.NotEmpty(t, msgID, "send #%d: empty MessageId", i) + sent[g] = msgID + } + + // Receive them all; the fanout walks all 4 partitions to find + // every message. Receive enough times to drain the queue; + // each receive bumps a per-message visibility deadline so a + // second receive is a no-op for already-rotated messages + // inside the same call. + collected := make(map[string]string, len(groups)) // messageID -> receiptHandle + for range len(groups) * 4 { // generous budget + if len(collected) == len(groups) { + break + } + status, out := callSQS(t, node, sqsReceiveMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MaxNumberOfMessages": 10, + "VisibilityTimeout": 60, + }) + require.Equal(t, http.StatusOK, status, "receive: %v", out) + msgs, _ := out["Messages"].([]any) + for _, m := range msgs { + mm, _ := m.(map[string]any) + id, _ := mm["MessageId"].(string) + handle, _ := mm["ReceiptHandle"].(string) + + // The handle must be v2 since the queue is partitioned — + // this is the wiring guarantee. + parsed, err := decodeReceiptHandle(handle) + require.NoError(t, err, "decode receipt handle: %v", err) + require.Equal(t, sqsReceiptHandleVersion2, parsed.Version, + "partitioned queue must produce v2 handles, got version=0x%02x for message=%s", + parsed.Version, id) + require.Less(t, parsed.Partition, uint32(4), + "v2 handle partition out of range") + + collected[id] = handle + } + } + require.Len(t, collected, len(groups), + "fanout receive must surface every message on the partitioned queue") + + // Delete each one via its v2 handle. The DeleteMessage path + // uses handle.Partition to dispatch the data-key lookup; if + // that wire is broken these calls return ReceiptHandleIsInvalid + // (no record found at the legacy data key). + for id, handle := range collected { + status, out := callSQS(t, node, sqsDeleteMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "ReceiptHandle": handle, + }) + require.Equal(t, http.StatusOK, status, + "delete (msg=%s): %v", id, out) + } + + // Queue must now be empty even after the visibility timeout + // would have re-exposed it. + status, out = callSQS(t, node, sqsReceiveMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MaxNumberOfMessages": 10, + }) + require.Equal(t, http.StatusOK, status, "post-delete receive: %v", out) + if msgs, _ := out["Messages"].([]any); len(msgs) > 0 { + t.Fatalf("expected empty queue after delete; got %d messages", len(msgs)) + } + + // Sanity: the legacy keyspace must be empty (every send on + // this queue went to the partitioned keyspace, never the + // legacy one). We probe the legacy data prefix at this + // queue's generation: it must yield zero entries. + ctx := context.Background() + readTS := node.sqsServer.nextTxnReadTS(ctx) + meta, exists, err := node.sqsServer.loadQueueMetaAt(ctx, queueName, readTS) + require.NoError(t, err) + require.True(t, exists) + legacyDataPrefix := sqsMsgDataKey(queueName, meta.Generation, "") + // Cap the prefix scan at the generation byte so we do not + // drag in unrelated queues. + end := append([]byte(nil), legacyDataPrefix...) + end = append(end, 0xFF, 0xFF, 0xFF, 0xFF) + page, err := node.sqsServer.store.ScanAt(ctx, legacyDataPrefix, end, 32, readTS) + require.NoError(t, err) + for _, kvp := range page { + require.False(t, strings.HasPrefix(string(kvp.Key), string(legacyDataPrefix)), + "legacy data key found on a partitioned queue (key=%q) — wiring leaked into the wrong keyspace", + string(kvp.Key)) + } +} + +// TestSQSServer_PartitionedFIFO_RejectsV1Handle pins the queue- +// aware version validation: a v1 handle on a partitioned queue +// surfaces as ReceiptHandleIsInvalid. PR 5a's blanket v2 rejection +// at the API boundary moved into validateReceiptHandleVersion, +// which is invoked once meta is loaded. +func TestSQSServer_PartitionedFIFO_RejectsV1Handle(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + const queueName = "rejectv1.fifo" + status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{ + "QueueName": queueName, + "Attributes": map[string]string{"FifoQueue": "true"}, + }) + require.Equal(t, http.StatusOK, status, "create: %v", out) + queueURL, _ := out["QueueUrl"].(string) + + // Send one message under legacy meta so we know the queue + // generation. Then upgrade meta to partitioned. + status, _ = callSQS(t, node, sqsSendMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MessageBody": "x", + "MessageGroupId": "g", + "MessageDeduplicationId": "d", + }) + require.Equal(t, http.StatusOK, status) + + installPartitionedMetaForTest(t, node, queueName, 4, htfifoThroughputPerMessageGroupID) + + // Forge a v1 handle that would have been valid before the + // upgrade. The partitioned-queue receipt-version check must + // reject it as ReceiptHandleIsInvalid (not pass it through to + // dispatch where it would route to the legacy keyspace and + // silently miss). + token := make([]byte, sqsReceiptTokenBytes) + v1Handle, err := encodeReceiptHandle(1, "deadbeefdeadbeefdeadbeefdeadbeef", token) + require.NoError(t, err) + status, out = callSQS(t, node, sqsDeleteMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "ReceiptHandle": v1Handle, + }) + require.Equal(t, http.StatusBadRequest, status, + "v1 handle on partitioned queue must surface as ReceiptHandleIsInvalid: %v", out) + require.Equal(t, sqsErrReceiptHandleInvalid, out["__type"]) + + // ChangeMessageVisibility must reject the same v1 handle. + status, out = callSQS(t, node, sqsChangeMessageVisibilityTarget, map[string]any{ + "QueueUrl": queueURL, + "ReceiptHandle": v1Handle, + "VisibilityTimeout": int64(30), + }) + require.Equal(t, http.StatusBadRequest, status, "%v", out) + require.Equal(t, sqsErrReceiptHandleInvalid, out["__type"]) +} + +// TestSQSServer_PartitionedFIFO_PerQueueCollapsesToPartitionZero +// is the named regression for the PR 731 round 2 forward note: a +// queue with PartitionCount=4 and FifoThroughputLimit=perQueue +// has every group hashed to partition 0 by partitionFor, but the +// keyspace is still partitioned. The receive fanout must collapse +// to 1 iteration (effectivePartitionCount=1) AND that iteration +// must scan the partitioned vis prefix — the perQueue +// short-circuit narrows the iteration count, not the keyspace. +func TestSQSServer_PartitionedFIFO_PerQueueCollapsesToPartitionZero(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + const queueName = "perqueue.fifo" + status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{ + "QueueName": queueName, + "Attributes": map[string]string{"FifoQueue": "true"}, + }) + require.Equal(t, http.StatusOK, status, "create: %v", out) + queueURL, _ := out["QueueUrl"].(string) + + installPartitionedMetaForTest(t, node, queueName, 4, htfifoThroughputPerQueue) + + // Send messages from many distinct groups; in perQueue mode + // they all land in partition 0. + groups := []string{"alpha", "beta", "gamma", "delta", "epsilon"} + for _, g := range groups { + status, out := callSQS(t, node, sqsSendMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MessageBody": "body-" + g, + "MessageGroupId": g, + "MessageDeduplicationId": "d-" + g, + }) + require.Equal(t, http.StatusOK, status, "send group=%s: %v", g, out) + } + + // One receive call must surface every message — the fanout + // only iterates partition 0 (effectivePartitionCount=1) but + // that single iteration must use the partitioned vis prefix. + collected := make(map[string]bool, len(groups)) + for range 4 { + if len(collected) == len(groups) { + break + } + status, out := callSQS(t, node, sqsReceiveMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MaxNumberOfMessages": 10, + "VisibilityTimeout": 60, + }) + require.Equal(t, http.StatusOK, status, "receive: %v", out) + msgs, _ := out["Messages"].([]any) + for _, m := range msgs { + mm, _ := m.(map[string]any) + body, _ := mm["Body"].(string) + collected[body] = true + handle, _ := mm["ReceiptHandle"].(string) + parsed, err := decodeReceiptHandle(handle) + require.NoError(t, err) + require.Equal(t, sqsReceiptHandleVersion2, parsed.Version, + "perQueue + PartitionCount=4 must still produce v2 handles") + require.Equal(t, uint32(0), parsed.Partition, + "perQueue mode pins every group to partition 0, so every handle must record Partition=0") + } + } + require.Len(t, collected, len(groups), + "perQueue receive must surface every message in one fanout pass over partition 0") +} diff --git a/adapter/sqs_reaper.go b/adapter/sqs_reaper.go index c2a078a4f..c870917a1 100644 --- a/adapter/sqs_reaper.go +++ b/adapter/sqs_reaper.go @@ -370,7 +370,13 @@ func (s *SQSServer) reapPage(ctx context.Context, queueName string, currentGen u // success — the message has just been touched (received, deleted, // redriven) by another path and is no longer ours to reap. func (s *SQSServer) reapOneRecord(ctx context.Context, queueName string, gen uint64, byAgeKey []byte, messageID string, readTS uint64) error { - dataKey := sqsMsgDataKey(queueName, gen, messageID) + // Reaper iterates legacy byAge entries only in PR 5b-2; the + // partitioned-byAge enumeration ships in a later PR. nil meta + // + partition 0 routes the dispatch helper to the legacy + // constructor so the data-key matches the pre-PR-5b layout + // byte-for-byte. + const partition uint32 = 0 + dataKey := sqsMsgDataKeyDispatch(nil, queueName, partition, gen, messageID) parsed, found, err := s.loadDataForReaper(ctx, dataKey, readTS) if err != nil { return err @@ -513,7 +519,15 @@ func (s *SQSServer) dispatchDedupDelete(ctx context.Context, key []byte, readTS } func (s *SQSServer) buildReapOps(ctx context.Context, queueName string, gen uint64, byAgeKey, dataKey []byte, parsed *sqsMessageRecord, readTS uint64) (*kv.OperationGroup[kv.OP], error) { - visKey := sqsMsgVisKey(queueName, gen, parsed.VisibleAtMillis, parsed.MessageID) + // Reaper currently iterates the legacy byAge keyspace only — the + // partitioned-byAge enumeration is wired in a later PR (Phase 3.D + // PR 6, partition-iterating reaper). Dispatch helpers receive + // nil meta + partition 0 so they deterministically route to the + // legacy constructor and produce byte-identical keys to the + // pre-PR-5b reaper. When PR 6 lands the caller switches to the + // real meta + parsed partition. + const partition uint32 = 0 + visKey := sqsMsgVisKeyDispatch(nil, queueName, partition, gen, parsed.VisibleAtMillis, parsed.MessageID) readKeys := [][]byte{byAgeKey, dataKey, visKey, sqsQueueMetaKey(queueName), sqsQueueGenKey(queueName)} elems := []*kv.Elem[kv.OP]{ {Op: kv.Del, Key: byAgeKey}, @@ -521,8 +535,8 @@ func (s *SQSServer) buildReapOps(ctx context.Context, queueName string, gen uint {Op: kv.Del, Key: visKey}, } if parsed.MessageGroupId != "" { - lockKey := sqsMsgGroupKey(queueName, gen, parsed.MessageGroupId) - lock, err := s.loadFifoGroupLock(ctx, queueName, gen, parsed.MessageGroupId, readTS) + lockKey := sqsMsgGroupKeyDispatch(nil, queueName, partition, gen, parsed.MessageGroupId) + lock, err := s.loadFifoGroupLock(ctx, queueName, nil, partition, gen, parsed.MessageGroupId, readTS) if err != nil { return nil, err } diff --git a/adapter/sqs_receipt_handle_v2_test.go b/adapter/sqs_receipt_handle_v2_test.go index aa02da4ad..9ca18a05a 100644 --- a/adapter/sqs_receipt_handle_v2_test.go +++ b/adapter/sqs_receipt_handle_v2_test.go @@ -278,53 +278,39 @@ func TestDecodeClientReceiptHandle_AcceptsV1(t *testing.T) { require.Equal(t, uint64(7), back.QueueGeneration) } -// TestDecodeClientReceiptHandle_RejectsV2 pins the codex/coderabbit -// major fix on PR #724 round 1: the v2 codec is added but -// dormant. A client-supplied v2 handle MUST be rejected at the -// public API boundary so the wire format does not leak before -// PR 5b wires the partitioned-FIFO data plane. -// -// Without this gate, a malicious / curious client could re-encode -// a legitimately-issued v1 handle's (queue_gen, message_id, -// receipt_token) under the v2 layout, and DeleteMessage / -// ChangeMessageVisibility would accept it (since downstream -// validation only checks queue_gen + receipt_token). PR 5b -// replaces this gate with a queue-aware version (v1 required on -// non-partitioned queues, v2 required on partitioned ones), so -// the gate-removal lands together with the partitioned data plane. -func TestDecodeClientReceiptHandle_RejectsV2(t *testing.T) { +// TestDecodeClientReceiptHandle_AcceptsV2 pins the PR 5b-2 contract +// shift: the public API wrapper no longer enforces the PR 5a +// blanket v2 rejection. Version validation moved into +// validateReceiptHandleVersion, which the meta-loading callers +// (loadMessageForDelete / loadAndVerifyMessage) invoke once they +// have the queue's PartitionCount in scope. The dormancy promise +// is preserved by the queue-aware check downstream — v2 handles +// against a non-partitioned queue still surface as +// ReceiptHandleIsInvalid (see +// TestValidateReceiptHandleVersion_RejectsV2OnNonPartitioned), +// and the §11 PR 2 dormancy gate still rejects PartitionCount > 1 +// at CreateQueue, so no production queue can be in the +// partitioned branch until PR 5b-3 lifts the gate atomically. +func TestDecodeClientReceiptHandle_AcceptsV2(t *testing.T) { t.Parallel() token := make([]byte, sqsReceiptTokenBytes) h, err := encodeReceiptHandleV2(3, 7, "deadbeefdeadbeefdeadbeefdeadbeef", token) - require.NoError(t, err, - "v2 encoder must succeed even though the public API "+ - "wrapper rejects the result — the codec is dormant, "+ - "not absent") - - // Low-level decoder still accepts v2 (it's pure codec). - back, err := decodeReceiptHandle(h) require.NoError(t, err) - require.Equal(t, sqsReceiptHandleVersion2, back.Version, - "low-level decodeReceiptHandle must keep working — the "+ - "gate is at the public API boundary, not in the codec") - // Public API wrapper rejects v2. - _, err = decodeClientReceiptHandle(h) - require.Error(t, err, - "v2 handle on the public API must fail until PR 5b lifts "+ - "the dormancy gate") - require.Contains(t, err.Error(), "not yet enabled") + back, err := decodeClientReceiptHandle(h) + require.NoError(t, err, + "v2 handle must decode at the public API; version "+ + "validation moved to validateReceiptHandleVersion") + require.Equal(t, sqsReceiptHandleVersion2, back.Version) + require.Equal(t, uint32(3), back.Partition) + require.Equal(t, uint64(7), back.QueueGeneration) } // TestDecodeClientReceiptHandle_PassesThroughDecodeErrors pins // that decode-error propagation is unchanged — a malformed blob -// still surfaces the underlying base64 / length error rather than -// being masked by the dormancy-gate message. +// still surfaces the underlying base64 / length error. func TestDecodeClientReceiptHandle_PassesThroughDecodeErrors(t *testing.T) { t.Parallel() _, err := decodeClientReceiptHandle("!!!" + strings.Repeat("?", 50)) require.Error(t, err) - require.NotContains(t, err.Error(), "not yet enabled", - "a decode-step error must NOT be reported as the "+ - "dormancy-gate message") } diff --git a/adapter/sqs_redrive.go b/adapter/sqs_redrive.go index 4e1084862..634c50582 100644 --- a/adapter/sqs_redrive.go +++ b/adapter/sqs_redrive.go @@ -114,7 +114,7 @@ func shouldRedrive(rec *sqsMessageRecord, policy *parsedRedrivePolicy) bool { func (s *SQSServer) redriveCandidateToDLQ( ctx context.Context, srcQueueName string, - srcGen uint64, + srcMeta *sqsQueueMeta, cand sqsMsgCandidate, srcDataKey []byte, srcRec *sqsMessageRecord, @@ -147,7 +147,7 @@ func (s *SQSServer) redriveCandidateToDLQ( if err != nil { return false, err } - req, err := s.buildRedriveOps(ctx, srcQueueName, srcGen, cand, srcDataKey, srcRec, policy, dlqMeta, dlqRec, dlqRecordBytes, dlqSeq, readTS) + req, err := s.buildRedriveOps(ctx, srcQueueName, srcMeta, cand, srcDataKey, srcRec, policy, dlqMeta, dlqRec, dlqRecordBytes, dlqSeq, readTS) if err != nil { return false, err } @@ -278,7 +278,7 @@ func buildDLQRecord(srcRec *sqsMessageRecord, dlqMeta *sqsQueueMeta, srcArn stri func (s *SQSServer) buildRedriveOps( ctx context.Context, srcQueueName string, - srcGen uint64, + srcMeta *sqsQueueMeta, cand sqsMsgCandidate, srcDataKey []byte, srcRec *sqsMessageRecord, @@ -289,11 +289,17 @@ func (s *SQSServer) buildRedriveOps( dlqSeq uint64, readTS uint64, ) (*kv.OperationGroup[kv.OP], error) { + srcGen := srcMeta.Generation now := dlqRec.SendTimestampMillis - dlqDataKey := sqsMsgDataKey(policy.DLQName, dlqMeta.Generation, dlqRec.MessageID) - dlqVisKey := sqsMsgVisKey(policy.DLQName, dlqMeta.Generation, now, dlqRec.MessageID) - dlqByAgeKey := sqsMsgByAgeKey(policy.DLQName, dlqMeta.Generation, now, dlqRec.MessageID) - srcByAgeKey := sqsMsgByAgeKey(srcQueueName, srcGen, srcRec.SendTimestampMillis, srcRec.MessageID) + // DLQ partition for FIFO sources: redrive carries the source's + // MessageGroupId forward, so the DLQ partition is the result of + // hashing that group through the DLQ's partitionFor. Standard + // DLQs (or any DLQ with PartitionCount <= 1) collapse this to 0. + dlqPartition := partitionFor(dlqMeta, dlqRec.MessageGroupId) + dlqDataKey := sqsMsgDataKeyDispatch(dlqMeta, policy.DLQName, dlqPartition, dlqMeta.Generation, dlqRec.MessageID) + dlqVisKey := sqsMsgVisKeyDispatch(dlqMeta, policy.DLQName, dlqPartition, dlqMeta.Generation, now, dlqRec.MessageID) + dlqByAgeKey := sqsMsgByAgeKeyDispatch(dlqMeta, policy.DLQName, dlqPartition, dlqMeta.Generation, now, dlqRec.MessageID) + srcByAgeKey := sqsMsgByAgeKeyDispatch(srcMeta, srcQueueName, cand.partition, srcGen, srcRec.SendTimestampMillis, srcRec.MessageID) readKeys := [][]byte{ cand.visKey, srcDataKey, sqsQueueMetaKey(srcQueueName), sqsQueueGenKey(srcQueueName), @@ -317,8 +323,8 @@ func (s *SQSServer) buildRedriveOps( }) } if srcRec.MessageGroupId != "" { - lockKey := sqsMsgGroupKey(srcQueueName, srcGen, srcRec.MessageGroupId) - lock, err := s.loadFifoGroupLock(ctx, srcQueueName, srcGen, srcRec.MessageGroupId, readTS) + lockKey := sqsMsgGroupKeyDispatch(srcMeta, srcQueueName, cand.partition, srcGen, srcRec.MessageGroupId) + lock, err := s.loadFifoGroupLock(ctx, srcQueueName, srcMeta, cand.partition, srcGen, srcRec.MessageGroupId, readTS) if err != nil { return nil, err } From 52c449f5b3b7070cf3f8463a94d54e57fffca022 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sun, 3 May 2026 05:14:43 +0900 Subject: [PATCH 2/7] sqs(receive): rotate fanout starting partition by readTS (PR #732, round 1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex P1: with the fanout always starting at partition 0, a sustained backlog on partition 0 would fill opts.Max before the loop reached partition 1 — messages in higher-index partitions were never observed under load. Rotate the starting partition by readTS so consecutive receives spread across every partition. Helper startPartitionOffset folds the upper and lower 32 bits of the HLC-derived readTS and mask-ANDs by partitions-1 (PartitionCount is power-of-two by validator invariant). The byte-slice fold preserves entropy from both HLC halves while keeping the conversion in uint32 space — no //nolint needed for gosec G115. FIFO ordering is unaffected: partitionFor is deterministic, so a MessageGroupId always lands in one partition and cross-partition iteration order does not reorder messages within any group. Caller audit: scanAndDeliverOnce / scanAndDeliverPartition signatures and error semantics are unchanged; only the internal iteration order rotates. New helper has only the one in-package caller. --- adapter/sqs_messages.go | 44 +++++++++++++++++++++++- adapter/sqs_partitioned_dispatch_test.go | 40 +++++++++++++++++++++ 2 files changed, 83 insertions(+), 1 deletion(-) diff --git a/adapter/sqs_messages.go b/adapter/sqs_messages.go index 2151af780..bc9647b9f 100644 --- a/adapter/sqs_messages.go +++ b/adapter/sqs_messages.go @@ -869,14 +869,27 @@ func (s *SQSServer) scanAndDeliverOnce(ctx context.Context, queueName string, op // scanned sequentially so the wall-clock + opts.Max budgets are // shared across the whole receive call instead of being // multiplied by N. + // + // Rotate the starting partition by readTS so a sustained backlog + // on a single partition cannot permanently starve the others: a + // fixed start at 0 lets opts.Max fill from partition 0 on every + // call, so messages in higher-index partitions are never observed + // while the head queue stays hot. readTS is HLC-derived and + // advances per call, giving a uniform-enough rotation without + // any per-server state. FIFO ordering is unaffected because a + // MessageGroupId hashes to exactly one partition (partitionFor is + // deterministic), so cross-partition iteration order does not + // reorder messages within any group. partitions := effectivePartitionCount(meta) - for partition := uint32(0); partition < partitions; partition++ { + startOffset := startPartitionOffset(partitions, readTS) + for i := uint32(0); i < partitions; i++ { if len(delivered) >= opts.Max { break } if time.Now().After(deadline) { return delivered, nil } + partition := (startOffset + i) % partitions fresh, err := s.scanAndDeliverPartition(ctx, queueName, meta, partition, readTS, deadline, pageSize, sqsReceiveOptions{ Max: opts.Max - len(delivered), VisibilityTimeout: opts.VisibilityTimeout, @@ -891,6 +904,35 @@ func (s *SQSServer) scanAndDeliverOnce(ctx context.Context, queueName string, op return delivered, nil } +// startPartitionOffset rotates the receive-fanout starting partition +// by readTS so a sustained backlog on a single partition cannot +// starve the others. With a fixed start at 0, opts.Max would fill +// from partition 0 on every call and messages in higher-index +// partitions would never be observed while the head queue stays +// hot — the regression Codex flagged as P1 on PR #732. readTS is +// HLC-derived and advances per call, so consecutive Receives land +// on different partitions without any per-server state. +// +// Returns 0 on legacy / non-partitioned queues (partitions <= 1). +// +// PartitionCount is power-of-two by the validator invariant +// (htfifoValidatePartitionCount), so mask-AND is equivalent to +// modulo. The byte-slice fold spreads entropy from both halves of +// readTS (HLC physical-ms in the upper bits, logical counter in the +// lower bits) into the final uint32 without an explicit uint64 → +// uint32 narrowing — which gosec G115 would flag on a safe-by- +// construction conversion. Mirrors the entirely-in-uint32 pattern +// in partitionFor (sqs_partitioning.go). +func startPartitionOffset(partitions uint32, readTS uint64) uint32 { + if partitions <= 1 { + return 0 + } + var buf [8]byte + binary.BigEndian.PutUint64(buf[:], readTS) + folded := binary.BigEndian.Uint32(buf[0:4]) ^ binary.BigEndian.Uint32(buf[4:8]) + return folded & (partitions - 1) +} + // scanAndDeliverPartition pages the visibility index for one // partition under the shared wall-clock + per-call max budget. On // legacy / non-partitioned queues the caller invokes this exactly diff --git a/adapter/sqs_partitioned_dispatch_test.go b/adapter/sqs_partitioned_dispatch_test.go index 237fd6a82..80526b508 100644 --- a/adapter/sqs_partitioned_dispatch_test.go +++ b/adapter/sqs_partitioned_dispatch_test.go @@ -316,3 +316,43 @@ func TestSQSServer_PartitionedFIFO_PerQueueCollapsesToPartitionZero(t *testing.T require.Len(t, collected, len(groups), "perQueue receive must surface every message in one fanout pass over partition 0") } + +// TestStartPartitionOffset_RotatesByReadTS pins the regression for +// the Codex P1 finding on PR #732: with a fixed starting partition +// of 0, a sustained backlog on partition 0 fills opts.Max before +// the fanout reaches partition 1, so messages in higher-index +// partitions are never observed under load. The rotation helper +// distributes consecutive readTS values across all partitions. +func TestStartPartitionOffset_RotatesByReadTS(t *testing.T) { + t.Parallel() + + // Legacy / non-partitioned / perQueue queues collapse to 0. + require.Equal(t, uint32(0), startPartitionOffset(0, 12345)) + require.Equal(t, uint32(0), startPartitionOffset(1, 12345)) + + // Partitioned queues spread consecutive readTS values across + // every partition. HLC advances per receive call, so over + // consecutive readTS values every partition appears at least + // once as the starting point — without that, partition 0 would + // permanently win the fanout race under sustained backlog. + const partitions uint32 = 4 + seen := make(map[uint32]int, partitions) + for ts := uint64(1); ts <= 64; ts++ { + seen[startPartitionOffset(partitions, ts)]++ + } + require.Len(t, seen, int(partitions), + "rotation must cover every partition across consecutive readTS; got %v", seen) + for p := uint32(0); p < partitions; p++ { + require.NotZero(t, seen[p], + "partition %d never selected as fanout start; got distribution %v", p, seen) + } + + // The output must always fall inside [0, partitions). Validates + // the mask-AND contract — partitions is power-of-two, so the + // AND with (partitions-1) is equivalent to modulo. + for ts := uint64(0); ts < 1024; ts++ { + off := startPartitionOffset(partitions, ts) + require.Less(t, off, partitions, + "offset %d out of range [0, %d) for ts=%d", off, partitions, ts) + } +} From 11fbd45b1836646c834783ab2c2dbe953558a06b Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sun, 3 May 2026 17:41:06 +0900 Subject: [PATCH 3/7] sqs(receive): bounds-check v2 handle.Partition + tighten partition assertions (PR #732, round 2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CodeRabbit P1/Major: validateReceiptHandleVersion accepted any v2 handle on a partitioned queue without checking handle.Partition < meta.PartitionCount. With decodeClientReceiptHandle now accepting v2 on the wire, an out-of-range partition would fall through to sqsMsg*KeyDispatch and depend on downstream routing failure semantics rather than returning ReceiptHandleIsInvalid at the documented choke point. Add the bounds check and surface the same "receipt handle is invalid" public error. Caller audit: both production callers (loadMessageForDelete, loadAndVerifyMessage in sqs_messages.go) already wrap any non-nil return from validateReceiptHandleVersion as sqsErrReceiptHandleInvalid HTTP 400, so the new error case slots into the existing semantic bucket — no caller-side change required. Two test tightenings on PR #732 round 2: - TestSQSServer_PartitionedFIFO_SendReceiveDeleteRoundTrip previously asserted only that parsed.Partition < 4. That would still pass if a dispatch regression sent every group to partition 0. Reverse the group->messageID map to messageID->group and require parsed.Partition == partitionFor(meta, group) per message — this is the assertion that actually catches misrouting. - The post-delete emptiness check ran a fresh receive immediately while every prior receive used VisibilityTimeout=60. A regression that turned DeleteMessage into "leave the record invisible but not removed" would still pass under the active visibility window. Probe the partitioned data keyspace directly per partition (which DeleteMessage targets), and also issue the public-API receive after sleeping past a 1s visibility window so any in-flight invisible record would re-expose. Add a regression test (TestValidateReceiptHandleVersion_RejectsOutOfRangePartition) covering several out-of-range partitions and the count-1 boundary. --- adapter/sqs_keys_dispatch.go | 7 +++ adapter/sqs_keys_dispatch_v2_test.go | 28 ++++++++++++ adapter/sqs_partitioned_dispatch_test.go | 57 +++++++++++++++++++----- 3 files changed, 82 insertions(+), 10 deletions(-) diff --git a/adapter/sqs_keys_dispatch.go b/adapter/sqs_keys_dispatch.go index 53faf44a7..0a1834468 100644 --- a/adapter/sqs_keys_dispatch.go +++ b/adapter/sqs_keys_dispatch.go @@ -187,6 +187,13 @@ func validateReceiptHandleVersion(meta *sqsQueueMeta, handle *decodedReceiptHand if handle.Version != sqsReceiptHandleVersion2 { return errors.New("receipt handle version mismatch for partitioned queue") } + // handle.Partition is client-controlled once decodeClientReceiptHandle + // accepts v2. Without this bound check, an out-of-range partition + // falls through to sqsMsg*KeyDispatch and depends on downstream + // routing failure semantics instead of returning ReceiptHandleIsInvalid. + if handle.Partition >= meta.PartitionCount { + return errors.New("receipt handle partition out of range for queue") + } return nil } if handle.Version != sqsReceiptHandleVersion1 { diff --git a/adapter/sqs_keys_dispatch_v2_test.go b/adapter/sqs_keys_dispatch_v2_test.go index 196b24bd4..deb5baf1b 100644 --- a/adapter/sqs_keys_dispatch_v2_test.go +++ b/adapter/sqs_keys_dispatch_v2_test.go @@ -139,6 +139,34 @@ func TestValidateReceiptHandleVersion_NilHandle(t *testing.T) { require.Error(t, err) } +// TestValidateReceiptHandleVersion_RejectsOutOfRangePartition pins +// the round-2 fix: handle.Partition is client-controlled once +// decodeClientReceiptHandle accepts v2, so the queue-aware validator +// must bounds-check it against meta.PartitionCount. Without this an +// out-of-range partition would fall through to sqsMsg*KeyDispatch +// and depend on downstream routing failure semantics rather than +// returning ReceiptHandleIsInvalid at the validation choke point. +func TestValidateReceiptHandleVersion_RejectsOutOfRangePartition(t *testing.T) { + t.Parallel() + meta := &sqsQueueMeta{PartitionCount: 4} + for _, partition := range []uint32{4, 5, 17, 1 << 30} { + err := validateReceiptHandleVersion(meta, &decodedReceiptHandle{ + Version: sqsReceiptHandleVersion2, + Partition: partition, + }) + require.Error(t, err, "partition=%d on PartitionCount=4 must be rejected", partition) + require.True(t, + strings.Contains(err.Error(), "out of range"), + "error must reference out-of-range partition, got %v", err) + } + // Boundary: partition == count-1 is the last legal value. + err := validateReceiptHandleVersion(meta, &decodedReceiptHandle{ + Version: sqsReceiptHandleVersion2, + Partition: 3, + }) + require.NoError(t, err) +} + // TestSQSMsgVisScanBoundsDispatch_LegacyMatchesLegacy pins that on // legacy / non-partitioned metas the dispatch helper produces the // same start/end pair as the legacy sqsMsgVisScanBounds — the diff --git a/adapter/sqs_partitioned_dispatch_test.go b/adapter/sqs_partitioned_dispatch_test.go index 80526b508..6ba5a6a41 100644 --- a/adapter/sqs_partitioned_dispatch_test.go +++ b/adapter/sqs_partitioned_dispatch_test.go @@ -5,6 +5,7 @@ import ( "net/http" "strings" "testing" + "time" "github.com/bootjp/elastickv/kv" "github.com/stretchr/testify/require" @@ -86,10 +87,24 @@ func TestSQSServer_PartitionedFIFO_SendReceiveDeleteRoundTrip(t *testing.T) { installPartitionedMetaForTest(t, node, queueName, 4, htfifoThroughputPerMessageGroupID) + // Load meta once so the test can assert each handle lands on the + // partition partitionFor would compute for its group — a loose + // "Partition < 4" check would still pass if every group landed + // on partition 0, masking the dispatch regression this test is + // meant to catch. + ctx := context.Background() + readTS := node.sqsServer.nextTxnReadTS(ctx) + meta, exists, err := node.sqsServer.loadQueueMetaAt(ctx, queueName, readTS) + require.NoError(t, err) + require.True(t, exists) + require.Equal(t, uint32(4), meta.PartitionCount, + "meta override must have installed PartitionCount=4") + // Send a few messages spread across distinct group ids so // partitionFor gets to actually pick different partitions. groups := []string{"alpha", "beta", "gamma", "delta", "epsilon", "zeta"} - sent := make(map[string]string, len(groups)) // group -> messageID + sent := make(map[string]string, len(groups)) // group -> messageID + byMessageID := make(map[string]string, len(groups)) // messageID -> group for i, g := range groups { body := "body-" + g dedup := "dedup-" + g @@ -104,6 +119,7 @@ func TestSQSServer_PartitionedFIFO_SendReceiveDeleteRoundTrip(t *testing.T) { msgID, _ := out["MessageId"].(string) require.NotEmpty(t, msgID, "send #%d: empty MessageId", i) sent[g] = msgID + byMessageID[msgID] = g } // Receive them all; the fanout walks all 4 partitions to find @@ -135,8 +151,12 @@ func TestSQSServer_PartitionedFIFO_SendReceiveDeleteRoundTrip(t *testing.T) { require.Equal(t, sqsReceiptHandleVersion2, parsed.Version, "partitioned queue must produce v2 handles, got version=0x%02x for message=%s", parsed.Version, id) - require.Less(t, parsed.Partition, uint32(4), - "v2 handle partition out of range") + group, ok := byMessageID[id] + require.True(t, ok, "received unknown messageId %s", id) + require.Equal(t, + partitionFor(meta, group), + parsed.Partition, + "message %s (group=%s) routed to wrong partition", id, group) collected[id] = handle } @@ -157,11 +177,32 @@ func TestSQSServer_PartitionedFIFO_SendReceiveDeleteRoundTrip(t *testing.T) { "delete (msg=%s): %v", id, out) } - // Queue must now be empty even after the visibility timeout - // would have re-exposed it. + // Queue must now be empty. Probe the partitioned data keyspace + // directly so a regression that turned DeleteMessage into a + // "leave the record invisible but not removed" no-op cannot + // false-pass under the still-active 60s visibility window. + for p := uint32(0); p < meta.PartitionCount; p++ { + dataPrefix := sqsMsgDataKeyDispatch(meta, queueName, p, meta.Generation, "") + end := append([]byte(nil), dataPrefix...) + end = append(end, 0xFF, 0xFF, 0xFF, 0xFF) + page, err := node.sqsServer.store.ScanAt(ctx, dataPrefix, end, 32, node.sqsServer.nextTxnReadTS(ctx)) + require.NoError(t, err, "post-delete scan partition %d: %v", p, err) + for _, kvp := range page { + require.False(t, + strings.HasPrefix(string(kvp.Key), string(dataPrefix)), + "partition %d still holds key %q after delete — DeleteMessage left a tombstone-less record", + p, string(kvp.Key)) + } + } + + // And the public API must agree: a fresh receive with a short + // visibility timeout (after sleeping past it, so any in-flight + // invisible record would re-expose) returns no messages. + time.Sleep(1100 * time.Millisecond) status, out = callSQS(t, node, sqsReceiveMessageTarget, map[string]any{ "QueueUrl": queueURL, "MaxNumberOfMessages": 10, + "VisibilityTimeout": 1, }) require.Equal(t, http.StatusOK, status, "post-delete receive: %v", out) if msgs, _ := out["Messages"].([]any); len(msgs) > 0 { @@ -172,11 +213,7 @@ func TestSQSServer_PartitionedFIFO_SendReceiveDeleteRoundTrip(t *testing.T) { // this queue went to the partitioned keyspace, never the // legacy one). We probe the legacy data prefix at this // queue's generation: it must yield zero entries. - ctx := context.Background() - readTS := node.sqsServer.nextTxnReadTS(ctx) - meta, exists, err := node.sqsServer.loadQueueMetaAt(ctx, queueName, readTS) - require.NoError(t, err) - require.True(t, exists) + readTS = node.sqsServer.nextTxnReadTS(ctx) legacyDataPrefix := sqsMsgDataKey(queueName, meta.Generation, "") // Cap the prefix scan at the generation byte so we do not // drag in unrelated queues. From ea0cf4ba2581a0eb94424ea7ac4f1895c2cca34e Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sun, 3 May 2026 18:15:06 +0900 Subject: [PATCH 4/7] sqs(fifo): scope partitioned dedup by MessageGroupId + counter-driven fanout (PR #732, round 3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two Codex P1s on commit 11fbd45b: 1. Dedup scoping (sqs_fifo.go:95) — On a partitioned queue DeduplicationScope is messageGroup (the validator rejects queue-scope on PartitionCount > 1), but the dedup key was being constructed with only (queue, partition, dedupID). Two distinct MessageGroupIds whose FNV-1a hashes happen to collide onto the same partition would then share a dedup namespace, so a fresh send in group B reusing group A's dedup-id would be silently acked with group A's MessageId — a data-loss outcome. Per the design doc §4.1 line 200, the dedup key keys on (queue, partition, MessageGroupId, dedupID); partition is redundant given the deterministic group→partition map but is kept for keyspace organisation (every key belonging to a group lives under the same partition prefix). Thread groupID through sqsPartitionedMsgDedupKey, sqsMsgDedupKeyDispatch, and loadFifoDedupRecord. Caller audit: loadFifoDedupRecord has exactly one production caller (sendFifoMessage at sqs_fifo.go:186); the dedupKey it returns is reused by the same OCC transaction's ReadKeys + Put list, so the read-guard and the write share one key. sqsMsgDedupKeyDispatch is only reached via loadFifoDedupRecord. Legacy (non-partitioned) keys are unchanged on disk — the legacy branch ignores groupID, preserving the on-disk shape for queues created before partitioning landed. 2. Fanout aliasing (sqs_messages.go:933) — The round-1 fix derived the fanout starting partition from a XOR-fold of readTS bits and masked by (partitions - 1). Codex P1 round 2 flagged a real aliasing concern: HLC packs a 16-bit logical counter in the low bits of readTS and ReceiveMessage commits a fixed number of per-message transactions per call, so consecutive readTS deltas exhibit a structured stride. With partitions=4 and stride=10 the masked fold alternates between two starts (0,2,0,2,…) — partitions 1 and 3 never appear as the start, and opts.Max can fill from those two before the loop reaches the others. Replace the bit-fold with a per-server atomic counter (receiveFanoutCounter on SQSServer). Consecutive partitioned receives walk every partition in strict round-robin regardless of HLC behaviour. The counter is per-server (not Raft-replicated) because fanout distribution is a local-fairness property: every node already sees every message via its own receive scans, and "this node's distribution is balanced" is the only invariant that matters. Add regression tests: - TestSQSDedupKeyDispatch_PartitionedScopesByMessageGroupId pins that distinct MessageGroupIds on the same (queue, partition, dedupID) produce distinct dedup keys, and that legacy keys are unaffected. - TestNextReceiveFanoutStart_RoundRobin pins that 16 consecutive calls over 4 partitions produce exactly 4 hits per partition (strict round-robin), and that concurrent receivers each get a valid offset under the race detector. --- adapter/sqs.go | 10 +++ adapter/sqs_fifo.go | 10 ++- adapter/sqs_keys.go | 19 ++++-- adapter/sqs_keys_dispatch.go | 15 +++-- adapter/sqs_keys_dispatch_test.go | 49 ++++++++++++-- adapter/sqs_keys_test.go | 2 +- adapter/sqs_messages.go | 48 +++++++------- adapter/sqs_partition_resolver_test.go | 2 +- adapter/sqs_partitioned_dispatch_test.go | 81 +++++++++++++++++------- 9 files changed, 170 insertions(+), 66 deletions(-) diff --git a/adapter/sqs.go b/adapter/sqs.go index 112153bb4..9095b1c50 100644 --- a/adapter/sqs.go +++ b/adapter/sqs.go @@ -7,6 +7,7 @@ import ( "net/http" "strconv" "strings" + "sync/atomic" "time" "github.com/bootjp/elastickv/kv" @@ -167,6 +168,15 @@ type SQSServer struct { // throttle config so unconfigured queues pay one nil-check per // request and nothing else (see sqs_throttle.go). throttle *bucketStore + // receiveFanoutCounter advances once per partitioned receive call + // to pick the fanout starting partition. Using a per-server + // atomic counter (instead of the readTS-bit fold the round-1 fix + // shipped) guarantees true round-robin coverage across partitions + // even when readTS advances by a structured stride — the case + // Codex flagged as P1 round 2 (HLC's 16-bit logical counter + + // fixed per-receive transaction count can produce stride patterns + // that masked-AND alias to a subset of partitions). + receiveFanoutCounter atomic.Uint32 } // WithSQSLeaderMap configures the Raft-address-to-SQS-address mapping used to diff --git a/adapter/sqs_fifo.go b/adapter/sqs_fifo.go index a532c3553..4db6dec21 100644 --- a/adapter/sqs_fifo.go +++ b/adapter/sqs_fifo.go @@ -91,8 +91,12 @@ func resolveFifoDedupID(meta *sqsQueueMeta, in sqsSendMessageInput) string { // or (nil, nil) when there is no live record for this dedup-id. // Expired records are surfaced as nil so a stale entry does not block // a fresh send within the same FIFO queue. -func (s *SQSServer) loadFifoDedupRecord(ctx context.Context, queueName string, meta *sqsQueueMeta, partition uint32, gen uint64, dedupID string, readTS uint64) (*sqsFifoDedupRecord, []byte, error) { - key := sqsMsgDedupKeyDispatch(meta, queueName, partition, gen, dedupID) +// +// groupID is the MessageGroupId; it participates in the partitioned +// dedup key so that two groups colliding onto the same partition keep +// disjoint dedup namespaces (the AWS messageGroup-scope contract). +func (s *SQSServer) loadFifoDedupRecord(ctx context.Context, queueName string, meta *sqsQueueMeta, partition uint32, gen uint64, groupID, dedupID string, readTS uint64) (*sqsFifoDedupRecord, []byte, error) { + key := sqsMsgDedupKeyDispatch(meta, queueName, partition, gen, groupID, dedupID) raw, err := s.store.GetAt(ctx, key, readTS) if err != nil { if errors.Is(err, store.ErrKeyNotFound) { @@ -179,7 +183,7 @@ func (s *SQSServer) sendFifoMessage( // perQueue throughput short-circuit, so the dispatch helpers // round-trip to legacy output for those cases. partition := partitionFor(meta, in.MessageGroupId) - dedup, dedupKey, err := s.loadFifoDedupRecord(ctx, queueName, meta, partition, meta.Generation, dedupID, readTS) + dedup, dedupKey, err := s.loadFifoDedupRecord(ctx, queueName, meta, partition, meta.Generation, in.MessageGroupId, dedupID, readTS) if err != nil { return nil, false, err } diff --git a/adapter/sqs_keys.go b/adapter/sqs_keys.go index 5cc46c00a..38abf55ac 100644 --- a/adapter/sqs_keys.go +++ b/adapter/sqs_keys.go @@ -312,18 +312,25 @@ func sqsPartitionedMsgVisPrefixForQueue(queueName string, partition uint32, gen } // sqsPartitionedMsgDedupKey builds the FIFO dedup key for a -// partitioned queue. The dedup window is per-partition by design -// (DeduplicationScope=messageGroup with PartitionCount>1) — the -// validator in adapter/sqs_partitioning.go rejects the queue-scoped -// scope on partitioned queues, so this key shape is always reachable -// from the same partition that ran the dedup check. -func sqsPartitionedMsgDedupKey(queueName string, partition uint32, gen uint64, dedupID string) []byte { +// partitioned queue. DeduplicationScope=messageGroup (the only +// scope reachable on partitioned queues — the validator in +// adapter/sqs_partitioning.go rejects queue-scoped dedup) requires +// the dedup window to be per (queue, group, dedupID): two distinct +// MessageGroupIds that happen to FNV-collide onto the same partition +// must NOT share a dedup namespace, otherwise a fresh send in group B +// would be silently acked with group A's MessageId. Per design doc +// §4.1 line 200, the dedup key keys on (queue, partition, group, +// dedupID); partition is redundant given the deterministic group→ +// partition map but is kept for keyspace organization (every key +// belonging to a group lives under the same partition prefix). +func sqsPartitionedMsgDedupKey(queueName string, partition uint32, gen uint64, groupID, dedupID string) []byte { buf := make([]byte, 0, len(SqsPartitionedMsgDedupPrefix)+sqsKeyCapLarge) buf = append(buf, SqsPartitionedMsgDedupPrefix...) buf = append(buf, encodeSQSSegment(queueName)...) buf = append(buf, sqsPartitionedQueueTerminator) buf = appendU32(buf, partition) buf = appendU64(buf, gen) + buf = append(buf, encodeSQSSegment(groupID)...) buf = append(buf, encodeSQSSegment(dedupID)...) return buf } diff --git a/adapter/sqs_keys_dispatch.go b/adapter/sqs_keys_dispatch.go index 0a1834468..6734caeb3 100644 --- a/adapter/sqs_keys_dispatch.go +++ b/adapter/sqs_keys_dispatch.go @@ -53,12 +53,17 @@ func sqsMsgVisKeyDispatch(meta *sqsQueueMeta, queueName string, partition uint32 } // sqsMsgDedupKeyDispatch builds the FIFO dedup key for either -// keyspace. Dedup scope is per-partition on partitioned queues -// (DeduplicationScope = messageGroup is enforced by the validator -// on PartitionCount > 1). -func sqsMsgDedupKeyDispatch(meta *sqsQueueMeta, queueName string, partition uint32, gen uint64, dedupID string) []byte { +// keyspace. On a partitioned queue (DeduplicationScope = messageGroup +// — enforced by the validator on PartitionCount > 1) the dedup window +// must be per (queue, group, dedupID): two distinct MessageGroupIds +// that FNV-collide onto the same partition must NOT share a dedup +// namespace, otherwise a fresh send in group B is silently acked +// with group A's MessageId. The legacy (non-partitioned) branch +// keeps the legacy (queue, gen, dedupID) shape — there is only one +// implicit "group" so no collision is possible there. +func sqsMsgDedupKeyDispatch(meta *sqsQueueMeta, queueName string, partition uint32, gen uint64, groupID, dedupID string) []byte { if meta != nil && meta.PartitionCount > 1 { - return sqsPartitionedMsgDedupKey(queueName, partition, gen, dedupID) + return sqsPartitionedMsgDedupKey(queueName, partition, gen, groupID, dedupID) } return sqsMsgDedupKey(queueName, gen, dedupID) } diff --git a/adapter/sqs_keys_dispatch_test.go b/adapter/sqs_keys_dispatch_test.go index 27de6399e..f2b0b1d15 100644 --- a/adapter/sqs_keys_dispatch_test.go +++ b/adapter/sqs_keys_dispatch_test.go @@ -35,7 +35,7 @@ func TestSQSKeysDispatch_LegacyMatchesLegacyConstructor(t *testing.T) { sqsMsgVisKeyDispatch(nil, queue, 0, gen, ts, msgID), sqsMsgVisKey(queue, gen, ts, msgID)}, {"meta=nil dedup", - sqsMsgDedupKeyDispatch(nil, queue, 0, gen, dedupID), + sqsMsgDedupKeyDispatch(nil, queue, 0, gen, groupID, dedupID), sqsMsgDedupKey(queue, gen, dedupID)}, {"meta=nil group", sqsMsgGroupKeyDispatch(nil, queue, 0, gen, groupID), @@ -66,7 +66,7 @@ func TestSQSKeysDispatch_LegacyMatchesLegacyConstructor(t *testing.T) { sqsMsgVisKeyDispatch(&sqsQueueMeta{PartitionCount: 1}, queue, 0, gen, ts, msgID), sqsMsgVisKey(queue, gen, ts, msgID)}, {"meta.PartitionCount=1 dedup", - sqsMsgDedupKeyDispatch(&sqsQueueMeta{PartitionCount: 1}, queue, 0, gen, dedupID), + sqsMsgDedupKeyDispatch(&sqsQueueMeta{PartitionCount: 1}, queue, 0, gen, groupID, dedupID), sqsMsgDedupKey(queue, gen, dedupID)}, {"meta.PartitionCount=1 group", sqsMsgGroupKeyDispatch(&sqsQueueMeta{PartitionCount: 1}, queue, 0, gen, groupID), @@ -119,8 +119,8 @@ func TestSQSKeysDispatch_PartitionedMatchesPartitionedConstructor(t *testing.T) sqsMsgVisKeyDispatch(meta, queue, partition, gen, ts, msgID), sqsPartitionedMsgVisKey(queue, partition, gen, ts, msgID)}, {"dedup", - sqsMsgDedupKeyDispatch(meta, queue, partition, gen, dedupID), - sqsPartitionedMsgDedupKey(queue, partition, gen, dedupID)}, + sqsMsgDedupKeyDispatch(meta, queue, partition, gen, groupID, dedupID), + sqsPartitionedMsgDedupKey(queue, partition, gen, groupID, dedupID)}, {"group", sqsMsgGroupKeyDispatch(meta, queue, partition, gen, groupID), sqsPartitionedMsgGroupKey(queue, partition, gen, groupID)}, @@ -167,6 +167,47 @@ func TestSQSKeysDispatch_BoundaryAtPartitionCount2(t *testing.T) { "PartitionCount=2 must NOT route to the legacy keyspace") } +// TestSQSDedupKeyDispatch_PartitionedScopesByMessageGroupId is the +// regression for the round-3 P1 (Codex) on PR #732: with +// DeduplicationScope=messageGroup on a partitioned queue the dedup +// key MUST include MessageGroupId so two distinct groups that +// FNV-collide onto the same partition do NOT share a dedup +// namespace. Without the group segment, a fresh send in group "B" +// reusing group "A"'s dedup-id would be silently acked with group +// "A"'s MessageId — that is a data-loss outcome. +func TestSQSDedupKeyDispatch_PartitionedScopesByMessageGroupId(t *testing.T) { + t.Parallel() + meta := &sqsQueueMeta{PartitionCount: 4} + const ( + queue = "events.fifo" + gen = uint64(11) + partition = uint32(2) + dedupID = "shared-token" + ) + keyA := sqsMsgDedupKeyDispatch(meta, queue, partition, gen, "groupA", dedupID) + keyB := sqsMsgDedupKeyDispatch(meta, queue, partition, gen, "groupB", dedupID) + require.NotEqual(t, keyA, keyB, + "distinct MessageGroupIds on the same (queue, partition, dedupID) "+ + "must produce distinct dedup keys — otherwise a fresh send in "+ + "groupB is silently dropped as a duplicate of groupA") + + // Same group + same dedupID must round-trip to the same key (the + // idempotency contract we DO want to keep). + keyA2 := sqsMsgDedupKeyDispatch(meta, queue, partition, gen, "groupA", dedupID) + require.Equal(t, keyA, keyA2, + "same (group, dedupID) must produce the same dedup key — "+ + "AWS idempotent-by-design retries depend on this") + + // Legacy (non-partitioned) path is unaffected: groupID is ignored + // because there is only one implicit group on a non-partitioned + // queue and the legacy key shape predates partitioning. + legacyA := sqsMsgDedupKeyDispatch(nil, queue, 0, gen, "groupA", dedupID) + legacyB := sqsMsgDedupKeyDispatch(nil, queue, 0, gen, "groupB", dedupID) + require.Equal(t, legacyA, legacyB, + "legacy keyspace ignores groupID — preserves the on-disk shape "+ + "for queues created before partitioning landed") +} + // TestSQSKeysDispatch_LegacyAndPartitionedAreDistinct pins the // keyspace-isolation invariant at the dispatch level: a legacy // (PartitionCount=1) key and a partitioned (PartitionCount>1) key diff --git a/adapter/sqs_keys_test.go b/adapter/sqs_keys_test.go index b1b8f0c60..103f64db8 100644 --- a/adapter/sqs_keys_test.go +++ b/adapter/sqs_keys_test.go @@ -46,7 +46,7 @@ func TestSqsPartitionedMsgKeys_DistinctFromLegacy(t *testing.T) { { name: "dedup", legacy: sqsMsgDedupKey(queue, gen, dedupID), - partitioned: sqsPartitionedMsgDedupKey(queue, partition, gen, dedupID), + partitioned: sqsPartitionedMsgDedupKey(queue, partition, gen, groupID, dedupID), }, { name: "group", diff --git a/adapter/sqs_messages.go b/adapter/sqs_messages.go index bc9647b9f..3e4879a58 100644 --- a/adapter/sqs_messages.go +++ b/adapter/sqs_messages.go @@ -881,7 +881,7 @@ func (s *SQSServer) scanAndDeliverOnce(ctx context.Context, queueName string, op // deterministic), so cross-partition iteration order does not // reorder messages within any group. partitions := effectivePartitionCount(meta) - startOffset := startPartitionOffset(partitions, readTS) + startOffset := s.nextReceiveFanoutStart(partitions) for i := uint32(0); i < partitions; i++ { if len(delivered) >= opts.Max { break @@ -904,33 +904,35 @@ func (s *SQSServer) scanAndDeliverOnce(ctx context.Context, queueName string, op return delivered, nil } -// startPartitionOffset rotates the receive-fanout starting partition -// by readTS so a sustained backlog on a single partition cannot -// starve the others. With a fixed start at 0, opts.Max would fill -// from partition 0 on every call and messages in higher-index -// partitions would never be observed while the head queue stays -// hot — the regression Codex flagged as P1 on PR #732. readTS is -// HLC-derived and advances per call, so consecutive Receives land -// on different partitions without any per-server state. +// nextReceiveFanoutStart advances the per-server atomic counter and +// returns the starting partition index for this receive call. With +// PartitionCount power-of-two (validator invariant), mask-AND is +// equivalent to modulo, so consecutive receives walk every partition +// in strict round-robin — no aliasing to a subset of partitions even +// when readTS advances by a structured stride. // -// Returns 0 on legacy / non-partitioned queues (partitions <= 1). +// The round-1 fix used a readTS-bit fold for rotation. Codex P1 +// round 2 flagged a real aliasing concern: HLC packs a 16-bit logical +// counter in the low bits of readTS and ReceiveMessage commits a +// fixed number of per-message transactions per call, so consecutive +// readTS deltas exhibit a structured stride. With partitions=4 and +// stride=10 the masked fold alternates between two starts (0,2,0,2,…) +// — partitions 1 and 3 never appear as the start. A per-server +// counter sidesteps the entire HLC-bit-pattern question. // -// PartitionCount is power-of-two by the validator invariant -// (htfifoValidatePartitionCount), so mask-AND is equivalent to -// modulo. The byte-slice fold spreads entropy from both halves of -// readTS (HLC physical-ms in the upper bits, logical counter in the -// lower bits) into the final uint32 without an explicit uint64 → -// uint32 narrowing — which gosec G115 would flag on a safe-by- -// construction conversion. Mirrors the entirely-in-uint32 pattern -// in partitionFor (sqs_partitioning.go). -func startPartitionOffset(partitions uint32, readTS uint64) uint32 { +// Returns 0 on legacy / non-partitioned queues (partitions <= 1) so +// the counter is not perturbed for non-partitioned receives — those +// always start at the only partition that exists. +// +// The counter is per-server (not Raft-replicated) because fanout +// distribution is a local-fairness property: every node already sees +// every message via its own receive scans, and "this node's +// distribution is balanced" is the only invariant that matters. +func (s *SQSServer) nextReceiveFanoutStart(partitions uint32) uint32 { if partitions <= 1 { return 0 } - var buf [8]byte - binary.BigEndian.PutUint64(buf[:], readTS) - folded := binary.BigEndian.Uint32(buf[0:4]) ^ binary.BigEndian.Uint32(buf[4:8]) - return folded & (partitions - 1) + return s.receiveFanoutCounter.Add(1) & (partitions - 1) } // scanAndDeliverPartition pages the visibility index for one diff --git a/adapter/sqs_partition_resolver_test.go b/adapter/sqs_partition_resolver_test.go index d9cb9fe23..c8dc4fb9f 100644 --- a/adapter/sqs_partition_resolver_test.go +++ b/adapter/sqs_partition_resolver_test.go @@ -69,7 +69,7 @@ func TestSQSPartitionResolver_ResolveByPartition(t *testing.T) { for familyName, key := range map[string][]byte{ "data": sqsPartitionedMsgDataKey(tc.queue, tc.partition, 1, "msg-id"), "vis": sqsPartitionedMsgVisKey(tc.queue, tc.partition, 1, 1700000000000, "msg-id"), - "dedup": sqsPartitionedMsgDedupKey(tc.queue, tc.partition, 1, "dedup-id"), + "dedup": sqsPartitionedMsgDedupKey(tc.queue, tc.partition, 1, "group-id", "dedup-id"), "group": sqsPartitionedMsgGroupKey(tc.queue, tc.partition, 1, "group-id"), "byage": sqsPartitionedMsgByAgeKey(tc.queue, tc.partition, 1, 1700000000000, "msg-id"), } { diff --git a/adapter/sqs_partitioned_dispatch_test.go b/adapter/sqs_partitioned_dispatch_test.go index 6ba5a6a41..9a4355bbd 100644 --- a/adapter/sqs_partitioned_dispatch_test.go +++ b/adapter/sqs_partitioned_dispatch_test.go @@ -4,6 +4,7 @@ import ( "context" "net/http" "strings" + "sync" "testing" "time" @@ -354,42 +355,76 @@ func TestSQSServer_PartitionedFIFO_PerQueueCollapsesToPartitionZero(t *testing.T "perQueue receive must surface every message in one fanout pass over partition 0") } -// TestStartPartitionOffset_RotatesByReadTS pins the regression for -// the Codex P1 finding on PR #732: with a fixed starting partition -// of 0, a sustained backlog on partition 0 fills opts.Max before -// the fanout reaches partition 1, so messages in higher-index -// partitions are never observed under load. The rotation helper -// distributes consecutive readTS values across all partitions. -func TestStartPartitionOffset_RotatesByReadTS(t *testing.T) { +// TestNextReceiveFanoutStart_RoundRobin pins the round-2 regression: +// the original PR #732 round-1 fix derived the starting partition +// from masked readTS bits (XOR-fold of the upper and lower 32-bit +// halves). Codex P1 round 2 flagged that this can alias to a subset +// of partitions when readTS advances by a structured stride — HLC +// packs a 16-bit logical counter into the low bits and ReceiveMessage +// commits a fixed number of per-message transactions per call, so +// consecutive readTS deltas exhibit a structured stride. +// +// nextReceiveFanoutStart replaces the bit-fold with a per-server +// atomic counter, so consecutive calls walk every partition in +// strict round-robin regardless of HLC behaviour. This test pins +// both the round-robin contract and the explicit aliasing case +// Codex flagged. +func TestNextReceiveFanoutStart_RoundRobin(t *testing.T) { t.Parallel() - // Legacy / non-partitioned / perQueue queues collapse to 0. - require.Equal(t, uint32(0), startPartitionOffset(0, 12345)) - require.Equal(t, uint32(0), startPartitionOffset(1, 12345)) + s := &SQSServer{} + + // Legacy / non-partitioned / perQueue queues collapse to 0 and + // must not perturb the counter. + require.Equal(t, uint32(0), s.nextReceiveFanoutStart(0)) + require.Equal(t, uint32(0), s.nextReceiveFanoutStart(1)) - // Partitioned queues spread consecutive readTS values across - // every partition. HLC advances per receive call, so over - // consecutive readTS values every partition appears at least - // once as the starting point — without that, partition 0 would - // permanently win the fanout race under sustained backlog. + // Partitioned queues walk every partition in strict round-robin. + // 16 consecutive calls over 4 partitions must produce exactly + // 4 hits per partition. const partitions uint32 = 4 seen := make(map[uint32]int, partitions) - for ts := uint64(1); ts <= 64; ts++ { - seen[startPartitionOffset(partitions, ts)]++ + for range 16 { + seen[s.nextReceiveFanoutStart(partitions)]++ } require.Len(t, seen, int(partitions), - "rotation must cover every partition across consecutive readTS; got %v", seen) + "counter-driven rotation must cover every partition; got %v", seen) for p := uint32(0); p < partitions; p++ { - require.NotZero(t, seen[p], - "partition %d never selected as fanout start; got distribution %v", p, seen) + require.Equal(t, 4, seen[p], + "partition %d hit %d times in 16 calls; round-robin must be exact", + p, seen[p]) } // The output must always fall inside [0, partitions). Validates // the mask-AND contract — partitions is power-of-two, so the // AND with (partitions-1) is equivalent to modulo. - for ts := uint64(0); ts < 1024; ts++ { - off := startPartitionOffset(partitions, ts) + for range 1024 { + off := s.nextReceiveFanoutStart(partitions) require.Less(t, off, partitions, - "offset %d out of range [0, %d) for ts=%d", off, partitions, ts) + "offset %d out of range [0, %d)", off, partitions) + } + + // Concurrent receivers must each get a valid offset (no torn + // reads, no out-of-range values). Atomicity check. + var wg sync.WaitGroup + const goroutines = 8 + const perGoroutine = 256 + results := make([][]uint32, goroutines) + for i := 0; i < goroutines; i++ { + results[i] = make([]uint32, perGoroutine) + wg.Add(1) + go func(idx int) { + defer wg.Done() + for j := 0; j < perGoroutine; j++ { + results[idx][j] = s.nextReceiveFanoutStart(partitions) + } + }(i) + } + wg.Wait() + for _, batch := range results { + for _, off := range batch { + require.Less(t, off, partitions, + "concurrent offset %d out of range", off) + } } } From 667dea9c37b911d2e8d0654c53474e1064b944a6 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sun, 3 May 2026 18:29:57 +0900 Subject: [PATCH 5/7] sqs(receive): scope fanout rotation counter per queue (PR #732, round 4) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex P1 round 4 on commit ea0cf4ba: the round-3 fix used a single server-wide receiveFanoutCounter, but that aliases across queues. When other queues call ReceiveMessage in between, the queue-of-interest observes only a strided subsequence of counter values; if that stride shares a factor with PartitionCount the queue cycles through only a subset of partitions, and with MaxNumberOfMessages filling on the earlier-scanned partitions the rest can be starved indefinitely. Concrete example: 4-partition queue receiving every other tick → its counter values are 1,3,5,7,… and 1,3,5,7 mod 4 = 1,3,1,3 → partitions 0 and 2 never appear as the start. Replace the server-wide atomic.Uint32 with a sync.Map keyed by queue name; each queue owns its own *atomic.Uint32. nextReceiveFanoutStart takes (queueName, partitions) and looks up / installs the per-queue counter. Each queue's rotation now depends solely on its own receive cadence — strict round-robin regardless of cross-queue interleaving. Caller audit: nextReceiveFanoutStart has exactly one production caller (scanAndDeliverOnce at sqs_messages.go:885), which now passes queueName from its existing scope. Return type and range are unchanged ([0, partitions)); no caller-side semantic shift. sync.Map is the right shape: lookups are read-mostly (the same queue keeps getting the same counter) and the keyset grows only with the number of distinct queues this server has handled receives for in-process — bounded by the operator-controlled CreateQueue rate. Update TestNextReceiveFanoutStart_RoundRobin → renamed to TestNextReceiveFanoutStart_PerQueueRoundRobin. New assertions: - queueB and queueC interleaved must each independently see strict round-robin (the round-4 isolation contract); the legacy server-wide counter would have surfaced 1,3,1,3 on queueB → fails. - Concurrent receivers spread across multiple queue names exercise the sync.Map LoadOrStore path under -race so any missed synchronisation on the insert is caught. --- adapter/sqs.go | 27 +++++--- adapter/sqs_messages.go | 46 ++++++------- adapter/sqs_partitioned_dispatch_test.go | 82 ++++++++++++++++-------- 3 files changed, 92 insertions(+), 63 deletions(-) diff --git a/adapter/sqs.go b/adapter/sqs.go index 9095b1c50..670f440f7 100644 --- a/adapter/sqs.go +++ b/adapter/sqs.go @@ -7,7 +7,7 @@ import ( "net/http" "strconv" "strings" - "sync/atomic" + "sync" "time" "github.com/bootjp/elastickv/kv" @@ -168,15 +168,22 @@ type SQSServer struct { // throttle config so unconfigured queues pay one nil-check per // request and nothing else (see sqs_throttle.go). throttle *bucketStore - // receiveFanoutCounter advances once per partitioned receive call - // to pick the fanout starting partition. Using a per-server - // atomic counter (instead of the readTS-bit fold the round-1 fix - // shipped) guarantees true round-robin coverage across partitions - // even when readTS advances by a structured stride — the case - // Codex flagged as P1 round 2 (HLC's 16-bit logical counter + - // fixed per-receive transaction count can produce stride patterns - // that masked-AND alias to a subset of partitions). - receiveFanoutCounter atomic.Uint32 + // receiveFanoutCounters maps queueName → *atomic.Uint32 so each + // partitioned queue gets its own round-robin starting partition. + // Codex P1 round 4 flagged that a server-wide counter aliases + // across queues: when other queues' receives interleave with a + // stride that shares a factor with PartitionCount, the queue's + // observed counter subsequence cycles through only a subset of + // partitions, which can starve the rest under MaxNumberOfMessages + // pressure on the early-scanned ones. Per-queue isolation makes + // each queue's rotation depend solely on its own receive cadence. + // + // sync.Map is the right shape: lookups are read-mostly (the same + // queue keeps getting the same counter), and the keyset grows + // only with the number of distinct queues this server has handled + // receives for in-process — bounded by the operator-controlled + // CreateQueue rate. + receiveFanoutCounters sync.Map } // WithSQSLeaderMap configures the Raft-address-to-SQS-address mapping used to diff --git a/adapter/sqs_messages.go b/adapter/sqs_messages.go index 3e4879a58..e1d734893 100644 --- a/adapter/sqs_messages.go +++ b/adapter/sqs_messages.go @@ -12,6 +12,7 @@ import ( "sort" "strconv" "strings" + "sync/atomic" "time" "github.com/bootjp/elastickv/kv" @@ -881,7 +882,7 @@ func (s *SQSServer) scanAndDeliverOnce(ctx context.Context, queueName string, op // deterministic), so cross-partition iteration order does not // reorder messages within any group. partitions := effectivePartitionCount(meta) - startOffset := s.nextReceiveFanoutStart(partitions) + startOffset := s.nextReceiveFanoutStart(queueName, partitions) for i := uint32(0); i < partitions; i++ { if len(delivered) >= opts.Max { break @@ -904,35 +905,28 @@ func (s *SQSServer) scanAndDeliverOnce(ctx context.Context, queueName string, op return delivered, nil } -// nextReceiveFanoutStart advances the per-server atomic counter and -// returns the starting partition index for this receive call. With -// PartitionCount power-of-two (validator invariant), mask-AND is -// equivalent to modulo, so consecutive receives walk every partition -// in strict round-robin — no aliasing to a subset of partitions even -// when readTS advances by a structured stride. +// nextReceiveFanoutStart returns the starting partition index for a +// partitioned ReceiveMessage call. The counter is keyed by queueName +// so each queue's rotation depends only on its own receive cadence +// — a server-wide counter (round 3) aliases when other queues +// interleave with strides that share a factor with PartitionCount +// (Codex P1 round 4). // -// The round-1 fix used a readTS-bit fold for rotation. Codex P1 -// round 2 flagged a real aliasing concern: HLC packs a 16-bit logical -// counter in the low bits of readTS and ReceiveMessage commits a -// fixed number of per-message transactions per call, so consecutive -// readTS deltas exhibit a structured stride. With partitions=4 and -// stride=10 the masked fold alternates between two starts (0,2,0,2,…) -// — partitions 1 and 3 never appear as the start. A per-server -// counter sidesteps the entire HLC-bit-pattern question. -// -// Returns 0 on legacy / non-partitioned queues (partitions <= 1) so -// the counter is not perturbed for non-partitioned receives — those -// always start at the only partition that exists. -// -// The counter is per-server (not Raft-replicated) because fanout -// distribution is a local-fairness property: every node already sees -// every message via its own receive scans, and "this node's -// distribution is balanced" is the only invariant that matters. -func (s *SQSServer) nextReceiveFanoutStart(partitions uint32) uint32 { +// PartitionCount is power-of-two (validator), so mask-AND is modulo; +// consecutive receives on the same queue walk every partition in +// strict round-robin. Returns 0 on legacy / non-partitioned queues +// (partitions <= 1) without touching the map so unconfigured queues +// pay no per-call allocation. +func (s *SQSServer) nextReceiveFanoutStart(queueName string, partitions uint32) uint32 { if partitions <= 1 { return 0 } - return s.receiveFanoutCounter.Add(1) & (partitions - 1) + v, ok := s.receiveFanoutCounters.Load(queueName) + if !ok { + v, _ = s.receiveFanoutCounters.LoadOrStore(queueName, &atomic.Uint32{}) + } + counter, _ := v.(*atomic.Uint32) + return counter.Add(1) & (partitions - 1) } // scanAndDeliverPartition pages the visibility index for one diff --git a/adapter/sqs_partitioned_dispatch_test.go b/adapter/sqs_partitioned_dispatch_test.go index 9a4355bbd..85cdf2c16 100644 --- a/adapter/sqs_partitioned_dispatch_test.go +++ b/adapter/sqs_partitioned_dispatch_test.go @@ -2,6 +2,7 @@ package adapter import ( "context" + "fmt" "net/http" "strings" "sync" @@ -355,37 +356,40 @@ func TestSQSServer_PartitionedFIFO_PerQueueCollapsesToPartitionZero(t *testing.T "perQueue receive must surface every message in one fanout pass over partition 0") } -// TestNextReceiveFanoutStart_RoundRobin pins the round-2 regression: -// the original PR #732 round-1 fix derived the starting partition -// from masked readTS bits (XOR-fold of the upper and lower 32-bit -// halves). Codex P1 round 2 flagged that this can alias to a subset -// of partitions when readTS advances by a structured stride — HLC -// packs a 16-bit logical counter into the low bits and ReceiveMessage -// commits a fixed number of per-message transactions per call, so -// consecutive readTS deltas exhibit a structured stride. +// TestNextReceiveFanoutStart_PerQueueRoundRobin pins the round-4 +// regression: the round-3 fix used a server-wide atomic counter, +// which Codex P1 round 4 flagged because it aliases across queues +// — when other queues' receives interleave with strides that share +// a factor with PartitionCount, the queue's observed counter +// subsequence cycles through only a subset of partitions, which can +// starve the rest. Per-queue counters make each queue's rotation +// depend only on its own receive cadence. // -// nextReceiveFanoutStart replaces the bit-fold with a per-server -// atomic counter, so consecutive calls walk every partition in -// strict round-robin regardless of HLC behaviour. This test pins -// both the round-robin contract and the explicit aliasing case -// Codex flagged. -func TestNextReceiveFanoutStart_RoundRobin(t *testing.T) { +// This test pins: +// - legacy / non-partitioned queues collapse to 0 without +// perturbing any counter, +// - a single queue's consecutive calls walk every partition in +// strict round-robin, +// - interleaved calls across two queues do NOT cross-pollute each +// other's counter (the round-4 isolation contract), +// - concurrent receives across many queues remain atomic and +// in-range under -race. +func TestNextReceiveFanoutStart_PerQueueRoundRobin(t *testing.T) { t.Parallel() s := &SQSServer{} // Legacy / non-partitioned / perQueue queues collapse to 0 and - // must not perturb the counter. - require.Equal(t, uint32(0), s.nextReceiveFanoutStart(0)) - require.Equal(t, uint32(0), s.nextReceiveFanoutStart(1)) + // must not perturb any counter. + require.Equal(t, uint32(0), s.nextReceiveFanoutStart("legacy", 0)) + require.Equal(t, uint32(0), s.nextReceiveFanoutStart("legacy", 1)) - // Partitioned queues walk every partition in strict round-robin. - // 16 consecutive calls over 4 partitions must produce exactly - // 4 hits per partition. + // Single-queue strict round-robin: 16 consecutive calls over 4 + // partitions must produce exactly 4 hits per partition. const partitions uint32 = 4 seen := make(map[uint32]int, partitions) for range 16 { - seen[s.nextReceiveFanoutStart(partitions)]++ + seen[s.nextReceiveFanoutStart("queueA", partitions)]++ } require.Len(t, seen, int(partitions), "counter-driven rotation must cover every partition; got %v", seen) @@ -395,17 +399,40 @@ func TestNextReceiveFanoutStart_RoundRobin(t *testing.T) { p, seen[p]) } + // Per-queue isolation — the round-4 contract. Interleaving calls + // across queueB and queueC must not cross-pollute their counters: + // each queue must still see strict round-robin in the order ITS + // calls were issued. With a server-wide counter (round 3), B's + // consecutive calls would observe values 1, 3, 5, 7, … (every + // other tick because C is calling in between) — and 1,3,5,7 mod 4 + // is 1,3,1,3 → partitions 0 and 2 never appear as B's start. + bSeen := make(map[uint32]int, partitions) + cSeen := make(map[uint32]int, partitions) + for range 16 { + bSeen[s.nextReceiveFanoutStart("queueB", partitions)]++ + cSeen[s.nextReceiveFanoutStart("queueC", partitions)]++ + } + for p := uint32(0); p < partitions; p++ { + require.Equal(t, 4, bSeen[p], + "queueB partition %d hit %d times; round-4 per-queue isolation broken (cross-queue stride aliasing)", + p, bSeen[p]) + require.Equal(t, 4, cSeen[p], + "queueC partition %d hit %d times; round-4 per-queue isolation broken", + p, cSeen[p]) + } + // The output must always fall inside [0, partitions). Validates - // the mask-AND contract — partitions is power-of-two, so the - // AND with (partitions-1) is equivalent to modulo. + // the mask-AND contract — partitions is power-of-two. for range 1024 { - off := s.nextReceiveFanoutStart(partitions) + off := s.nextReceiveFanoutStart("queueA", partitions) require.Less(t, off, partitions, "offset %d out of range [0, %d)", off, partitions) } - // Concurrent receivers must each get a valid offset (no torn - // reads, no out-of-range values). Atomicity check. + // Concurrent receivers across many queues must each get a valid + // offset (no torn reads, no out-of-range values, no nil-counter + // races on the LoadOrStore path). The race detector will catch + // any missed synchronisation on the sync.Map insert. var wg sync.WaitGroup const goroutines = 8 const perGoroutine = 256 @@ -415,8 +442,9 @@ func TestNextReceiveFanoutStart_RoundRobin(t *testing.T) { wg.Add(1) go func(idx int) { defer wg.Done() + queueName := fmt.Sprintf("concurrent-%d", idx%3) for j := 0; j < perGoroutine; j++ { - results[idx][j] = s.nextReceiveFanoutStart(partitions) + results[idx][j] = s.nextReceiveFanoutStart(queueName, partitions) } }(i) } From 3961d4aa1b3f5c8ca42b4ee1c802597a08d3874f Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sun, 3 May 2026 23:31:33 +0900 Subject: [PATCH 6/7] sqs(receive): evict per-queue fanout counter on DeleteQueue (PR #732, round 5) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex P2 (user-elevated): nextReceiveFanoutStart populates a sync.Map entry per queue name on the first partitioned receive, but DeleteQueue never removes it. Repeated create/delete of unique queue names then leaks one entry per name for the lifetime of the process — a real problem for multi-tenant or high-churn deployments. Mirror throttle.invalidateQueue's two call sites: - createQueue genuine-create branch — drops a counter that survived a delete-then-create race (a partitioned receive holding pre-delete meta could repopulate the entry between the delete-side cleanup and this create commit). Tied to the same "fresh state on a true create" guarantee the throttle invalidate provides. - deleteQueue HTTP handler — drops the counter alongside the throttle bucket so DeleteQueue+CreateQueue resets every piece of in-memory queue state, not just the rate-limit bucket. setQueueAttributes is intentionally NOT mirrored: it only mutates throttle config, never PartitionCount, so the counter remains valid for the same queue across attribute changes — same conditional shape the throttle invalidate already uses there. Caller audit: the new dropReceiveFanoutCounter helper has exactly two production callers, both adjacent to the existing throttle.invalidateQueue calls. AdminDeleteQueue does not call throttle.invalidateQueue today either; staying consistent rather than fixing that pre-existing inconsistency in this round. Test: TestDropReceiveFanoutCounter_ClearsEntry pins - no-op on never-touched queues (must not allocate or panic), - removal after a populated counter, - per-queue isolation (dropping queueB leaves queueC's counter intact), - fresh allocation after drop (the recreated queue starts from a zero counter, not a recycled pointer — pinned by asserting the first offset == 1 on a 4-partition queue). --- adapter/sqs_catalog.go | 12 +++++ adapter/sqs_messages.go | 12 +++++ adapter/sqs_partitioned_dispatch_test.go | 58 ++++++++++++++++++++++++ 3 files changed, 82 insertions(+) diff --git a/adapter/sqs_catalog.go b/adapter/sqs_catalog.go index 44bbadfb2..abb179a22 100644 --- a/adapter/sqs_catalog.go +++ b/adapter/sqs_catalog.go @@ -1012,6 +1012,12 @@ func (s *SQSServer) tryCreateQueueOnce(ctx context.Context, requested *sqsQueueM // the new queue starts with a fresh full-capacity bucket // regardless of in-flight traffic to the prior incarnation. s.throttle.invalidateQueue(requested.Name) + // Mirror the throttle invalidate for the per-queue fanout-rotation + // counter. A delete-then-create race could otherwise leave the + // new queue starting partitioned receives at the previous + // incarnation's counter offset (harmless for correctness, but + // ties new-queue routing to old-queue history). + s.dropReceiveFanoutCounter(requested.Name) return true, nil } @@ -1038,6 +1044,12 @@ func (s *SQSServer) deleteQueue(w http.ResponseWriter, r *http.Request) { // surprising operators who use DeleteQueue+CreateQueue to reset // queue state. s.throttle.invalidateQueue(name) + // Drop the per-queue fanout-rotation counter as well. Without + // this, repeated DeleteQueue of unique queue names retains one + // receiveFanoutCounters entry per name for the process lifetime + // — a leak in multi-tenant / high-churn deployments. Codex P2, + // PR #732 round 5. + s.dropReceiveFanoutCounter(name) // SQS DeleteQueue returns 200 with an empty body. writeSQSJSON(w, map[string]any{}) } diff --git a/adapter/sqs_messages.go b/adapter/sqs_messages.go index e1d734893..b3994fc5c 100644 --- a/adapter/sqs_messages.go +++ b/adapter/sqs_messages.go @@ -929,6 +929,18 @@ func (s *SQSServer) nextReceiveFanoutStart(queueName string, partitions uint32) return counter.Add(1) & (partitions - 1) } +// dropReceiveFanoutCounter removes any per-queue fanout-rotation +// counter for queueName. Mirrors throttle.invalidateQueue: called +// from the same DeleteQueue and genuine-CreateQueue paths so the +// sync.Map does not retain a counter for every queue name the +// process has ever served. Without this, repeated create/delete of +// unique queue names (multi-tenant or high-churn workloads) leaks +// one entry per name for the lifetime of the process. No-op when +// the queue never produced a partitioned receive (entry absent). +func (s *SQSServer) dropReceiveFanoutCounter(queueName string) { + s.receiveFanoutCounters.Delete(queueName) +} + // scanAndDeliverPartition pages the visibility index for one // partition under the shared wall-clock + per-call max budget. On // legacy / non-partitioned queues the caller invokes this exactly diff --git a/adapter/sqs_partitioned_dispatch_test.go b/adapter/sqs_partitioned_dispatch_test.go index 85cdf2c16..3b04324f3 100644 --- a/adapter/sqs_partitioned_dispatch_test.go +++ b/adapter/sqs_partitioned_dispatch_test.go @@ -456,3 +456,61 @@ func TestNextReceiveFanoutStart_PerQueueRoundRobin(t *testing.T) { } } } + +// TestDropReceiveFanoutCounter_ClearsEntry pins the round-5 +// cleanup contract: a queue's per-queue fanout counter must be +// removed from receiveFanoutCounters once dropReceiveFanoutCounter +// is called, so repeated create/delete of unique queue names +// cannot leak one sync.Map entry per name for the process +// lifetime (Codex P2 on PR #732). The test also asserts the +// no-op shape on absent / never-partitioned queues so the same +// helper is safe to call from the catalog paths regardless of +// whether the queue ever issued a partitioned receive. +func TestDropReceiveFanoutCounter_ClearsEntry(t *testing.T) { + t.Parallel() + + s := &SQSServer{} + + // Drop on an absent queue must be a no-op (no panic, no + // allocation in the map). Calling it before any receive is + // the common path on legacy / non-partitioned queues that + // never populate the counter. + s.dropReceiveFanoutCounter("never-touched") + _, ok := s.receiveFanoutCounters.Load("never-touched") + require.False(t, ok, "no entry must exist for an unused queue name") + + // Populate a counter via the public path, then verify it is + // observable in the underlying sync.Map. + const partitions uint32 = 4 + _ = s.nextReceiveFanoutStart("queueA", partitions) + _, ok = s.receiveFanoutCounters.Load("queueA") + require.True(t, ok, "nextReceiveFanoutStart must populate the counter") + + // Drop and verify removal. A stale entry surviving here is the + // leak Codex flagged. + s.dropReceiveFanoutCounter("queueA") + _, ok = s.receiveFanoutCounters.Load("queueA") + require.False(t, ok, "dropReceiveFanoutCounter must remove the queue's entry") + + // Dropping an unrelated queue must not affect a different + // queue's counter (per-queue isolation contract from round 4 + // must hold across cleanup). + _ = s.nextReceiveFanoutStart("queueB", partitions) + _ = s.nextReceiveFanoutStart("queueC", partitions) + s.dropReceiveFanoutCounter("queueB") + _, ok = s.receiveFanoutCounters.Load("queueB") + require.False(t, ok, "queueB entry must be removed") + _, ok = s.receiveFanoutCounters.Load("queueC") + require.True(t, ok, "queueC entry must survive an unrelated drop") + + // After drop, a new receive on the same queue must allocate a + // fresh counter starting from 0+1 (the Add is unconditional), + // not pick up where the old counter left off. The first call's + // returned offset must be 1 (counter = 1, mask 3 → 1) on a + // fresh atomic.Uint32 — pinning this catches any regression + // that recycled the old pointer. + first := s.nextReceiveFanoutStart("queueA", partitions) + require.Equal(t, uint32(1), first, + "a queue recreated after drop must start from a zero counter (got first offset %d)", + first) +} From 5047223acc81018d07d5c85e6612f48ea497a493 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Mon, 4 May 2026 00:47:15 +0900 Subject: [PATCH 7/7] sqs(keys): add segment terminator between groupID and dedupID (PR #732, round 6) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CodeRabbit major: sqsPartitionedMsgDedupKey appends encodeSQSSegment(groupID) and encodeSQSSegment(dedupID) back-to-back. encodeSQSSegment uses base64.RawURLEncoding (no padding, alphabet [A-Za-z0-9_-]), so the boundary between the two segments is variable-length and ambiguous: distinct (groupID, dedupID) pairs can collapse onto the same byte sequence — most trivially when one of the two is empty: ("", "abcd") and ("abcd", "") both produce "...QUJDRA". Even for non-empty IDs the boundary depends on input length mod 3 in subtle ways. This is the on-disk dedup-key format PR 5b-3 will rely on once the dormancy gate lifts; fixing it now avoids shipping an ambiguous format. Fix: insert sqsPartitionedQueueTerminator pipe-byte between the two segments. That byte is outside RawURLEncoding's alphabet so neither segment can contribute one of its own — the boundary becomes unambiguous regardless of input length. Mirrors how sqsPartitionedMsgDataKey, MsgVisKey, MsgGroupKey, and MsgByAgeKey already use the same terminator at the queue/partition boundary. Caller audit: sqsPartitionedMsgDedupKey is reached only via sqsMsgDedupKeyDispatch, which has exactly two production callers — loadFifoDedupRecord (read-side) and sendFifoMessage (write-side, via the key returned from the same loadFifoDedupRecord call). Read and write share one helper invocation, so the new format takes effect symmetrically with no read/write skew window. parsePartitionedSQSKey only inspects the key through the partition field and stops; the new terminator after groupID does not affect parsing. Note on dormancy: partitioned queues are still gated at CreateQueue (§11 PR 2 dormancy gate, lifted atomically with the capability check in PR 5b-3), so no production data carries the old format. This is purely a forward-format correction. Test: TestSqsPartitionedMsgDedupKey_GroupDedupSeparator covers - the empty-segment collision class (the most blatant ambiguity), - the new terminator's exact placement (key ends with the b64(group)+pipe+b64(dedup) suffix, proving the byte sits at the boundary), - non-empty pair non-collision (("ab", "cd") vs ("abcd", "")), - read-write symmetry through sqsMsgDedupKeyDispatch (the dispatch helper produces the same bytes as the underlying constructor, so a regression that diverged the two would fail here). --- adapter/sqs_keys.go | 10 +++++ adapter/sqs_keys_dispatch_test.go | 69 +++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+) diff --git a/adapter/sqs_keys.go b/adapter/sqs_keys.go index 38abf55ac..44105c0e0 100644 --- a/adapter/sqs_keys.go +++ b/adapter/sqs_keys.go @@ -331,6 +331,16 @@ func sqsPartitionedMsgDedupKey(queueName string, partition uint32, gen uint64, g buf = appendU32(buf, partition) buf = appendU64(buf, gen) buf = append(buf, encodeSQSSegment(groupID)...) + // Terminator between the variable-length groupID and dedupID + // segments. encodeSQSSegment uses base64.RawURLEncoding (no + // padding), so back-to-back segments are not unambiguously + // splittable and distinct (groupID, dedupID) pairs can collapse + // onto the same key — reintroducing the cross-group false- + // duplicate class the round-3 dedup-scoping fix closed. The + // terminator '|' is safe because RawURLEncoding never emits it + // (alphabet is A-Z a-z 0-9 - _; see sqsPartitionedQueueTerminator + // docs). CodeRabbit major, PR #732 round 6. + buf = append(buf, sqsPartitionedQueueTerminator) buf = append(buf, encodeSQSSegment(dedupID)...) return buf } diff --git a/adapter/sqs_keys_dispatch_test.go b/adapter/sqs_keys_dispatch_test.go index f2b0b1d15..f4337d54f 100644 --- a/adapter/sqs_keys_dispatch_test.go +++ b/adapter/sqs_keys_dispatch_test.go @@ -208,6 +208,75 @@ func TestSQSDedupKeyDispatch_PartitionedScopesByMessageGroupId(t *testing.T) { "for queues created before partitioning landed") } +// TestSqsPartitionedMsgDedupKey_GroupDedupSeparator pins the round-6 +// fix for the CodeRabbit major: encodeSQSSegment uses RawURLEncoding +// (no padding, alphabet [A-Za-z0-9_-]), so when groupID and dedupID +// segments are concatenated WITHOUT a separator, distinct (group, +// dedup) pairs can collapse onto the same byte sequence — most +// trivially when one of the two is empty (("", "abcd") vs ("abcd", +// "") encode identically as base64). Even with non-empty inputs the +// boundary is fragile because the per-segment length depends on the +// input length mod 3. The fix is a single sqsPartitionedQueueTerminator +// '|' between the two segments; '|' is outside RawURLEncoding's +// alphabet so it cannot be produced by either encodeSQSSegment call, +// making the boundary unambiguous regardless of input length. +func TestSqsPartitionedMsgDedupKey_GroupDedupSeparator(t *testing.T) { + t.Parallel() + const ( + queue = "events.fifo" + gen = uint64(11) + partition = uint32(2) + ) + + // (1) The empty-segment collision class. SQS validation + // rejects empty group / dedup IDs at the public API, but the + // key constructor must still be unambiguous on its own — a + // future code path that passes an empty string (intentionally + // or via a bug) must NOT silently merge keyspaces. Without the + // terminator: ("", "abcd") and ("abcd", "") produce the same + // "...QUJDRA" suffix. + keyEmptyGroup := sqsPartitionedMsgDedupKey(queue, partition, gen, "", "abcd") + keyEmptyDedup := sqsPartitionedMsgDedupKey(queue, partition, gen, "abcd", "") + require.NotEqual(t, keyEmptyGroup, keyEmptyDedup, + "('', 'abcd') and ('abcd', '') must produce distinct keys — "+ + "the terminator is the only thing that disambiguates them") + + // (2) The separator must appear AFTER the encoded groupID, + // BEFORE the encoded dedupID. Build the expected suffix + // "|" and assert the key ends with + // it. RawURLEncoding's alphabet (A-Z a-z 0-9 - _) excludes '|' + // so neither segment can contribute one of its own — finding + // the literal "|" between them therefore proves the round-6 + // terminator is in place. (We can't just count '|' because the + // SqsPartitionedMsgDedupPrefix constant itself contains '|' + // separators inside the family-name marker.) + key := sqsPartitionedMsgDedupKey(queue, partition, gen, "groupA", "dedup-token") + wantTail := append(append([]byte(encodeSQSSegment("groupA")), '|'), encodeSQSSegment("dedup-token")...) + require.True(t, bytes.HasSuffix(key, wantTail), + "key must end with |; got key=%q want suffix=%q", + key, wantTail) + + // (3) Round-trip: two non-empty pairs whose b64 lengths align + // at the segment boundary must still produce distinct keys. + // Without a terminator, a regression that re-introduces back- + // to-back encoding could (for some lengths) make these match. + keyAB := sqsPartitionedMsgDedupKey(queue, partition, gen, "ab", "cd") + keyABCD := sqsPartitionedMsgDedupKey(queue, partition, gen, "abcd", "") + require.NotEqual(t, keyAB, keyABCD, + "('ab', 'cd') and ('abcd', '') must produce distinct keys") + + // (4) Read-write symmetry: the dispatch helper used by + // loadFifoDedupRecord (read) and sendFifoMessage (write) MUST + // route to the same constructor so both sides observe the same + // new format simultaneously — no read/write skew window. + meta := &sqsQueueMeta{PartitionCount: 4} + viaDispatch := sqsMsgDedupKeyDispatch(meta, queue, partition, gen, "groupA", "dedup-token") + require.Equal(t, key, viaDispatch, + "dispatch helper must produce the same bytes as the underlying "+ + "constructor — round-6 format change must be picked up "+ + "symmetrically on read and write paths") +} + // TestSQSKeysDispatch_LegacyAndPartitionedAreDistinct pins the // keyspace-isolation invariant at the dispatch level: a legacy // (PartitionCount=1) key and a partitioned (PartitionCount>1) key