Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 192 additions & 0 deletions adapter/sqs_partitioned_dispatch_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package adapter

import (
"bytes"
"context"
"fmt"
"net/http"
Expand Down Expand Up @@ -648,3 +649,194 @@ func countPartitionedRows(t *testing.T, node Node, queueName string, gen uint64)
}
return total
}

// TestSQSServer_PartitionedFIFO_LiveQueueDedupReaperPartitions
// pins the PR 6b contract for live queues: reapExpiredDedup must
// walk the partitioned dedup keyspace too, otherwise expired
// dedup records on partitioned FIFO queues leak forever (the
// pre-PR-6b reaper only scanned SqsMsgDedupPrefix, which is empty
// for partitioned queues). Closes the live-queue half of the
// Codex P2 deferred from PR 5b-2 round 0.
func TestSQSServer_PartitionedFIFO_LiveQueueDedupReaperPartitions(t *testing.T) {
t.Parallel()
nodes, _, _ := createNode(t, 1)
defer shutdown(nodes)
node := sqsLeaderNode(t, nodes)

const queueName = "live-dedup-reap.fifo"
status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{
"QueueName": queueName,
"Attributes": map[string]string{"FifoQueue": "true"},
})
require.Equal(t, http.StatusOK, status, "create queue: %v", out)
queueURL, _ := out["QueueUrl"].(string)
require.NotEmpty(t, queueURL)

const partitions uint32 = 4
installPartitionedMetaForTest(t, node, queueName, partitions, htfifoThroughputPerMessageGroupID)

// Send across distinct groups so dedup records land on more
// than one partition; partitionFor (FNV-1a) gives reasonable
// spread for these inputs.
groups := []string{"alpha", "beta", "gamma", "delta", "epsilon", "zeta"}
for _, g := range groups {
status, out := callSQS(t, node, sqsSendMessageTarget, map[string]any{
"QueueUrl": queueURL,
"MessageBody": "body-" + g,
"MessageGroupId": g,
"MessageDeduplicationId": "dedup-" + g,
})
require.Equal(t, http.StatusOK, status, "send (group=%s): %v", g, out)
}

srv := node.sqsServer
ctx := t.Context()

// Sanity: at least one partitioned dedup row exists pre-reap.
preReap := countPartitionedDedupRowsAcrossPartitions(t, node, queueName, partitions)
require.Positive(t, preReap, "expected partitioned dedup rows after sends")

// Backdate every partitioned dedup record's ExpiresAtMillis
// so the reaper's value-based expiry filter trips on every
// row. Same approach as TestSQSServer_RetentionReaperDropsExpiredFifoDedup
// for the legacy keyspace; here applied per-partition prefix.
readTS := srv.nextTxnReadTS(ctx)
now := time.Now().UnixMilli()
for partition := uint32(0); partition < partitions; partition++ {
prefix := sqsPartitionedMsgDedupKeyPrefix(queueName, partition, 1)
rows, err := srv.store.ScanAt(ctx, prefix, prefixScanEnd(prefix), 1024, readTS)
require.NoError(t, err)
for _, row := range rows {
rec, err := decodeFifoDedupRecord(row.Value)
require.NoError(t, err)
rec.ExpiresAtMillis = now - 1000
body, err := encodeFifoDedupRecord(rec)
require.NoError(t, err)
req := &kv.OperationGroup[kv.OP]{
IsTxn: true,
StartTS: readTS,
Elems: []*kv.Elem[kv.OP]{
{Op: kv.Put, Key: bytes.Clone(row.Key), Value: body},
},
}
_, err = srv.coordinator.Dispatch(ctx, req)
require.NoError(t, err)
}
}

// Drive one reaper pass — the live-queue dedup walk now
// dispatches reapExpiredDedupPartitioned for partitioned
// queues (PR 6b).
require.NoError(t, srv.reapAllQueues(ctx), "reapAllQueues")

postReap := countPartitionedDedupRowsAcrossPartitions(t, node, queueName, partitions)
require.Zero(t, postReap,
"expired partitioned dedup records must be reaped (got %d remaining)",
postReap)
}

// countPartitionedDedupRowsAcrossPartitions sums the rows across
// every partition's partitioned dedup prefix at gen=1 (the
// generation a freshly-created queue lands on). Test-helper
// twin of countPartitionedRows scoped to dedup.
func countPartitionedDedupRowsAcrossPartitions(t *testing.T, node Node, queueName string, partitions uint32) int {
t.Helper()
ctx := context.Background()
readTS := node.sqsServer.nextTxnReadTS(ctx)
total := 0
for partition := uint32(0); partition < partitions; partition++ {
prefix := sqsPartitionedMsgDedupKeyPrefix(queueName, partition, 1)
rows, err := node.sqsServer.store.ScanAt(ctx, prefix, prefixScanEnd(prefix), 1024, readTS)
require.NoError(t, err)
total += len(rows)
}
return total
}

