Skip to content
17 changes: 17 additions & 0 deletions adapter/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"strconv"
"strings"
"sync"
"time"

"github.com/bootjp/elastickv/kv"
Expand Down Expand Up @@ -167,6 +168,22 @@ type SQSServer struct {
// throttle config so unconfigured queues pay one nil-check per
// request and nothing else (see sqs_throttle.go).
throttle *bucketStore
// receiveFanoutCounters maps queueName → *atomic.Uint32 so each
// partitioned queue gets its own round-robin starting partition.
// Codex P1 round 4 flagged that a server-wide counter aliases
// across queues: when other queues' receives interleave with a
// stride that shares a factor with PartitionCount, the queue's
// observed counter subsequence cycles through only a subset of
// partitions, which can starve the rest under MaxNumberOfMessages
// pressure on the early-scanned ones. Per-queue isolation makes
// each queue's rotation depend solely on its own receive cadence.
//
// sync.Map is the right shape: lookups are read-mostly (the same
// queue keeps getting the same counter), and the keyset grows
// only with the number of distinct queues this server has handled
// receives for in-process — bounded by the operator-controlled
// CreateQueue rate.
receiveFanoutCounters sync.Map
}

// WithSQSLeaderMap configures the Raft-address-to-SQS-address mapping used to
Expand Down
12 changes: 12 additions & 0 deletions adapter/sqs_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,12 @@ func (s *SQSServer) tryCreateQueueOnce(ctx context.Context, requested *sqsQueueM
// the new queue starts with a fresh full-capacity bucket
// regardless of in-flight traffic to the prior incarnation.
s.throttle.invalidateQueue(requested.Name)
// Mirror the throttle invalidate for the per-queue fanout-rotation
// counter. A delete-then-create race could otherwise leave the
// new queue starting partitioned receives at the previous
// incarnation's counter offset (harmless for correctness, but
// ties new-queue routing to old-queue history).
s.dropReceiveFanoutCounter(requested.Name)
return true, nil
}

