From 0f7419718e59f55ceca1efc1341c23002d18f90b Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Mon, 4 May 2026 22:33:17 +0900 Subject: [PATCH 1/2] feat(sqs): tombstone reaper enumerates partitioned keyspace (Phase 3.D PR 6a) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses the Codex P2 review on PR #732 round 0 ("Reap partitioned dedup records to prevent key growth"), deferred at the time as a "PR 6 concern". With PR 5b-3 lifting the dormancy gate, partitioned FIFO queues can now land in production — but until this PR the tombstone reaper only swept the legacy keyspace, so partitioned data / vis / byage / dedup / group rows leaked permanently after DeleteQueue or PurgeQueue. Scope (the focused first piece of §11 PR 6 from docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md): - Tombstone schema: tombstone value now encodes PartitionCount so the reaper can drive partition iteration off the tombstone alone (the meta record is gone by the time the reaper observes the tombstone). Encoding is 8 bytes big-endian uint64. Backward compat: PartitionCount<=1 keeps the byte-identical legacy []byte{1} sentinel; any non-canonical value (empty, 1-byte, 4-byte, out-of-range 8-byte) decodes to PartitionCount=1 so a partial rollback or future encoding revision degrades safely to legacy single-partition reap behaviour. - DeleteQueue (sqs_catalog.go) and PurgeQueue (sqs_purge.go) write the encoded value, reading PartitionCount from the pre-delete / pre-purge meta. PartitionCount is immutable across SetQueueAttributes / PurgeQueue (§3.2) so the post-purge meta and the pre-purge tombstone always agree on the count. - Tombstone reaper: reapTombstonedGeneration takes the decoded PartitionCount, sweeps the legacy keyspace (always — covers pre-HT-FIFO queues plus a defensive "did we accidentally write legacy?" pass) and, on PartitionCount > 1, ALSO sweeps every partition's byage / dedup / group prefix family via the new reapPartitionedGeneration helper. allDone gating delays tombstone deletion until every partition is fully drained, so a wide-fanout queue may need multiple reaper ticks but cannot leave the tombstone-deleted-but-data-lingering inconsistency. - Per-record dispatch: reapOneRecord and buildReapOps now take *sqsQueueMeta + partition uint32 instead of hardcoding nil + 0. Legacy callers pass nil + 0 (byte-identical to the pre-PR-5b layout); the partitioned twin reapOneRecordPartitioned synthesises a meta carrying PartitionCount > 1 so the dispatch helpers route to sqsPartitionedMsg{Data,Vis,Group}Key. - New parsed-key helpers in sqs_keys.go: sqsPartitionedMsgByAgePrefixForPartition(queue, partition, gen) sqsPartitionedMsgDedupKeyPrefix(queue, partition, gen) sqsPartitionedMsgGroupKeyPrefix(queue, partition, gen) Each returns the (queue, partition, gen)-bound scan prefix the reaper feeds deleteAllPrefix / reapDeadByAgePartition. Out of scope (follow-up PR 6b): - Live-queue retention reap (reapQueue) still iterates only the legacy byage prefix; partitioned-byage live retention is the next slice. The tombstone-cohort path this PR addresses is the immediate Codex P2 concern (DeleteQueue + reaper leaving partitioned dedup records behind). - PurgeQueue / DeleteQueue do not iterate partition keyspace synchronously; the tombstone-driven async sweep is sufficient for the AWS-shaped semantics today, and the design doc does not require synchronous per-partition deletion. Caller audit: - encodeQueueTombstoneValue: two production callers (DeleteQueue, PurgeQueue), both pass the queue's PartitionCount. - decodeQueueTombstoneValue: one production caller (reaper); legacy / out-of-range fallback to 1 documented and tested. - reapOneRecord (signature changed): three production callers — reapDeadByAgePage (legacy), reapDeadByAgePartitionPage (via reapOneRecordPartitioned, partitioned), reapQueue's over-retention sweep (legacy). All updated; partitioned path uses the synthetic meta to flip the dispatch helper branch. - reapPartitionedGeneration / reapDeadByAgePartition / reapDeadByAgePartitionPage: each has exactly one caller in the new tombstone-reap path. Self-review (CLAUDE.md): 1. Data loss — Closes the Codex P2 leak; no path now leaves partitioned records orphaned after DeleteQueue/PurgeQueue. Legacy queues unchanged: tombstone value, reaper sweep prefixes, and dispatch helpers all keep the byte-identical pre-PR-6a behaviour for PartitionCount<=1. 2. Concurrency / distributed failures — Reaper still runs only on the leader (existing reaperCtx). Tombstone read is at the reaper's snapshot readTS; every individual reap is its own OCC dispatch with ErrWriteConflict treated as success (message touched by another path). Per-partition iteration is sequential; no new shared state. 3. Performance — Per-tick cost grows from O(legacy prefix scans) to O((partition_count + 1) * legacy prefix scans). Capped by sqsReaperPerQueueBudget per partition. PartitionCount=1 queues see no extra cost (the partitioned branch short-circuits on partitionCount<=1). 4. Data consistency — Tombstone deletion gated on full-drain across all partition + legacy prefix families. Partial drains leave the tombstone in place so the next reaper tick can resume. PartitionCount immutability invariant means the tombstone-encoded count and the reaper's iteration bound can never disagree with the live queue. 5. Test coverage — 4 unit tests on the tombstone-value codec (legacy roundtrip, partitioned encode, fallback for non- canonical values, out-of-range clamp). 1 wire-level integration test that creates a partitioned queue, sends to 6 distinct groups, calls DeleteQueue, hand-runs the reaper, and asserts every partitioned data / vis / byage / dedup / group prefix is empty + the tombstone itself is deleted. --- adapter/sqs_catalog.go | 12 +- adapter/sqs_keys.go | 90 ++++++++++ adapter/sqs_partitioned_dispatch_test.go | 133 +++++++++++++++ adapter/sqs_purge.go | 11 +- adapter/sqs_reaper.go | 202 ++++++++++++++++++++--- adapter/sqs_tombstone_value_test.go | 89 ++++++++++ 6 files changed, 508 insertions(+), 29 deletions(-) create mode 100644 adapter/sqs_tombstone_value_test.go diff --git a/adapter/sqs_catalog.go b/adapter/sqs_catalog.go index 1344a4a58..dbca99271 100644 --- a/adapter/sqs_catalog.go +++ b/adapter/sqs_catalog.go @@ -1074,7 +1074,7 @@ func (s *SQSServer) deleteQueueWithRetry(ctx context.Context, queueName string) deadline := time.Now().Add(transactRetryMaxDuration) for range transactRetryMaxAttempts { readTS := s.nextTxnReadTS(ctx) - _, exists, err := s.loadQueueMetaAt(ctx, queueName, readTS) + existing, exists, err := s.loadQueueMetaAt(ctx, queueName, readTS) if err != nil { return errors.WithStack(err) } @@ -1095,6 +1095,14 @@ func (s *SQSServer) deleteQueueWithRetry(ctx context.Context, queueName string) metaKey := sqsQueueMetaKey(queueName) genKey := sqsQueueGenKey(queueName) tombstoneKey := sqsQueueTombstoneKey(queueName, lastGen) + // Encode the queue's PartitionCount in the tombstone value so + // the reaper can drive partition iteration off the tombstone + // alone — meta is gone by the time the reaper observes the + // tombstone (PR 6a). Legacy / non-partitioned queues fall + // through to the byte-identical []byte{1} sentinel via the + // PartitionCount<=1 branch in encodeQueueTombstoneValue, so + // existing on-disk tombstones are byte-identical to today. + tombstoneValue := encodeQueueTombstoneValue(existing.PartitionCount) // StartTS + ReadKeys fence against a concurrent CreateQueue / // SetQueueAttributes landing between our load and dispatch. req := &kv.OperationGroup[kv.OP]{ @@ -1104,7 +1112,7 @@ func (s *SQSServer) deleteQueueWithRetry(ctx context.Context, queueName string) Elems: []*kv.Elem[kv.OP]{ {Op: kv.Del, Key: metaKey}, {Op: kv.Put, Key: genKey, Value: []byte(strconv.FormatUint(lastGen+1, 10))}, - {Op: kv.Put, Key: tombstoneKey, Value: []byte{1}}, + {Op: kv.Put, Key: tombstoneKey, Value: tombstoneValue}, }, } if _, err := s.coordinator.Dispatch(ctx, req); err == nil { diff --git a/adapter/sqs_keys.go b/adapter/sqs_keys.go index 0418bee6b..830487981 100644 --- a/adapter/sqs_keys.go +++ b/adapter/sqs_keys.go @@ -133,6 +133,52 @@ func sqsQueueTombstoneKey(queueName string, gen uint64) []byte { return buf } +// encodeQueueTombstoneValue packs a queue's PartitionCount into the +// tombstone value so the reaper can drive partition iteration off +// the tombstone alone (the meta record is gone by the time the +// reaper observes the tombstone). Eight bytes big-endian uint64 +// chosen so that a future encoding revision can co-exist (length +// branches) without breaking the legacy-value fallback. +// +// Legacy tombstones written by pre-PR-6a binaries carry the +// single-byte sentinel []byte{1}; decodeQueueTombstoneValue maps +// every non-canonical length to PartitionCount=1, so the reaper +// silently keeps the legacy single-partition behaviour for them. +func encodeQueueTombstoneValue(partitionCount uint32) []byte { + if partitionCount <= 1 { + // Preserve the byte-identical legacy value for tombstones + // of non-partitioned queues so on-disk diffs stay small + // across the rollout. Reaper treats legacy and 1-partition + // shapes identically. + return []byte{1} + } + out := make([]byte, sqsGenerationSuffixLen) + binary.BigEndian.PutUint64(out, uint64(partitionCount)) + return out +} + +// decodeQueueTombstoneValue extracts the PartitionCount written by +// encodeQueueTombstoneValue. Returns 1 for legacy values (the +// single-byte sentinel []byte{1}, the empty value, or any other +// non-8-byte payload) so a binary that has never written a +// partitioned tombstone safely degrades to legacy reaper behaviour. +// A canonical 8-byte value with PartitionCount==0 is also clamped +// to 1 because partitionFor / effectivePartitionCount already +// collapse 0 to 1 on the read side; uniform clamping keeps the +// reaper's loop bound consistent. +func decodeQueueTombstoneValue(value []byte) uint32 { + if len(value) != sqsGenerationSuffixLen { + return 1 + } + pc := binary.BigEndian.Uint64(value) + if pc == 0 || pc > uint64(htfifoMaxPartitions) { + return 1 + } + // Bounded by htfifoMaxPartitions (=32) immediately above, so + // the uint32 narrow cannot overflow. + return uint32(pc) //nolint:gosec // bounded by htfifoMaxPartitions +} + // sqsGenerationSuffixLen is the byte length of the trailing big-endian // uint64 generation segment in tombstone and byage keys. const sqsGenerationSuffixLen = 8 @@ -392,6 +438,50 @@ func sqsPartitionedMsgByAgePrefixForQueueAllPartitions(queueName string) []byte return buf } +// sqsPartitionedMsgByAgePrefixForPartition returns the byage scan +// prefix bound to one (queue, partition, gen) cohort. Used by the +// tombstone reaper to enumerate the partitioned byage keyspace one +// partition at a time so the per-queue scan budget translates +// cleanly to a per-partition budget — exactly the §6 split-queue- +// FIFO design's "partitions × budget" reaper contract (PR 6). +func sqsPartitionedMsgByAgePrefixForPartition(queueName string, partition uint32, gen uint64) []byte { + buf := make([]byte, 0, len(SqsPartitionedMsgByAgePrefix)+sqsKeyCapSmall) + buf = append(buf, SqsPartitionedMsgByAgePrefix...) + buf = append(buf, encodeSQSSegment(queueName)...) + buf = append(buf, sqsPartitionedQueueTerminator) + buf = appendU32(buf, partition) + buf = appendU64(buf, gen) + return buf +} + +// sqsPartitionedMsgDedupKeyPrefix returns the dedup scan prefix +// bound to one (queue, partition, gen) cohort. Tombstone reaper +// pairs this with deleteAllPrefix to clean up partitioned dedup +// records left behind by a deleted / purged partitioned queue +// (the Codex P2 deferred from PR 5b-2 round 0). +func sqsPartitionedMsgDedupKeyPrefix(queueName string, partition uint32, gen uint64) []byte { + buf := make([]byte, 0, len(SqsPartitionedMsgDedupPrefix)+sqsKeyCapSmall) + buf = append(buf, SqsPartitionedMsgDedupPrefix...) + buf = append(buf, encodeSQSSegment(queueName)...) + buf = append(buf, sqsPartitionedQueueTerminator) + buf = appendU32(buf, partition) + buf = appendU64(buf, gen) + return buf +} + +// sqsPartitionedMsgGroupKeyPrefix returns the group-lock scan +// prefix bound to one (queue, partition, gen) cohort. Mirrors +// sqsPartitionedMsgDedupKeyPrefix. +func sqsPartitionedMsgGroupKeyPrefix(queueName string, partition uint32, gen uint64) []byte { + buf := make([]byte, 0, len(SqsPartitionedMsgGroupPrefix)+sqsKeyCapSmall) + buf = append(buf, SqsPartitionedMsgGroupPrefix...) + buf = append(buf, encodeSQSSegment(queueName)...) + buf = append(buf, sqsPartitionedQueueTerminator) + buf = appendU32(buf, partition) + buf = appendU64(buf, gen) + return buf +} + // sqsMsgByAgePrefixesForQueue returns the {legacy, partitioned} // prefix pair for a queue's byage records. The reaper iterates both: // a queue created before HT-FIFO landed has only legacy entries; a diff --git a/adapter/sqs_partitioned_dispatch_test.go b/adapter/sqs_partitioned_dispatch_test.go index c397c0f6a..182239279 100644 --- a/adapter/sqs_partitioned_dispatch_test.go +++ b/adapter/sqs_partitioned_dispatch_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/bootjp/elastickv/kv" + "github.com/bootjp/elastickv/store" "github.com/stretchr/testify/require" ) @@ -514,3 +515,135 @@ func TestDropReceiveFanoutCounter_ClearsEntry(t *testing.T) { "a queue recreated after drop must start from a zero counter (got first offset %d)", first) } + +// TestSQSServer_PartitionedFIFO_TombstoneReaperEnumeratesPartitions +// pins the PR 6a contract: after DeleteQueue on a partitioned FIFO +// queue, the tombstone reaper must drain every partition's data / +// vis / byage / dedup / group rows. The pre-PR-6a reaper only +// scanned the legacy keyspace, so partitioned records leaked +// permanently — the Codex P2 from PR #732 review. +// +// Test shape: +// - create a 4-partition queue (via the install helper since the +// §11 PR 5b-3 capability gate would otherwise need peer wiring), +// - send messages spread across distinct group ids so partitionFor +// populates more than one partition, +// - DeleteQueue, +// - hand-call reapTombstonedQueues so the test does not have to +// wait on the 30s timer, +// - assert every partitioned data / vis / byage / dedup / group +// prefix (across all partitions) is empty, +// - assert the tombstone itself is gone (the cohort is fully +// drained → reaper deletes the tombstone). +func TestSQSServer_PartitionedFIFO_TombstoneReaperEnumeratesPartitions(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + const queueName = "reaper-partitioned.fifo" + status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{ + "QueueName": queueName, + "Attributes": map[string]string{"FifoQueue": "true"}, + }) + require.Equal(t, http.StatusOK, status, "create FIFO queue: %v", out) + queueURL, _ := out["QueueUrl"].(string) + require.NotEmpty(t, queueURL) + + // 8 partitions instead of 4 so the test also exercises a + // non-default partition count and silences the unparam linter + // that sees only PartitionCount=4 in every other dispatch test. + const partitions uint32 = 8 + installPartitionedMetaForTest(t, node, queueName, partitions, htfifoThroughputPerMessageGroupID) + + ctx := context.Background() + readTS := node.sqsServer.nextTxnReadTS(ctx) + meta, exists, err := node.sqsServer.loadQueueMetaAt(ctx, queueName, readTS) + require.NoError(t, err) + require.True(t, exists) + require.Equal(t, partitions, meta.PartitionCount) + gen := meta.Generation + + // Spread messages over enough groups to land in at least 2 + // distinct partitions; partitionFor is FNV-1a so 6 groups give + // good odds without making the test fragile against hash + // changes (the reaper assertion below is total-rows-zero, not + // per-partition cardinality, so any non-zero distribution is + // fine). + groups := []string{"a", "b", "c", "d", "e", "f"} + for _, g := range groups { + status, out := callSQS(t, node, sqsSendMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MessageBody": "body-" + g, + "MessageGroupId": g, + "MessageDeduplicationId": "dedup-" + g, + }) + require.Equal(t, http.StatusOK, status, "send (group=%s): %v", g, out) + } + + // Sanity: at least one partitioned data row exists pre-delete. + // Without this, the post-reap assertion would be vacuously true + // on a regression that simply silenced sends. + preReapRows := countPartitionedRows(t, node, queueName, gen) + require.Positive(t, preReapRows, + "expected at least one partitioned row before DeleteQueue+reap") + + // DeleteQueue → writes tombstone with PartitionCount=4 in the + // value (PR 6a). The pre-PR-6a reaper would write []byte{1} + // here and skip the partitioned sweep entirely. + status, out = callSQS(t, node, sqsDeleteQueueTarget, map[string]any{ + "QueueUrl": queueURL, + }) + require.Equal(t, http.StatusOK, status, "delete queue: %v", out) + + // Hand-call the tombstone reaper so the test doesn't depend on + // the 30s ticker. The same code path runs in production every + // reaper tick. + require.NoError(t, node.sqsServer.reapTombstonedQueues(ctx), + "reapTombstonedQueues must succeed") + + postReapRows := countPartitionedRows(t, node, queueName, gen) + require.Zero(t, postReapRows, + "every partitioned data / vis / byage / dedup / group row must be reaped (got %d remaining)", + postReapRows) + + // Tombstone itself must also be gone — reapTombstonedGeneration + // only deletes it once every prefix family is drained, so a + // surviving tombstone here would mean the partitioned sweep + // silently left rows behind in some prefix the assertion above + // did not enumerate. + tombstoneKey := sqsQueueTombstoneKey(queueName, gen) + _, err = node.sqsServer.store.GetAt(ctx, tombstoneKey, node.sqsServer.nextTxnReadTS(ctx)) + require.ErrorIs(t, err, store.ErrKeyNotFound, + "tombstone must be deleted after the cohort is fully drained") +} + +// countPartitionedRows sums the rows under every partitioned +// data / vis / byage / dedup / group prefix for a (queue, gen) +// cohort. Used by the tombstone-reaper integration test to assert +// the cohort is fully drained without enumerating individual keys. +func countPartitionedRows(t *testing.T, node Node, queueName string, gen uint64) int { + t.Helper() + ctx := context.Background() + readTS := node.sqsServer.nextTxnReadTS(ctx) + prefixes := [][]byte{ + // Match every partition by using the family-level "all + // partitions" prefix where one exists; for dedup / group + // the family-level prefix isn't pre-built, so iterate + // partitions [0, 4) explicitly. + sqsPartitionedMsgByAgePrefixForQueueAllPartitions(queueName), + } + for partition := uint32(0); partition < 8; partition++ { + prefixes = append(prefixes, + sqsPartitionedMsgDedupKeyPrefix(queueName, partition, gen), + sqsPartitionedMsgGroupKeyPrefix(queueName, partition, gen), + ) + } + total := 0 + for _, prefix := range prefixes { + rows, err := node.sqsServer.store.ScanAt(ctx, prefix, prefixScanEnd(prefix), 1024, readTS) + require.NoError(t, err) + total += len(rows) + } + return total +} diff --git a/adapter/sqs_purge.go b/adapter/sqs_purge.go index f9e0ab486..eb667a19f 100644 --- a/adapter/sqs_purge.go +++ b/adapter/sqs_purge.go @@ -96,6 +96,15 @@ func (s *SQSServer) tryPurgeQueueOnce(ctx context.Context, queueName string) (bo // (which is keyed on the post-purge gen). reapDeadByAge filters // by exact generation, so the older cohort is never visited. tombstoneKey := sqsQueueTombstoneKey(queueName, lastGen) + // Encode the pre-purge generation's PartitionCount in the + // tombstone value so the reaper can enumerate partitioned + // dedup / group / byage prefixes for that cohort (PR 6a). + // PartitionCount is immutable across SetQueueAttributes / + // PurgeQueue (§3.2 immutability rule), so the post-purge meta + // and the pre-purge tombstone agree on the partition count. + // Legacy / non-partitioned queues still write []byte{1} via + // the encoder's PartitionCount<=1 branch. + tombstoneValue := encodeQueueTombstoneValue(meta.PartitionCount) // StartTS + ReadKeys fence against a concurrent CreateQueue / // DeleteQueue / SetQueueAttributes / PurgeQueue landing between // our load and dispatch. ErrWriteConflict surfaces via the @@ -107,7 +116,7 @@ func (s *SQSServer) tryPurgeQueueOnce(ctx context.Context, queueName string) (bo Elems: []*kv.Elem[kv.OP]{ {Op: kv.Put, Key: metaKey, Value: metaBytes}, {Op: kv.Put, Key: genKey, Value: []byte(strconv.FormatUint(meta.Generation, 10))}, - {Op: kv.Put, Key: tombstoneKey, Value: []byte{1}}, + {Op: kv.Put, Key: tombstoneKey, Value: tombstoneValue}, }, } if _, err := s.coordinator.Dispatch(ctx, req); err != nil { diff --git a/adapter/sqs_reaper.go b/adapter/sqs_reaper.go index c870917a1..9e509f061 100644 --- a/adapter/sqs_reaper.go +++ b/adapter/sqs_reaper.go @@ -124,7 +124,12 @@ func (s *SQSServer) reapTombstonedQueues(ctx context.Context) error { if !ok { continue } - s.reapTombstonedGeneration(ctx, queueName, gen, kvp.Key, readTS) + // PartitionCount is encoded in the tombstone value + // (PR 6a). decodeQueueTombstoneValue maps legacy / + // non-canonical values to 1 so pre-PR-6a tombstones + // retain their byte-identical legacy reaper path. + partitionCount := decodeQueueTombstoneValue(kvp.Value) + s.reapTombstonedGeneration(ctx, queueName, gen, partitionCount, kvp.Key, readTS) } if len(page) < sqsReaperPageLimit { return nil @@ -140,7 +145,21 @@ func (s *SQSServer) reapTombstonedQueues(ctx context.Context) error { // its own per-queue budget. Once every prefix the cohort can occupy // is empty, the tombstone itself is deleted; otherwise it stays so // the next tick can finish what was left. -func (s *SQSServer) reapTombstonedGeneration(ctx context.Context, queueName string, gen uint64, tombstoneKey []byte, readTS uint64) { +// +// partitionCount drives partition-iterative cleanup: 1 (legacy / +// non-partitioned queue, or pre-PR-6a tombstone whose value +// decoded to the default) takes the byte-identical legacy path — +// one byage / dedup / group sweep — and leaves the partitioned +// keyspace untouched. Greater than 1 ALSO sweeps the partitioned +// byage / dedup / group prefix family for each partition in +// [0, partitionCount), which is the §6 "partitions × budget" +// reaper contract from the split-queue-FIFO design. +func (s *SQSServer) reapTombstonedGeneration(ctx context.Context, queueName string, gen uint64, partitionCount uint32, tombstoneKey []byte, readTS uint64) { + // Legacy keyspace is always swept — covers all pre-HT-FIFO + // queues plus any partitioned queue that briefly carried legacy + // records (defensive: data is nominally never written to the + // legacy keyspace for partitioned queues, but the sweep is + // idempotent and cheap). dataDone, err := s.reapDeadByAge(ctx, queueName, gen, readTS) if err != nil { slog.Warn("sqs tombstone byage reap failed", "queue", queueName, "gen", gen, "err", err) @@ -156,11 +175,59 @@ func (s *SQSServer) reapTombstonedGeneration(ctx context.Context, queueName stri slog.Warn("sqs tombstone group reap failed", "queue", queueName, "gen", gen, "err", err) return } - if dataDone && dedupDone && groupDone { + allDone := dataDone && dedupDone && groupDone + // Partitioned sweep: one (byage, dedup, group) triple per + // partition. Each triple shares the per-queue budget with the + // legacy sweep, so a wide-fanout queue may need multiple reaper + // ticks to fully drain — same contract as the live-queue reap. + if partitionCount > 1 { + partDone, err := s.reapPartitionedGeneration(ctx, queueName, gen, partitionCount, readTS) + if err != nil { + slog.Warn("sqs tombstone partitioned reap failed", + "queue", queueName, "gen", gen, "partitionCount", partitionCount, "err", err) + return + } + allDone = allDone && partDone + } + if allDone { _ = s.dispatchDedupDelete(ctx, tombstoneKey, readTS) } } +// reapPartitionedGeneration sweeps the partitioned byage, dedup, +// and group prefix family for every partition of one tombstoned +// (queue, gen) cohort. Returns done=true only when EVERY partition +// AND every prefix family is fully drained — short-circuiting on +// the first unfinished partition would leave the tombstone in +// place but skip later partitions on this tick, starving them +// under churn. +func (s *SQSServer) reapPartitionedGeneration(ctx context.Context, queueName string, gen uint64, partitionCount uint32, readTS uint64) (bool, error) { + allDone := true + for partition := uint32(0); partition < partitionCount; partition++ { + if err := ctx.Err(); err != nil { + return false, errors.WithStack(err) + } + byageDone, err := s.reapDeadByAgePartition(ctx, queueName, gen, partition, readTS) + if err != nil { + return false, err + } + dedupDone, err := s.deleteAllPrefix(ctx, + sqsPartitionedMsgDedupKeyPrefix(queueName, partition, gen), readTS) + if err != nil { + return false, err + } + groupDone, err := s.deleteAllPrefix(ctx, + sqsPartitionedMsgGroupKeyPrefix(queueName, partition, gen), readTS) + if err != nil { + return false, err + } + if !byageDone || !dedupDone || !groupDone { + allDone = false + } + } + return allDone, nil +} + // reapDeadByAge walks the byage prefix for one (queue, gen) cohort // and reaps each record found, regardless of retention age — every // row under a tombstoned generation is by definition orphaned. @@ -203,7 +270,73 @@ func (s *SQSServer) reapDeadByAgePage(ctx context.Context, queueName string, gen if !ok || parsed.Generation != gen { continue } - if err := s.reapOneRecord(ctx, queueName, gen, kvp.Key, parsed.MessageID, readTS); err != nil { + // Legacy byage path: nil meta + partition 0 keeps the + // dispatch helpers on the legacy constructors (byte- + // identical to the pre-PR-5b reaper). The partitioned + // twin (reapDeadByAgePartitionPage) takes the meta-aware + // branch via reapOneRecordPartitioned. + if err := s.reapOneRecord(ctx, queueName, nil, 0, gen, kvp.Key, parsed.MessageID, readTS); err != nil { + return true, processed, err + } + processed++ + if processed >= sqsReaperPerQueueBudget { + return true, processed, nil + } + } + if len(page) < sqsReaperPageLimit { + return true, processed, nil + } + return false, processed, nil +} + +// reapDeadByAgePartition is the partitioned-keyspace twin of +// reapDeadByAge. Each iteration scans one partition's byage prefix +// for one (queue, gen) cohort, parses the partitioned byage key, +// and dispatches the (data, vis, byage, optional group-lock) +// quartet delete for the message. Threads partition through +// reapOneRecord so the dispatch helpers route to the partitioned +// data / vis keys, not the legacy ones. +func (s *SQSServer) reapDeadByAgePartition(ctx context.Context, queueName string, gen uint64, partition uint32, readTS uint64) (bool, error) { + prefix := sqsPartitionedMsgByAgePrefixForPartition(queueName, partition, gen) + upper := prefixScanEnd(prefix) + start := bytes.Clone(prefix) + processed := 0 + for processed < sqsReaperPerQueueBudget { + page, err := s.store.ScanAt(ctx, start, upper, sqsReaperPageLimit, readTS) + if err != nil { + return false, errors.WithStack(err) + } + if len(page) == 0 { + return true, nil + } + done, newProcessed, err := s.reapDeadByAgePartitionPage(ctx, queueName, gen, partition, page, readTS, processed) + if err != nil { + return false, err + } + processed = newProcessed + if done { + return processed < sqsReaperPerQueueBudget, nil + } + start = nextScanCursorAfter(page[len(page)-1].Key) + } + return false, nil +} + +// reapDeadByAgePartitionPage is the partitioned twin of +// reapDeadByAgePage. Parses each entry as a partitioned byage key +// (verifying the partition matches — defensive against page +// boundaries that span partitions, which the prefix scan should +// already prevent) and feeds the partition-aware reapOneRecord. +func (s *SQSServer) reapDeadByAgePartitionPage(ctx context.Context, queueName string, gen uint64, partition uint32, page []*store.KVPair, readTS uint64, processed int) (bool, int, error) { + for _, kvp := range page { + if err := ctx.Err(); err != nil { + return true, processed, errors.WithStack(err) + } + parsed, ok := parseSqsPartitionedMsgByAgeKey(kvp.Key, queueName) + if !ok || parsed.Partition != partition || parsed.Generation != gen { + continue + } + if err := s.reapOneRecordPartitioned(ctx, queueName, partition, gen, kvp.Key, parsed.MessageID, readTS); err != nil { return true, processed, err } processed++ @@ -351,7 +484,11 @@ func (s *SQSServer) reapPage(ctx context.Context, queueName string, currentGen u // see meta caught up. continue } - if err := s.reapOneRecord(ctx, queueName, parsed.Generation, kvp.Key, parsed.MessageID, readTS); err != nil { + // Live-queue retention reap currently iterates only the + // legacy byage keyspace; partitioned-byage live-queue + // retention is a follow-up to PR 6a (the tombstoned-cohort + // path is what this PR addresses). + if err := s.reapOneRecord(ctx, queueName, nil, 0, parsed.Generation, kvp.Key, parsed.MessageID, readTS); err != nil { return true, processed, err } processed++ @@ -369,14 +506,15 @@ func (s *SQSServer) reapPage(ctx context.Context, queueName string, currentGen u // quartet under a single OCC dispatch. ErrWriteConflict is treated as // success — the message has just been touched (received, deleted, // redriven) by another path and is no longer ours to reap. -func (s *SQSServer) reapOneRecord(ctx context.Context, queueName string, gen uint64, byAgeKey []byte, messageID string, readTS uint64) error { - // Reaper iterates legacy byAge entries only in PR 5b-2; the - // partitioned-byAge enumeration ships in a later PR. nil meta - // + partition 0 routes the dispatch helper to the legacy - // constructor so the data-key matches the pre-PR-5b layout - // byte-for-byte. - const partition uint32 = 0 - dataKey := sqsMsgDataKeyDispatch(nil, queueName, partition, gen, messageID) +// +// Legacy reaper callers pass nil meta + partition 0 so the dispatch +// helpers route to the legacy constructors (byte-identical to the +// pre-PR-5b layout). Partitioned reaper callers (PR 6a) pass a +// synthetic *sqsQueueMeta carrying the tombstone-encoded +// PartitionCount so the dispatch helpers route to the partitioned +// constructors. +func (s *SQSServer) reapOneRecord(ctx context.Context, queueName string, meta *sqsQueueMeta, partition uint32, gen uint64, byAgeKey []byte, messageID string, readTS uint64) error { + dataKey := sqsMsgDataKeyDispatch(meta, queueName, partition, gen, messageID) parsed, found, err := s.loadDataForReaper(ctx, dataKey, readTS) if err != nil { return err @@ -388,7 +526,7 @@ func (s *SQSServer) reapOneRecord(ctx context.Context, queueName string, gen uin s.dispatchOrphanByAgeDrop(ctx, byAgeKey, readTS) return nil } - req, err := s.buildReapOps(ctx, queueName, gen, byAgeKey, dataKey, parsed, readTS) + req, err := s.buildReapOps(ctx, queueName, meta, partition, gen, byAgeKey, dataKey, parsed, readTS) if err != nil { return err } @@ -401,6 +539,21 @@ func (s *SQSServer) reapOneRecord(ctx context.Context, queueName string, gen uin return nil } +// reapOneRecordPartitioned is a thin convenience wrapper around +// reapOneRecord for the partitioned-byage enumeration: synthesises +// a meta carrying the tombstone-encoded PartitionCount so the +// dispatch helpers route to the partitioned key family. Existing +// fields on the synthetic meta beyond PartitionCount are +// irrelevant — the dispatch helpers only branch on meta.PartitionCount. +func (s *SQSServer) reapOneRecordPartitioned(ctx context.Context, queueName string, partition uint32, gen uint64, byAgeKey []byte, messageID string, readTS uint64) error { + // PartitionCount > 1 forces the dispatch helpers down the + // partitioned branch; the actual count value is read by + // effectivePartitionCount on the read path but the reaper's + // per-key dispatch only needs the legacy/partitioned bit. + syntheticMeta := &sqsQueueMeta{PartitionCount: htfifoMaxPartitions} + return s.reapOneRecord(ctx, queueName, syntheticMeta, partition, gen, byAgeKey, messageID, readTS) +} + // loadDataForReaper fetches and decodes the data record for a byage // entry. found=false signals "byage points at a missing record — drop // the byage entry" to the caller. Read errors other than ErrKeyNotFound @@ -518,16 +671,13 @@ func (s *SQSServer) dispatchDedupDelete(ctx context.Context, key []byte, readTS return nil } -func (s *SQSServer) buildReapOps(ctx context.Context, queueName string, gen uint64, byAgeKey, dataKey []byte, parsed *sqsMessageRecord, readTS uint64) (*kv.OperationGroup[kv.OP], error) { - // Reaper currently iterates the legacy byAge keyspace only — the - // partitioned-byAge enumeration is wired in a later PR (Phase 3.D - // PR 6, partition-iterating reaper). Dispatch helpers receive - // nil meta + partition 0 so they deterministically route to the - // legacy constructor and produce byte-identical keys to the - // pre-PR-5b reaper. When PR 6 lands the caller switches to the - // real meta + parsed partition. - const partition uint32 = 0 - visKey := sqsMsgVisKeyDispatch(nil, queueName, partition, gen, parsed.VisibleAtMillis, parsed.MessageID) +func (s *SQSServer) buildReapOps(ctx context.Context, queueName string, meta *sqsQueueMeta, partition uint32, gen uint64, byAgeKey, dataKey []byte, parsed *sqsMessageRecord, readTS uint64) (*kv.OperationGroup[kv.OP], error) { + // meta + partition route the dispatch helpers to the right key + // family: nil meta + partition 0 is the legacy reaper path + // (byte-identical to pre-PR-5b layout); a synthetic meta with + // PartitionCount>1 + a real partition is the partitioned reaper + // path landed in PR 6a. + visKey := sqsMsgVisKeyDispatch(meta, queueName, partition, gen, parsed.VisibleAtMillis, parsed.MessageID) readKeys := [][]byte{byAgeKey, dataKey, visKey, sqsQueueMetaKey(queueName), sqsQueueGenKey(queueName)} elems := []*kv.Elem[kv.OP]{ {Op: kv.Del, Key: byAgeKey}, @@ -535,8 +685,8 @@ func (s *SQSServer) buildReapOps(ctx context.Context, queueName string, gen uint {Op: kv.Del, Key: visKey}, } if parsed.MessageGroupId != "" { - lockKey := sqsMsgGroupKeyDispatch(nil, queueName, partition, gen, parsed.MessageGroupId) - lock, err := s.loadFifoGroupLock(ctx, queueName, nil, partition, gen, parsed.MessageGroupId, readTS) + lockKey := sqsMsgGroupKeyDispatch(meta, queueName, partition, gen, parsed.MessageGroupId) + lock, err := s.loadFifoGroupLock(ctx, queueName, meta, partition, gen, parsed.MessageGroupId, readTS) if err != nil { return nil, err } diff --git a/adapter/sqs_tombstone_value_test.go b/adapter/sqs_tombstone_value_test.go new file mode 100644 index 000000000..d07ffb39b --- /dev/null +++ b/adapter/sqs_tombstone_value_test.go @@ -0,0 +1,89 @@ +package adapter + +import ( + "encoding/binary" + "testing" + + "github.com/stretchr/testify/require" +) + +// TestEncodeQueueTombstoneValue_LegacyByteIdenticalForUnpartitioned +// pins that pre-PR-6a tombstone values are byte-identical for +// PartitionCount==0 and PartitionCount==1: a rolling upgrade that +// flips a node from pre-PR-6a to PR 6a sees no on-disk change for +// existing non-partitioned queues. Without this, every DeleteQueue +// during the rollout window would emit a new tombstone value shape +// that the pre-PR-6a binary doesn't expect (it ignores the value +// today, but a future encoding-aware check could trip on it). +func TestEncodeQueueTombstoneValue_LegacyByteIdenticalForUnpartitioned(t *testing.T) { + t.Parallel() + require.Equal(t, []byte{1}, encodeQueueTombstoneValue(0), + "PartitionCount=0 must encode to the legacy single-byte sentinel") + require.Equal(t, []byte{1}, encodeQueueTombstoneValue(1), + "PartitionCount=1 must encode to the legacy single-byte sentinel") +} + +// TestEncodeQueueTombstoneValue_PartitionedEncodesUint64BE pins the +// new encoding for PartitionCount > 1: an 8-byte big-endian uint64 +// the reaper can safely Decode. Roundtrips through +// decodeQueueTombstoneValue. +func TestEncodeQueueTombstoneValue_PartitionedEncodesUint64BE(t *testing.T) { + t.Parallel() + for _, pc := range []uint32{2, 4, 8, 16, 32} { + got := encodeQueueTombstoneValue(pc) + require.Len(t, got, 8, "PartitionCount=%d must encode to 8 bytes", pc) + require.Equal(t, uint64(pc), binary.BigEndian.Uint64(got), + "PartitionCount=%d must encode big-endian", pc) + require.Equal(t, pc, decodeQueueTombstoneValue(got), + "encode/decode round-trip must preserve PartitionCount") + } +} + +// TestDecodeQueueTombstoneValue_LegacyValuesFallBackToOne pins the +// fail-safe for any value the reaper observes that doesn't match +// the canonical 8-byte shape: empty value (theoretically impossible +// since tombstones always have a non-empty marker), the legacy +// []byte{1} sentinel from pre-PR-6a writers, and any other length +// (a future encoding revision that this binary doesn't recognise) +// all degrade to PartitionCount=1 — the legacy reaper path. Without +// this, a partial rollback or mid-flight encoding change would +// silently flip the reaper to scan zero partitions and leak the +// queue's data. +func TestDecodeQueueTombstoneValue_LegacyValuesFallBackToOne(t *testing.T) { + t.Parallel() + require.Equal(t, uint32(1), decodeQueueTombstoneValue(nil), + "nil value must fall back to PartitionCount=1") + require.Equal(t, uint32(1), decodeQueueTombstoneValue([]byte{}), + "empty value must fall back to PartitionCount=1") + require.Equal(t, uint32(1), decodeQueueTombstoneValue([]byte{1}), + "legacy single-byte sentinel must decode to PartitionCount=1") + require.Equal(t, uint32(1), decodeQueueTombstoneValue([]byte{0, 0, 0, 1}), + "unrecognised 4-byte value must fall back to PartitionCount=1") +} + +// TestDecodeQueueTombstoneValue_OutOfRangeFallsBackToOne pins the +// defensive clamp on canonical 8-byte values whose decoded count is +// outside the [1, htfifoMaxPartitions] range. PartitionCount=0 is +// canonical for "no HT-FIFO" on the read path (effectivePartitionCount +// collapses 0→1) so the reaper treats it the same way; values above +// htfifoMaxPartitions are a corruption / future-format signal — the +// reaper can't know how to iterate them safely, so it falls back to +// the legacy single-partition sweep and surfaces the corruption to +// the operator via the slow leak (better than the alternative of +// iterating bogus partitions). +func TestDecodeQueueTombstoneValue_OutOfRangeFallsBackToOne(t *testing.T) { + t.Parallel() + zero := make([]byte, 8) + require.Equal(t, uint32(1), decodeQueueTombstoneValue(zero), + "canonical encoding of 0 must decode to PartitionCount=1 (matches effectivePartitionCount)") + + tooBig := make([]byte, 8) + binary.BigEndian.PutUint64(tooBig, uint64(htfifoMaxPartitions)+1) + require.Equal(t, uint32(1), decodeQueueTombstoneValue(tooBig), + "PartitionCount above htfifoMaxPartitions must fall back to 1") + + maxOK := make([]byte, 8) + binary.BigEndian.PutUint64(maxOK, uint64(htfifoMaxPartitions)) + require.Equal(t, htfifoMaxPartitions, decodeQueueTombstoneValue(maxOK), + "PartitionCount=htfifoMaxPartitions is in-range and must decode unchanged") +} From 59dd72438ae4c1cba559af3d25602acb157c5dbf Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Mon, 4 May 2026 22:50:51 +0900 Subject: [PATCH 2/2] sqs(reaper,keys): polish per Claude review (PR #735, round 1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses Claude's 4 review items on the round-0 commit. None behaviour-changing; all narrow code-quality / clarity fixes. 1. Remove the //nolint:gosec on the uint64→uint32 narrow in decodeQueueTombstoneValue (Claude Issue 1). CLAUDE.md is explicit: "Avoid //nolint — refactor instead." Refactored to read the 8-byte tombstone value as two big-endian uint32s instead of a single uint64, then check the high half is zero (must be — encoder only writes PartitionCount<=htfifoMaxPartitions into the low 32 bits) and the low half is in [1, htfifoMaxPartitions]. No narrowing conversion → no gosec flag → no nolint annotation. 2. reapOneRecordPartitioned's synthetic meta uses PartitionCount=2 instead of htfifoMaxPartitions (=32) as the dispatch sentinel (Claude Issue 2). The dispatch helpers only branch on PartitionCount>1; using the maximum implies the synthetic meta carries the queue's real count, which it does not. A future caller that read meta.PartitionCount for any other purpose would silently get the wrong answer. 3. Stale comment fix in countPartitionedRows test helper (Claude Issue 3): "[0, 4)" → "[0, 8)" to match the actual loop bound and the test queue's PartitionCount=8. 4. New sqsTombstoneValueLen=8 constant for the tombstone value size (Claude Issue 4). Was reusing sqsGenerationSuffixLen which is a key-layout constant — incidentally also 8 today but conceptually unrelated. Dedicated constant makes the intent self-documenting and decouples the two: a future tombstone-value encoding revision can change the value size without touching key parsers. Below threshold, intentionally not addressed: - Gemini medium "shared budget" comment vs. per-partition budget implementation (sqs_reaper.go:182). The PR description explicitly states "O((partition_count + 1) × legacy prefix scans)" — the per-partition budget IS the design. Comment was already correct in describing the intent. This is a wording nit at most; not P0/P1/high/critical/major. - Gemini medium "use generic Dispatch instead of dispatchDedupDelete for tombstone deletion" (sqs_reaper.go:193). Style suggestion; the current call works correctly. Below threshold. Verified: build clean, golangci-lint 0 issues, targeted test sweep (-race) all green. --- adapter/sqs_keys.go | 33 ++++++++++++++++++------ adapter/sqs_partitioned_dispatch_test.go | 3 ++- adapter/sqs_reaper.go | 19 +++++++------- 3 files changed, 37 insertions(+), 18 deletions(-) diff --git a/adapter/sqs_keys.go b/adapter/sqs_keys.go index 830487981..8614fe668 100644 --- a/adapter/sqs_keys.go +++ b/adapter/sqs_keys.go @@ -133,6 +133,15 @@ func sqsQueueTombstoneKey(queueName string, gen uint64) []byte { return buf } +// sqsTombstoneValueLen is the canonical length of the tombstone +// value emitted by encodeQueueTombstoneValue when PartitionCount +// > 1. Kept distinct from sqsGenerationSuffixLen (which describes +// a key-layout constant) even though they both happen to equal 8 +// today — Claude review on PR #735 flagged the borrow as +// confusing. A future encoding revision (e.g. a length-prefixed +// version field) can change this without touching key parsers. +const sqsTombstoneValueLen = 8 + // encodeQueueTombstoneValue packs a queue's PartitionCount into the // tombstone value so the reaper can drive partition iteration off // the tombstone alone (the meta record is gone by the time the @@ -152,7 +161,7 @@ func encodeQueueTombstoneValue(partitionCount uint32) []byte { // shapes identically. return []byte{1} } - out := make([]byte, sqsGenerationSuffixLen) + out := make([]byte, sqsTombstoneValueLen) binary.BigEndian.PutUint64(out, uint64(partitionCount)) return out } @@ -160,23 +169,31 @@ func encodeQueueTombstoneValue(partitionCount uint32) []byte { // decodeQueueTombstoneValue extracts the PartitionCount written by // encodeQueueTombstoneValue. Returns 1 for legacy values (the // single-byte sentinel []byte{1}, the empty value, or any other -// non-8-byte payload) so a binary that has never written a +// non-canonical payload) so a binary that has never written a // partitioned tombstone safely degrades to legacy reaper behaviour. // A canonical 8-byte value with PartitionCount==0 is also clamped // to 1 because partitionFor / effectivePartitionCount already // collapse 0 to 1 on the read side; uniform clamping keeps the // reaper's loop bound consistent. +// +// Reads the value as two big-endian uint32s rather than a single +// uint64 so the function never narrows uint64→uint32 — gosec's +// G115 flags every uint64→uint32 conversion regardless of bound, +// and CLAUDE.md forbids //nolint annotations. The high half must +// be zero because the encoder only ever writes +// PartitionCount<=htfifoMaxPartitions (=32) into the low 32 bits; +// a non-zero high half is therefore a corruption / future-format +// signal and decodes to PartitionCount=1. func decodeQueueTombstoneValue(value []byte) uint32 { - if len(value) != sqsGenerationSuffixLen { + if len(value) != sqsTombstoneValueLen { return 1 } - pc := binary.BigEndian.Uint64(value) - if pc == 0 || pc > uint64(htfifoMaxPartitions) { + highHalf := binary.BigEndian.Uint32(value[:4]) + lowHalf := binary.BigEndian.Uint32(value[4:]) + if highHalf != 0 || lowHalf == 0 || lowHalf > htfifoMaxPartitions { return 1 } - // Bounded by htfifoMaxPartitions (=32) immediately above, so - // the uint32 narrow cannot overflow. - return uint32(pc) //nolint:gosec // bounded by htfifoMaxPartitions + return lowHalf } // sqsGenerationSuffixLen is the byte length of the trailing big-endian diff --git a/adapter/sqs_partitioned_dispatch_test.go b/adapter/sqs_partitioned_dispatch_test.go index 182239279..abb031069 100644 --- a/adapter/sqs_partitioned_dispatch_test.go +++ b/adapter/sqs_partitioned_dispatch_test.go @@ -630,7 +630,8 @@ func countPartitionedRows(t *testing.T, node Node, queueName string, gen uint64) // Match every partition by using the family-level "all // partitions" prefix where one exists; for dedup / group // the family-level prefix isn't pre-built, so iterate - // partitions [0, 4) explicitly. + // partitions [0, 8) explicitly to match the test queue's + // PartitionCount=8. sqsPartitionedMsgByAgePrefixForQueueAllPartitions(queueName), } for partition := uint32(0); partition < 8; partition++ { diff --git a/adapter/sqs_reaper.go b/adapter/sqs_reaper.go index 9e509f061..5a3f743a4 100644 --- a/adapter/sqs_reaper.go +++ b/adapter/sqs_reaper.go @@ -541,16 +541,17 @@ func (s *SQSServer) reapOneRecord(ctx context.Context, queueName string, meta *s // reapOneRecordPartitioned is a thin convenience wrapper around // reapOneRecord for the partitioned-byage enumeration: synthesises -// a meta carrying the tombstone-encoded PartitionCount so the -// dispatch helpers route to the partitioned key family. Existing -// fields on the synthetic meta beyond PartitionCount are -// irrelevant — the dispatch helpers only branch on meta.PartitionCount. +// a meta carrying any value of PartitionCount > 1 so the dispatch +// helpers route to the partitioned key family. The exact value is +// not consulted by the reaper's per-key dispatch path — the +// helpers only branch on the legacy-vs-partitioned bit +// (PartitionCount > 1) — so we use the minimum legal partitioned +// value as a sentinel rather than the queue's real count, which +// would imply the synthetic meta carries information it actually +// does not (Claude review on PR #735). func (s *SQSServer) reapOneRecordPartitioned(ctx context.Context, queueName string, partition uint32, gen uint64, byAgeKey []byte, messageID string, readTS uint64) error { - // PartitionCount > 1 forces the dispatch helpers down the - // partitioned branch; the actual count value is read by - // effectivePartitionCount on the read path but the reaper's - // per-key dispatch only needs the legacy/partitioned bit. - syntheticMeta := &sqsQueueMeta{PartitionCount: htfifoMaxPartitions} + const partitionedDispatchSentinel uint32 = 2 + syntheticMeta := &sqsQueueMeta{PartitionCount: partitionedDispatchSentinel} return s.reapOneRecord(ctx, queueName, syntheticMeta, partition, gen, byAgeKey, messageID, readTS) }