// TestClassifyPartitionedByAgeEntry pins every branch of the
// classify helper that decides which partitioned byage rows the
// live-queue reaper sweeps. The integration tests above cover
// the orphan-gen path (tombstoned cohort) and the dedup-expiry
// path; this unit test fills in the two live-queue paths that
// have no integration coverage today: the retention-cutoff branch
// (live gen, sendTs <= cutoff → reapable) and the future-gen
// guard (parsed.gen > currentGen → not reapable, defensive
// against a meta-vs-byage race).
func TestClassifyPartitionedByAgeEntry(t *testing.T) {
t.Parallel()
const (
queueName = "live-byage.fifo"
partition = uint32(2)
currentGen = uint64(5)
cutoff = int64(1_000_000)
messageID = "msg-classify"
)

type want struct {
reapable bool
expectedGen uint64
expectedPartn uint32
expectedSendTs int64
expectedMsgID string
assertParsedSet bool // expectedGen / Partition / SendTs / MsgID checked only when true
}
cases := []struct {
name string
key []byte
want want
}{
{
name: "live gen within retention window — not reapable",
key: sqsPartitionedMsgByAgeKey(queueName, partition, currentGen, cutoff+1, messageID),
want: want{reapable: false, expectedGen: currentGen, expectedPartn: partition, expectedSendTs: cutoff + 1, expectedMsgID: messageID, assertParsedSet: true},
},
{
name: "live gen past retention window — reapable (cutoff branch)",
key: sqsPartitionedMsgByAgeKey(queueName, partition, currentGen, cutoff-1, messageID),
want: want{reapable: true, expectedGen: currentGen, expectedPartn: partition, expectedSendTs: cutoff - 1, expectedMsgID: messageID, assertParsedSet: true},
},
{
name: "live gen exactly at cutoff — reapable (>, not >=)",
key: sqsPartitionedMsgByAgeKey(queueName, partition, currentGen, cutoff, messageID),
want: want{reapable: true, expectedGen: currentGen, expectedPartn: partition, expectedSendTs: cutoff, expectedMsgID: messageID, assertParsedSet: true},
},
{
name: "orphan generation (gen < currentGen) — reapable unconditionally",
key: sqsPartitionedMsgByAgeKey(queueName, partition, currentGen-1, cutoff+1_000_000, messageID),
want: want{reapable: true, expectedGen: currentGen - 1, expectedPartn: partition, expectedSendTs: cutoff + 1_000_000, expectedMsgID: messageID, assertParsedSet: true},
},
{
name: "future generation (gen > currentGen) — not reapable (gen-race guard)",
key: sqsPartitionedMsgByAgeKey(queueName, partition, currentGen+1, cutoff-1, messageID),
want: want{reapable: false, expectedGen: currentGen + 1, expectedPartn: partition, expectedSendTs: cutoff - 1, expectedMsgID: messageID, assertParsedSet: true},
},
{
name: "wrong partition — not reapable (page bleed defense)",
key: sqsPartitionedMsgByAgeKey(queueName, partition+1, currentGen, cutoff-1, messageID),
want: want{reapable: false, assertParsedSet: false},
},
{
name: "wrong queue prefix — not reapable (parse fails)",
key: sqsPartitionedMsgByAgeKey("different.fifo", partition, currentGen, cutoff-1, messageID),
want: want{reapable: false, assertParsedSet: false},
},
{
name: "legacy byage key — not reapable (parse fails)",
key: sqsMsgByAgeKey(queueName, currentGen, cutoff-1, messageID),
want: want{reapable: false, assertParsedSet: false},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
parsed, reapable := classifyPartitionedByAgeEntry(tc.key, queueName, partition, currentGen, cutoff)
require.Equal(t, tc.want.reapable, reapable, "reapable")
if tc.want.assertParsedSet {
require.Equal(t, tc.want.expectedGen, parsed.Generation, "Generation")
require.Equal(t, tc.want.expectedPartn, parsed.Partition, "Partition")
require.Equal(t, tc.want.expectedSendTs, parsed.SendTimestampMs, "SendTimestampMs")
require.Equal(t, tc.want.expectedMsgID, parsed.MessageID, "MessageID")
}
})
}
}
Loading
Loading