diff --git a/adapter/sqs_partitioned_dispatch_test.go b/adapter/sqs_partitioned_dispatch_test.go index abb03106..76916b85 100644 --- a/adapter/sqs_partitioned_dispatch_test.go +++ b/adapter/sqs_partitioned_dispatch_test.go @@ -1,6 +1,7 @@ package adapter import ( + "bytes" "context" "fmt" "net/http" @@ -648,3 +649,194 @@ func countPartitionedRows(t *testing.T, node Node, queueName string, gen uint64) } return total } + +// TestSQSServer_PartitionedFIFO_LiveQueueDedupReaperPartitions +// pins the PR 6b contract for live queues: reapExpiredDedup must +// walk the partitioned dedup keyspace too, otherwise expired +// dedup records on partitioned FIFO queues leak forever (the +// pre-PR-6b reaper only scanned SqsMsgDedupPrefix, which is empty +// for partitioned queues). Closes the live-queue half of the +// Codex P2 deferred from PR 5b-2 round 0. +func TestSQSServer_PartitionedFIFO_LiveQueueDedupReaperPartitions(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + const queueName = "live-dedup-reap.fifo" + status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{ + "QueueName": queueName, + "Attributes": map[string]string{"FifoQueue": "true"}, + }) + require.Equal(t, http.StatusOK, status, "create queue: %v", out) + queueURL, _ := out["QueueUrl"].(string) + require.NotEmpty(t, queueURL) + + const partitions uint32 = 4 + installPartitionedMetaForTest(t, node, queueName, partitions, htfifoThroughputPerMessageGroupID) + + // Send across distinct groups so dedup records land on more + // than one partition; partitionFor (FNV-1a) gives reasonable + // spread for these inputs. + groups := []string{"alpha", "beta", "gamma", "delta", "epsilon", "zeta"} + for _, g := range groups { + status, out := callSQS(t, node, sqsSendMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MessageBody": "body-" + g, + "MessageGroupId": g, + "MessageDeduplicationId": "dedup-" + g, + }) + require.Equal(t, http.StatusOK, status, "send (group=%s): %v", g, out) + } + + srv := node.sqsServer + ctx := t.Context() + + // Sanity: at least one partitioned dedup row exists pre-reap. + preReap := countPartitionedDedupRowsAcrossPartitions(t, node, queueName, partitions) + require.Positive(t, preReap, "expected partitioned dedup rows after sends") + + // Backdate every partitioned dedup record's ExpiresAtMillis + // so the reaper's value-based expiry filter trips on every + // row. Same approach as TestSQSServer_RetentionReaperDropsExpiredFifoDedup + // for the legacy keyspace; here applied per-partition prefix. + readTS := srv.nextTxnReadTS(ctx) + now := time.Now().UnixMilli() + for partition := uint32(0); partition < partitions; partition++ { + prefix := sqsPartitionedMsgDedupKeyPrefix(queueName, partition, 1) + rows, err := srv.store.ScanAt(ctx, prefix, prefixScanEnd(prefix), 1024, readTS) + require.NoError(t, err) + for _, row := range rows { + rec, err := decodeFifoDedupRecord(row.Value) + require.NoError(t, err) + rec.ExpiresAtMillis = now - 1000 + body, err := encodeFifoDedupRecord(rec) + require.NoError(t, err) + req := &kv.OperationGroup[kv.OP]{ + IsTxn: true, + StartTS: readTS, + Elems: []*kv.Elem[kv.OP]{ + {Op: kv.Put, Key: bytes.Clone(row.Key), Value: body}, + }, + } + _, err = srv.coordinator.Dispatch(ctx, req) + require.NoError(t, err) + } + } + + // Drive one reaper pass — the live-queue dedup walk now + // dispatches reapExpiredDedupPartitioned for partitioned + // queues (PR 6b). + require.NoError(t, srv.reapAllQueues(ctx), "reapAllQueues") + + postReap := countPartitionedDedupRowsAcrossPartitions(t, node, queueName, partitions) + require.Zero(t, postReap, + "expired partitioned dedup records must be reaped (got %d remaining)", + postReap) +} + +// countPartitionedDedupRowsAcrossPartitions sums the rows across +// every partition's partitioned dedup prefix at gen=1 (the +// generation a freshly-created queue lands on). Test-helper +// twin of countPartitionedRows scoped to dedup. +func countPartitionedDedupRowsAcrossPartitions(t *testing.T, node Node, queueName string, partitions uint32) int { + t.Helper() + ctx := context.Background() + readTS := node.sqsServer.nextTxnReadTS(ctx) + total := 0 + for partition := uint32(0); partition < partitions; partition++ { + prefix := sqsPartitionedMsgDedupKeyPrefix(queueName, partition, 1) + rows, err := node.sqsServer.store.ScanAt(ctx, prefix, prefixScanEnd(prefix), 1024, readTS) + require.NoError(t, err) + total += len(rows) + } + return total +} + +// TestClassifyPartitionedByAgeEntry pins every branch of the +// classify helper that decides which partitioned byage rows the +// live-queue reaper sweeps. The integration tests above cover +// the orphan-gen path (tombstoned cohort) and the dedup-expiry +// path; this unit test fills in the two live-queue paths that +// have no integration coverage today: the retention-cutoff branch +// (live gen, sendTs <= cutoff → reapable) and the future-gen +// guard (parsed.gen > currentGen → not reapable, defensive +// against a meta-vs-byage race). +func TestClassifyPartitionedByAgeEntry(t *testing.T) { + t.Parallel() + const ( + queueName = "live-byage.fifo" + partition = uint32(2) + currentGen = uint64(5) + cutoff = int64(1_000_000) + messageID = "msg-classify" + ) + + type want struct { + reapable bool + expectedGen uint64 + expectedPartn uint32 + expectedSendTs int64 + expectedMsgID string + assertParsedSet bool // expectedGen / Partition / SendTs / MsgID checked only when true + } + cases := []struct { + name string + key []byte + want want + }{ + { + name: "live gen within retention window — not reapable", + key: sqsPartitionedMsgByAgeKey(queueName, partition, currentGen, cutoff+1, messageID), + want: want{reapable: false, expectedGen: currentGen, expectedPartn: partition, expectedSendTs: cutoff + 1, expectedMsgID: messageID, assertParsedSet: true}, + }, + { + name: "live gen past retention window — reapable (cutoff branch)", + key: sqsPartitionedMsgByAgeKey(queueName, partition, currentGen, cutoff-1, messageID), + want: want{reapable: true, expectedGen: currentGen, expectedPartn: partition, expectedSendTs: cutoff - 1, expectedMsgID: messageID, assertParsedSet: true}, + }, + { + name: "live gen exactly at cutoff — reapable (>, not >=)", + key: sqsPartitionedMsgByAgeKey(queueName, partition, currentGen, cutoff, messageID), + want: want{reapable: true, expectedGen: currentGen, expectedPartn: partition, expectedSendTs: cutoff, expectedMsgID: messageID, assertParsedSet: true}, + }, + { + name: "orphan generation (gen < currentGen) — reapable unconditionally", + key: sqsPartitionedMsgByAgeKey(queueName, partition, currentGen-1, cutoff+1_000_000, messageID), + want: want{reapable: true, expectedGen: currentGen - 1, expectedPartn: partition, expectedSendTs: cutoff + 1_000_000, expectedMsgID: messageID, assertParsedSet: true}, + }, + { + name: "future generation (gen > currentGen) — not reapable (gen-race guard)", + key: sqsPartitionedMsgByAgeKey(queueName, partition, currentGen+1, cutoff-1, messageID), + want: want{reapable: false, expectedGen: currentGen + 1, expectedPartn: partition, expectedSendTs: cutoff - 1, expectedMsgID: messageID, assertParsedSet: true}, + }, + { + name: "wrong partition — not reapable (page bleed defense)", + key: sqsPartitionedMsgByAgeKey(queueName, partition+1, currentGen, cutoff-1, messageID), + want: want{reapable: false, assertParsedSet: false}, + }, + { + name: "wrong queue prefix — not reapable (parse fails)", + key: sqsPartitionedMsgByAgeKey("different.fifo", partition, currentGen, cutoff-1, messageID), + want: want{reapable: false, assertParsedSet: false}, + }, + { + name: "legacy byage key — not reapable (parse fails)", + key: sqsMsgByAgeKey(queueName, currentGen, cutoff-1, messageID), + want: want{reapable: false, assertParsedSet: false}, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + parsed, reapable := classifyPartitionedByAgeEntry(tc.key, queueName, partition, currentGen, cutoff) + require.Equal(t, tc.want.reapable, reapable, "reapable") + if tc.want.assertParsedSet { + require.Equal(t, tc.want.expectedGen, parsed.Generation, "Generation") + require.Equal(t, tc.want.expectedPartn, parsed.Partition, "Partition") + require.Equal(t, tc.want.expectedSendTs, parsed.SendTimestampMs, "SendTimestampMs") + require.Equal(t, tc.want.expectedMsgID, parsed.MessageID, "MessageID") + } + }) + } +} diff --git a/adapter/sqs_reaper.go b/adapter/sqs_reaper.go index 5a3f743a..ae7f6c20 100644 --- a/adapter/sqs_reaper.go +++ b/adapter/sqs_reaper.go @@ -85,7 +85,7 @@ func (s *SQSServer) reapAllQueues(ctx context.Context) error { if err := s.reapQueue(ctx, name, meta, readTS); err != nil { slog.Warn("sqs reaper queue pass failed", "queue", name, "err", err) } - if err := s.reapExpiredDedup(ctx, name, readTS); err != nil { + if err := s.reapExpiredDedup(ctx, name, meta, readTS); err != nil { slog.Warn("sqs dedup reaper pass failed", "queue", name, "err", err) } } @@ -425,6 +425,39 @@ func (s *SQSServer) reapQueue(ctx context.Context, queueName string, meta *sqsQu // for the live generation so we never delete live records. cutoff = 0 } + // Legacy byage scan — always runs. For non-partitioned queues + // this is the only path; for partitioned queues this also + // catches any defensive legacy entry that might have leaked + // in (the data plane never writes here on partitioned queues + // today, but the sweep is idempotent and cheap). + if err := s.reapQueueLegacy(ctx, queueName, meta.Generation, cutoff, readTS); err != nil { + return err + } + // Partitioned byage scan — one per partition under its own + // per-partition budget. Per the §6 split-queue-FIFO design, + // the per-queue budget becomes a per-partition budget so a + // 32-partition queue cannot starve other queues; instead its + // reap completes in partitions × budget time per cycle, which + // at the 30s reaper interval is well within budget. + if meta.PartitionCount > 1 { + for partition := uint32(0); partition < meta.PartitionCount; partition++ { + if err := ctx.Err(); err != nil { + return errors.WithStack(err) + } + if err := s.reapQueuePartition(ctx, queueName, partition, meta.Generation, cutoff, readTS); err != nil { + return err + } + } + } + return nil +} + +// reapQueueLegacy is the existing legacy-keyspace byage walk for +// one queue, factored out of reapQueue so the partitioned twin +// (reapQueuePartition) can sit alongside it under the same +// per-budget contract. Behaviour for non-partitioned queues is +// byte-identical to pre-PR-6b reapQueue. +func (s *SQSServer) reapQueueLegacy(ctx context.Context, queueName string, currentGen uint64, cutoff int64, readTS uint64) error { prefix := sqsMsgByAgePrefixAllGenerations(queueName) upper := prefixScanEnd(prefix) start := bytes.Clone(prefix) @@ -438,7 +471,7 @@ func (s *SQSServer) reapQueue(ctx context.Context, queueName string, meta *sqsQu if len(page) == 0 { return nil } - done, newProcessed, err := s.reapPage(ctx, queueName, meta.Generation, cutoff, page, readTS, processed) + done, newProcessed, err := s.reapPage(ctx, queueName, currentGen, cutoff, page, readTS, processed) if err != nil { return err } @@ -454,6 +487,108 @@ func (s *SQSServer) reapQueue(ctx context.Context, queueName string, meta *sqsQu return nil } +// reapQueuePartition is the partitioned-keyspace twin of +// reapQueueLegacy. Walks one partition's byage prefix family +// across all generations the partitioned-byage prefix matches +// (parseSqsPartitionedMsgByAgeKey returns the gen embedded in +// each key) and reaps each entry past the retention cutoff (live +// gen) or unconditionally (any older gen — orphan from a prior +// PurgeQueue). +// +// Per-partition budget rather than per-queue: a 32-partition +// queue therefore allows up to 32 × sqsReaperPerQueueBudget +// records per tick, which the 30s tick interval comfortably +// absorbs (§6 design contract). +func (s *SQSServer) reapQueuePartition(ctx context.Context, queueName string, partition uint32, currentGen uint64, cutoff int64, readTS uint64) error { + // Note: the partitioned-byage prefix embeds (queue, partition) + // but not the generation, so this scan walks every generation + // for that partition. reapPartitionedPage filters per-entry by + // the (currentGen, cutoff) live-vs-orphan rules, mirroring + // reapPage on the legacy path. + prefix := []byte{} + prefix = append(prefix, SqsPartitionedMsgByAgePrefix...) + prefix = append(prefix, encodeSQSSegment(queueName)...) + prefix = append(prefix, sqsPartitionedQueueTerminator) + prefix = appendU32(prefix, partition) + upper := prefixScanEnd(prefix) + start := bytes.Clone(prefix) + + processed := 0 + for processed < sqsReaperPerQueueBudget { + page, err := s.store.ScanAt(ctx, start, upper, sqsReaperPageLimit, readTS) + if err != nil { + return errors.WithStack(err) + } + if len(page) == 0 { + return nil + } + done, newProcessed, err := s.reapPartitionedPage(ctx, queueName, partition, currentGen, cutoff, page, readTS, processed) + if err != nil { + return err + } + processed = newProcessed + if done { + return nil + } + start = nextScanCursorAfter(page[len(page)-1].Key) + if bytes.Compare(start, upper) >= 0 { + return nil + } + } + return nil +} + +// reapPartitionedPage is the partitioned twin of reapPage. Same +// live-vs-orphan classification as reapPage, but parses each entry +// as a partitioned byage key and routes the dispatch through +// reapOneRecordPartitioned so the dispatch helpers build +// partitioned data / vis / group keys instead of legacy ones. +func (s *SQSServer) reapPartitionedPage(ctx context.Context, queueName string, partition uint32, currentGen uint64, cutoff int64, page []*store.KVPair, readTS uint64, processed int) (bool, int, error) { + for _, kvp := range page { + if err := ctx.Err(); err != nil { + return true, processed, errors.WithStack(err) + } + parsed, reapable := classifyPartitionedByAgeEntry(kvp.Key, queueName, partition, currentGen, cutoff) + if !reapable { + continue + } + if err := s.reapOneRecordPartitioned(ctx, queueName, partition, parsed.Generation, kvp.Key, parsed.MessageID, readTS); err != nil { + return true, processed, err + } + processed++ + if processed >= sqsReaperPerQueueBudget { + return true, processed, nil + } + } + if len(page) < sqsReaperPageLimit { + return true, processed, nil + } + return false, processed, nil +} + +// classifyPartitionedByAgeEntry parses a candidate partitioned +// byage key and decides whether it should be reaped this pass. +// Returns reapable=false for entries that do not match the +// partition (page bleed across partitions, defensive), live +// entries inside their retention window, or future-generation +// rows from a meta read that hasn't caught up yet. Pulled out of +// reapPartitionedPage so the loop body stays under the cyclop +// ceiling. +func classifyPartitionedByAgeEntry(key []byte, queueName string, partition uint32, currentGen uint64, cutoff int64) (sqsPartitionedMsgByAgeRecord, bool) { + parsed, ok := parseSqsPartitionedMsgByAgeKey(key, queueName) + if !ok || parsed.Partition != partition { + return sqsPartitionedMsgByAgeRecord{}, false + } + if parsed.Generation == currentGen && parsed.SendTimestampMs > cutoff { + return parsed, false + } + if parsed.Generation > currentGen { + // Defensive against gen-counter races; mirrors reapPage. + return parsed, false + } + return parsed, true +} + // reapPage walks one ScanAt page, dispatching a per-record reap // transaction. currentGen is the queue's *live* generation; entries // under any earlier generation are unconditionally reaped, while @@ -484,10 +619,8 @@ func (s *SQSServer) reapPage(ctx context.Context, queueName string, currentGen u // see meta caught up. continue } - // Live-queue retention reap currently iterates only the - // legacy byage keyspace; partitioned-byage live-queue - // retention is a follow-up to PR 6a (the tombstoned-cohort - // path is what this PR addresses). + // reapPage covers the legacy byage keyspace only; the + // partitioned twin is reapPartitionedPage. if err := s.reapOneRecord(ctx, queueName, nil, 0, parsed.Generation, kvp.Key, parsed.MessageID, readTS); err != nil { return true, processed, err } @@ -593,7 +726,33 @@ func (s *SQSServer) dispatchOrphanByAgeDrop(ctx context.Context, byAgeKey []byte // unique MessageDeduplicationIds would accumulate permanent // dedup-row leaks because the send path treats expired records as // misses but never removes them. -func (s *SQSServer) reapExpiredDedup(ctx context.Context, queueName string, readTS uint64) error { +// +// On partitioned queues (meta.PartitionCount > 1), the dedup +// records live under SqsPartitionedMsgDedupPrefix instead of +// SqsMsgDedupPrefix, so the legacy scan alone would miss them; +// reapExpiredDedupPartitioned covers that case (PR 6b). +// +// Mirrors reapQueue's both-scan policy: legacy always runs, and +// the partitioned scan additionally runs for partitioned queues. +// The data plane never writes legacy dedup on partitioned queues +// today, but the legacy scan over an empty prefix is cheap, and +// running it unconditionally keeps the two reaper paths symmetric +// and defends against an unforeseen legacy-prefix leak. +func (s *SQSServer) reapExpiredDedup(ctx context.Context, queueName string, meta *sqsQueueMeta, readTS uint64) error { + if err := s.reapExpiredDedupLegacy(ctx, queueName, readTS); err != nil { + return err + } + if meta != nil && meta.PartitionCount > 1 { + return s.reapExpiredDedupPartitioned(ctx, queueName, meta.PartitionCount, readTS) + } + return nil +} + +// reapExpiredDedupLegacy is the legacy-keyspace dedup expiry walk, +// factored out of reapExpiredDedup so the partitioned twin can sit +// alongside it. Behaviour for non-partitioned queues is byte- +// identical to pre-PR-6b reapExpiredDedup. +func (s *SQSServer) reapExpiredDedupLegacy(ctx context.Context, queueName string, readTS uint64) error { prefix := []byte(SqsMsgDedupPrefix) prefix = append(prefix, []byte(encodeSQSSegment(queueName))...) upper := prefixScanEnd(prefix) @@ -625,6 +784,56 @@ func (s *SQSServer) reapExpiredDedup(ctx context.Context, queueName string, read return nil } +// reapExpiredDedupPartitioned is the partitioned-keyspace twin of +// reapExpiredDedupLegacy. Walks every partition's dedup prefix +// across all generations and removes records whose +// ExpiresAtMillis has passed. Each partition gets its own +// per-partition budget (per the §6 design contract — same as +// reapQueuePartition). +func (s *SQSServer) reapExpiredDedupPartitioned(ctx context.Context, queueName string, partitionCount uint32, readTS uint64) error { + now := time.Now().UnixMilli() + for partition := uint32(0); partition < partitionCount; partition++ { + if err := ctx.Err(); err != nil { + return errors.WithStack(err) + } + // Per-partition prefix (across all gens) — the partitioned + // dedup key embeds gen after partition, so a partition-only + // prefix walks every gen for that partition. The per-entry + // expiry check is unchanged from the legacy walk. + prefix := []byte{} + prefix = append(prefix, SqsPartitionedMsgDedupPrefix...) + prefix = append(prefix, encodeSQSSegment(queueName)...) + prefix = append(prefix, sqsPartitionedQueueTerminator) + prefix = appendU32(prefix, partition) + upper := prefixScanEnd(prefix) + start := bytes.Clone(prefix) + + processed := 0 + for processed < sqsReaperPerQueueBudget { + page, err := s.store.ScanAt(ctx, start, upper, sqsReaperPageLimit, readTS) + if err != nil { + return errors.WithStack(err) + } + if len(page) == 0 { + break + } + done, newProcessed, err := s.reapDedupPage(ctx, page, now, readTS, processed) + if err != nil { + return err + } + processed = newProcessed + if done { + break + } + start = nextScanCursorAfter(page[len(page)-1].Key) + if bytes.Compare(start, upper) >= 0 { + break + } + } + } + return nil +} + // reapDedupPage walks one ScanAt page of dedup records and removes // any whose ExpiresAtMillis is in the past. Returns done=true when // the per-queue budget runs out or the page was short.