Expand All @@ -1038,6 +1044,12 @@ func (s *SQSServer) deleteQueue(w http.ResponseWriter, r *http.Request) {
// surprising operators who use DeleteQueue+CreateQueue to reset
// queue state.
s.throttle.invalidateQueue(name)
// Drop the per-queue fanout-rotation counter as well. Without
// this, repeated DeleteQueue of unique queue names retains one
// receiveFanoutCounters entry per name for the process lifetime
// — a leak in multi-tenant / high-churn deployments. Codex P2,
// PR #732 round 5.
s.dropReceiveFanoutCounter(name)
// SQS DeleteQueue returns 200 with an empty body.
writeSQSJSON(w, map[string]any{})
}
Expand Down
39 changes: 25 additions & 14 deletions adapter/sqs_fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,12 @@ func resolveFifoDedupID(meta *sqsQueueMeta, in sqsSendMessageInput) string {
// or (nil, nil) when there is no live record for this dedup-id.
// Expired records are surfaced as nil so a stale entry does not block
// a fresh send within the same FIFO queue.
func (s *SQSServer) loadFifoDedupRecord(ctx context.Context, queueName string, gen uint64, dedupID string, readTS uint64) (*sqsFifoDedupRecord, []byte, error) {
key := sqsMsgDedupKey(queueName, gen, dedupID)
//
// groupID is the MessageGroupId; it participates in the partitioned
// dedup key so that two groups colliding onto the same partition keep
// disjoint dedup namespaces (the AWS messageGroup-scope contract).
func (s *SQSServer) loadFifoDedupRecord(ctx context.Context, queueName string, meta *sqsQueueMeta, partition uint32, gen uint64, groupID, dedupID string, readTS uint64) (*sqsFifoDedupRecord, []byte, error) {
key := sqsMsgDedupKeyDispatch(meta, queueName, partition, gen, groupID, dedupID)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reap partitioned dedup records to prevent key growth

loadFifoDedupRecord now resolves dedup keys via sqsMsgDedupKeyDispatch, which routes PartitionCount > 1 queues to the partitioned dedup prefix, but the dedup reaper still scans only the legacy SqsMsgDedupPrefix + encodeSQSSegment(queueName) range in adapter/sqs_reaper.go. Once partitioned FIFO is enabled, expired dedup entries for those queues will never be swept, so high-cardinality MessageDeduplicationId traffic causes unbounded dedup-key accumulation and steadily worse storage/scan overhead.

Useful? React with 👍 / 👎.

raw, err := s.store.GetAt(ctx, key, readTS)
if err != nil {
if errors.Is(err, store.ErrKeyNotFound) {
Expand Down Expand Up @@ -130,11 +134,11 @@ func (s *SQSServer) loadFifoSequence(ctx context.Context, queueName string, read

// loadFifoGroupLock fetches the in-flight lock for a group, if any.
// Returns nil when no lock is held. Callers that also need the key
// can recompute it via sqsMsgGroupKey — the helper used to return it
// alongside the lock, but every caller already had the key in scope
// from a different code path.
func (s *SQSServer) loadFifoGroupLock(ctx context.Context, queueName string, gen uint64, groupID string, readTS uint64) (*sqsFifoGroupLock, error) {
key := sqsMsgGroupKey(queueName, gen, groupID)
// can recompute it via sqsMsgGroupKeyDispatch — the helper used to
// return it alongside the lock, but every caller already had the
// key in scope from a different code path.
func (s *SQSServer) loadFifoGroupLock(ctx context.Context, queueName string, meta *sqsQueueMeta, partition uint32, gen uint64, groupID string, readTS uint64) (*sqsFifoGroupLock, error) {
key := sqsMsgGroupKeyDispatch(meta, queueName, partition, gen, groupID)
raw, err := s.store.GetAt(ctx, key, readTS)
if err != nil {
if errors.Is(err, store.ErrKeyNotFound) {
Expand Down Expand Up @@ -172,7 +176,14 @@ func (s *SQSServer) sendFifoMessage(
delay int64,
readTS uint64,
) (map[string]string, bool, error) {
dedup, dedupKey, err := s.loadFifoDedupRecord(ctx, queueName, meta.Generation, dedupID, readTS)
// HT-FIFO: hash the MessageGroupId once at the entry point so
// every key built in this transaction (data, vis, byage, dedup,
// group-lock, sequence) lands in the same partition. partitionFor
// returns 0 on legacy / non-partitioned queues and on the
// perQueue throughput short-circuit, so the dispatch helpers
// round-trip to legacy output for those cases.
partition := partitionFor(meta, in.MessageGroupId)
dedup, dedupKey, err := s.loadFifoDedupRecord(ctx, queueName, meta, partition, meta.Generation, in.MessageGroupId, dedupID, readTS)
if err != nil {
return nil, false, err
}
Expand Down Expand Up @@ -216,9 +227,9 @@ func (s *SQSServer) sendFifoMessage(
return nil, false, errors.WithStack(err)
}

dataKey := sqsMsgDataKey(queueName, meta.Generation, rec.MessageID)
visKey := sqsMsgVisKey(queueName, meta.Generation, rec.AvailableAtMillis, rec.MessageID)
byAgeKey := sqsMsgByAgeKey(queueName, meta.Generation, rec.SendTimestampMillis, rec.MessageID)
dataKey := sqsMsgDataKeyDispatch(meta, queueName, partition, meta.Generation, rec.MessageID)
visKey := sqsMsgVisKeyDispatch(meta, queueName, partition, meta.Generation, rec.AvailableAtMillis, rec.MessageID)
byAgeKey := sqsMsgByAgeKeyDispatch(meta, queueName, partition, meta.Generation, rec.SendTimestampMillis, rec.MessageID)
seqKey := sqsQueueSeqKey(queueName)
metaKey := sqsQueueMetaKey(queueName)
genKey := sqsQueueGenKey(queueName)
Expand Down Expand Up @@ -270,9 +281,9 @@ const (

// classifyFifoGroupLock decides whether a FIFO candidate is eligible
// for delivery. Standard queues bypass the function entirely.
func (s *SQSServer) classifyFifoGroupLock(ctx context.Context, queueName string, gen uint64, rec *sqsMessageRecord, readTS uint64) (fifoCandidateLockState, []byte, error) {
lockKey := sqsMsgGroupKey(queueName, gen, rec.MessageGroupId)
lock, err := s.loadFifoGroupLock(ctx, queueName, gen, rec.MessageGroupId, readTS)
func (s *SQSServer) classifyFifoGroupLock(ctx context.Context, queueName string, meta *sqsQueueMeta, partition uint32, gen uint64, rec *sqsMessageRecord, readTS uint64) (fifoCandidateLockState, []byte, error) {
lockKey := sqsMsgGroupKeyDispatch(meta, queueName, partition, gen, rec.MessageGroupId)
lock, err := s.loadFifoGroupLock(ctx, queueName, meta, partition, gen, rec.MessageGroupId, readTS)
if err != nil {
return fifoLockSkip, lockKey, err
}
Expand Down
29 changes: 23 additions & 6 deletions adapter/sqs_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,18 +312,35 @@ func sqsPartitionedMsgVisPrefixForQueue(queueName string, partition uint32, gen
}

// sqsPartitionedMsgDedupKey builds the FIFO dedup key for a
// partitioned queue. The dedup window is per-partition by design
// (DeduplicationScope=messageGroup with PartitionCount>1) — the
// validator in adapter/sqs_partitioning.go rejects the queue-scoped
// scope on partitioned queues, so this key shape is always reachable
// from the same partition that ran the dedup check.
func sqsPartitionedMsgDedupKey(queueName string, partition uint32, gen uint64, dedupID string) []byte {
// partitioned queue. DeduplicationScope=messageGroup (the only
// scope reachable on partitioned queues — the validator in
// adapter/sqs_partitioning.go rejects queue-scoped dedup) requires
// the dedup window to be per (queue, group, dedupID): two distinct
// MessageGroupIds that happen to FNV-collide onto the same partition
// must NOT share a dedup namespace, otherwise a fresh send in group B
// would be silently acked with group A's MessageId. Per design doc
// §4.1 line 200, the dedup key keys on (queue, partition, group,
// dedupID); partition is redundant given the deterministic group→
// partition map but is kept for keyspace organization (every key
// belonging to a group lives under the same partition prefix).
func sqsPartitionedMsgDedupKey(queueName string, partition uint32, gen uint64, groupID, dedupID string) []byte {
buf := make([]byte, 0, len(SqsPartitionedMsgDedupPrefix)+sqsKeyCapLarge)
buf = append(buf, SqsPartitionedMsgDedupPrefix...)
buf = append(buf, encodeSQSSegment(queueName)...)
buf = append(buf, sqsPartitionedQueueTerminator)
buf = appendU32(buf, partition)
buf = appendU64(buf, gen)
buf = append(buf, encodeSQSSegment(groupID)...)
// Terminator between the variable-length groupID and dedupID
// segments. encodeSQSSegment uses base64.RawURLEncoding (no
// padding), so back-to-back segments are not unambiguously
// splittable and distinct (groupID, dedupID) pairs can collapse
// onto the same key — reintroducing the cross-group false-
// duplicate class the round-3 dedup-scoping fix closed. The
// terminator '|' is safe because RawURLEncoding never emits it
// (alphabet is A-Z a-z 0-9 - _; see sqsPartitionedQueueTerminator
// docs). CodeRabbit major, PR #732 round 6.
buf = append(buf, sqsPartitionedQueueTerminator)
buf = append(buf, encodeSQSSegment(dedupID)...)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
return buf
}
Expand Down
100 changes: 95 additions & 5 deletions adapter/sqs_keys_dispatch.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package adapter

import (
"bytes"

"github.com/cockroachdb/errors"
)

// Per-key dispatch helpers that route to the legacy single-partition
// constructor or the partitioned-FIFO constructor based on
// meta.PartitionCount. Phase 3.D PR 5b's central abstraction:
Expand Down Expand Up @@ -47,12 +53,17 @@ func sqsMsgVisKeyDispatch(meta *sqsQueueMeta, queueName string, partition uint32
}

// sqsMsgDedupKeyDispatch builds the FIFO dedup key for either
// keyspace. Dedup scope is per-partition on partitioned queues
// (DeduplicationScope = messageGroup is enforced by the validator
// on PartitionCount > 1).
func sqsMsgDedupKeyDispatch(meta *sqsQueueMeta, queueName string, partition uint32, gen uint64, dedupID string) []byte {
// keyspace. On a partitioned queue (DeduplicationScope = messageGroup
// — enforced by the validator on PartitionCount > 1) the dedup window
// must be per (queue, group, dedupID): two distinct MessageGroupIds
// that FNV-collide onto the same partition must NOT share a dedup
// namespace, otherwise a fresh send in group B is silently acked
// with group A's MessageId. The legacy (non-partitioned) branch
// keeps the legacy (queue, gen, dedupID) shape — there is only one
// implicit "group" so no collision is possible there.
func sqsMsgDedupKeyDispatch(meta *sqsQueueMeta, queueName string, partition uint32, gen uint64, groupID, dedupID string) []byte {
if meta != nil && meta.PartitionCount > 1 {
return sqsPartitionedMsgDedupKey(queueName, partition, gen, dedupID)
return sqsPartitionedMsgDedupKey(queueName, partition, gen, groupID, dedupID)
}
return sqsMsgDedupKey(queueName, gen, dedupID)
}
Expand Down Expand Up @@ -116,3 +127,82 @@ func effectivePartitionCount(meta *sqsQueueMeta) uint32 {
}
return meta.PartitionCount
}

// sqsMsgVisScanBoundsDispatch returns the start/end byte ranges that
// ReceiveMessage's per-partition visibility-index scan iterates.
// Mirrors sqsMsgVisScanBounds (legacy keyspace) but parametrises the
// prefix on partition when the queue is partitioned. The bounds are
// always [prefix||u64(0), prefix||u64(maxVisibleAtMillis+1)) so
// messages with visible_at == maxVisibleAtMillis are included.
func sqsMsgVisScanBoundsDispatch(meta *sqsQueueMeta, queueName string, partition uint32, gen uint64, maxVisibleAtMillis int64) (start, end []byte) {
prefix := sqsMsgVisPrefixForQueueDispatch(meta, queueName, partition, gen)
start = append(bytes.Clone(prefix), zeroU64()...)
upper := uint64MaxZero(maxVisibleAtMillis)
if upper < ^uint64(0) {
upper++
}
end = append(bytes.Clone(prefix), encodedU64(upper)...)
return start, end
}

// encodeReceiptHandleDispatch picks the receipt-handle wire format
// based on meta.PartitionCount: v1 on legacy / non-partitioned
// queues, v2 on partitioned ones. The partition argument is only
// consulted on the v2 branch — callers may pass 0 on the legacy
// branch.
//
// This is the single point where a fresh receipt handle commits to
// a wire version. Pairing the choice with the same meta.PartitionCount
// the dispatch helpers used to build keys keeps the handle's
// recorded partition consistent with the partition the message was
// stored under, so a later DeleteMessage / ChangeMessageVisibility
// routes to the right keyspace.
func encodeReceiptHandleDispatch(meta *sqsQueueMeta, partition uint32, queueGen uint64, messageIDHex string, receiptToken []byte) (string, error) {
if meta != nil && meta.PartitionCount > 1 {
return encodeReceiptHandleV2(partition, queueGen, messageIDHex, receiptToken)
}
return encodeReceiptHandle(queueGen, messageIDHex, receiptToken)
}

// validateReceiptHandleVersion enforces the queue-aware version
// rule that replaced the dormancy gate from PR 5a:
//
// - meta.PartitionCount <= 1 (legacy / non-partitioned queue):
// handle MUST be v1. A v2 handle on a non-partitioned queue
// is structurally impossible (SendMessage would never have
// produced one) and accepting it would let a malicious caller
// re-encode a v1 handle as v2 to probe / corrupt the v2 layout
// before any partitioned queue exists.
// - meta.PartitionCount > 1 (partitioned queue): handle MUST be
// v2. A v1 handle on a partitioned queue carries no partition
// index, so dispatch would default to partition 0 and the
// delete / change-visibility would silently miss messages on
// other partitions.
//
// Mismatches surface as ReceiptHandleIsInvalid (the same AWS error
// shape used for malformed handles), so a misrouted client cannot
// distinguish "wrong version" from "garbled bytes" — preserving the
// PR 5a / PR 724 round 3 dormancy guarantee that the v2 wire format
// is not probeable from the public API.
func validateReceiptHandleVersion(meta *sqsQueueMeta, handle *decodedReceiptHandle) error {
if handle == nil {
return errors.New("receipt handle is nil")
}
if meta != nil && meta.PartitionCount > 1 {
if handle.Version != sqsReceiptHandleVersion2 {
return errors.New("receipt handle version mismatch for partitioned queue")
}
// handle.Partition is client-controlled once decodeClientReceiptHandle
// accepts v2. Without this bound check, an out-of-range partition
// falls through to sqsMsg*KeyDispatch and depends on downstream
// routing failure semantics instead of returning ReceiptHandleIsInvalid.
if handle.Partition >= meta.PartitionCount {
return errors.New("receipt handle partition out of range for queue")
}
return nil
}
if handle.Version != sqsReceiptHandleVersion1 {
return errors.New("receipt handle version mismatch for non-partitioned queue")
}
return nil
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reject out-of-range v2 receipt-handle partitions

The queue-aware version check now accepts any v2 handle on partitioned queues but never validates handle.Partition < meta.PartitionCount. A forged v2 handle with an out-of-range partition is then used to build partitioned keys in delete/change-visibility paths; the partition resolver fails closed for unknown partitions, which bubbles up as a generic internal error instead of ReceiptHandleIsInvalid. This is reachable once partitioned queues are enabled.

Useful? React with 👍 / 👎.

Loading
Loading