diff --git a/adapter/sqs.go b/adapter/sqs.go index 112153bb..670f440f 100644 --- a/adapter/sqs.go +++ b/adapter/sqs.go @@ -7,6 +7,7 @@ import ( "net/http" "strconv" "strings" + "sync" "time" "github.com/bootjp/elastickv/kv" @@ -167,6 +168,22 @@ type SQSServer struct { // throttle config so unconfigured queues pay one nil-check per // request and nothing else (see sqs_throttle.go). throttle *bucketStore + // receiveFanoutCounters maps queueName → *atomic.Uint32 so each + // partitioned queue gets its own round-robin starting partition. + // Codex P1 round 4 flagged that a server-wide counter aliases + // across queues: when other queues' receives interleave with a + // stride that shares a factor with PartitionCount, the queue's + // observed counter subsequence cycles through only a subset of + // partitions, which can starve the rest under MaxNumberOfMessages + // pressure on the early-scanned ones. Per-queue isolation makes + // each queue's rotation depend solely on its own receive cadence. + // + // sync.Map is the right shape: lookups are read-mostly (the same + // queue keeps getting the same counter), and the keyset grows + // only with the number of distinct queues this server has handled + // receives for in-process — bounded by the operator-controlled + // CreateQueue rate. + receiveFanoutCounters sync.Map } // WithSQSLeaderMap configures the Raft-address-to-SQS-address mapping used to diff --git a/adapter/sqs_catalog.go b/adapter/sqs_catalog.go index 44bbadfb..abb179a2 100644 --- a/adapter/sqs_catalog.go +++ b/adapter/sqs_catalog.go @@ -1012,6 +1012,12 @@ func (s *SQSServer) tryCreateQueueOnce(ctx context.Context, requested *sqsQueueM // the new queue starts with a fresh full-capacity bucket // regardless of in-flight traffic to the prior incarnation. s.throttle.invalidateQueue(requested.Name) + // Mirror the throttle invalidate for the per-queue fanout-rotation + // counter. A delete-then-create race could otherwise leave the + // new queue starting partitioned receives at the previous + // incarnation's counter offset (harmless for correctness, but + // ties new-queue routing to old-queue history). + s.dropReceiveFanoutCounter(requested.Name) return true, nil } @@ -1038,6 +1044,12 @@ func (s *SQSServer) deleteQueue(w http.ResponseWriter, r *http.Request) { // surprising operators who use DeleteQueue+CreateQueue to reset // queue state. s.throttle.invalidateQueue(name) + // Drop the per-queue fanout-rotation counter as well. Without + // this, repeated DeleteQueue of unique queue names retains one + // receiveFanoutCounters entry per name for the process lifetime + // — a leak in multi-tenant / high-churn deployments. Codex P2, + // PR #732 round 5. + s.dropReceiveFanoutCounter(name) // SQS DeleteQueue returns 200 with an empty body. writeSQSJSON(w, map[string]any{}) } diff --git a/adapter/sqs_fifo.go b/adapter/sqs_fifo.go index c603a8f6..4db6dec2 100644 --- a/adapter/sqs_fifo.go +++ b/adapter/sqs_fifo.go @@ -91,8 +91,12 @@ func resolveFifoDedupID(meta *sqsQueueMeta, in sqsSendMessageInput) string { // or (nil, nil) when there is no live record for this dedup-id. // Expired records are surfaced as nil so a stale entry does not block // a fresh send within the same FIFO queue. -func (s *SQSServer) loadFifoDedupRecord(ctx context.Context, queueName string, gen uint64, dedupID string, readTS uint64) (*sqsFifoDedupRecord, []byte, error) { - key := sqsMsgDedupKey(queueName, gen, dedupID) +// +// groupID is the MessageGroupId; it participates in the partitioned +// dedup key so that two groups colliding onto the same partition keep +// disjoint dedup namespaces (the AWS messageGroup-scope contract). +func (s *SQSServer) loadFifoDedupRecord(ctx context.Context, queueName string, meta *sqsQueueMeta, partition uint32, gen uint64, groupID, dedupID string, readTS uint64) (*sqsFifoDedupRecord, []byte, error) { + key := sqsMsgDedupKeyDispatch(meta, queueName, partition, gen, groupID, dedupID) raw, err := s.store.GetAt(ctx, key, readTS) if err != nil { if errors.Is(err, store.ErrKeyNotFound) { @@ -130,11 +134,11 @@ func (s *SQSServer) loadFifoSequence(ctx context.Context, queueName string, read // loadFifoGroupLock fetches the in-flight lock for a group, if any. // Returns nil when no lock is held. Callers that also need the key -// can recompute it via sqsMsgGroupKey — the helper used to return it -// alongside the lock, but every caller already had the key in scope -// from a different code path. -func (s *SQSServer) loadFifoGroupLock(ctx context.Context, queueName string, gen uint64, groupID string, readTS uint64) (*sqsFifoGroupLock, error) { - key := sqsMsgGroupKey(queueName, gen, groupID) +// can recompute it via sqsMsgGroupKeyDispatch — the helper used to +// return it alongside the lock, but every caller already had the +// key in scope from a different code path. +func (s *SQSServer) loadFifoGroupLock(ctx context.Context, queueName string, meta *sqsQueueMeta, partition uint32, gen uint64, groupID string, readTS uint64) (*sqsFifoGroupLock, error) { + key := sqsMsgGroupKeyDispatch(meta, queueName, partition, gen, groupID) raw, err := s.store.GetAt(ctx, key, readTS) if err != nil { if errors.Is(err, store.ErrKeyNotFound) { @@ -172,7 +176,14 @@ func (s *SQSServer) sendFifoMessage( delay int64, readTS uint64, ) (map[string]string, bool, error) { - dedup, dedupKey, err := s.loadFifoDedupRecord(ctx, queueName, meta.Generation, dedupID, readTS) + // HT-FIFO: hash the MessageGroupId once at the entry point so + // every key built in this transaction (data, vis, byage, dedup, + // group-lock, sequence) lands in the same partition. partitionFor + // returns 0 on legacy / non-partitioned queues and on the + // perQueue throughput short-circuit, so the dispatch helpers + // round-trip to legacy output for those cases. + partition := partitionFor(meta, in.MessageGroupId) + dedup, dedupKey, err := s.loadFifoDedupRecord(ctx, queueName, meta, partition, meta.Generation, in.MessageGroupId, dedupID, readTS) if err != nil { return nil, false, err } @@ -216,9 +227,9 @@ func (s *SQSServer) sendFifoMessage( return nil, false, errors.WithStack(err) } - dataKey := sqsMsgDataKey(queueName, meta.Generation, rec.MessageID) - visKey := sqsMsgVisKey(queueName, meta.Generation, rec.AvailableAtMillis, rec.MessageID) - byAgeKey := sqsMsgByAgeKey(queueName, meta.Generation, rec.SendTimestampMillis, rec.MessageID) + dataKey := sqsMsgDataKeyDispatch(meta, queueName, partition, meta.Generation, rec.MessageID) + visKey := sqsMsgVisKeyDispatch(meta, queueName, partition, meta.Generation, rec.AvailableAtMillis, rec.MessageID) + byAgeKey := sqsMsgByAgeKeyDispatch(meta, queueName, partition, meta.Generation, rec.SendTimestampMillis, rec.MessageID) seqKey := sqsQueueSeqKey(queueName) metaKey := sqsQueueMetaKey(queueName) genKey := sqsQueueGenKey(queueName) @@ -270,9 +281,9 @@ const ( // classifyFifoGroupLock decides whether a FIFO candidate is eligible // for delivery. Standard queues bypass the function entirely. -func (s *SQSServer) classifyFifoGroupLock(ctx context.Context, queueName string, gen uint64, rec *sqsMessageRecord, readTS uint64) (fifoCandidateLockState, []byte, error) { - lockKey := sqsMsgGroupKey(queueName, gen, rec.MessageGroupId) - lock, err := s.loadFifoGroupLock(ctx, queueName, gen, rec.MessageGroupId, readTS) +func (s *SQSServer) classifyFifoGroupLock(ctx context.Context, queueName string, meta *sqsQueueMeta, partition uint32, gen uint64, rec *sqsMessageRecord, readTS uint64) (fifoCandidateLockState, []byte, error) { + lockKey := sqsMsgGroupKeyDispatch(meta, queueName, partition, gen, rec.MessageGroupId) + lock, err := s.loadFifoGroupLock(ctx, queueName, meta, partition, gen, rec.MessageGroupId, readTS) if err != nil { return fifoLockSkip, lockKey, err } diff --git a/adapter/sqs_keys.go b/adapter/sqs_keys.go index 5cc46c00..44105c0e 100644 --- a/adapter/sqs_keys.go +++ b/adapter/sqs_keys.go @@ -312,18 +312,35 @@ func sqsPartitionedMsgVisPrefixForQueue(queueName string, partition uint32, gen } // sqsPartitionedMsgDedupKey builds the FIFO dedup key for a -// partitioned queue. The dedup window is per-partition by design -// (DeduplicationScope=messageGroup with PartitionCount>1) — the -// validator in adapter/sqs_partitioning.go rejects the queue-scoped -// scope on partitioned queues, so this key shape is always reachable -// from the same partition that ran the dedup check. -func sqsPartitionedMsgDedupKey(queueName string, partition uint32, gen uint64, dedupID string) []byte { +// partitioned queue. DeduplicationScope=messageGroup (the only +// scope reachable on partitioned queues — the validator in +// adapter/sqs_partitioning.go rejects queue-scoped dedup) requires +// the dedup window to be per (queue, group, dedupID): two distinct +// MessageGroupIds that happen to FNV-collide onto the same partition +// must NOT share a dedup namespace, otherwise a fresh send in group B +// would be silently acked with group A's MessageId. Per design doc +// §4.1 line 200, the dedup key keys on (queue, partition, group, +// dedupID); partition is redundant given the deterministic group→ +// partition map but is kept for keyspace organization (every key +// belonging to a group lives under the same partition prefix). +func sqsPartitionedMsgDedupKey(queueName string, partition uint32, gen uint64, groupID, dedupID string) []byte { buf := make([]byte, 0, len(SqsPartitionedMsgDedupPrefix)+sqsKeyCapLarge) buf = append(buf, SqsPartitionedMsgDedupPrefix...) buf = append(buf, encodeSQSSegment(queueName)...) buf = append(buf, sqsPartitionedQueueTerminator) buf = appendU32(buf, partition) buf = appendU64(buf, gen) + buf = append(buf, encodeSQSSegment(groupID)...) + // Terminator between the variable-length groupID and dedupID + // segments. encodeSQSSegment uses base64.RawURLEncoding (no + // padding), so back-to-back segments are not unambiguously + // splittable and distinct (groupID, dedupID) pairs can collapse + // onto the same key — reintroducing the cross-group false- + // duplicate class the round-3 dedup-scoping fix closed. The + // terminator '|' is safe because RawURLEncoding never emits it + // (alphabet is A-Z a-z 0-9 - _; see sqsPartitionedQueueTerminator + // docs). CodeRabbit major, PR #732 round 6. + buf = append(buf, sqsPartitionedQueueTerminator) buf = append(buf, encodeSQSSegment(dedupID)...) return buf } diff --git a/adapter/sqs_keys_dispatch.go b/adapter/sqs_keys_dispatch.go index 0b57877b..6734caeb 100644 --- a/adapter/sqs_keys_dispatch.go +++ b/adapter/sqs_keys_dispatch.go @@ -1,5 +1,11 @@ package adapter +import ( + "bytes" + + "github.com/cockroachdb/errors" +) + // Per-key dispatch helpers that route to the legacy single-partition // constructor or the partitioned-FIFO constructor based on // meta.PartitionCount. Phase 3.D PR 5b's central abstraction: @@ -47,12 +53,17 @@ func sqsMsgVisKeyDispatch(meta *sqsQueueMeta, queueName string, partition uint32 } // sqsMsgDedupKeyDispatch builds the FIFO dedup key for either -// keyspace. Dedup scope is per-partition on partitioned queues -// (DeduplicationScope = messageGroup is enforced by the validator -// on PartitionCount > 1). -func sqsMsgDedupKeyDispatch(meta *sqsQueueMeta, queueName string, partition uint32, gen uint64, dedupID string) []byte { +// keyspace. On a partitioned queue (DeduplicationScope = messageGroup +// — enforced by the validator on PartitionCount > 1) the dedup window +// must be per (queue, group, dedupID): two distinct MessageGroupIds +// that FNV-collide onto the same partition must NOT share a dedup +// namespace, otherwise a fresh send in group B is silently acked +// with group A's MessageId. The legacy (non-partitioned) branch +// keeps the legacy (queue, gen, dedupID) shape — there is only one +// implicit "group" so no collision is possible there. +func sqsMsgDedupKeyDispatch(meta *sqsQueueMeta, queueName string, partition uint32, gen uint64, groupID, dedupID string) []byte { if meta != nil && meta.PartitionCount > 1 { - return sqsPartitionedMsgDedupKey(queueName, partition, gen, dedupID) + return sqsPartitionedMsgDedupKey(queueName, partition, gen, groupID, dedupID) } return sqsMsgDedupKey(queueName, gen, dedupID) } @@ -116,3 +127,82 @@ func effectivePartitionCount(meta *sqsQueueMeta) uint32 { } return meta.PartitionCount } + +// sqsMsgVisScanBoundsDispatch returns the start/end byte ranges that +// ReceiveMessage's per-partition visibility-index scan iterates. +// Mirrors sqsMsgVisScanBounds (legacy keyspace) but parametrises the +// prefix on partition when the queue is partitioned. The bounds are +// always [prefix||u64(0), prefix||u64(maxVisibleAtMillis+1)) so +// messages with visible_at == maxVisibleAtMillis are included. +func sqsMsgVisScanBoundsDispatch(meta *sqsQueueMeta, queueName string, partition uint32, gen uint64, maxVisibleAtMillis int64) (start, end []byte) { + prefix := sqsMsgVisPrefixForQueueDispatch(meta, queueName, partition, gen) + start = append(bytes.Clone(prefix), zeroU64()...) + upper := uint64MaxZero(maxVisibleAtMillis) + if upper < ^uint64(0) { + upper++ + } + end = append(bytes.Clone(prefix), encodedU64(upper)...) + return start, end +} + +// encodeReceiptHandleDispatch picks the receipt-handle wire format +// based on meta.PartitionCount: v1 on legacy / non-partitioned +// queues, v2 on partitioned ones. The partition argument is only +// consulted on the v2 branch — callers may pass 0 on the legacy +// branch. +// +// This is the single point where a fresh receipt handle commits to +// a wire version. Pairing the choice with the same meta.PartitionCount +// the dispatch helpers used to build keys keeps the handle's +// recorded partition consistent with the partition the message was +// stored under, so a later DeleteMessage / ChangeMessageVisibility +// routes to the right keyspace. +func encodeReceiptHandleDispatch(meta *sqsQueueMeta, partition uint32, queueGen uint64, messageIDHex string, receiptToken []byte) (string, error) { + if meta != nil && meta.PartitionCount > 1 { + return encodeReceiptHandleV2(partition, queueGen, messageIDHex, receiptToken) + } + return encodeReceiptHandle(queueGen, messageIDHex, receiptToken) +} + +// validateReceiptHandleVersion enforces the queue-aware version +// rule that replaced the dormancy gate from PR 5a: +// +// - meta.PartitionCount <= 1 (legacy / non-partitioned queue): +// handle MUST be v1. A v2 handle on a non-partitioned queue +// is structurally impossible (SendMessage would never have +// produced one) and accepting it would let a malicious caller +// re-encode a v1 handle as v2 to probe / corrupt the v2 layout +// before any partitioned queue exists. +// - meta.PartitionCount > 1 (partitioned queue): handle MUST be +// v2. A v1 handle on a partitioned queue carries no partition +// index, so dispatch would default to partition 0 and the +// delete / change-visibility would silently miss messages on +// other partitions. +// +// Mismatches surface as ReceiptHandleIsInvalid (the same AWS error +// shape used for malformed handles), so a misrouted client cannot +// distinguish "wrong version" from "garbled bytes" — preserving the +// PR 5a / PR 724 round 3 dormancy guarantee that the v2 wire format +// is not probeable from the public API. +func validateReceiptHandleVersion(meta *sqsQueueMeta, handle *decodedReceiptHandle) error { + if handle == nil { + return errors.New("receipt handle is nil") + } + if meta != nil && meta.PartitionCount > 1 { + if handle.Version != sqsReceiptHandleVersion2 { + return errors.New("receipt handle version mismatch for partitioned queue") + } + // handle.Partition is client-controlled once decodeClientReceiptHandle + // accepts v2. Without this bound check, an out-of-range partition + // falls through to sqsMsg*KeyDispatch and depends on downstream + // routing failure semantics instead of returning ReceiptHandleIsInvalid. + if handle.Partition >= meta.PartitionCount { + return errors.New("receipt handle partition out of range for queue") + } + return nil + } + if handle.Version != sqsReceiptHandleVersion1 { + return errors.New("receipt handle version mismatch for non-partitioned queue") + } + return nil +} diff --git a/adapter/sqs_keys_dispatch_test.go b/adapter/sqs_keys_dispatch_test.go index 27de6399..f4337d54 100644 --- a/adapter/sqs_keys_dispatch_test.go +++ b/adapter/sqs_keys_dispatch_test.go @@ -35,7 +35,7 @@ func TestSQSKeysDispatch_LegacyMatchesLegacyConstructor(t *testing.T) { sqsMsgVisKeyDispatch(nil, queue, 0, gen, ts, msgID), sqsMsgVisKey(queue, gen, ts, msgID)}, {"meta=nil dedup", - sqsMsgDedupKeyDispatch(nil, queue, 0, gen, dedupID), + sqsMsgDedupKeyDispatch(nil, queue, 0, gen, groupID, dedupID), sqsMsgDedupKey(queue, gen, dedupID)}, {"meta=nil group", sqsMsgGroupKeyDispatch(nil, queue, 0, gen, groupID), @@ -66,7 +66,7 @@ func TestSQSKeysDispatch_LegacyMatchesLegacyConstructor(t *testing.T) { sqsMsgVisKeyDispatch(&sqsQueueMeta{PartitionCount: 1}, queue, 0, gen, ts, msgID), sqsMsgVisKey(queue, gen, ts, msgID)}, {"meta.PartitionCount=1 dedup", - sqsMsgDedupKeyDispatch(&sqsQueueMeta{PartitionCount: 1}, queue, 0, gen, dedupID), + sqsMsgDedupKeyDispatch(&sqsQueueMeta{PartitionCount: 1}, queue, 0, gen, groupID, dedupID), sqsMsgDedupKey(queue, gen, dedupID)}, {"meta.PartitionCount=1 group", sqsMsgGroupKeyDispatch(&sqsQueueMeta{PartitionCount: 1}, queue, 0, gen, groupID), @@ -119,8 +119,8 @@ func TestSQSKeysDispatch_PartitionedMatchesPartitionedConstructor(t *testing.T) sqsMsgVisKeyDispatch(meta, queue, partition, gen, ts, msgID), sqsPartitionedMsgVisKey(queue, partition, gen, ts, msgID)}, {"dedup", - sqsMsgDedupKeyDispatch(meta, queue, partition, gen, dedupID), - sqsPartitionedMsgDedupKey(queue, partition, gen, dedupID)}, + sqsMsgDedupKeyDispatch(meta, queue, partition, gen, groupID, dedupID), + sqsPartitionedMsgDedupKey(queue, partition, gen, groupID, dedupID)}, {"group", sqsMsgGroupKeyDispatch(meta, queue, partition, gen, groupID), sqsPartitionedMsgGroupKey(queue, partition, gen, groupID)}, @@ -167,6 +167,116 @@ func TestSQSKeysDispatch_BoundaryAtPartitionCount2(t *testing.T) { "PartitionCount=2 must NOT route to the legacy keyspace") } +// TestSQSDedupKeyDispatch_PartitionedScopesByMessageGroupId is the +// regression for the round-3 P1 (Codex) on PR #732: with +// DeduplicationScope=messageGroup on a partitioned queue the dedup +// key MUST include MessageGroupId so two distinct groups that +// FNV-collide onto the same partition do NOT share a dedup +// namespace. Without the group segment, a fresh send in group "B" +// reusing group "A"'s dedup-id would be silently acked with group +// "A"'s MessageId — that is a data-loss outcome. +func TestSQSDedupKeyDispatch_PartitionedScopesByMessageGroupId(t *testing.T) { + t.Parallel() + meta := &sqsQueueMeta{PartitionCount: 4} + const ( + queue = "events.fifo" + gen = uint64(11) + partition = uint32(2) + dedupID = "shared-token" + ) + keyA := sqsMsgDedupKeyDispatch(meta, queue, partition, gen, "groupA", dedupID) + keyB := sqsMsgDedupKeyDispatch(meta, queue, partition, gen, "groupB", dedupID) + require.NotEqual(t, keyA, keyB, + "distinct MessageGroupIds on the same (queue, partition, dedupID) "+ + "must produce distinct dedup keys — otherwise a fresh send in "+ + "groupB is silently dropped as a duplicate of groupA") + + // Same group + same dedupID must round-trip to the same key (the + // idempotency contract we DO want to keep). + keyA2 := sqsMsgDedupKeyDispatch(meta, queue, partition, gen, "groupA", dedupID) + require.Equal(t, keyA, keyA2, + "same (group, dedupID) must produce the same dedup key — "+ + "AWS idempotent-by-design retries depend on this") + + // Legacy (non-partitioned) path is unaffected: groupID is ignored + // because there is only one implicit group on a non-partitioned + // queue and the legacy key shape predates partitioning. + legacyA := sqsMsgDedupKeyDispatch(nil, queue, 0, gen, "groupA", dedupID) + legacyB := sqsMsgDedupKeyDispatch(nil, queue, 0, gen, "groupB", dedupID) + require.Equal(t, legacyA, legacyB, + "legacy keyspace ignores groupID — preserves the on-disk shape "+ + "for queues created before partitioning landed") +} + +// TestSqsPartitionedMsgDedupKey_GroupDedupSeparator pins the round-6 +// fix for the CodeRabbit major: encodeSQSSegment uses RawURLEncoding +// (no padding, alphabet [A-Za-z0-9_-]), so when groupID and dedupID +// segments are concatenated WITHOUT a separator, distinct (group, +// dedup) pairs can collapse onto the same byte sequence — most +// trivially when one of the two is empty (("", "abcd") vs ("abcd", +// "") encode identically as base64). Even with non-empty inputs the +// boundary is fragile because the per-segment length depends on the +// input length mod 3. The fix is a single sqsPartitionedQueueTerminator +// '|' between the two segments; '|' is outside RawURLEncoding's +// alphabet so it cannot be produced by either encodeSQSSegment call, +// making the boundary unambiguous regardless of input length. +func TestSqsPartitionedMsgDedupKey_GroupDedupSeparator(t *testing.T) { + t.Parallel() + const ( + queue = "events.fifo" + gen = uint64(11) + partition = uint32(2) + ) + + // (1) The empty-segment collision class. SQS validation + // rejects empty group / dedup IDs at the public API, but the + // key constructor must still be unambiguous on its own — a + // future code path that passes an empty string (intentionally + // or via a bug) must NOT silently merge keyspaces. Without the + // terminator: ("", "abcd") and ("abcd", "") produce the same + // "...QUJDRA" suffix. + keyEmptyGroup := sqsPartitionedMsgDedupKey(queue, partition, gen, "", "abcd") + keyEmptyDedup := sqsPartitionedMsgDedupKey(queue, partition, gen, "abcd", "") + require.NotEqual(t, keyEmptyGroup, keyEmptyDedup, + "('', 'abcd') and ('abcd', '') must produce distinct keys — "+ + "the terminator is the only thing that disambiguates them") + + // (2) The separator must appear AFTER the encoded groupID, + // BEFORE the encoded dedupID. Build the expected suffix + // "|" and assert the key ends with + // it. RawURLEncoding's alphabet (A-Z a-z 0-9 - _) excludes '|' + // so neither segment can contribute one of its own — finding + // the literal "|" between them therefore proves the round-6 + // terminator is in place. (We can't just count '|' because the + // SqsPartitionedMsgDedupPrefix constant itself contains '|' + // separators inside the family-name marker.) + key := sqsPartitionedMsgDedupKey(queue, partition, gen, "groupA", "dedup-token") + wantTail := append(append([]byte(encodeSQSSegment("groupA")), '|'), encodeSQSSegment("dedup-token")...) + require.True(t, bytes.HasSuffix(key, wantTail), + "key must end with |; got key=%q want suffix=%q", + key, wantTail) + + // (3) Round-trip: two non-empty pairs whose b64 lengths align + // at the segment boundary must still produce distinct keys. + // Without a terminator, a regression that re-introduces back- + // to-back encoding could (for some lengths) make these match. + keyAB := sqsPartitionedMsgDedupKey(queue, partition, gen, "ab", "cd") + keyABCD := sqsPartitionedMsgDedupKey(queue, partition, gen, "abcd", "") + require.NotEqual(t, keyAB, keyABCD, + "('ab', 'cd') and ('abcd', '') must produce distinct keys") + + // (4) Read-write symmetry: the dispatch helper used by + // loadFifoDedupRecord (read) and sendFifoMessage (write) MUST + // route to the same constructor so both sides observe the same + // new format simultaneously — no read/write skew window. + meta := &sqsQueueMeta{PartitionCount: 4} + viaDispatch := sqsMsgDedupKeyDispatch(meta, queue, partition, gen, "groupA", "dedup-token") + require.Equal(t, key, viaDispatch, + "dispatch helper must produce the same bytes as the underlying "+ + "constructor — round-6 format change must be picked up "+ + "symmetrically on read and write paths") +} + // TestSQSKeysDispatch_LegacyAndPartitionedAreDistinct pins the // keyspace-isolation invariant at the dispatch level: a legacy // (PartitionCount=1) key and a partitioned (PartitionCount>1) key diff --git a/adapter/sqs_keys_dispatch_v2_test.go b/adapter/sqs_keys_dispatch_v2_test.go new file mode 100644 index 00000000..deb5baf1 --- /dev/null +++ b/adapter/sqs_keys_dispatch_v2_test.go @@ -0,0 +1,310 @@ +package adapter + +import ( + "bytes" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +// Tests for the PR 5b-2 dispatch helpers added on top of the PR 5b-1 +// per-key wrappers: encodeReceiptHandleDispatch (handle wire format +// dispatch), validateReceiptHandleVersion (queue-aware version check +// that replaces PR 5a's blanket rejection), and +// sqsMsgVisScanBoundsDispatch (per-partition scan bounds). + +const ( + dispatchTestMsgIDHex = "deadbeefdeadbeefdeadbeefdeadbeef" + dispatchTestQueue = "orders.fifo" +) + +// TestEncodeReceiptHandleDispatch_PicksVersionByPartitionCount pins +// the wire-format dispatch decision: meta.PartitionCount > 1 → v2, +// otherwise v1. The partition argument is only consulted on the v2 +// branch (a v1 handle has no partition field). +func TestEncodeReceiptHandleDispatch_PicksVersionByPartitionCount(t *testing.T) { + t.Parallel() + token := make([]byte, sqsReceiptTokenBytes) + + // Legacy queue: nil meta → v1. + h, err := encodeReceiptHandleDispatch(nil, 0, 7, dispatchTestMsgIDHex, token) + require.NoError(t, err) + parsed, err := decodeReceiptHandle(h) + require.NoError(t, err) + require.Equal(t, sqsReceiptHandleVersion1, parsed.Version, + "nil meta must dispatch to v1 — the legacy single-partition layout") + + // PartitionCount = 0 → v1 (canonical legacy / unset). + h, err = encodeReceiptHandleDispatch(&sqsQueueMeta{PartitionCount: 0}, 0, 7, dispatchTestMsgIDHex, token) + require.NoError(t, err) + parsed, err = decodeReceiptHandle(h) + require.NoError(t, err) + require.Equal(t, sqsReceiptHandleVersion1, parsed.Version) + + // PartitionCount = 1 → v1. + h, err = encodeReceiptHandleDispatch(&sqsQueueMeta{PartitionCount: 1}, 0, 7, dispatchTestMsgIDHex, token) + require.NoError(t, err) + parsed, err = decodeReceiptHandle(h) + require.NoError(t, err) + require.Equal(t, sqsReceiptHandleVersion1, parsed.Version) + + // PartitionCount = 4 → v2; partition is preserved. + h, err = encodeReceiptHandleDispatch(&sqsQueueMeta{PartitionCount: 4}, 3, 7, dispatchTestMsgIDHex, token) + require.NoError(t, err) + parsed, err = decodeReceiptHandle(h) + require.NoError(t, err) + require.Equal(t, sqsReceiptHandleVersion2, parsed.Version) + require.Equal(t, uint32(3), parsed.Partition) + require.Equal(t, uint64(7), parsed.QueueGeneration) +} + +// TestEncodeReceiptHandleDispatch_LegacyByteIdenticalToV1 protects +// the byte-identical-output guarantee on legacy queues: the +// dispatch helper's output must equal what encodeReceiptHandle +// would produce directly. Without this, a future refactor that +// added a v1.5 layout via the dispatch path could change the wire +// format on existing deployments without anyone noticing in the +// test suite. +func TestEncodeReceiptHandleDispatch_LegacyByteIdenticalToV1(t *testing.T) { + t.Parallel() + token := bytes.Repeat([]byte{0xAB}, sqsReceiptTokenBytes) + want, err := encodeReceiptHandle(7, dispatchTestMsgIDHex, token) + require.NoError(t, err) + + for _, meta := range []*sqsQueueMeta{nil, {PartitionCount: 0}, {PartitionCount: 1}} { + got, err := encodeReceiptHandleDispatch(meta, 0, 7, dispatchTestMsgIDHex, token) + require.NoError(t, err) + require.Equal(t, want, got, + "legacy dispatch (PartitionCount=%d) must be byte-identical to encodeReceiptHandle", + func() uint32 { + if meta == nil { + return 0 + } + return meta.PartitionCount + }()) + } +} + +// TestValidateReceiptHandleVersion_QueueAwareRules covers the four +// (handle.Version, meta.PartitionCount) cells of the PR 5b-2 +// version-validation matrix. The PR 5a blanket rejection moved +// here from decodeClientReceiptHandle, so this test pins the +// dormancy guarantee under the new contract. +func TestValidateReceiptHandleVersion_QueueAwareRules(t *testing.T) { + t.Parallel() + v1Handle := &decodedReceiptHandle{Version: sqsReceiptHandleVersion1} + v2Handle := &decodedReceiptHandle{Version: sqsReceiptHandleVersion2, Partition: 3} + + cases := []struct { + name string + meta *sqsQueueMeta + handle *decodedReceiptHandle + wantError bool + }{ + {"nil_meta_v1", nil, v1Handle, false}, + {"nil_meta_v2", nil, v2Handle, true}, + {"legacy_count0_v1", &sqsQueueMeta{PartitionCount: 0}, v1Handle, false}, + {"legacy_count0_v2", &sqsQueueMeta{PartitionCount: 0}, v2Handle, true}, + {"legacy_count1_v1", &sqsQueueMeta{PartitionCount: 1}, v1Handle, false}, + {"legacy_count1_v2", &sqsQueueMeta{PartitionCount: 1}, v2Handle, true}, + {"partitioned4_v1", &sqsQueueMeta{PartitionCount: 4}, v1Handle, true}, + {"partitioned4_v2", &sqsQueueMeta{PartitionCount: 4}, v2Handle, false}, + // perQueue throughput on PartitionCount=4 still requires v2 on + // the wire — partitionFor collapses every group to partition + // 0, but the keyspace is still partitioned, and a v1 handle + // would route through the legacy data-key constructor. + {"partitioned4_perQueue_v1", &sqsQueueMeta{PartitionCount: 4, FifoThroughputLimit: htfifoThroughputPerQueue}, v1Handle, true}, + {"partitioned4_perQueue_v2", &sqsQueueMeta{PartitionCount: 4, FifoThroughputLimit: htfifoThroughputPerQueue}, v2Handle, false}, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + err := validateReceiptHandleVersion(tc.meta, tc.handle) + if tc.wantError { + require.Error(t, err) + return + } + require.NoError(t, err) + }) + } +} + +// TestValidateReceiptHandleVersion_NilHandle pins the defensive nil +// branch so a typed-nil mistake does not silently pass. +func TestValidateReceiptHandleVersion_NilHandle(t *testing.T) { + t.Parallel() + err := validateReceiptHandleVersion(&sqsQueueMeta{PartitionCount: 4}, nil) + require.Error(t, err) +} + +// TestValidateReceiptHandleVersion_RejectsOutOfRangePartition pins +// the round-2 fix: handle.Partition is client-controlled once +// decodeClientReceiptHandle accepts v2, so the queue-aware validator +// must bounds-check it against meta.PartitionCount. Without this an +// out-of-range partition would fall through to sqsMsg*KeyDispatch +// and depend on downstream routing failure semantics rather than +// returning ReceiptHandleIsInvalid at the validation choke point. +func TestValidateReceiptHandleVersion_RejectsOutOfRangePartition(t *testing.T) { + t.Parallel() + meta := &sqsQueueMeta{PartitionCount: 4} + for _, partition := range []uint32{4, 5, 17, 1 << 30} { + err := validateReceiptHandleVersion(meta, &decodedReceiptHandle{ + Version: sqsReceiptHandleVersion2, + Partition: partition, + }) + require.Error(t, err, "partition=%d on PartitionCount=4 must be rejected", partition) + require.True(t, + strings.Contains(err.Error(), "out of range"), + "error must reference out-of-range partition, got %v", err) + } + // Boundary: partition == count-1 is the last legal value. + err := validateReceiptHandleVersion(meta, &decodedReceiptHandle{ + Version: sqsReceiptHandleVersion2, + Partition: 3, + }) + require.NoError(t, err) +} + +// TestSQSMsgVisScanBoundsDispatch_LegacyMatchesLegacy pins that on +// legacy / non-partitioned metas the dispatch helper produces the +// same start/end pair as the legacy sqsMsgVisScanBounds — the +// receive fanout's byte-identical guarantee on the steady-state +// path. +func TestSQSMsgVisScanBoundsDispatch_LegacyMatchesLegacy(t *testing.T) { + t.Parallel() + const gen uint64 = 7 + const maxTS int64 = 1_700_000_000_000 + + wantStart, wantEnd := sqsMsgVisScanBounds(dispatchTestQueue, gen, maxTS) + + for _, meta := range []*sqsQueueMeta{nil, {PartitionCount: 0}, {PartitionCount: 1}} { + gotStart, gotEnd := sqsMsgVisScanBoundsDispatch(meta, dispatchTestQueue, 0, gen, maxTS) + require.Equal(t, wantStart, gotStart, "start must match legacy sqsMsgVisScanBounds") + require.Equal(t, wantEnd, gotEnd, "end must match legacy sqsMsgVisScanBounds") + } +} + +// TestSQSMsgVisScanBoundsDispatch_PartitionedUsesPartitionedPrefix +// pins that on a partitioned queue the bounds carry the +// partitioned vis prefix — different partitions yield disjoint +// ranges, which is what makes the receive fanout safe (no scan +// overlap, no cross-partition leakage). +func TestSQSMsgVisScanBoundsDispatch_PartitionedUsesPartitionedPrefix(t *testing.T) { + t.Parallel() + const gen uint64 = 7 + const maxTS int64 = 1_700_000_000_000 + meta := &sqsQueueMeta{PartitionCount: 4} + + startP0, endP0 := sqsMsgVisScanBoundsDispatch(meta, dispatchTestQueue, 0, gen, maxTS) + startP1, endP1 := sqsMsgVisScanBoundsDispatch(meta, dispatchTestQueue, 1, gen, maxTS) + + require.NotEqual(t, startP0, startP1, + "different partitions must produce disjoint scan ranges") + require.Equal(t, sqsPartitionedMsgVisPrefixForQueue(dispatchTestQueue, 0, gen), + startP0[:len(sqsPartitionedMsgVisPrefixForQueue(dispatchTestQueue, 0, gen))], + "partition 0 start must be prefixed by sqsPartitionedMsgVisPrefixForQueue(_, 0, _)") + require.Equal(t, sqsPartitionedMsgVisPrefixForQueue(dispatchTestQueue, 1, gen), + startP1[:len(sqsPartitionedMsgVisPrefixForQueue(dispatchTestQueue, 1, gen))], + "partition 1 start must be prefixed by sqsPartitionedMsgVisPrefixForQueue(_, 1, _)") + require.True(t, bytes.HasPrefix(startP0, []byte(SqsMsgVisPrefix)), + "partitioned start key still begins with the SqsMsgVisPrefix marker") + require.True(t, bytes.HasPrefix(endP0, []byte(SqsMsgVisPrefix)), + "partitioned end key still begins with the SqsMsgVisPrefix marker") + require.True(t, bytes.Compare(startP0, endP0) < 0, + "start must compare strictly less than end") + require.True(t, bytes.Compare(startP1, endP1) < 0, + "start must compare strictly less than end") +} + +// TestSQSMsgVisScanBoundsDispatch_PerQueueOnPartitionedKeyspace +// pins the PR 731 round 2 forward note: when meta.PartitionCount=4 +// and FifoThroughputLimit=perQueue, partitionFor collapses every +// group to partition 0, effectivePartitionCount returns 1, but the +// partition-0 vis prefix is still the partitioned-keyspace one +// (not legacy) because dispatch is keyed on PartitionCount > 1. +// +// Without this invariant a perQueue queue would write to the +// partitioned vis prefix at partition 0 (under sendFifoMessage's +// dispatch helpers) but the receive fanout — collapsing to a +// single iteration — would scan the legacy prefix and miss +// everything. +func TestSQSMsgVisScanBoundsDispatch_PerQueueOnPartitionedKeyspace(t *testing.T) { + t.Parallel() + const gen uint64 = 7 + const maxTS int64 = 1_700_000_000_000 + meta := &sqsQueueMeta{ + PartitionCount: 4, + FifoThroughputLimit: htfifoThroughputPerQueue, + } + + // effectivePartitionCount collapses to 1 (perQueue short-circuit). + require.Equal(t, uint32(1), effectivePartitionCount(meta)) + + // But the partition-0 scan bounds still use the partitioned + // prefix — the dispatch helpers route by PartitionCount, not + // by the throughput-limit short-circuit. + gotStart, gotEnd := sqsMsgVisScanBoundsDispatch(meta, dispatchTestQueue, 0, gen, maxTS) + wantPrefix := sqsPartitionedMsgVisPrefixForQueue(dispatchTestQueue, 0, gen) + require.True(t, bytes.HasPrefix(gotStart, wantPrefix), + "perQueue + PartitionCount=4 must scan the partitioned "+ + "prefix at partition 0 (where send writes), not the "+ + "legacy prefix — without this the fanout misses everything") + require.True(t, bytes.HasPrefix(gotEnd, wantPrefix)) + + // And the legacy bounds for this same (queue, gen, maxTS) must + // be byte-distinct from the partitioned partition-0 bounds, so + // a future refactor that accidentally read the legacy prefix + // here would be caught by direct byte comparison. + legacyStart, _ := sqsMsgVisScanBounds(dispatchTestQueue, gen, maxTS) + require.NotEqual(t, legacyStart, gotStart, + "perQueue + PartitionCount=4 must NOT collapse to the "+ + "legacy keyspace — that would silently strand send writes") +} + +// TestEncodeReceiptHandleDispatch_PerQueueUsesV2 pins that perQueue +// + PartitionCount=4 still produces v2 handles (because the +// keyspace is partitioned). The handle records the partition the +// message was actually stored under — partitionFor returns 0 in +// perQueue mode, so every handle carries Partition=0, and the +// later DeleteMessage / ChangeMessageVisibility routes back to +// partition 0 of the partitioned keyspace. +func TestEncodeReceiptHandleDispatch_PerQueueUsesV2(t *testing.T) { + t.Parallel() + token := make([]byte, sqsReceiptTokenBytes) + meta := &sqsQueueMeta{ + PartitionCount: 4, + FifoThroughputLimit: htfifoThroughputPerQueue, + } + h, err := encodeReceiptHandleDispatch(meta, 0, 7, dispatchTestMsgIDHex, token) + require.NoError(t, err) + parsed, err := decodeReceiptHandle(h) + require.NoError(t, err) + require.Equal(t, sqsReceiptHandleVersion2, parsed.Version, + "perQueue + PartitionCount=4 must use the v2 wire format — "+ + "the partition the send wrote to is meaningful even "+ + "when partitionFor collapses every group to partition 0") + require.Equal(t, uint32(0), parsed.Partition) +} + +// TestValidateReceiptHandleVersion_RejectsV2OnNonPartitioned is a +// named regression for the dormancy guarantee under the new +// contract. PR 5a rejected v2 at the public API; PR 5b-2 moves +// that to validateReceiptHandleVersion against the loaded meta. +// In PR 5b-2 the §11 PR 2 dormancy gate still rejects +// PartitionCount > 1 at CreateQueue, so every queue is +// non-partitioned, so every v2 handle still surfaces as +// ReceiptHandleIsInvalid downstream — exactly the PR 5a +// observable behaviour. +func TestValidateReceiptHandleVersion_RejectsV2OnNonPartitioned(t *testing.T) { + t.Parallel() + v2 := &decodedReceiptHandle{Version: sqsReceiptHandleVersion2, Partition: 3} + for _, meta := range []*sqsQueueMeta{nil, {PartitionCount: 0}, {PartitionCount: 1}} { + err := validateReceiptHandleVersion(meta, v2) + require.Error(t, err, + "v2 handle on a non-partitioned queue must be rejected") + require.True(t, strings.Contains(err.Error(), "version"), + "error message should reference version mismatch") + } +} diff --git a/adapter/sqs_keys_test.go b/adapter/sqs_keys_test.go index b1b8f0c6..103f64db 100644 --- a/adapter/sqs_keys_test.go +++ b/adapter/sqs_keys_test.go @@ -46,7 +46,7 @@ func TestSqsPartitionedMsgKeys_DistinctFromLegacy(t *testing.T) { { name: "dedup", legacy: sqsMsgDedupKey(queue, gen, dedupID), - partitioned: sqsPartitionedMsgDedupKey(queue, partition, gen, dedupID), + partitioned: sqsPartitionedMsgDedupKey(queue, partition, gen, groupID, dedupID), }, { name: "group", diff --git a/adapter/sqs_messages.go b/adapter/sqs_messages.go index 4f3ee6d0..b3994fc5 100644 --- a/adapter/sqs_messages.go +++ b/adapter/sqs_messages.go @@ -12,6 +12,7 @@ import ( "sort" "strconv" "strings" + "sync/atomic" "time" "github.com/bootjp/elastickv/kv" @@ -369,45 +370,28 @@ func decodeReceiptHandleV1(b []byte) (*decodedReceiptHandle, error) { } // decodeClientReceiptHandle is the public-API entry point for -// decoding a client-supplied receipt handle. It wraps -// decodeReceiptHandle with the dormancy gate that keeps the v2 -// codec inert until Phase 3.D PR 5b wires the partitioned-FIFO -// data plane. +// decoding a client-supplied receipt handle. It returns the parsed +// shape without validating the version against the target queue — +// PR 5a's blanket v2 rejection has been replaced with the queue- +// aware check in validateReceiptHandleVersion, which the meta- +// loading callers (loadMessageForDelete / loadAndVerifyMessage) +// invoke once they have the queue's PartitionCount in scope. // -// # Why the gate +// Splitting decode from version validation lets DeleteMessage / +// ChangeMessageVisibility produce a single ReceiptHandleIsInvalid +// shape for both "garbled bytes" and "wrong version for this +// queue" without leaking which one happened — preserving the +// dormancy promise of PR 724 round 3 (no v2 wire-format +// probability from the public API) under the new contract. // -// PR 5a adds the v2 codec to the binary but does NOT yet wire any -// production path that produces v2 handles — SendMessage on a -// partitioned queue is rejected by the §11 PR 2 dormancy gate -// (PartitionCount > 1 → InvalidAttributeValue). Without this -// helper, a client could craft a v2 handle (re-encoding a -// legitimately-issued v1 handle's queue_gen / message_id / -// receipt_token under the v2 layout) and DeleteMessage / -// ChangeMessageVisibility would accept it, since the downstream -// validation only checks queue_gen + receipt_token. The behaviour -// is technically correct (the v1 keyspace lookup still finds the -// message) but it leaks the new wire format before PR 5b lands — -// breaking the "no behavior change yet" guarantee of this PR -// (codex/coderabbit major on PR #724). -// -// PR 5b lifts this gate together with the rest of the data-plane -// fanout: it replaces the != v1 check with a queue-aware version -// (v1 required on non-partitioned queues, v2 required on -// partitioned ones), so neither version leaks into the wrong -// keyspace. Until then, any v2 handle on the public API surfaces -// as ReceiptHandleIsInvalid. +// On partitioned queues (PartitionCount > 1) the §11 PR 2 +// dormancy gate is still in force in PR 5b-2 — CreateQueue +// rejects PartitionCount > 1, so no production queue can be in +// the partitioned branch yet, and validateReceiptHandleVersion +// against a non-partitioned meta still rejects every v2 handle. +// PR 5b-3 lifts the gate together with the capability check. func decodeClientReceiptHandle(raw string) (*decodedReceiptHandle, error) { - handle, err := decodeReceiptHandle(raw) - if err != nil { - return nil, err - } - if handle.Version != sqsReceiptHandleVersion1 { - // v2 codec is added but dormant until PR 5b. Reject any - // non-v1 handle on the public API so the wire format - // does not leak. - return nil, errors.New("receipt handle version is not yet enabled on the public API") - } - return handle, nil + return decodeReceiptHandle(raw) } func decodeReceiptHandleV2(b []byte) (*decodedReceiptHandle, error) { @@ -533,9 +517,14 @@ func (s *SQSServer) sendMessage(w http.ResponseWriter, r *http.Request) { return } - dataKey := sqsMsgDataKey(queueName, meta.Generation, rec.MessageID) - visKey := sqsMsgVisKey(queueName, meta.Generation, rec.AvailableAtMillis, rec.MessageID) - byAgeKey := sqsMsgByAgeKey(queueName, meta.Generation, rec.SendTimestampMillis, rec.MessageID) + // Standard queues never have PartitionCount > 1 (the cross- + // attribute validator rejects it), so partition is always 0 + // here and the dispatch helpers route to the legacy keyspace + // — byte-identical to the pre-PR-5b output. + const partition uint32 = 0 + dataKey := sqsMsgDataKeyDispatch(meta, queueName, partition, meta.Generation, rec.MessageID) + visKey := sqsMsgVisKeyDispatch(meta, queueName, partition, meta.Generation, rec.AvailableAtMillis, rec.MessageID) + byAgeKey := sqsMsgByAgeKeyDispatch(meta, queueName, partition, meta.Generation, rec.SendTimestampMillis, rec.MessageID) metaKey := sqsQueueMetaKey(queueName) genKey := sqsQueueGenKey(queueName) // StartTS + ReadKeys fence against a concurrent DeleteQueue / @@ -867,19 +856,106 @@ func (s *SQSServer) scanAndDeliverOnce(ctx context.Context, queueName string, op if !exists { return nil, newSQSAPIError(http.StatusBadRequest, sqsErrQueueDoesNotExist, "queue does not exist") } - now := time.Now().UnixMilli() - start, end := sqsMsgVisScanBounds(queueName, meta.Generation, now) pageSize := opts.Max * sqsReceiveScanOverfetchFactor if pageSize > sqsVisScanPageLimit { pageSize = sqsVisScanPageLimit } deadline := time.Now().Add(sqsVisScanWallClockBudget) delivered := make([]map[string]any, 0, opts.Max) + // Per-partition fanout. effectivePartitionCount returns 1 on + // legacy / non-partitioned queues (and on perQueue throughput + // mode, where every group hashes to partition 0 — see + // effectivePartitionCount doc), so this loop is byte-identical + // to the pre-fanout path on those queues. Partitions are + // scanned sequentially so the wall-clock + opts.Max budgets are + // shared across the whole receive call instead of being + // multiplied by N. + // + // Rotate the starting partition by readTS so a sustained backlog + // on a single partition cannot permanently starve the others: a + // fixed start at 0 lets opts.Max fill from partition 0 on every + // call, so messages in higher-index partitions are never observed + // while the head queue stays hot. readTS is HLC-derived and + // advances per call, giving a uniform-enough rotation without + // any per-server state. FIFO ordering is unaffected because a + // MessageGroupId hashes to exactly one partition (partitionFor is + // deterministic), so cross-partition iteration order does not + // reorder messages within any group. + partitions := effectivePartitionCount(meta) + startOffset := s.nextReceiveFanoutStart(queueName, partitions) + for i := uint32(0); i < partitions; i++ { + if len(delivered) >= opts.Max { + break + } + if time.Now().After(deadline) { + return delivered, nil + } + partition := (startOffset + i) % partitions + fresh, err := s.scanAndDeliverPartition(ctx, queueName, meta, partition, readTS, deadline, pageSize, sqsReceiveOptions{ + Max: opts.Max - len(delivered), + VisibilityTimeout: opts.VisibilityTimeout, + WaitSeconds: opts.WaitSeconds, + MessageAttributeNames: opts.MessageAttributeNames, + }) + if err != nil { + return delivered, err + } + delivered = append(delivered, fresh...) + } + return delivered, nil +} + +// nextReceiveFanoutStart returns the starting partition index for a +// partitioned ReceiveMessage call. The counter is keyed by queueName +// so each queue's rotation depends only on its own receive cadence +// — a server-wide counter (round 3) aliases when other queues +// interleave with strides that share a factor with PartitionCount +// (Codex P1 round 4). +// +// PartitionCount is power-of-two (validator), so mask-AND is modulo; +// consecutive receives on the same queue walk every partition in +// strict round-robin. Returns 0 on legacy / non-partitioned queues +// (partitions <= 1) without touching the map so unconfigured queues +// pay no per-call allocation. +func (s *SQSServer) nextReceiveFanoutStart(queueName string, partitions uint32) uint32 { + if partitions <= 1 { + return 0 + } + v, ok := s.receiveFanoutCounters.Load(queueName) + if !ok { + v, _ = s.receiveFanoutCounters.LoadOrStore(queueName, &atomic.Uint32{}) + } + counter, _ := v.(*atomic.Uint32) + return counter.Add(1) & (partitions - 1) +} + +// dropReceiveFanoutCounter removes any per-queue fanout-rotation +// counter for queueName. Mirrors throttle.invalidateQueue: called +// from the same DeleteQueue and genuine-CreateQueue paths so the +// sync.Map does not retain a counter for every queue name the +// process has ever served. Without this, repeated create/delete of +// unique queue names (multi-tenant or high-churn workloads) leaks +// one entry per name for the lifetime of the process. No-op when +// the queue never produced a partitioned receive (entry absent). +func (s *SQSServer) dropReceiveFanoutCounter(queueName string) { + s.receiveFanoutCounters.Delete(queueName) +} + +// scanAndDeliverPartition pages the visibility index for one +// partition under the shared wall-clock + per-call max budget. On +// legacy / non-partitioned queues the caller invokes this exactly +// once with partition=0 and the bounds come from +// sqsMsgVisScanBoundsDispatch's legacy branch — byte-identical to +// the pre-fanout scanAndDeliverOnce body. +func (s *SQSServer) scanAndDeliverPartition(ctx context.Context, queueName string, meta *sqsQueueMeta, partition uint32, readTS uint64, deadline time.Time, pageSize int, opts sqsReceiveOptions) ([]map[string]any, error) { + now := time.Now().UnixMilli() + start, end := sqsMsgVisScanBoundsDispatch(meta, queueName, partition, meta.Generation, now) + delivered := make([]map[string]any, 0, opts.Max) for len(delivered) < opts.Max { if time.Now().After(deadline) { return delivered, nil } - page, next, done, scanErr := s.scanOneVisibleMessagePage(ctx, start, end, pageSize, readTS) + page, next, done, scanErr := s.scanOneVisibleMessagePage(ctx, start, end, pageSize, readTS, partition) if scanErr != nil { return nil, scanErr } @@ -923,11 +999,15 @@ func resolveReceiveMaxMessages(requested *int) (int, error) { // scanVisibleMessageCandidates returns vis-index entries with // visible_at <= now, up to limit. Each entry carries the key (needed -// for the delete-old-vis step) and the message_id pointed at by its -// value. +// for the delete-old-vis step), the message_id pointed at by its +// value, and the partition the vis-index entry was found under (so +// downstream rotate / delete / expire helpers can look up data / +// byage / group-lock keys in the same partition the message was +// originally stored under). On legacy queues partition is always 0. type sqsMsgCandidate struct { visKey []byte messageID string + partition uint32 } // sqsVisScanWallClockBudget caps how long the scan + deliver loop may @@ -941,8 +1021,11 @@ const sqsVisScanWallClockBudget = 100 * time.Millisecond // scanOneVisibleMessagePage reads a single page of the visibility // index starting at `start`, returning the parsed candidates plus the // cursor for the next page. `done=true` means the scan range is -// drained and the caller should stop paging. -func (s *SQSServer) scanOneVisibleMessagePage(ctx context.Context, start, end []byte, pageSize int, readTS uint64) ([]sqsMsgCandidate, []byte, bool, error) { +// drained and the caller should stop paging. `partition` is stamped +// onto every candidate so downstream rotate / delete / expire +// helpers route to the same partition the vis-index entry was found +// under (legacy queues always pass 0). +func (s *SQSServer) scanOneVisibleMessagePage(ctx context.Context, start, end []byte, pageSize int, readTS uint64, partition uint32) ([]sqsMsgCandidate, []byte, bool, error) { kvs, err := s.store.ScanAt(ctx, start, end, pageSize, readTS) if err != nil { return nil, start, true, errors.WithStack(err) @@ -952,7 +1035,7 @@ func (s *SQSServer) scanOneVisibleMessagePage(ctx context.Context, start, end [] } out := make([]sqsMsgCandidate, 0, len(kvs)) for _, kvp := range kvs { - out = append(out, sqsMsgCandidate{visKey: bytes.Clone(kvp.Key), messageID: string(kvp.Value)}) + out = append(out, sqsMsgCandidate{visKey: bytes.Clone(kvp.Key), messageID: string(kvp.Value), partition: partition}) } if len(kvs) < pageSize { return out, start, true, nil @@ -979,8 +1062,8 @@ func (s *SQSServer) scanOneVisibleMessagePage(ctx context.Context, start, end [] // - skip=true, err=nil : ErrKeyNotFound race; caller skips this one. // - skip=false, err!=nil : non-retryable; propagate. // - skip=false, err=nil : record loaded. -func (s *SQSServer) loadCandidateRecord(ctx context.Context, queueName string, gen uint64, cand sqsMsgCandidate, readTS uint64) (*sqsMessageRecord, []byte, bool, error) { - dataKey := sqsMsgDataKey(queueName, gen, cand.messageID) +func (s *SQSServer) loadCandidateRecord(ctx context.Context, queueName string, meta *sqsQueueMeta, gen uint64, cand sqsMsgCandidate, readTS uint64) (*sqsMessageRecord, []byte, bool, error) { + dataKey := sqsMsgDataKeyDispatch(meta, queueName, cand.partition, gen, cand.messageID) raw, err := s.store.GetAt(ctx, dataKey, readTS) if err != nil { if errors.Is(err, store.ErrKeyNotFound) { @@ -1002,8 +1085,8 @@ func (s *SQSServer) loadCandidateRecord(ctx context.Context, queueName string, g // no longer our responsibility either way. Any other error propagates // so a coordinator / storage failure does not silently fall through // to "delivered empty", matching the receive-error policy. -func (s *SQSServer) expireMessage(ctx context.Context, queueName string, gen uint64, visKey, dataKey []byte, rec *sqsMessageRecord, readTS uint64) error { - byAgeKey := sqsMsgByAgeKey(queueName, gen, rec.SendTimestampMillis, rec.MessageID) +func (s *SQSServer) expireMessage(ctx context.Context, queueName string, meta *sqsQueueMeta, partition uint32, gen uint64, visKey, dataKey []byte, rec *sqsMessageRecord, readTS uint64) error { + byAgeKey := sqsMsgByAgeKeyDispatch(meta, queueName, partition, gen, rec.SendTimestampMillis, rec.MessageID) readKeys := [][]byte{visKey, dataKey, sqsQueueMetaKey(queueName), sqsQueueGenKey(queueName)} elems := []*kv.Elem[kv.OP]{ {Op: kv.Del, Key: visKey}, @@ -1014,8 +1097,8 @@ func (s *SQSServer) expireMessage(ctx context.Context, queueName string, gen uin // in the same group can become deliverable. This mirrors the delete // and redrive paths. if rec.MessageGroupId != "" { - lockKey := sqsMsgGroupKey(queueName, gen, rec.MessageGroupId) - lock, err := s.loadFifoGroupLock(ctx, queueName, gen, rec.MessageGroupId, readTS) + lockKey := sqsMsgGroupKeyDispatch(meta, queueName, partition, gen, rec.MessageGroupId) + lock, err := s.loadFifoGroupLock(ctx, queueName, meta, partition, gen, rec.MessageGroupId, readTS) if err != nil { return err } @@ -1061,7 +1144,7 @@ func (s *SQSServer) rotateMessagesForDelivery( if len(delivered) >= opts.Max { break } - msg, skip, err := s.tryDeliverCandidate(ctx, queueName, meta.Generation, cand, meta.MessageRetentionSeconds, readTS, opts, redrive) + msg, skip, err := s.tryDeliverCandidate(ctx, queueName, meta, cand, readTS, opts, redrive) if err != nil { return delivered, err } @@ -1087,18 +1170,18 @@ func (s *SQSServer) rotateMessagesForDelivery( func (s *SQSServer) tryDeliverCandidate( ctx context.Context, queueName string, - gen uint64, + meta *sqsQueueMeta, cand sqsMsgCandidate, - retentionSeconds int64, readTS uint64, opts sqsReceiveOptions, redrive *parsedRedrivePolicy, ) (map[string]any, bool, error) { - rec, dataKey, skip, err := s.loadCandidateRecord(ctx, queueName, gen, cand, readTS) + gen := meta.Generation + rec, dataKey, skip, err := s.loadCandidateRecord(ctx, queueName, meta, gen, cand, readTS) if skip || err != nil { return nil, skip, err } - if expired, err := s.handleRetentionExpiry(ctx, queueName, gen, cand, dataKey, rec, retentionSeconds, readTS); expired || err != nil { + if expired, err := s.handleRetentionExpiry(ctx, queueName, meta, cand, dataKey, rec, meta.MessageRetentionSeconds, readTS); expired || err != nil { return nil, expired, err } if shouldRedrive(rec, redrive) { @@ -1107,7 +1190,7 @@ func (s *SQSServer) tryDeliverCandidate( // receive response intentionally omits redriven messages — // AWS does the same and consumers polling the source queue // must not observe a poison message past the limit. - moved, err := s.redriveCandidateToDLQ(ctx, queueName, gen, cand, dataKey, rec, redrive, s.queueArn(queueName), readTS) + moved, err := s.redriveCandidateToDLQ(ctx, queueName, meta, cand, dataKey, rec, redrive, s.queueArn(queueName), readTS) if err != nil { return nil, false, err } @@ -1119,7 +1202,7 @@ func (s *SQSServer) tryDeliverCandidate( lockState := fifoLockAcquire var lockKey []byte if rec.MessageGroupId != "" { - state, key, err := s.classifyFifoGroupLock(ctx, queueName, gen, rec, readTS) + state, key, err := s.classifyFifoGroupLock(ctx, queueName, meta, cand.partition, gen, rec, readTS) if err != nil { return nil, false, err } @@ -1129,7 +1212,7 @@ func (s *SQSServer) tryDeliverCandidate( lockState = state lockKey = key } - return s.commitReceiveRotation(ctx, queueName, gen, cand, dataKey, rec, readTS, opts, lockKey, lockState) + return s.commitReceiveRotation(ctx, queueName, meta, cand, dataKey, rec, readTS, opts, lockKey, lockState) } // handleRetentionExpiry deletes the candidate inline when its @@ -1137,7 +1220,7 @@ func (s *SQSServer) tryDeliverCandidate( // does not keep re-finding it. Returns (expired, err): expired=true // means the candidate has been (or is being) reaped and the caller // must skip. -func (s *SQSServer) handleRetentionExpiry(ctx context.Context, queueName string, gen uint64, cand sqsMsgCandidate, dataKey []byte, rec *sqsMessageRecord, retentionSeconds int64, readTS uint64) (bool, error) { +func (s *SQSServer) handleRetentionExpiry(ctx context.Context, queueName string, meta *sqsQueueMeta, cand sqsMsgCandidate, dataKey []byte, rec *sqsMessageRecord, retentionSeconds int64, readTS uint64) (bool, error) { if retentionSeconds <= 0 { return false, nil } @@ -1145,7 +1228,7 @@ func (s *SQSServer) handleRetentionExpiry(ctx context.Context, queueName string, if now-rec.SendTimestampMillis <= retentionSeconds*sqsMillisPerSecond { return false, nil } - if err := s.expireMessage(ctx, queueName, gen, cand.visKey, dataKey, rec, readTS); err != nil { + if err := s.expireMessage(ctx, queueName, meta, cand.partition, meta.Generation, cand.visKey, dataKey, rec, readTS); err != nil { return false, err } return true, nil @@ -1156,7 +1239,8 @@ func (s *SQSServer) handleRetentionExpiry(ctx context.Context, queueName string, // the candidate carries a MessageGroupId the transaction also // installs (or refreshes) the per-group lock so a later message in // the same group cannot overtake it on the next receive. -func (s *SQSServer) commitReceiveRotation(ctx context.Context, queueName string, gen uint64, cand sqsMsgCandidate, dataKey []byte, rec *sqsMessageRecord, readTS uint64, opts sqsReceiveOptions, lockKey []byte, lockState fifoCandidateLockState) (map[string]any, bool, error) { +func (s *SQSServer) commitReceiveRotation(ctx context.Context, queueName string, meta *sqsQueueMeta, cand sqsMsgCandidate, dataKey []byte, rec *sqsMessageRecord, readTS uint64, opts sqsReceiveOptions, lockKey []byte, lockState fifoCandidateLockState) (map[string]any, bool, error) { + gen := meta.Generation newToken, err := newReceiptToken() if err != nil { return nil, false, err @@ -1173,7 +1257,7 @@ func (s *SQSServer) commitReceiveRotation(ctx context.Context, queueName string, if err != nil { return nil, false, err } - newVisKey := sqsMsgVisKey(queueName, gen, newVisibleAt, cand.messageID) + newVisKey := sqsMsgVisKeyDispatch(meta, queueName, cand.partition, gen, newVisibleAt, cand.messageID) req, err := buildReceiveRotationOps(queueName, cand, dataKey, recordBytes, newVisKey, lockKey, lockState, newVisibleAt, readTS) if err != nil { return nil, false, err @@ -1185,7 +1269,7 @@ func (s *SQSServer) commitReceiveRotation(ctx context.Context, queueName string, return nil, false, errors.WithStack(err) } - handle, err := encodeReceiptHandle(gen, cand.messageID, newToken) + handle, err := encodeReceiptHandleDispatch(meta, cand.partition, gen, cand.messageID, newToken) if err != nil { return nil, false, err } @@ -1335,14 +1419,14 @@ func (s *SQSServer) deleteMessageWithRetry(ctx context.Context, queueName string backoff := transactRetryInitialBackoff deadline := time.Now().Add(transactRetryMaxDuration) for range transactRetryMaxAttempts { - rec, dataKey, readTS, outcome, err := s.loadMessageForDelete(ctx, queueName, handle) + meta, rec, dataKey, readTS, outcome, err := s.loadMessageForDelete(ctx, queueName, handle) if err != nil { return err } if outcome == sqsDeleteNoOp { return nil } - req, err := s.buildDeleteOps(ctx, queueName, handle, rec, dataKey, readTS) + req, err := s.buildDeleteOps(ctx, queueName, meta, handle, rec, dataKey, readTS) if err != nil { return err } @@ -1362,9 +1446,9 @@ func (s *SQSServer) deleteMessageWithRetry(ctx context.Context, queueName string // buildDeleteOps assembles the OCC OperationGroup for a DeleteMessage // commit. The FIFO group-lock release branch lives here so the // retry-loop wrapper stays readable and within the cyclomatic budget. -func (s *SQSServer) buildDeleteOps(ctx context.Context, queueName string, handle *decodedReceiptHandle, rec *sqsMessageRecord, dataKey []byte, readTS uint64) (*kv.OperationGroup[kv.OP], error) { - visKey := sqsMsgVisKey(queueName, handle.QueueGeneration, rec.VisibleAtMillis, rec.MessageID) - byAgeKey := sqsMsgByAgeKey(queueName, handle.QueueGeneration, rec.SendTimestampMillis, rec.MessageID) +func (s *SQSServer) buildDeleteOps(ctx context.Context, queueName string, meta *sqsQueueMeta, handle *decodedReceiptHandle, rec *sqsMessageRecord, dataKey []byte, readTS uint64) (*kv.OperationGroup[kv.OP], error) { + visKey := sqsMsgVisKeyDispatch(meta, queueName, handle.Partition, handle.QueueGeneration, rec.VisibleAtMillis, rec.MessageID) + byAgeKey := sqsMsgByAgeKeyDispatch(meta, queueName, handle.Partition, handle.QueueGeneration, rec.SendTimestampMillis, rec.MessageID) readKeys := [][]byte{dataKey, visKey, sqsQueueMetaKey(queueName), sqsQueueGenKey(queueName)} elems := []*kv.Elem[kv.OP]{ {Op: kv.Del, Key: dataKey}, @@ -1372,8 +1456,8 @@ func (s *SQSServer) buildDeleteOps(ctx context.Context, queueName string, handle {Op: kv.Del, Key: byAgeKey}, } if rec.MessageGroupId != "" { - lockKey := sqsMsgGroupKey(queueName, handle.QueueGeneration, rec.MessageGroupId) - lock, err := s.loadFifoGroupLock(ctx, queueName, handle.QueueGeneration, rec.MessageGroupId, readTS) + lockKey := sqsMsgGroupKeyDispatch(meta, queueName, handle.Partition, handle.QueueGeneration, rec.MessageGroupId) + lock, err := s.loadFifoGroupLock(ctx, queueName, meta, handle.Partition, handle.QueueGeneration, rec.MessageGroupId, readTS) if err != nil { return nil, err } @@ -1414,34 +1498,37 @@ const ( // to a different (or recreated) queue and we reject it as a structural // error — silently succeeding would let misrouted deletes ack messages // that cannot possibly be deleted on this queue. -func (s *SQSServer) loadMessageForDelete(ctx context.Context, queueName string, handle *decodedReceiptHandle) (*sqsMessageRecord, []byte, uint64, sqsDeleteOutcome, error) { +func (s *SQSServer) loadMessageForDelete(ctx context.Context, queueName string, handle *decodedReceiptHandle) (*sqsQueueMeta, *sqsMessageRecord, []byte, uint64, sqsDeleteOutcome, error) { readTS := s.nextTxnReadTS(ctx) meta, exists, err := s.loadQueueMetaAt(ctx, queueName, readTS) if err != nil { - return nil, nil, readTS, sqsDeleteProceed, errors.WithStack(err) + return nil, nil, nil, readTS, sqsDeleteProceed, errors.WithStack(err) } if !exists { - return nil, nil, readTS, sqsDeleteProceed, newSQSAPIError(http.StatusBadRequest, sqsErrQueueDoesNotExist, "queue does not exist") + return nil, nil, nil, readTS, sqsDeleteProceed, newSQSAPIError(http.StatusBadRequest, sqsErrQueueDoesNotExist, "queue does not exist") } if meta.Generation != handle.QueueGeneration { - return nil, nil, readTS, sqsDeleteProceed, newSQSAPIError(http.StatusBadRequest, sqsErrReceiptHandleInvalid, "receipt handle does not belong to this queue") + return nil, nil, nil, readTS, sqsDeleteProceed, newSQSAPIError(http.StatusBadRequest, sqsErrReceiptHandleInvalid, "receipt handle does not belong to this queue") } - dataKey := sqsMsgDataKey(queueName, handle.QueueGeneration, handle.MessageIDHex) + if err := validateReceiptHandleVersion(meta, handle); err != nil { + return nil, nil, nil, readTS, sqsDeleteProceed, newSQSAPIError(http.StatusBadRequest, sqsErrReceiptHandleInvalid, "receipt handle is not valid for this queue") + } + dataKey := sqsMsgDataKeyDispatch(meta, queueName, handle.Partition, handle.QueueGeneration, handle.MessageIDHex) raw, err := s.store.GetAt(ctx, dataKey, readTS) if err != nil { if errors.Is(err, store.ErrKeyNotFound) { - return nil, nil, readTS, sqsDeleteNoOp, nil + return meta, nil, nil, readTS, sqsDeleteNoOp, nil } - return nil, nil, readTS, sqsDeleteProceed, errors.WithStack(err) + return nil, nil, nil, readTS, sqsDeleteProceed, errors.WithStack(err) } rec, err := decodeSQSMessageRecord(raw) if err != nil { - return nil, nil, readTS, sqsDeleteProceed, errors.WithStack(err) + return nil, nil, nil, readTS, sqsDeleteProceed, errors.WithStack(err) } if !bytes.Equal(rec.CurrentReceiptToken, handle.ReceiptToken) { - return nil, nil, readTS, sqsDeleteNoOp, nil + return meta, nil, nil, readTS, sqsDeleteNoOp, nil } - return rec, dataKey, readTS, sqsDeleteProceed, nil + return meta, rec, dataKey, readTS, sqsDeleteProceed, nil } func (s *SQSServer) changeMessageVisibility(w http.ResponseWriter, r *http.Request) { @@ -1485,7 +1572,7 @@ func (s *SQSServer) changeVisibilityWithRetry(ctx context.Context, queueName str backoff := transactRetryInitialBackoff deadline := time.Now().Add(transactRetryMaxDuration) for range transactRetryMaxAttempts { - rec, dataKey, readTS, apiErr := s.loadAndVerifyMessage(ctx, queueName, handle) + meta, rec, dataKey, readTS, apiErr := s.loadAndVerifyMessage(ctx, queueName, handle) if apiErr != nil { return apiErr } @@ -1493,13 +1580,13 @@ func (s *SQSServer) changeVisibilityWithRetry(ctx context.Context, queueName str if rec.VisibleAtMillis <= now { return newSQSAPIError(http.StatusBadRequest, sqsErrMessageNotInflight, "message is not currently in flight") } - oldVisKey := sqsMsgVisKey(queueName, handle.QueueGeneration, rec.VisibleAtMillis, rec.MessageID) + oldVisKey := sqsMsgVisKeyDispatch(meta, queueName, handle.Partition, handle.QueueGeneration, rec.VisibleAtMillis, rec.MessageID) rec.VisibleAtMillis = now + newTimeout*sqsMillisPerSecond recordBytes, err := encodeSQSMessageRecord(rec) if err != nil { return errors.WithStack(err) } - newVisKey := sqsMsgVisKey(queueName, handle.QueueGeneration, rec.VisibleAtMillis, rec.MessageID) + newVisKey := sqsMsgVisKeyDispatch(meta, queueName, handle.Partition, handle.QueueGeneration, rec.VisibleAtMillis, rec.MessageID) // StartTS pins OCC to the snapshot; without it the coordinator // would auto-assign a newer StartTS and a concurrent receive / // delete that commits between our load and dispatch could slip @@ -1555,34 +1642,37 @@ func (s *SQSServer) parseQueueAndReceipt(queueUrl, receiptHandle string) (string // cleans them up, so a handle from a deleted / recreated queue must // be rejected with ReceiptHandleIsInvalid instead of silently // mutating the orphan record. -func (s *SQSServer) loadAndVerifyMessage(ctx context.Context, queueName string, handle *decodedReceiptHandle) (*sqsMessageRecord, []byte, uint64, error) { +func (s *SQSServer) loadAndVerifyMessage(ctx context.Context, queueName string, handle *decodedReceiptHandle) (*sqsQueueMeta, *sqsMessageRecord, []byte, uint64, error) { readTS := s.nextTxnReadTS(ctx) meta, exists, err := s.loadQueueMetaAt(ctx, queueName, readTS) if err != nil { - return nil, nil, readTS, errors.WithStack(err) + return nil, nil, nil, readTS, errors.WithStack(err) } if !exists { - return nil, nil, readTS, newSQSAPIError(http.StatusBadRequest, sqsErrQueueDoesNotExist, "queue does not exist") + return nil, nil, nil, readTS, newSQSAPIError(http.StatusBadRequest, sqsErrQueueDoesNotExist, "queue does not exist") } if meta.Generation != handle.QueueGeneration { - return nil, nil, readTS, newSQSAPIError(http.StatusBadRequest, sqsErrReceiptHandleInvalid, "receipt handle does not belong to this queue") + return nil, nil, nil, readTS, newSQSAPIError(http.StatusBadRequest, sqsErrReceiptHandleInvalid, "receipt handle does not belong to this queue") + } + if err := validateReceiptHandleVersion(meta, handle); err != nil { + return nil, nil, nil, readTS, newSQSAPIError(http.StatusBadRequest, sqsErrReceiptHandleInvalid, "receipt handle is not valid for this queue") } - dataKey := sqsMsgDataKey(queueName, handle.QueueGeneration, handle.MessageIDHex) + dataKey := sqsMsgDataKeyDispatch(meta, queueName, handle.Partition, handle.QueueGeneration, handle.MessageIDHex) raw, err := s.store.GetAt(ctx, dataKey, readTS) if err != nil { if errors.Is(err, store.ErrKeyNotFound) { - return nil, nil, readTS, newSQSAPIError(http.StatusBadRequest, sqsErrReceiptHandleInvalid, "message not found") + return nil, nil, nil, readTS, newSQSAPIError(http.StatusBadRequest, sqsErrReceiptHandleInvalid, "message not found") } - return nil, nil, readTS, errors.WithStack(err) + return nil, nil, nil, readTS, errors.WithStack(err) } rec, err := decodeSQSMessageRecord(raw) if err != nil { - return nil, nil, readTS, errors.WithStack(err) + return nil, nil, nil, readTS, errors.WithStack(err) } if !bytes.Equal(rec.CurrentReceiptToken, handle.ReceiptToken) { - return nil, nil, readTS, newSQSAPIError(http.StatusBadRequest, sqsErrInvalidReceiptHandle, "receipt handle token does not match") + return nil, nil, nil, readTS, newSQSAPIError(http.StatusBadRequest, sqsErrInvalidReceiptHandle, "receipt handle token does not match") } - return rec, dataKey, readTS, nil + return meta, rec, dataKey, readTS, nil } // ------------------------ small helpers ------------------------ diff --git a/adapter/sqs_messages_batch.go b/adapter/sqs_messages_batch.go index e837750c..6a9c61ce 100644 --- a/adapter/sqs_messages_batch.go +++ b/adapter/sqs_messages_batch.go @@ -193,9 +193,14 @@ func (s *SQSServer) sendBatchStandardOnce( failed = append(failed, batchErrorEntryFromAPIErr(entry.Id, apiErr)) continue } - dataKey := sqsMsgDataKey(queueName, meta.Generation, rec.MessageID) - visKey := sqsMsgVisKey(queueName, meta.Generation, rec.AvailableAtMillis, rec.MessageID) - byAgeKey := sqsMsgByAgeKey(queueName, meta.Generation, rec.SendTimestampMillis, rec.MessageID) + // Standard batch send: PartitionCount > 1 is rejected on + // non-FIFO queues by the cross-attribute validator, so + // partition is always 0 and dispatch routes to legacy + // keys — byte-identical to the pre-PR-5b output. + const partition uint32 = 0 + dataKey := sqsMsgDataKeyDispatch(meta, queueName, partition, meta.Generation, rec.MessageID) + visKey := sqsMsgVisKeyDispatch(meta, queueName, partition, meta.Generation, rec.AvailableAtMillis, rec.MessageID) + byAgeKey := sqsMsgByAgeKeyDispatch(meta, queueName, partition, meta.Generation, rec.SendTimestampMillis, rec.MessageID) elems = append(elems, &kv.Elem[kv.OP]{Op: kv.Put, Key: dataKey, Value: recordBytes}, &kv.Elem[kv.OP]{Op: kv.Put, Key: visKey, Value: []byte(rec.MessageID)}, diff --git a/adapter/sqs_partition_resolver_test.go b/adapter/sqs_partition_resolver_test.go index d9cb9fe2..c8dc4fb9 100644 --- a/adapter/sqs_partition_resolver_test.go +++ b/adapter/sqs_partition_resolver_test.go @@ -69,7 +69,7 @@ func TestSQSPartitionResolver_ResolveByPartition(t *testing.T) { for familyName, key := range map[string][]byte{ "data": sqsPartitionedMsgDataKey(tc.queue, tc.partition, 1, "msg-id"), "vis": sqsPartitionedMsgVisKey(tc.queue, tc.partition, 1, 1700000000000, "msg-id"), - "dedup": sqsPartitionedMsgDedupKey(tc.queue, tc.partition, 1, "dedup-id"), + "dedup": sqsPartitionedMsgDedupKey(tc.queue, tc.partition, 1, "group-id", "dedup-id"), "group": sqsPartitionedMsgGroupKey(tc.queue, tc.partition, 1, "group-id"), "byage": sqsPartitionedMsgByAgeKey(tc.queue, tc.partition, 1, 1700000000000, "msg-id"), } { diff --git a/adapter/sqs_partitioned_dispatch_test.go b/adapter/sqs_partitioned_dispatch_test.go new file mode 100644 index 00000000..3b04324f --- /dev/null +++ b/adapter/sqs_partitioned_dispatch_test.go @@ -0,0 +1,516 @@ +package adapter + +import ( + "context" + "fmt" + "net/http" + "strings" + "sync" + "testing" + "time" + + "github.com/bootjp/elastickv/kv" + "github.com/stretchr/testify/require" +) + +// Integration tests for the PR 5b-2 partitioned-FIFO data plane +// wiring. The §11 PR 2 dormancy gate still rejects PartitionCount +// > 1 at CreateQueue (lifted atomically with the capability check +// in PR 5b-3), so these tests reach below the public CreateQueue +// surface to install a partitioned meta record directly. That +// short-circuits the dormancy gate for the duration of the test +// without disabling it for production CreateQueue calls — which +// is the exact split the design doc envisaged for "data plane +// landed but not user-creatable yet." + +// installPartitionedMetaForTest overwrites the queue's meta record +// with a partitioned shape (PartitionCount > 1) by dispatching a +// raw OCC put against the meta key. The queue is created via the +// normal CreateQueue path first (so generation / incarnation +// counters and the catalog index are populated correctly); only +// the partition-shape attributes are mutated. +// +// The dormancy gate intercepts CreateQueue, not the data plane, +// so once the meta record carries PartitionCount > 1 every +// SendMessage / ReceiveMessage / DeleteMessage call routes +// through the partitioned dispatch helpers exactly as it would +// after PR 5b-3 lifts the gate. +func installPartitionedMetaForTest(t *testing.T, node Node, queueName string, partitionCount uint32, throughputLimit string) { + t.Helper() + s := node.sqsServer + require.NotNil(t, s, "test must run on a node with sqsServer wired") + + ctx := context.Background() + readTS := s.nextTxnReadTS(ctx) + meta, exists, err := s.loadQueueMetaAt(ctx, queueName, readTS) + require.NoError(t, err) + require.True(t, exists, "queue %q must exist before partition meta override", queueName) + + meta.PartitionCount = partitionCount + meta.FifoThroughputLimit = throughputLimit + meta.DeduplicationScope = htfifoDedupeScopeMessageGroup + + body, err := encodeSQSQueueMeta(meta) + require.NoError(t, err) + + req := &kv.OperationGroup[kv.OP]{ + IsTxn: true, + StartTS: readTS, + ReadKeys: [][]byte{sqsQueueMetaKey(queueName)}, + Elems: []*kv.Elem[kv.OP]{ + {Op: kv.Put, Key: sqsQueueMetaKey(queueName), Value: body}, + }, + } + _, err = s.coordinator.Dispatch(ctx, req) + require.NoError(t, err) +} + +// TestSQSServer_PartitionedFIFO_SendReceiveDeleteRoundTrip is the +// end-to-end smoke test for PR 5b-2's wiring: SendMessage on a +// partitioned FIFO queue stores the message under the partitioned +// keyspace, ReceiveMessage's fanout finds it via the partitioned +// vis prefix, the returned ReceiptHandle is in v2 wire format, and +// DeleteMessage routes back to the same partition via +// handle.Partition. Failure of any step is a wiring break. +func TestSQSServer_PartitionedFIFO_SendReceiveDeleteRoundTrip(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + const queueName = "orders.fifo" + status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{ + "QueueName": queueName, + "Attributes": map[string]string{"FifoQueue": "true"}, + }) + require.Equal(t, http.StatusOK, status, "create FIFO queue: %v", out) + queueURL, _ := out["QueueUrl"].(string) + require.NotEmpty(t, queueURL) + + installPartitionedMetaForTest(t, node, queueName, 4, htfifoThroughputPerMessageGroupID) + + // Load meta once so the test can assert each handle lands on the + // partition partitionFor would compute for its group — a loose + // "Partition < 4" check would still pass if every group landed + // on partition 0, masking the dispatch regression this test is + // meant to catch. + ctx := context.Background() + readTS := node.sqsServer.nextTxnReadTS(ctx) + meta, exists, err := node.sqsServer.loadQueueMetaAt(ctx, queueName, readTS) + require.NoError(t, err) + require.True(t, exists) + require.Equal(t, uint32(4), meta.PartitionCount, + "meta override must have installed PartitionCount=4") + + // Send a few messages spread across distinct group ids so + // partitionFor gets to actually pick different partitions. + groups := []string{"alpha", "beta", "gamma", "delta", "epsilon", "zeta"} + sent := make(map[string]string, len(groups)) // group -> messageID + byMessageID := make(map[string]string, len(groups)) // messageID -> group + for i, g := range groups { + body := "body-" + g + dedup := "dedup-" + g + status, out := callSQS(t, node, sqsSendMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MessageBody": body, + "MessageGroupId": g, + "MessageDeduplicationId": dedup, + }) + require.Equal(t, http.StatusOK, status, + "send #%d (group=%s): %v", i, g, out) + msgID, _ := out["MessageId"].(string) + require.NotEmpty(t, msgID, "send #%d: empty MessageId", i) + sent[g] = msgID + byMessageID[msgID] = g + } + + // Receive them all; the fanout walks all 4 partitions to find + // every message. Receive enough times to drain the queue; + // each receive bumps a per-message visibility deadline so a + // second receive is a no-op for already-rotated messages + // inside the same call. + collected := make(map[string]string, len(groups)) // messageID -> receiptHandle + for range len(groups) * 4 { // generous budget + if len(collected) == len(groups) { + break + } + status, out := callSQS(t, node, sqsReceiveMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MaxNumberOfMessages": 10, + "VisibilityTimeout": 60, + }) + require.Equal(t, http.StatusOK, status, "receive: %v", out) + msgs, _ := out["Messages"].([]any) + for _, m := range msgs { + mm, _ := m.(map[string]any) + id, _ := mm["MessageId"].(string) + handle, _ := mm["ReceiptHandle"].(string) + + // The handle must be v2 since the queue is partitioned — + // this is the wiring guarantee. + parsed, err := decodeReceiptHandle(handle) + require.NoError(t, err, "decode receipt handle: %v", err) + require.Equal(t, sqsReceiptHandleVersion2, parsed.Version, + "partitioned queue must produce v2 handles, got version=0x%02x for message=%s", + parsed.Version, id) + group, ok := byMessageID[id] + require.True(t, ok, "received unknown messageId %s", id) + require.Equal(t, + partitionFor(meta, group), + parsed.Partition, + "message %s (group=%s) routed to wrong partition", id, group) + + collected[id] = handle + } + } + require.Len(t, collected, len(groups), + "fanout receive must surface every message on the partitioned queue") + + // Delete each one via its v2 handle. The DeleteMessage path + // uses handle.Partition to dispatch the data-key lookup; if + // that wire is broken these calls return ReceiptHandleIsInvalid + // (no record found at the legacy data key). + for id, handle := range collected { + status, out := callSQS(t, node, sqsDeleteMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "ReceiptHandle": handle, + }) + require.Equal(t, http.StatusOK, status, + "delete (msg=%s): %v", id, out) + } + + // Queue must now be empty. Probe the partitioned data keyspace + // directly so a regression that turned DeleteMessage into a + // "leave the record invisible but not removed" no-op cannot + // false-pass under the still-active 60s visibility window. + for p := uint32(0); p < meta.PartitionCount; p++ { + dataPrefix := sqsMsgDataKeyDispatch(meta, queueName, p, meta.Generation, "") + end := append([]byte(nil), dataPrefix...) + end = append(end, 0xFF, 0xFF, 0xFF, 0xFF) + page, err := node.sqsServer.store.ScanAt(ctx, dataPrefix, end, 32, node.sqsServer.nextTxnReadTS(ctx)) + require.NoError(t, err, "post-delete scan partition %d: %v", p, err) + for _, kvp := range page { + require.False(t, + strings.HasPrefix(string(kvp.Key), string(dataPrefix)), + "partition %d still holds key %q after delete — DeleteMessage left a tombstone-less record", + p, string(kvp.Key)) + } + } + + // And the public API must agree: a fresh receive with a short + // visibility timeout (after sleeping past it, so any in-flight + // invisible record would re-expose) returns no messages. + time.Sleep(1100 * time.Millisecond) + status, out = callSQS(t, node, sqsReceiveMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MaxNumberOfMessages": 10, + "VisibilityTimeout": 1, + }) + require.Equal(t, http.StatusOK, status, "post-delete receive: %v", out) + if msgs, _ := out["Messages"].([]any); len(msgs) > 0 { + t.Fatalf("expected empty queue after delete; got %d messages", len(msgs)) + } + + // Sanity: the legacy keyspace must be empty (every send on + // this queue went to the partitioned keyspace, never the + // legacy one). We probe the legacy data prefix at this + // queue's generation: it must yield zero entries. + readTS = node.sqsServer.nextTxnReadTS(ctx) + legacyDataPrefix := sqsMsgDataKey(queueName, meta.Generation, "") + // Cap the prefix scan at the generation byte so we do not + // drag in unrelated queues. + end := append([]byte(nil), legacyDataPrefix...) + end = append(end, 0xFF, 0xFF, 0xFF, 0xFF) + page, err := node.sqsServer.store.ScanAt(ctx, legacyDataPrefix, end, 32, readTS) + require.NoError(t, err) + for _, kvp := range page { + require.False(t, strings.HasPrefix(string(kvp.Key), string(legacyDataPrefix)), + "legacy data key found on a partitioned queue (key=%q) — wiring leaked into the wrong keyspace", + string(kvp.Key)) + } +} + +// TestSQSServer_PartitionedFIFO_RejectsV1Handle pins the queue- +// aware version validation: a v1 handle on a partitioned queue +// surfaces as ReceiptHandleIsInvalid. PR 5a's blanket v2 rejection +// at the API boundary moved into validateReceiptHandleVersion, +// which is invoked once meta is loaded. +func TestSQSServer_PartitionedFIFO_RejectsV1Handle(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + const queueName = "rejectv1.fifo" + status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{ + "QueueName": queueName, + "Attributes": map[string]string{"FifoQueue": "true"}, + }) + require.Equal(t, http.StatusOK, status, "create: %v", out) + queueURL, _ := out["QueueUrl"].(string) + + // Send one message under legacy meta so we know the queue + // generation. Then upgrade meta to partitioned. + status, _ = callSQS(t, node, sqsSendMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MessageBody": "x", + "MessageGroupId": "g", + "MessageDeduplicationId": "d", + }) + require.Equal(t, http.StatusOK, status) + + installPartitionedMetaForTest(t, node, queueName, 4, htfifoThroughputPerMessageGroupID) + + // Forge a v1 handle that would have been valid before the + // upgrade. The partitioned-queue receipt-version check must + // reject it as ReceiptHandleIsInvalid (not pass it through to + // dispatch where it would route to the legacy keyspace and + // silently miss). + token := make([]byte, sqsReceiptTokenBytes) + v1Handle, err := encodeReceiptHandle(1, "deadbeefdeadbeefdeadbeefdeadbeef", token) + require.NoError(t, err) + status, out = callSQS(t, node, sqsDeleteMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "ReceiptHandle": v1Handle, + }) + require.Equal(t, http.StatusBadRequest, status, + "v1 handle on partitioned queue must surface as ReceiptHandleIsInvalid: %v", out) + require.Equal(t, sqsErrReceiptHandleInvalid, out["__type"]) + + // ChangeMessageVisibility must reject the same v1 handle. + status, out = callSQS(t, node, sqsChangeMessageVisibilityTarget, map[string]any{ + "QueueUrl": queueURL, + "ReceiptHandle": v1Handle, + "VisibilityTimeout": int64(30), + }) + require.Equal(t, http.StatusBadRequest, status, "%v", out) + require.Equal(t, sqsErrReceiptHandleInvalid, out["__type"]) +} + +// TestSQSServer_PartitionedFIFO_PerQueueCollapsesToPartitionZero +// is the named regression for the PR 731 round 2 forward note: a +// queue with PartitionCount=4 and FifoThroughputLimit=perQueue +// has every group hashed to partition 0 by partitionFor, but the +// keyspace is still partitioned. The receive fanout must collapse +// to 1 iteration (effectivePartitionCount=1) AND that iteration +// must scan the partitioned vis prefix — the perQueue +// short-circuit narrows the iteration count, not the keyspace. +func TestSQSServer_PartitionedFIFO_PerQueueCollapsesToPartitionZero(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + const queueName = "perqueue.fifo" + status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{ + "QueueName": queueName, + "Attributes": map[string]string{"FifoQueue": "true"}, + }) + require.Equal(t, http.StatusOK, status, "create: %v", out) + queueURL, _ := out["QueueUrl"].(string) + + installPartitionedMetaForTest(t, node, queueName, 4, htfifoThroughputPerQueue) + + // Send messages from many distinct groups; in perQueue mode + // they all land in partition 0. + groups := []string{"alpha", "beta", "gamma", "delta", "epsilon"} + for _, g := range groups { + status, out := callSQS(t, node, sqsSendMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MessageBody": "body-" + g, + "MessageGroupId": g, + "MessageDeduplicationId": "d-" + g, + }) + require.Equal(t, http.StatusOK, status, "send group=%s: %v", g, out) + } + + // One receive call must surface every message — the fanout + // only iterates partition 0 (effectivePartitionCount=1) but + // that single iteration must use the partitioned vis prefix. + collected := make(map[string]bool, len(groups)) + for range 4 { + if len(collected) == len(groups) { + break + } + status, out := callSQS(t, node, sqsReceiveMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MaxNumberOfMessages": 10, + "VisibilityTimeout": 60, + }) + require.Equal(t, http.StatusOK, status, "receive: %v", out) + msgs, _ := out["Messages"].([]any) + for _, m := range msgs { + mm, _ := m.(map[string]any) + body, _ := mm["Body"].(string) + collected[body] = true + handle, _ := mm["ReceiptHandle"].(string) + parsed, err := decodeReceiptHandle(handle) + require.NoError(t, err) + require.Equal(t, sqsReceiptHandleVersion2, parsed.Version, + "perQueue + PartitionCount=4 must still produce v2 handles") + require.Equal(t, uint32(0), parsed.Partition, + "perQueue mode pins every group to partition 0, so every handle must record Partition=0") + } + } + require.Len(t, collected, len(groups), + "perQueue receive must surface every message in one fanout pass over partition 0") +} + +// TestNextReceiveFanoutStart_PerQueueRoundRobin pins the round-4 +// regression: the round-3 fix used a server-wide atomic counter, +// which Codex P1 round 4 flagged because it aliases across queues +// — when other queues' receives interleave with strides that share +// a factor with PartitionCount, the queue's observed counter +// subsequence cycles through only a subset of partitions, which can +// starve the rest. Per-queue counters make each queue's rotation +// depend only on its own receive cadence. +// +// This test pins: +// - legacy / non-partitioned queues collapse to 0 without +// perturbing any counter, +// - a single queue's consecutive calls walk every partition in +// strict round-robin, +// - interleaved calls across two queues do NOT cross-pollute each +// other's counter (the round-4 isolation contract), +// - concurrent receives across many queues remain atomic and +// in-range under -race. +func TestNextReceiveFanoutStart_PerQueueRoundRobin(t *testing.T) { + t.Parallel() + + s := &SQSServer{} + + // Legacy / non-partitioned / perQueue queues collapse to 0 and + // must not perturb any counter. + require.Equal(t, uint32(0), s.nextReceiveFanoutStart("legacy", 0)) + require.Equal(t, uint32(0), s.nextReceiveFanoutStart("legacy", 1)) + + // Single-queue strict round-robin: 16 consecutive calls over 4 + // partitions must produce exactly 4 hits per partition. + const partitions uint32 = 4 + seen := make(map[uint32]int, partitions) + for range 16 { + seen[s.nextReceiveFanoutStart("queueA", partitions)]++ + } + require.Len(t, seen, int(partitions), + "counter-driven rotation must cover every partition; got %v", seen) + for p := uint32(0); p < partitions; p++ { + require.Equal(t, 4, seen[p], + "partition %d hit %d times in 16 calls; round-robin must be exact", + p, seen[p]) + } + + // Per-queue isolation — the round-4 contract. Interleaving calls + // across queueB and queueC must not cross-pollute their counters: + // each queue must still see strict round-robin in the order ITS + // calls were issued. With a server-wide counter (round 3), B's + // consecutive calls would observe values 1, 3, 5, 7, … (every + // other tick because C is calling in between) — and 1,3,5,7 mod 4 + // is 1,3,1,3 → partitions 0 and 2 never appear as B's start. + bSeen := make(map[uint32]int, partitions) + cSeen := make(map[uint32]int, partitions) + for range 16 { + bSeen[s.nextReceiveFanoutStart("queueB", partitions)]++ + cSeen[s.nextReceiveFanoutStart("queueC", partitions)]++ + } + for p := uint32(0); p < partitions; p++ { + require.Equal(t, 4, bSeen[p], + "queueB partition %d hit %d times; round-4 per-queue isolation broken (cross-queue stride aliasing)", + p, bSeen[p]) + require.Equal(t, 4, cSeen[p], + "queueC partition %d hit %d times; round-4 per-queue isolation broken", + p, cSeen[p]) + } + + // The output must always fall inside [0, partitions). Validates + // the mask-AND contract — partitions is power-of-two. + for range 1024 { + off := s.nextReceiveFanoutStart("queueA", partitions) + require.Less(t, off, partitions, + "offset %d out of range [0, %d)", off, partitions) + } + + // Concurrent receivers across many queues must each get a valid + // offset (no torn reads, no out-of-range values, no nil-counter + // races on the LoadOrStore path). The race detector will catch + // any missed synchronisation on the sync.Map insert. + var wg sync.WaitGroup + const goroutines = 8 + const perGoroutine = 256 + results := make([][]uint32, goroutines) + for i := 0; i < goroutines; i++ { + results[i] = make([]uint32, perGoroutine) + wg.Add(1) + go func(idx int) { + defer wg.Done() + queueName := fmt.Sprintf("concurrent-%d", idx%3) + for j := 0; j < perGoroutine; j++ { + results[idx][j] = s.nextReceiveFanoutStart(queueName, partitions) + } + }(i) + } + wg.Wait() + for _, batch := range results { + for _, off := range batch { + require.Less(t, off, partitions, + "concurrent offset %d out of range", off) + } + } +} + +// TestDropReceiveFanoutCounter_ClearsEntry pins the round-5 +// cleanup contract: a queue's per-queue fanout counter must be +// removed from receiveFanoutCounters once dropReceiveFanoutCounter +// is called, so repeated create/delete of unique queue names +// cannot leak one sync.Map entry per name for the process +// lifetime (Codex P2 on PR #732). The test also asserts the +// no-op shape on absent / never-partitioned queues so the same +// helper is safe to call from the catalog paths regardless of +// whether the queue ever issued a partitioned receive. +func TestDropReceiveFanoutCounter_ClearsEntry(t *testing.T) { + t.Parallel() + + s := &SQSServer{} + + // Drop on an absent queue must be a no-op (no panic, no + // allocation in the map). Calling it before any receive is + // the common path on legacy / non-partitioned queues that + // never populate the counter. + s.dropReceiveFanoutCounter("never-touched") + _, ok := s.receiveFanoutCounters.Load("never-touched") + require.False(t, ok, "no entry must exist for an unused queue name") + + // Populate a counter via the public path, then verify it is + // observable in the underlying sync.Map. + const partitions uint32 = 4 + _ = s.nextReceiveFanoutStart("queueA", partitions) + _, ok = s.receiveFanoutCounters.Load("queueA") + require.True(t, ok, "nextReceiveFanoutStart must populate the counter") + + // Drop and verify removal. A stale entry surviving here is the + // leak Codex flagged. + s.dropReceiveFanoutCounter("queueA") + _, ok = s.receiveFanoutCounters.Load("queueA") + require.False(t, ok, "dropReceiveFanoutCounter must remove the queue's entry") + + // Dropping an unrelated queue must not affect a different + // queue's counter (per-queue isolation contract from round 4 + // must hold across cleanup). + _ = s.nextReceiveFanoutStart("queueB", partitions) + _ = s.nextReceiveFanoutStart("queueC", partitions) + s.dropReceiveFanoutCounter("queueB") + _, ok = s.receiveFanoutCounters.Load("queueB") + require.False(t, ok, "queueB entry must be removed") + _, ok = s.receiveFanoutCounters.Load("queueC") + require.True(t, ok, "queueC entry must survive an unrelated drop") + + // After drop, a new receive on the same queue must allocate a + // fresh counter starting from 0+1 (the Add is unconditional), + // not pick up where the old counter left off. The first call's + // returned offset must be 1 (counter = 1, mask 3 → 1) on a + // fresh atomic.Uint32 — pinning this catches any regression + // that recycled the old pointer. + first := s.nextReceiveFanoutStart("queueA", partitions) + require.Equal(t, uint32(1), first, + "a queue recreated after drop must start from a zero counter (got first offset %d)", + first) +} diff --git a/adapter/sqs_reaper.go b/adapter/sqs_reaper.go index c2a078a4..c870917a 100644 --- a/adapter/sqs_reaper.go +++ b/adapter/sqs_reaper.go @@ -370,7 +370,13 @@ func (s *SQSServer) reapPage(ctx context.Context, queueName string, currentGen u // success — the message has just been touched (received, deleted, // redriven) by another path and is no longer ours to reap. func (s *SQSServer) reapOneRecord(ctx context.Context, queueName string, gen uint64, byAgeKey []byte, messageID string, readTS uint64) error { - dataKey := sqsMsgDataKey(queueName, gen, messageID) + // Reaper iterates legacy byAge entries only in PR 5b-2; the + // partitioned-byAge enumeration ships in a later PR. nil meta + // + partition 0 routes the dispatch helper to the legacy + // constructor so the data-key matches the pre-PR-5b layout + // byte-for-byte. + const partition uint32 = 0 + dataKey := sqsMsgDataKeyDispatch(nil, queueName, partition, gen, messageID) parsed, found, err := s.loadDataForReaper(ctx, dataKey, readTS) if err != nil { return err @@ -513,7 +519,15 @@ func (s *SQSServer) dispatchDedupDelete(ctx context.Context, key []byte, readTS } func (s *SQSServer) buildReapOps(ctx context.Context, queueName string, gen uint64, byAgeKey, dataKey []byte, parsed *sqsMessageRecord, readTS uint64) (*kv.OperationGroup[kv.OP], error) { - visKey := sqsMsgVisKey(queueName, gen, parsed.VisibleAtMillis, parsed.MessageID) + // Reaper currently iterates the legacy byAge keyspace only — the + // partitioned-byAge enumeration is wired in a later PR (Phase 3.D + // PR 6, partition-iterating reaper). Dispatch helpers receive + // nil meta + partition 0 so they deterministically route to the + // legacy constructor and produce byte-identical keys to the + // pre-PR-5b reaper. When PR 6 lands the caller switches to the + // real meta + parsed partition. + const partition uint32 = 0 + visKey := sqsMsgVisKeyDispatch(nil, queueName, partition, gen, parsed.VisibleAtMillis, parsed.MessageID) readKeys := [][]byte{byAgeKey, dataKey, visKey, sqsQueueMetaKey(queueName), sqsQueueGenKey(queueName)} elems := []*kv.Elem[kv.OP]{ {Op: kv.Del, Key: byAgeKey}, @@ -521,8 +535,8 @@ func (s *SQSServer) buildReapOps(ctx context.Context, queueName string, gen uint {Op: kv.Del, Key: visKey}, } if parsed.MessageGroupId != "" { - lockKey := sqsMsgGroupKey(queueName, gen, parsed.MessageGroupId) - lock, err := s.loadFifoGroupLock(ctx, queueName, gen, parsed.MessageGroupId, readTS) + lockKey := sqsMsgGroupKeyDispatch(nil, queueName, partition, gen, parsed.MessageGroupId) + lock, err := s.loadFifoGroupLock(ctx, queueName, nil, partition, gen, parsed.MessageGroupId, readTS) if err != nil { return nil, err } diff --git a/adapter/sqs_receipt_handle_v2_test.go b/adapter/sqs_receipt_handle_v2_test.go index aa02da4a..9ca18a05 100644 --- a/adapter/sqs_receipt_handle_v2_test.go +++ b/adapter/sqs_receipt_handle_v2_test.go @@ -278,53 +278,39 @@ func TestDecodeClientReceiptHandle_AcceptsV1(t *testing.T) { require.Equal(t, uint64(7), back.QueueGeneration) } -// TestDecodeClientReceiptHandle_RejectsV2 pins the codex/coderabbit -// major fix on PR #724 round 1: the v2 codec is added but -// dormant. A client-supplied v2 handle MUST be rejected at the -// public API boundary so the wire format does not leak before -// PR 5b wires the partitioned-FIFO data plane. -// -// Without this gate, a malicious / curious client could re-encode -// a legitimately-issued v1 handle's (queue_gen, message_id, -// receipt_token) under the v2 layout, and DeleteMessage / -// ChangeMessageVisibility would accept it (since downstream -// validation only checks queue_gen + receipt_token). PR 5b -// replaces this gate with a queue-aware version (v1 required on -// non-partitioned queues, v2 required on partitioned ones), so -// the gate-removal lands together with the partitioned data plane. -func TestDecodeClientReceiptHandle_RejectsV2(t *testing.T) { +// TestDecodeClientReceiptHandle_AcceptsV2 pins the PR 5b-2 contract +// shift: the public API wrapper no longer enforces the PR 5a +// blanket v2 rejection. Version validation moved into +// validateReceiptHandleVersion, which the meta-loading callers +// (loadMessageForDelete / loadAndVerifyMessage) invoke once they +// have the queue's PartitionCount in scope. The dormancy promise +// is preserved by the queue-aware check downstream — v2 handles +// against a non-partitioned queue still surface as +// ReceiptHandleIsInvalid (see +// TestValidateReceiptHandleVersion_RejectsV2OnNonPartitioned), +// and the §11 PR 2 dormancy gate still rejects PartitionCount > 1 +// at CreateQueue, so no production queue can be in the +// partitioned branch until PR 5b-3 lifts the gate atomically. +func TestDecodeClientReceiptHandle_AcceptsV2(t *testing.T) { t.Parallel() token := make([]byte, sqsReceiptTokenBytes) h, err := encodeReceiptHandleV2(3, 7, "deadbeefdeadbeefdeadbeefdeadbeef", token) - require.NoError(t, err, - "v2 encoder must succeed even though the public API "+ - "wrapper rejects the result — the codec is dormant, "+ - "not absent") - - // Low-level decoder still accepts v2 (it's pure codec). - back, err := decodeReceiptHandle(h) require.NoError(t, err) - require.Equal(t, sqsReceiptHandleVersion2, back.Version, - "low-level decodeReceiptHandle must keep working — the "+ - "gate is at the public API boundary, not in the codec") - // Public API wrapper rejects v2. - _, err = decodeClientReceiptHandle(h) - require.Error(t, err, - "v2 handle on the public API must fail until PR 5b lifts "+ - "the dormancy gate") - require.Contains(t, err.Error(), "not yet enabled") + back, err := decodeClientReceiptHandle(h) + require.NoError(t, err, + "v2 handle must decode at the public API; version "+ + "validation moved to validateReceiptHandleVersion") + require.Equal(t, sqsReceiptHandleVersion2, back.Version) + require.Equal(t, uint32(3), back.Partition) + require.Equal(t, uint64(7), back.QueueGeneration) } // TestDecodeClientReceiptHandle_PassesThroughDecodeErrors pins // that decode-error propagation is unchanged — a malformed blob -// still surfaces the underlying base64 / length error rather than -// being masked by the dormancy-gate message. +// still surfaces the underlying base64 / length error. func TestDecodeClientReceiptHandle_PassesThroughDecodeErrors(t *testing.T) { t.Parallel() _, err := decodeClientReceiptHandle("!!!" + strings.Repeat("?", 50)) require.Error(t, err) - require.NotContains(t, err.Error(), "not yet enabled", - "a decode-step error must NOT be reported as the "+ - "dormancy-gate message") } diff --git a/adapter/sqs_redrive.go b/adapter/sqs_redrive.go index 4e108486..634c5058 100644 --- a/adapter/sqs_redrive.go +++ b/adapter/sqs_redrive.go @@ -114,7 +114,7 @@ func shouldRedrive(rec *sqsMessageRecord, policy *parsedRedrivePolicy) bool { func (s *SQSServer) redriveCandidateToDLQ( ctx context.Context, srcQueueName string, - srcGen uint64, + srcMeta *sqsQueueMeta, cand sqsMsgCandidate, srcDataKey []byte, srcRec *sqsMessageRecord, @@ -147,7 +147,7 @@ func (s *SQSServer) redriveCandidateToDLQ( if err != nil { return false, err } - req, err := s.buildRedriveOps(ctx, srcQueueName, srcGen, cand, srcDataKey, srcRec, policy, dlqMeta, dlqRec, dlqRecordBytes, dlqSeq, readTS) + req, err := s.buildRedriveOps(ctx, srcQueueName, srcMeta, cand, srcDataKey, srcRec, policy, dlqMeta, dlqRec, dlqRecordBytes, dlqSeq, readTS) if err != nil { return false, err } @@ -278,7 +278,7 @@ func buildDLQRecord(srcRec *sqsMessageRecord, dlqMeta *sqsQueueMeta, srcArn stri func (s *SQSServer) buildRedriveOps( ctx context.Context, srcQueueName string, - srcGen uint64, + srcMeta *sqsQueueMeta, cand sqsMsgCandidate, srcDataKey []byte, srcRec *sqsMessageRecord, @@ -289,11 +289,17 @@ func (s *SQSServer) buildRedriveOps( dlqSeq uint64, readTS uint64, ) (*kv.OperationGroup[kv.OP], error) { + srcGen := srcMeta.Generation now := dlqRec.SendTimestampMillis - dlqDataKey := sqsMsgDataKey(policy.DLQName, dlqMeta.Generation, dlqRec.MessageID) - dlqVisKey := sqsMsgVisKey(policy.DLQName, dlqMeta.Generation, now, dlqRec.MessageID) - dlqByAgeKey := sqsMsgByAgeKey(policy.DLQName, dlqMeta.Generation, now, dlqRec.MessageID) - srcByAgeKey := sqsMsgByAgeKey(srcQueueName, srcGen, srcRec.SendTimestampMillis, srcRec.MessageID) + // DLQ partition for FIFO sources: redrive carries the source's + // MessageGroupId forward, so the DLQ partition is the result of + // hashing that group through the DLQ's partitionFor. Standard + // DLQs (or any DLQ with PartitionCount <= 1) collapse this to 0. + dlqPartition := partitionFor(dlqMeta, dlqRec.MessageGroupId) + dlqDataKey := sqsMsgDataKeyDispatch(dlqMeta, policy.DLQName, dlqPartition, dlqMeta.Generation, dlqRec.MessageID) + dlqVisKey := sqsMsgVisKeyDispatch(dlqMeta, policy.DLQName, dlqPartition, dlqMeta.Generation, now, dlqRec.MessageID) + dlqByAgeKey := sqsMsgByAgeKeyDispatch(dlqMeta, policy.DLQName, dlqPartition, dlqMeta.Generation, now, dlqRec.MessageID) + srcByAgeKey := sqsMsgByAgeKeyDispatch(srcMeta, srcQueueName, cand.partition, srcGen, srcRec.SendTimestampMillis, srcRec.MessageID) readKeys := [][]byte{ cand.visKey, srcDataKey, sqsQueueMetaKey(srcQueueName), sqsQueueGenKey(srcQueueName), @@ -317,8 +323,8 @@ func (s *SQSServer) buildRedriveOps( }) } if srcRec.MessageGroupId != "" { - lockKey := sqsMsgGroupKey(srcQueueName, srcGen, srcRec.MessageGroupId) - lock, err := s.loadFifoGroupLock(ctx, srcQueueName, srcGen, srcRec.MessageGroupId, readTS) + lockKey := sqsMsgGroupKeyDispatch(srcMeta, srcQueueName, cand.partition, srcGen, srcRec.MessageGroupId) + lock, err := s.loadFifoGroupLock(ctx, srcQueueName, srcMeta, cand.partition, srcGen, srcRec.MessageGroupId, readTS) if err != nil { return nil, err }