From 80f1c86ec5175d1b83700b8f293eb335c9ac7e3c Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Tue, 5 May 2026 00:26:40 +0900 Subject: [PATCH 1/2] feat(sqs): live-queue reaper enumerates partitioned keyspace (Phase 3.D PR 6b) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to PR 6a (#735). The tombstone-driven reaper now sweeps partitioned data / vis / byage / dedup / group records on DeleteQueue / PurgeQueue, but the live-queue retention reaper still walked only the legacy keyspace, so: - retention-expired messages on partitioned queues leaked their data / vis / byage / group rows forever (reapQueue's byage walk used sqsMsgByAgePrefixAllGenerations only), - expired dedup records on partitioned FIFO queues leaked forever (reapExpiredDedup's prefix scan used SqsMsgDedupPrefix only — empty for partitioned queues, since sqsMsgDedupKeyDispatch routes their writes under SqsPartitionedMsgDedupPrefix). Closes the live-queue half of the Codex P2 from PR #732 round 0 ("Reap partitioned dedup records to prevent key growth"); PR 6a covered the tombstoned-cohort half. Scope (the second slice of §11 PR 6 from the split-queue-FIFO design doc): - reapQueue: legacy byage walk extracted as reapQueueLegacy, behaviour byte-identical to pre-PR-6b for non-partitioned queues. Adds a new reapQueuePartition step that runs once per partition for PartitionCount > 1 queues. Each partition gets its own per-partition budget per the §6 design ("partitions × budget per cycle"); a 32-partition queue thus allows up to 32 × sqsReaperPerQueueBudget records per tick, comfortably within the 30s reaper interval. - reapPartitionedPage: partitioned twin of reapPage. Same live-vs-orphan classification (parsed.SendTimestampMs > cutoff under the live gen, unconditional reap under older gens, defensive skip on parsed.Generation > currentGen) but parses each entry with parseSqsPartitionedMsgByAgeKey and routes the dispatch through reapOneRecordPartitioned. - classifyPartitionedByAgeEntry: helper extracted from reapPartitionedPage so the loop body stays under the cyclop ceiling. Returns (parsedKey, reapable bool). - reapExpiredDedup: now takes *sqsQueueMeta and routes by PartitionCount. Legacy meta (PartitionCount <= 1) → reapExpiredDedupLegacy (byte-identical to pre-PR-6b walk). Partitioned meta (PartitionCount > 1) → reapExpiredDedupPartitioned (NEW), which iterates each partition's partitioned dedup prefix under its own per-partition budget and uses the existing reapDedupPage to apply the value-based ExpiresAtMillis filter. Caller audit: - reapQueue: one production caller (reapAllQueues line 85). No signature change. Behaviour for non-partitioned queues byte-identical; partitioned queues get the additional per-partition pass. - reapExpiredDedup: signature changed to take *sqsQueueMeta. One production caller (reapAllQueues line 88), updated. No test files invoked this helper directly. Legacy meta routes to the byte-identical legacy walk; partitioned routes to the new walk. - reapQueueLegacy / reapQueuePartition / reapExpiredDedupLegacy / reapExpiredDedupPartitioned / classifyPartitionedByAgeEntry: each has exactly one caller in the new live-queue reap path. - reapOneRecordPartitioned: existing helper from PR 6a. Previous caller was reapDeadByAgePartitionPage (tombstone path); now also called from reapPartitionedPage (live-queue path). Same dispatch semantics — synthetic meta carrying PartitionCount > 1 to flip the dispatch helper branch. Tests: - New TestSQSServer_PartitionedFIFO_LiveQueueDedupReaperPartitions (wire-level): create a 4-partition FIFO queue, send across 6 distinct groups, backdate every partitioned dedup record's ExpiresAtMillis, run reapAllQueues, assert every partitioned dedup row across [0, 4) is gone. Pre-PR-6b reaper would have left every row in place — the test fails on the legacy code path. Self-review (CLAUDE.md): 1. Data loss — Closes the live-queue dedup leak; closes the partitioned retention-expired-message leak. Legacy queues unchanged: reapQueue, reapExpiredDedup, byage walks, and dispatch helpers all keep their byte-identical pre-PR-6b paths for PartitionCount <= 1. 2. Concurrency / distributed failures — Reaper still runs only on the leader. Each partition's pass is sequential; per- partition budget bounds the pass. Existing OCC dispatch semantics on each per-record reap unchanged. 3. Performance — Per-tick partitioned-queue cost grows from O(1 walk) to O(partition_count walks) on byage AND dedup. Each partition bounded by sqsReaperPerQueueBudget. 30s tick interval comfortably absorbs 32-partition × per-queue budget per design. 4. Data consistency — Live-vs-orphan classification on partitioned byage mirrors the legacy branch exactly (reapPage / reapPartitionedPage share the rules through classifyPartitionedByAgeEntry). PartitionCount immutability means the meta-driven iteration bound matches the on-disk keys for any cohort. 5. Test coverage — One new wire-level integration test for the partitioned dedup walk; the partitioned byage walk reuses parsing / dispatch helpers already tested by PR 6a's tombstone-reap integration test. --- adapter/sqs_partitioned_dispatch_test.go | 104 ++++++++++++ adapter/sqs_reaper.go | 208 ++++++++++++++++++++++- 2 files changed, 309 insertions(+), 3 deletions(-) diff --git a/adapter/sqs_partitioned_dispatch_test.go b/adapter/sqs_partitioned_dispatch_test.go index abb03106..5e650f30 100644 --- a/adapter/sqs_partitioned_dispatch_test.go +++ b/adapter/sqs_partitioned_dispatch_test.go @@ -1,6 +1,7 @@ package adapter import ( + "bytes" "context" "fmt" "net/http" @@ -648,3 +649,106 @@ func countPartitionedRows(t *testing.T, node Node, queueName string, gen uint64) } return total } + +// TestSQSServer_PartitionedFIFO_LiveQueueDedupReaperPartitions +// pins the PR 6b contract for live queues: reapExpiredDedup must +// walk the partitioned dedup keyspace too, otherwise expired +// dedup records on partitioned FIFO queues leak forever (the +// pre-PR-6b reaper only scanned SqsMsgDedupPrefix, which is empty +// for partitioned queues). Closes the live-queue half of the +// Codex P2 deferred from PR 5b-2 round 0. +func TestSQSServer_PartitionedFIFO_LiveQueueDedupReaperPartitions(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + const queueName = "live-dedup-reap.fifo" + status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{ + "QueueName": queueName, + "Attributes": map[string]string{"FifoQueue": "true"}, + }) + require.Equal(t, http.StatusOK, status, "create queue: %v", out) + queueURL, _ := out["QueueUrl"].(string) + require.NotEmpty(t, queueURL) + + const partitions uint32 = 4 + installPartitionedMetaForTest(t, node, queueName, partitions, htfifoThroughputPerMessageGroupID) + + // Send across distinct groups so dedup records land on more + // than one partition; partitionFor (FNV-1a) gives reasonable + // spread for these inputs. + groups := []string{"alpha", "beta", "gamma", "delta", "epsilon", "zeta"} + for _, g := range groups { + status, out := callSQS(t, node, sqsSendMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MessageBody": "body-" + g, + "MessageGroupId": g, + "MessageDeduplicationId": "dedup-" + g, + }) + require.Equal(t, http.StatusOK, status, "send (group=%s): %v", g, out) + } + + srv := node.sqsServer + ctx := t.Context() + + // Sanity: at least one partitioned dedup row exists pre-reap. + preReap := countPartitionedDedupRowsAcrossPartitions(t, node, queueName, partitions) + require.Positive(t, preReap, "expected partitioned dedup rows after sends") + + // Backdate every partitioned dedup record's ExpiresAtMillis + // so the reaper's value-based expiry filter trips on every + // row. Same approach as TestSQSServer_RetentionReaperDropsExpiredFifoDedup + // for the legacy keyspace; here applied per-partition prefix. + readTS := srv.nextTxnReadTS(ctx) + now := time.Now().UnixMilli() + for partition := uint32(0); partition < partitions; partition++ { + prefix := sqsPartitionedMsgDedupKeyPrefix(queueName, partition, 1) + rows, err := srv.store.ScanAt(ctx, prefix, prefixScanEnd(prefix), 1024, readTS) + require.NoError(t, err) + for _, row := range rows { + rec, err := decodeFifoDedupRecord(row.Value) + require.NoError(t, err) + rec.ExpiresAtMillis = now - 1000 + body, err := encodeFifoDedupRecord(rec) + require.NoError(t, err) + req := &kv.OperationGroup[kv.OP]{ + IsTxn: true, + StartTS: readTS, + Elems: []*kv.Elem[kv.OP]{ + {Op: kv.Put, Key: bytes.Clone(row.Key), Value: body}, + }, + } + _, err = srv.coordinator.Dispatch(ctx, req) + require.NoError(t, err) + } + } + + // Drive one reaper pass — the live-queue dedup walk now + // dispatches reapExpiredDedupPartitioned for partitioned + // queues (PR 6b). + require.NoError(t, srv.reapAllQueues(ctx), "reapAllQueues") + + postReap := countPartitionedDedupRowsAcrossPartitions(t, node, queueName, partitions) + require.Zero(t, postReap, + "expired partitioned dedup records must be reaped (got %d remaining)", + postReap) +} + +// countPartitionedDedupRowsAcrossPartitions sums the rows across +// every partition's partitioned dedup prefix at gen=1 (the +// generation a freshly-created queue lands on). Test-helper +// twin of countPartitionedRows scoped to dedup. +func countPartitionedDedupRowsAcrossPartitions(t *testing.T, node Node, queueName string, partitions uint32) int { + t.Helper() + ctx := context.Background() + readTS := node.sqsServer.nextTxnReadTS(ctx) + total := 0 + for partition := uint32(0); partition < partitions; partition++ { + prefix := sqsPartitionedMsgDedupKeyPrefix(queueName, partition, 1) + rows, err := node.sqsServer.store.ScanAt(ctx, prefix, prefixScanEnd(prefix), 1024, readTS) + require.NoError(t, err) + total += len(rows) + } + return total +} diff --git a/adapter/sqs_reaper.go b/adapter/sqs_reaper.go index 5a3f743a..6ca36766 100644 --- a/adapter/sqs_reaper.go +++ b/adapter/sqs_reaper.go @@ -85,7 +85,7 @@ func (s *SQSServer) reapAllQueues(ctx context.Context) error { if err := s.reapQueue(ctx, name, meta, readTS); err != nil { slog.Warn("sqs reaper queue pass failed", "queue", name, "err", err) } - if err := s.reapExpiredDedup(ctx, name, readTS); err != nil { + if err := s.reapExpiredDedup(ctx, name, meta, readTS); err != nil { slog.Warn("sqs dedup reaper pass failed", "queue", name, "err", err) } } @@ -425,6 +425,39 @@ func (s *SQSServer) reapQueue(ctx context.Context, queueName string, meta *sqsQu // for the live generation so we never delete live records. cutoff = 0 } + // Legacy byage scan — always runs. For non-partitioned queues + // this is the only path; for partitioned queues this also + // catches any defensive legacy entry that might have leaked + // in (the data plane never writes here on partitioned queues + // today, but the sweep is idempotent and cheap). + if err := s.reapQueueLegacy(ctx, queueName, meta.Generation, cutoff, readTS); err != nil { + return err + } + // Partitioned byage scan — one per partition under its own + // per-partition budget. Per the §6 split-queue-FIFO design, + // the per-queue budget becomes a per-partition budget so a + // 32-partition queue cannot starve other queues; instead its + // reap completes in partitions × budget time per cycle, which + // at the 30s reaper interval is well within budget. + if meta.PartitionCount > 1 { + for partition := uint32(0); partition < meta.PartitionCount; partition++ { + if err := ctx.Err(); err != nil { + return errors.WithStack(err) + } + if err := s.reapQueuePartition(ctx, queueName, partition, meta.Generation, cutoff, readTS); err != nil { + return err + } + } + } + return nil +} + +// reapQueueLegacy is the existing legacy-keyspace byage walk for +// one queue, factored out of reapQueue so the partitioned twin +// (reapQueuePartition) can sit alongside it under the same +// per-budget contract. Behaviour for non-partitioned queues is +// byte-identical to pre-PR-6b reapQueue. +func (s *SQSServer) reapQueueLegacy(ctx context.Context, queueName string, currentGen uint64, cutoff int64, readTS uint64) error { prefix := sqsMsgByAgePrefixAllGenerations(queueName) upper := prefixScanEnd(prefix) start := bytes.Clone(prefix) @@ -438,7 +471,58 @@ func (s *SQSServer) reapQueue(ctx context.Context, queueName string, meta *sqsQu if len(page) == 0 { return nil } - done, newProcessed, err := s.reapPage(ctx, queueName, meta.Generation, cutoff, page, readTS, processed) + done, newProcessed, err := s.reapPage(ctx, queueName, currentGen, cutoff, page, readTS, processed) + if err != nil { + return err + } + processed = newProcessed + if done { + return nil + } + start = nextScanCursorAfter(page[len(page)-1].Key) + if bytes.Compare(start, upper) >= 0 { + return nil + } + } + return nil +} + +// reapQueuePartition is the partitioned-keyspace twin of +// reapQueueLegacy. Walks one partition's byage prefix family +// across all generations the partitioned-byage prefix matches +// (parseSqsPartitionedMsgByAgeKey returns the gen embedded in +// each key) and reaps each entry past the retention cutoff (live +// gen) or unconditionally (any older gen — orphan from a prior +// PurgeQueue). +// +// Per-partition budget rather than per-queue: a 32-partition +// queue therefore allows up to 32 × sqsReaperPerQueueBudget +// records per tick, which the 30s tick interval comfortably +// absorbs (§6 design contract). +func (s *SQSServer) reapQueuePartition(ctx context.Context, queueName string, partition uint32, currentGen uint64, cutoff int64, readTS uint64) error { + // Note: the partitioned-byage prefix embeds (queue, partition) + // but not the generation, so this scan walks every generation + // for that partition. reapPartitionedPage filters per-entry by + // the (currentGen, cutoff) live-vs-orphan rules, mirroring + // reapPage on the legacy path. + prefix := []byte{} + prefix = append(prefix, SqsPartitionedMsgByAgePrefix...) + prefix = append(prefix, encodeSQSSegment(queueName)...) + prefix = append(prefix, sqsPartitionedQueueTerminator) + prefix = appendU32(prefix, partition) + upper := prefixScanEnd(prefix) + start := bytes.Clone(prefix) + + processed := 0 + for processed < sqsReaperPerQueueBudget { + page, err := s.store.ScanAt(ctx, start, upper, sqsReaperPageLimit, readTS) + if err != nil { + return errors.WithStack(err) + } + if len(page) == 0 { + return nil + } + done, newProcessed, err := s.reapPartitionedPage(ctx, queueName, partition, currentGen, cutoff, page, readTS, processed) if err != nil { return err } @@ -454,6 +538,57 @@ func (s *SQSServer) reapQueue(ctx context.Context, queueName string, meta *sqsQu return nil } +// reapPartitionedPage is the partitioned twin of reapPage. Same +// live-vs-orphan classification as reapPage, but parses each entry +// as a partitioned byage key and routes the dispatch through +// reapOneRecordPartitioned so the dispatch helpers build +// partitioned data / vis / group keys instead of legacy ones. +func (s *SQSServer) reapPartitionedPage(ctx context.Context, queueName string, partition uint32, currentGen uint64, cutoff int64, page []*store.KVPair, readTS uint64, processed int) (bool, int, error) { + for _, kvp := range page { + if err := ctx.Err(); err != nil { + return true, processed, errors.WithStack(err) + } + parsed, reapable := classifyPartitionedByAgeEntry(kvp.Key, queueName, partition, currentGen, cutoff) + if !reapable { + continue + } + if err := s.reapOneRecordPartitioned(ctx, queueName, partition, parsed.Generation, kvp.Key, parsed.MessageID, readTS); err != nil { + return true, processed, err + } + processed++ + if processed >= sqsReaperPerQueueBudget { + return true, processed, nil + } + } + if len(page) < sqsReaperPageLimit { + return true, processed, nil + } + return false, processed, nil +} + +// classifyPartitionedByAgeEntry parses a candidate partitioned +// byage key and decides whether it should be reaped this pass. +// Returns reapable=false for entries that do not match the +// partition (page bleed across partitions, defensive), live +// entries inside their retention window, or future-generation +// rows from a meta read that hasn't caught up yet. Pulled out of +// reapPartitionedPage so the loop body stays under the cyclop +// ceiling. +func classifyPartitionedByAgeEntry(key []byte, queueName string, partition uint32, currentGen uint64, cutoff int64) (sqsPartitionedMsgByAgeRecord, bool) { + parsed, ok := parseSqsPartitionedMsgByAgeKey(key, queueName) + if !ok || parsed.Partition != partition { + return sqsPartitionedMsgByAgeRecord{}, false + } + if parsed.Generation == currentGen && parsed.SendTimestampMs > cutoff { + return parsed, false + } + if parsed.Generation > currentGen { + // Defensive against gen-counter races; mirrors reapPage. + return parsed, false + } + return parsed, true +} + // reapPage walks one ScanAt page, dispatching a per-record reap // transaction. currentGen is the queue's *live* generation; entries // under any earlier generation are unconditionally reaped, while @@ -593,7 +728,24 @@ func (s *SQSServer) dispatchOrphanByAgeDrop(ctx context.Context, byAgeKey []byte // unique MessageDeduplicationIds would accumulate permanent // dedup-row leaks because the send path treats expired records as // misses but never removes them. -func (s *SQSServer) reapExpiredDedup(ctx context.Context, queueName string, readTS uint64) error { +// +// On partitioned queues (meta.PartitionCount > 1), the dedup +// records live under SqsPartitionedMsgDedupPrefix instead of +// SqsMsgDedupPrefix, so the legacy scan would find zero records +// and the leak would persist; reapExpiredDedupPartitioned takes +// over for that case (PR 6b). +func (s *SQSServer) reapExpiredDedup(ctx context.Context, queueName string, meta *sqsQueueMeta, readTS uint64) error { + if meta != nil && meta.PartitionCount > 1 { + return s.reapExpiredDedupPartitioned(ctx, queueName, meta.PartitionCount, readTS) + } + return s.reapExpiredDedupLegacy(ctx, queueName, readTS) +} + +// reapExpiredDedupLegacy is the legacy-keyspace dedup expiry walk, +// factored out of reapExpiredDedup so the partitioned twin can sit +// alongside it. Behaviour for non-partitioned queues is byte- +// identical to pre-PR-6b reapExpiredDedup. +func (s *SQSServer) reapExpiredDedupLegacy(ctx context.Context, queueName string, readTS uint64) error { prefix := []byte(SqsMsgDedupPrefix) prefix = append(prefix, []byte(encodeSQSSegment(queueName))...) upper := prefixScanEnd(prefix) @@ -625,6 +777,56 @@ func (s *SQSServer) reapExpiredDedup(ctx context.Context, queueName string, read return nil } +// reapExpiredDedupPartitioned is the partitioned-keyspace twin of +// reapExpiredDedupLegacy. Walks every partition's dedup prefix +// across all generations and removes records whose +// ExpiresAtMillis has passed. Each partition gets its own +// per-partition budget (per the §6 design contract — same as +// reapQueuePartition). +func (s *SQSServer) reapExpiredDedupPartitioned(ctx context.Context, queueName string, partitionCount uint32, readTS uint64) error { + now := time.Now().UnixMilli() + for partition := uint32(0); partition < partitionCount; partition++ { + if err := ctx.Err(); err != nil { + return errors.WithStack(err) + } + // Per-partition prefix (across all gens) — the partitioned + // dedup key embeds gen after partition, so a partition-only + // prefix walks every gen for that partition. The per-entry + // expiry check is unchanged from the legacy walk. + prefix := []byte{} + prefix = append(prefix, SqsPartitionedMsgDedupPrefix...) + prefix = append(prefix, encodeSQSSegment(queueName)...) + prefix = append(prefix, sqsPartitionedQueueTerminator) + prefix = appendU32(prefix, partition) + upper := prefixScanEnd(prefix) + start := bytes.Clone(prefix) + + processed := 0 + for processed < sqsReaperPerQueueBudget { + page, err := s.store.ScanAt(ctx, start, upper, sqsReaperPageLimit, readTS) + if err != nil { + return errors.WithStack(err) + } + if len(page) == 0 { + break + } + done, newProcessed, err := s.reapDedupPage(ctx, page, now, readTS, processed) + if err != nil { + return err + } + processed = newProcessed + if done { + break + } + start = nextScanCursorAfter(page[len(page)-1].Key) + if bytes.Compare(start, upper) >= 0 { + break + } + } + } + return nil +} + // reapDedupPage walks one ScanAt page of dedup records and removes // any whose ExpiresAtMillis is in the past. Returns done=true when // the per-queue budget runs out or the page was short. From 996df499cc0fd9ba9d2073ca6f9dbc89289ccb10 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Tue, 5 May 2026 01:04:40 +0900 Subject: [PATCH 2/2] sqs(reaper): polish per Claude review (PR #736, round 1) Two findings from Claude on PR #736 plus a medium-severity Gemini suggestion in the same review pass: 1. (Claude must-fix) Stale forward-reference comment in reapPage was written in PR 6a pointing at PR 6b. PR 6b is now this branch, so the "is a follow-up to PR 6a" wording was misleading. Trimmed to a one-line note that reapPage covers the legacy keyspace and reapPartitionedPage is the partitioned twin. 2. (Claude should-fix) classifyPartitionedByAgeEntry had no direct coverage of its retention-cutoff branch (live gen, sendTs <= cutoff -> reapable) or its future-gen guard (parsed.gen > currentGen -> not reapable). The integration tests exercise the orphan-cohort and dedup-expiry paths only. Added TestClassifyPartitionedByAgeEntry: a table-driven unit test that pins every branch (within retention, past retention, exact-cutoff boundary, orphan generation, future generation, wrong partition, wrong queue prefix, legacy key) against deterministic key inputs built via sqsPartitionedMsgByAgeKey. No store / coordinator dependency, so the test is fast and immune to retention timing. 3. (Gemini medium) reapExpiredDedup used an if/else to pick between legacy and partitioned dedup reaping for partitioned queues, while reapQueue runs both for symmetry and defensive coverage of leaked legacy entries. Mirrored that policy: legacy scan now always runs (cheap on an empty prefix today), and the partitioned scan additionally runs for partitioned queues. Caller audit: reapExpiredDedup has a single caller (reapAllQueues) and the error contract is unchanged, so no semantic ripple. Refs: PR #736 review thread; Claude review at run 25327783108. --- adapter/sqs_partitioned_dispatch_test.go | 88 ++++++++++++++++++++++++ adapter/sqs_reaper.go | 23 ++++--- 2 files changed, 103 insertions(+), 8 deletions(-) diff --git a/adapter/sqs_partitioned_dispatch_test.go b/adapter/sqs_partitioned_dispatch_test.go index 5e650f30..76916b85 100644 --- a/adapter/sqs_partitioned_dispatch_test.go +++ b/adapter/sqs_partitioned_dispatch_test.go @@ -752,3 +752,91 @@ func countPartitionedDedupRowsAcrossPartitions(t *testing.T, node Node, queueNam } return total } + +// TestClassifyPartitionedByAgeEntry pins every branch of the +// classify helper that decides which partitioned byage rows the +// live-queue reaper sweeps. The integration tests above cover +// the orphan-gen path (tombstoned cohort) and the dedup-expiry +// path; this unit test fills in the two live-queue paths that +// have no integration coverage today: the retention-cutoff branch +// (live gen, sendTs <= cutoff → reapable) and the future-gen +// guard (parsed.gen > currentGen → not reapable, defensive +// against a meta-vs-byage race). +func TestClassifyPartitionedByAgeEntry(t *testing.T) { + t.Parallel() + const ( + queueName = "live-byage.fifo" + partition = uint32(2) + currentGen = uint64(5) + cutoff = int64(1_000_000) + messageID = "msg-classify" + ) + + type want struct { + reapable bool + expectedGen uint64 + expectedPartn uint32 + expectedSendTs int64 + expectedMsgID string + assertParsedSet bool // expectedGen / Partition / SendTs / MsgID checked only when true + } + cases := []struct { + name string + key []byte + want want + }{ + { + name: "live gen within retention window — not reapable", + key: sqsPartitionedMsgByAgeKey(queueName, partition, currentGen, cutoff+1, messageID), + want: want{reapable: false, expectedGen: currentGen, expectedPartn: partition, expectedSendTs: cutoff + 1, expectedMsgID: messageID, assertParsedSet: true}, + }, + { + name: "live gen past retention window — reapable (cutoff branch)", + key: sqsPartitionedMsgByAgeKey(queueName, partition, currentGen, cutoff-1, messageID), + want: want{reapable: true, expectedGen: currentGen, expectedPartn: partition, expectedSendTs: cutoff - 1, expectedMsgID: messageID, assertParsedSet: true}, + }, + { + name: "live gen exactly at cutoff — reapable (>, not >=)", + key: sqsPartitionedMsgByAgeKey(queueName, partition, currentGen, cutoff, messageID), + want: want{reapable: true, expectedGen: currentGen, expectedPartn: partition, expectedSendTs: cutoff, expectedMsgID: messageID, assertParsedSet: true}, + }, + { + name: "orphan generation (gen < currentGen) — reapable unconditionally", + key: sqsPartitionedMsgByAgeKey(queueName, partition, currentGen-1, cutoff+1_000_000, messageID), + want: want{reapable: true, expectedGen: currentGen - 1, expectedPartn: partition, expectedSendTs: cutoff + 1_000_000, expectedMsgID: messageID, assertParsedSet: true}, + }, + { + name: "future generation (gen > currentGen) — not reapable (gen-race guard)", + key: sqsPartitionedMsgByAgeKey(queueName, partition, currentGen+1, cutoff-1, messageID), + want: want{reapable: false, expectedGen: currentGen + 1, expectedPartn: partition, expectedSendTs: cutoff - 1, expectedMsgID: messageID, assertParsedSet: true}, + }, + { + name: "wrong partition — not reapable (page bleed defense)", + key: sqsPartitionedMsgByAgeKey(queueName, partition+1, currentGen, cutoff-1, messageID), + want: want{reapable: false, assertParsedSet: false}, + }, + { + name: "wrong queue prefix — not reapable (parse fails)", + key: sqsPartitionedMsgByAgeKey("different.fifo", partition, currentGen, cutoff-1, messageID), + want: want{reapable: false, assertParsedSet: false}, + }, + { + name: "legacy byage key — not reapable (parse fails)", + key: sqsMsgByAgeKey(queueName, currentGen, cutoff-1, messageID), + want: want{reapable: false, assertParsedSet: false}, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + parsed, reapable := classifyPartitionedByAgeEntry(tc.key, queueName, partition, currentGen, cutoff) + require.Equal(t, tc.want.reapable, reapable, "reapable") + if tc.want.assertParsedSet { + require.Equal(t, tc.want.expectedGen, parsed.Generation, "Generation") + require.Equal(t, tc.want.expectedPartn, parsed.Partition, "Partition") + require.Equal(t, tc.want.expectedSendTs, parsed.SendTimestampMs, "SendTimestampMs") + require.Equal(t, tc.want.expectedMsgID, parsed.MessageID, "MessageID") + } + }) + } +} diff --git a/adapter/sqs_reaper.go b/adapter/sqs_reaper.go index 6ca36766..ae7f6c20 100644 --- a/adapter/sqs_reaper.go +++ b/adapter/sqs_reaper.go @@ -619,10 +619,8 @@ func (s *SQSServer) reapPage(ctx context.Context, queueName string, currentGen u // see meta caught up. continue } - // Live-queue retention reap currently iterates only the - // legacy byage keyspace; partitioned-byage live-queue - // retention is a follow-up to PR 6a (the tombstoned-cohort - // path is what this PR addresses). + // reapPage covers the legacy byage keyspace only; the + // partitioned twin is reapPartitionedPage. if err := s.reapOneRecord(ctx, queueName, nil, 0, parsed.Generation, kvp.Key, parsed.MessageID, readTS); err != nil { return true, processed, err } @@ -731,14 +729,23 @@ func (s *SQSServer) dispatchOrphanByAgeDrop(ctx context.Context, byAgeKey []byte // // On partitioned queues (meta.PartitionCount > 1), the dedup // records live under SqsPartitionedMsgDedupPrefix instead of -// SqsMsgDedupPrefix, so the legacy scan would find zero records -// and the leak would persist; reapExpiredDedupPartitioned takes -// over for that case (PR 6b). +// SqsMsgDedupPrefix, so the legacy scan alone would miss them; +// reapExpiredDedupPartitioned covers that case (PR 6b). +// +// Mirrors reapQueue's both-scan policy: legacy always runs, and +// the partitioned scan additionally runs for partitioned queues. +// The data plane never writes legacy dedup on partitioned queues +// today, but the legacy scan over an empty prefix is cheap, and +// running it unconditionally keeps the two reaper paths symmetric +// and defends against an unforeseen legacy-prefix leak. func (s *SQSServer) reapExpiredDedup(ctx context.Context, queueName string, meta *sqsQueueMeta, readTS uint64) error { + if err := s.reapExpiredDedupLegacy(ctx, queueName, readTS); err != nil { + return err + } if meta != nil && meta.PartitionCount > 1 { return s.reapExpiredDedupPartitioned(ctx, queueName, meta.PartitionCount, readTS) } - return s.reapExpiredDedupLegacy(ctx, queueName, readTS) + return nil } // reapExpiredDedupLegacy is the legacy-keyspace dedup expiry walk,