From df11d2b9a4e8de1c2d8efe6d67a7ff7dcb6b0177 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Mon, 4 May 2026 17:02:27 +0900 Subject: [PATCH 1/4] feat(sqs): atomic dormancy gate-lift + cluster-wide htfifo capability gate (Phase 3.D PR 5b-3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The §11 PR 2 dormancy gate (PartitionCount > 1 hard-rejected at CreateQueue) was a placeholder while the data plane caught up. The data plane is now in place (PR #731 + #732), so this PR atomically lifts the dormancy gate and replaces it with the §8.5 capability gate that polls every cluster peer's /sqs_health for the htfifo capability. What changes: - Remove validatePartitionDormancyGate and the htfifoTemporaryGateMessage constant from sqs_partitioning.go. Both were marked "Removed in PR 5 in the same commit that wires the data plane so the gate-and-lift land atomically" — that PR is this one. - Add (*SQSServer).validateHTFIFOCapability in adapter/sqs_capability_gate.go, called from createQueueCore. Two-stage fail-closed check on PartitionCount > 1: 1. Local: this binary must advertise htfifo (htfifoCapabilityAdvertised). Refuses the create with InvalidAttributeValue if not. 2. Peers: every entry in s.leaderSQS must report htfifo via /sqs_health within the poller's per-peer timeout. Any timeout, HTTP error, malformed body, or missing capability blocks the create. Vacuous on PartitionCount <= 1 and on empty leaderSQS (single- node cluster — the local check is the whole cluster). - collectSQSPeers helper returns leaderSQS values in deterministic sorted order with empty/duplicate addresses filtered, so the poller and operator-facing error messages never depend on Go map iteration order. - buildHTFIFOCapabilityRejection composes the rejection message with each failing peer's address + reason (per-peer Error or "missing capability") so an operator triaging a partial-rolling- upgrade cluster does not need to re-run the poll out-of-band. CreateQueue control flow on PartitionCount > 1: schema validators (validatePartitionConfig, etc.) → validateHTFIFOCapability → htfifoCapabilityAdvertised check (local) → PollSQSHTFIFOCapability(ctx, collectSQSPeers(), …) → reject with InvalidAttributeValue on any failure → createQueueWithRetry Caller audit: validateHTFIFOCapability has exactly one production caller (createQueueCore in sqs_catalog.go); both the JSON handler and the future query-protocol handler reach it through that one path. SetQueueAttributes is unaffected because PartitionCount is immutable post-create (validatePartitionImmutability). Test changes: - Delete TestValidatePartitionDormancyGate_RejectsAboveOne (the function it tested is gone). - Convert TestSQSServer_HTFIFO_DormancyGate_RejectsPartitionedCreate into TestSQSServer_HTFIFO_CapabilityGate_AcceptsOnSingleNode — the same wire payloads now SUCCEED because the local node advertises htfifo and there are no peers to poll. Renamed TestSQSServer_HTFIFO_DormancyGate_AllowsPartitionCountOne → TestSQSServer_HTFIFO_CapabilityGate_AllowsPartitionCountOne for consistency. - Update comments on TestSQSServer_HTFIFO_RejectsQueueScopedDedupOnPartitioned, TestSQSServer_HTFIFO_RejectsNonPowerOfTwoPartitionCount, TestSQSServer_HTFIFO_ImmutabilitySetQueueAttributesRejects, mustCreateFIFOWithThroughputLimit, and the installPartitionedMetaForTest helper to describe the new capability-gate world. New unit tests in sqs_capability_gate_test.go: - TestValidateHTFIFOCapability_ShortCircuitsOnLegacyMeta: PartitionCount in {0, 1} skips the poll entirely (proven by wiring a peer that would FAIL the gate and verifying the short-circuit path bypasses it). - TestValidateHTFIFOCapability_AcceptsWhenAllPeersAdvertise: happy path with two fake peers. - TestValidateHTFIFOCapability_AcceptsOnEmptyPeerList: vacuous case (single-node cluster). - TestValidateHTFIFOCapability_RejectsWhenOnePeerLacksCapability: rolling-upgrade fail-closed; offending peer's address surfaces in the InvalidAttributeValue message. - TestValidateHTFIFOCapability_RejectsWhenPeerUnreachable: transient-network fail-closed. - TestCollectSQSPeers_Deterministic: sort + dedup + empty-skip. - TestBuildHTFIFOCapabilityRejection_ShapesOperatorMessage: rejection-message shape pinned (advertising peers absent, failing peers contribute "(reason)" suffix, defensive paths). Self-review (CLAUDE.md): 1. Data loss — None. The gate strictly tightens CreateQueue acceptance vs. the previous dormancy reject; no path now accepts a write that would have been rejected before. The dormancy gate's invariant ("partitioned-shape meta cannot land on a binary that does not handle the partitioned keyspace") is preserved by the local htfifoCapabilityAdvertised check and strengthened by the cluster-wide poll. 2. Concurrency / distributed failures — Poll runs concurrently across peers via the existing PollSQSHTFIFOCapability helper (covered by its own race tests). collectSQSPeers + sort are pure / deterministic. The leaderSQS map is only mutated at SQSServer construction (WithSQSLeaderMap), not at request time, so no read/write races. Leader transitions during the poll are handled by the existing proxyToLeader path that gates createQueue before validateHTFIFOCapability runs. 3. Performance — Poll cost is O(peers) and only paid on PartitionCount > 1 creates (rare control-plane operation). Legacy / single-partition CreateQueue calls pay one short-circuit branch. collectSQSPeers' sort is O(N log N) on a small N (cluster size). No hot-path impact. 4. Data consistency — Schema validators (PartitionCount shape, dedup-scope rule, perMessageGroupId rule) still run BEFORE the capability gate inside parseAttributesIntoMeta, so an invalid shape rejects with the schema's reason rather than the gate's. SetQueueAttributes immutability remains the guard for post-create partition-shape changes. 5. Test coverage — Gate function: 5 unit tests covering the short-circuit, happy path, vacuous empty, rolling-upgrade, and unreachable-peer classes. Helpers: 2 unit tests pinning deterministic order and message shape. Wire-level: existing HT-FIFO integration tests carry forward, with the dormancy- reject test converted to the new accepts-on-single-node happy path. --- adapter/sqs_capability_gate.go | 127 ++++++++++++ adapter/sqs_capability_gate_test.go | 203 +++++++++++++++++++ adapter/sqs_catalog.go | 16 +- adapter/sqs_partitioned_dispatch_test.go | 26 +-- adapter/sqs_partitioning.go | 30 +-- adapter/sqs_partitioning_integration_test.go | 68 +++---- adapter/sqs_partitioning_test.go | 18 -- 7 files changed, 387 insertions(+), 101 deletions(-) create mode 100644 adapter/sqs_capability_gate.go create mode 100644 adapter/sqs_capability_gate_test.go diff --git a/adapter/sqs_capability_gate.go b/adapter/sqs_capability_gate.go new file mode 100644 index 00000000..1895e9d1 --- /dev/null +++ b/adapter/sqs_capability_gate.go @@ -0,0 +1,127 @@ +package adapter + +import ( + "context" + "net/http" + "sort" + "strings" +) + +// validateHTFIFOCapability is the §11 PR 5b-3 gate that replaced the +// PR 2 dormancy reject. CreateQueue calls this on every request; it +// is a no-op for legacy / single-partition meta and the full +// cluster-wide capability check for partitioned FIFO meta. +// +// Two-stage check, both fail-closed: +// +// 1. Local: this binary must advertise the htfifo capability +// (htfifoCapabilityAdvertised). If false, no amount of peer +// polling can make the create safe — the leader handling the +// request will write the partitioned-shape meta but its own +// data plane does not understand the partitioned keyspace. +// +// 2. Peers: every entry in s.leaderSQS must report htfifo via +// /sqs_health within the poller's per-peer timeout. Any +// timeout, HTTP error, malformed body, or missing capability +// blocks the create. This catches mid-rolling-upgrade clusters +// where the leader is on a new binary but a follower is still +// on the old one — the follower would silently store a +// partitioned record under the legacy keyspace if it ever won +// leadership, so we refuse the create until everyone is on a +// binary that handles the new layout. +// +// The vacuous case (single-node cluster, leaderSQS empty) is +// allowed: the local check covers the only node that will ever +// host the queue. proxyToLeader has already steered the request to +// the leader, and the leadership-refusal hook (PR 4-B-3b) keeps +// non-htfifo binaries from acquiring leadership over partitioned- +// queue Raft groups, so the gate's fail-closed default holds even +// after the create succeeds. +func (s *SQSServer) validateHTFIFOCapability(ctx context.Context, requested *sqsQueueMeta) error { + if requested == nil || requested.PartitionCount <= 1 { + return nil + } + if !htfifoCapabilityAdvertised { + return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, + "PartitionCount > 1 requires the htfifo capability, which this node does not advertise") + } + peers := s.collectSQSPeers() + if len(peers) == 0 { + // Single-node deployment: the local check above is the + // whole cluster. Vacuously true on the peer side. + return nil + } + report := PollSQSHTFIFOCapability(ctx, peers, PollerConfig{}) + if report == nil || !report.AllAdvertise { + return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, + buildHTFIFOCapabilityRejection(report)) + } + return nil +} + +// collectSQSPeers returns every distinct, non-empty SQS-side address +// from s.leaderSQS in deterministic (sorted) order. Used by the +// CreateQueue capability gate so error messages and tests pin a +// stable peer order. The map may legitimately contain self (the +// proxy-to-leader path uses the same map to find the leader's SQS +// address by Raft addr); polling self over loopback is cheap and +// keeps the "every peer reports htfifo" invariant uniform. +func (s *SQSServer) collectSQSPeers() []string { + if len(s.leaderSQS) == 0 { + return nil + } + peers := make([]string, 0, len(s.leaderSQS)) + seen := make(map[string]struct{}, len(s.leaderSQS)) + for _, addr := range s.leaderSQS { + if addr == "" { + continue + } + if _, ok := seen[addr]; ok { + continue + } + seen[addr] = struct{}{} + peers = append(peers, addr) + } + sort.Strings(peers) + return peers +} + +// buildHTFIFOCapabilityRejection composes the operator-facing message +// for a failed capability poll. Lists the peers that did not +// advertise htfifo (with the per-peer Error or "missing capability" +// reason) so the operator can fix the rolling-upgrade lag without +// rerunning the poll out-of-band. Order matches report.Peers, which +// matches collectSQSPeers' sorted input order — deterministic. +func buildHTFIFOCapabilityRejection(report *HTFIFOCapabilityReport) string { + var b strings.Builder + b.WriteString("PartitionCount > 1 requires every cluster peer to advertise the htfifo capability via /sqs_health; the following peers did not: ") + if report == nil { + b.WriteString("(no report)") + return b.String() + } + first := true + for _, p := range report.Peers { + if p.HasHTFIFO { + continue + } + if !first { + b.WriteString(", ") + } + first = false + b.WriteString(p.Address) + b.WriteString(" (") + if p.Error != "" { + b.WriteString(p.Error) + } else { + b.WriteString("missing capability") + } + b.WriteString(")") + } + if first { + // Defensive: AllAdvertise was false but no peer surfaced a + // reason. Should never happen, but emit a non-empty hint + // rather than a truncated message ending in a colon. + b.WriteString("(unknown peer)") + } + return b.String() +} diff --git a/adapter/sqs_capability_gate_test.go b/adapter/sqs_capability_gate_test.go new file mode 100644 index 00000000..40932003 --- /dev/null +++ b/adapter/sqs_capability_gate_test.go @@ -0,0 +1,203 @@ +package adapter + +import ( + "context" + "errors" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +// htfifoCapabilityServer spins up a minimal /sqs_health responder +// returning HTTP 200 with the given capabilities array. Stand-in +// for a peer node during the capability-gate tests. +func htfifoCapabilityServer(t *testing.T, capabilities []string) *httptest.Server { + t.Helper() + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != sqsHealthPath { + http.NotFound(w, r) + return + } + writeSQSHealthJSONBody(w, r, http.StatusOK, sqsHealthBody{ + Status: "ok", + Capabilities: capabilities, + }) + })) + t.Cleanup(srv.Close) + return srv +} + +// TestValidateHTFIFOCapability_ShortCircuitsOnLegacyMeta pins the +// no-op path: PartitionCount <= 1 must skip the capability poll +// entirely so legacy and single-partition CreateQueue calls do not +// pay the cluster-wide poll cost. +func TestValidateHTFIFOCapability_ShortCircuitsOnLegacyMeta(t *testing.T) { + t.Parallel() + + // Wire a peer whose /sqs_health would FAIL the gate, then + // verify validateHTFIFOCapability does NOT poll it (the call + // would otherwise reject). leaderSQS is non-empty so the + // "vacuous empty list" path isn't what's saving us. + bad := htfifoCapabilityServer(t, nil) + s := &SQSServer{leaderSQS: map[string]string{"raft1": strings.TrimPrefix(bad.URL, "http://")}} + + for _, pc := range []uint32{0, 1} { + err := s.validateHTFIFOCapability(context.Background(), &sqsQueueMeta{PartitionCount: pc}) + require.NoErrorf(t, err, "PartitionCount=%d must skip the poll entirely (gate is HT-FIFO-only)", pc) + } + + // Defensive: nil meta also short-circuits — never reach the + // poll for an unset/zero meta. Pinned so a future refactor + // that added a poll on the nil path would fail loudly here. + require.NoError(t, s.validateHTFIFOCapability(context.Background(), nil)) +} + +// TestValidateHTFIFOCapability_AcceptsWhenAllPeersAdvertise pins +// the happy path: every peer in leaderSQS reports the htfifo +// capability via /sqs_health → the gate passes for +// PartitionCount > 1. +func TestValidateHTFIFOCapability_AcceptsWhenAllPeersAdvertise(t *testing.T) { + t.Parallel() + + caps := []string{sqsCapabilityHTFIFO} + good1 := htfifoCapabilityServer(t, caps) + good2 := htfifoCapabilityServer(t, caps) + s := &SQSServer{leaderSQS: map[string]string{ + "raft1": strings.TrimPrefix(good1.URL, "http://"), + "raft2": strings.TrimPrefix(good2.URL, "http://"), + }} + + require.NoError(t, s.validateHTFIFOCapability(context.Background(), &sqsQueueMeta{PartitionCount: 4})) +} + +// TestValidateHTFIFOCapability_AcceptsOnEmptyPeerList pins the +// vacuous case: a single-node cluster (no peers) with the local +// htfifo capability advertised must allow PartitionCount > 1. +// htfifoCapabilityAdvertised is a build-time const = true so the +// local check passes; the empty peer list short-circuits the +// poll. This is the path the wire-level +// TestSQSServer_HTFIFO_CapabilityGate_AcceptsOnSingleNode test +// exercises end-to-end. +func TestValidateHTFIFOCapability_AcceptsOnEmptyPeerList(t *testing.T) { + t.Parallel() + s := &SQSServer{} + require.NoError(t, s.validateHTFIFOCapability(context.Background(), &sqsQueueMeta{PartitionCount: 4})) +} + +// TestValidateHTFIFOCapability_RejectsWhenOnePeerLacksCapability +// pins the rolling-upgrade fail-closed: one peer advertises +// htfifo, the other doesn't — the gate must reject the create +// with InvalidAttributeValue and surface the offending peer in +// the error message so the operator can fix the cluster without +// re-running the poll out-of-band. +func TestValidateHTFIFOCapability_RejectsWhenOnePeerLacksCapability(t *testing.T) { + t.Parallel() + + good := htfifoCapabilityServer(t, []string{sqsCapabilityHTFIFO}) + old := htfifoCapabilityServer(t, []string{}) // pre-htfifo binary + oldAddr := strings.TrimPrefix(old.URL, "http://") + s := &SQSServer{leaderSQS: map[string]string{ + "raft1": strings.TrimPrefix(good.URL, "http://"), + "raft2": oldAddr, + }} + + err := s.validateHTFIFOCapability(context.Background(), &sqsQueueMeta{PartitionCount: 8}) + require.Error(t, err) + + var apiErr *sqsAPIError + ok := errors.As(err, &apiErr) + require.True(t, ok, "must surface as sqsAPIError so the wire layer maps to InvalidAttributeValue, got %T", err) + require.Equal(t, http.StatusBadRequest, apiErr.status) + require.Equal(t, sqsErrInvalidAttributeValue, apiErr.errorType) + require.Contains(t, apiErr.message, "every cluster peer to advertise the htfifo capability", + "message must explain the gate so the operator knows what to fix") + require.Contains(t, apiErr.message, oldAddr, + "the offending peer must appear in the message, got %q", apiErr.message) +} + +// TestValidateHTFIFOCapability_RejectsWhenPeerUnreachable pins +// the network-failure fail-closed: a peer whose /sqs_health is +// unreachable (closed listener) must block the create. A +// transient network blip during a CreateQueue is exactly the +// class of partial-cluster state the gate is designed to catch — +// silently accepting the create here would let a partitioned +// queue land while a peer is offline and silently drop messages +// the moment that peer comes back as a non-htfifo binary. +func TestValidateHTFIFOCapability_RejectsWhenPeerUnreachable(t *testing.T) { + t.Parallel() + + // Bind a port, capture its address, then close the listener + // so dials fail immediately rather than waiting on the + // per-peer timeout. + srv := htfifoCapabilityServer(t, []string{sqsCapabilityHTFIFO}) + deadAddr := strings.TrimPrefix(srv.URL, "http://") + srv.Close() + + s := &SQSServer{leaderSQS: map[string]string{"raft1": deadAddr}} + + err := s.validateHTFIFOCapability(context.Background(), &sqsQueueMeta{PartitionCount: 2}) + require.Error(t, err) + var apiErr *sqsAPIError + ok := errors.As(err, &apiErr) + require.True(t, ok) + require.Equal(t, http.StatusBadRequest, apiErr.status) + require.Equal(t, sqsErrInvalidAttributeValue, apiErr.errorType) + require.Contains(t, apiErr.message, deadAddr, + "unreachable peer must be named in the rejection message") +} + +// TestCollectSQSPeers_Deterministic pins the helper's order + +// dedup contract: leaderSQS is a map (random Go iteration order), +// but the gate's error message and the poller's per-peer index +// must be deterministic so test assertions, log lines, and +// operator triage are stable across runs. +func TestCollectSQSPeers_Deterministic(t *testing.T) { + t.Parallel() + + s := &SQSServer{leaderSQS: map[string]string{ + "r1": "node3:9000", + "r2": "node1:9000", + "r3": "node2:9000", + "r4": "node1:9000", // duplicate (two Raft nodes pointing at one SQS endpoint) + "r5": "", // empty string must be skipped + }} + + got := s.collectSQSPeers() + require.Equal(t, []string{"node1:9000", "node2:9000", "node3:9000"}, got, + "peers must be sorted, deduped, and free of empty strings") + + // Empty leaderSQS: caller relies on len()==0 to skip the + // poll on single-node deployments. + require.Empty(t, (&SQSServer{}).collectSQSPeers()) +} + +// TestBuildHTFIFOCapabilityRejection_ShapesOperatorMessage pins the +// rejection-message shape so a future refactor cannot accidentally +// truncate the per-peer detail. Each failing peer must contribute +// a "(reason)" suffix; peers that pass do not appear at all. +func TestBuildHTFIFOCapabilityRejection_ShapesOperatorMessage(t *testing.T) { + t.Parallel() + + report := &HTFIFOCapabilityReport{ + Peers: []HTFIFOCapabilityPeerStatus{ + {Address: "ok:9000", HasHTFIFO: true}, + {Address: "old:9000", HasHTFIFO: false, Capabilities: []string{}}, + {Address: "down:9000", HasHTFIFO: false, Error: "dial tcp: refused"}, + }, + } + + msg := buildHTFIFOCapabilityRejection(report) + require.Contains(t, msg, "every cluster peer to advertise the htfifo capability") + require.NotContains(t, msg, "ok:9000", "advertising peers must NOT appear in the rejection") + require.Contains(t, msg, "old:9000 (missing capability)") + require.Contains(t, msg, "down:9000 (dial tcp: refused)") + + // Defensive: nil report and "all-passing-but-AllAdvertise-false" path. + require.Contains(t, buildHTFIFOCapabilityRejection(nil), "no report") + allPass := &HTFIFOCapabilityReport{Peers: []HTFIFOCapabilityPeerStatus{{Address: "x", HasHTFIFO: true}}} + require.Contains(t, buildHTFIFOCapabilityRejection(allPass), "unknown peer", + "never emit a truncated 'did not: ' tail when no peer details surface") +} diff --git a/adapter/sqs_catalog.go b/adapter/sqs_catalog.go index abb179a2..9521978c 100644 --- a/adapter/sqs_catalog.go +++ b/adapter/sqs_catalog.go @@ -903,13 +903,15 @@ func (s *SQSServer) createQueueCore(ctx context.Context, in *sqsCreateQueueInput if err != nil { return "", err } - // Temporary dormancy gate (Phase 3.D §11 PR 2). PartitionCount > 1 - // must reject until PR 5 wires the data plane atomically with the - // gate-lift. Without this, accepting a partitioned-queue create - // would let SendMessage write under the legacy single-partition - // prefix; the PR 5 reader would never find those messages and the - // reaper would not enumerate them — silent message loss. - if err := validatePartitionDormancyGate(requested); err != nil { + // Cluster-wide htfifo capability gate (Phase 3.D §11 PR 5b-3, + // replaces the PR 2 dormancy reject). PartitionCount > 1 is + // rejected unless this binary AND every peer in s.leaderSQS + // advertise the htfifo capability via /sqs_health. The gate + // fails closed on any peer timeout, HTTP error, malformed body, + // or missing capability so a partitioned queue cannot land in a + // partially-upgraded cluster where some peer would silently + // store its records under the legacy single-partition keyspace. + if err := s.validateHTFIFOCapability(ctx, requested); err != nil { return "", err } if len(in.Tags) > sqsMaxTagsPerQueue { diff --git a/adapter/sqs_partitioned_dispatch_test.go b/adapter/sqs_partitioned_dispatch_test.go index 3b04324f..c397c0f6 100644 --- a/adapter/sqs_partitioned_dispatch_test.go +++ b/adapter/sqs_partitioned_dispatch_test.go @@ -14,14 +14,12 @@ import ( ) // Integration tests for the PR 5b-2 partitioned-FIFO data plane -// wiring. The §11 PR 2 dormancy gate still rejects PartitionCount -// > 1 at CreateQueue (lifted atomically with the capability check -// in PR 5b-3), so these tests reach below the public CreateQueue -// surface to install a partitioned meta record directly. That -// short-circuits the dormancy gate for the duration of the test -// without disabling it for production CreateQueue calls — which -// is the exact split the design doc envisaged for "data plane -// landed but not user-creatable yet." +// wiring. PR 5b-3 has since lifted the §11 PR 2 dormancy gate, so +// CreateQueue with PartitionCount > 1 succeeds on a single-node +// cluster. The helper below predates that gate-lift and overrides +// the meta record directly to keep these tests independent of the +// CreateQueue capability path; new partitioned-FIFO tests should +// prefer the public CreateQueue surface. // installPartitionedMetaForTest overwrites the queue's meta record // with a partitioned shape (PartitionCount > 1) by dispatching a @@ -30,11 +28,13 @@ import ( // counters and the catalog index are populated correctly); only // the partition-shape attributes are mutated. // -// The dormancy gate intercepts CreateQueue, not the data plane, -// so once the meta record carries PartitionCount > 1 every -// SendMessage / ReceiveMessage / DeleteMessage call routes -// through the partitioned dispatch helpers exactly as it would -// after PR 5b-3 lifts the gate. +// Predates the PR 5b-3 gate-lift; still useful for tests that want +// to bypass the capability poll (e.g. when CreateQueue would have +// to negotiate over a fake peer list). Once the meta record +// carries PartitionCount > 1, every SendMessage / ReceiveMessage / +// DeleteMessage call routes through the partitioned dispatch +// helpers exactly as it would after a normal partitioned-queue +// CreateQueue. func installPartitionedMetaForTest(t *testing.T, node Node, queueName string, partitionCount uint32, throughputLimit string) { t.Helper() s := node.sqsServer diff --git a/adapter/sqs_partitioning.go b/adapter/sqs_partitioning.go index 39c0f7b7..bce51e21 100644 --- a/adapter/sqs_partitioning.go +++ b/adapter/sqs_partitioning.go @@ -48,11 +48,6 @@ const ( htfifoDedupeScopeQueue = "queue" ) -// htfifoTemporaryGateMessage is the operator-facing reason the -// CreateQueue gate uses while PR 2-4 are in production. Removed in -// PR 5 in the same commit that wires the data-plane fanout. -const htfifoTemporaryGateMessage = "PartitionCount > 1 requires HT-FIFO data plane — not yet enabled" - // partitionFor maps a (queue meta, MessageGroupId) pair to a // partition index in [0, PartitionCount). Edge cases: // @@ -131,11 +126,9 @@ func isPowerOfTwo(n uint32) bool { // with multi-partition FIFO because the dedup key cannot be // globally unique across partitions without a cross-partition // OCC transaction. -// - The §11 PR 2 dormancy gate (PartitionCount > 1 rejected at -// CreateQueue) lives in validatePartitionDormancyGate so the -// dormancy check can be turned off in unit tests that want to -// exercise the full schema path. Production CreateQueue calls -// both validators. +// - The §11 PR 2 dormancy gate has been lifted (Phase 3.D PR 5b-3); +// CreateQueue now gates PartitionCount > 1 on the cluster-wide +// htfifo capability via validateHTFIFOCapability instead. func validatePartitionConfig(meta *sqsQueueMeta) error { if err := validatePartitionShape(meta); err != nil { return err @@ -212,23 +205,6 @@ func validateStandardQueueRejectsHTFIFO(meta *sqsQueueMeta) error { return nil } -// validatePartitionDormancyGate is the temporary §11 PR 2 gate. As -// long as the data-plane fanout (PR 5) has not landed, accepting a -// partitioned-queue CreateQueue would let SendMessage write under -// the legacy single-partition prefix — the PR 5 reader would never -// find those messages and the reaper would not enumerate them. This -// gate makes the wrong-layout-data class of bug impossible. -// -// Removed in PR 5 in the same commit that wires the data plane so -// the gate-and-lift land atomically. -func validatePartitionDormancyGate(meta *sqsQueueMeta) error { - if meta.PartitionCount > 1 { - return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, - htfifoTemporaryGateMessage) - } - return nil -} - // validatePartitionImmutability enforces the §3.2 rule that // PartitionCount, FifoThroughputLimit, and DeduplicationScope are // all immutable from CreateQueue onward. Called from diff --git a/adapter/sqs_partitioning_integration_test.go b/adapter/sqs_partitioning_integration_test.go index b0c50178..57f204fc 100644 --- a/adapter/sqs_partitioning_integration_test.go +++ b/adapter/sqs_partitioning_integration_test.go @@ -6,13 +6,16 @@ import ( "testing" ) -// TestSQSServer_HTFIFO_DormancyGate_RejectsPartitionedCreate pins -// the §11 PR 2 dormancy gate at the wire layer: CreateQueue with -// PartitionCount > 1 rejects with InvalidAttributeValue and the -// gate's reason ("not yet enabled") makes it into the operator- -// visible message. Removed in PR 5 in the same commit that wires -// the data plane. -func TestSQSServer_HTFIFO_DormancyGate_RejectsPartitionedCreate(t *testing.T) { +// TestSQSServer_HTFIFO_CapabilityGate_AcceptsOnSingleNode pins the +// §11 PR 5b-3 gate-lift at the wire layer on a single-node cluster: +// CreateQueue with PartitionCount > 1 now succeeds because the local +// binary advertises htfifo and there are no peers to poll. This test +// replaces the prior PR 2 dormancy-reject test (which expected a 400 +// with "not yet enabled" — the dormancy gate has been removed). The +// peer-rejection class is exercised in +// sqs_capability_gate_test.go where a fake peer can be wired in +// without a multi-node cluster. +func TestSQSServer_HTFIFO_CapabilityGate_AcceptsOnSingleNode(t *testing.T) { t.Parallel() nodes, _, _ := createNode(t, 1) defer shutdown(nodes) @@ -26,24 +29,18 @@ func TestSQSServer_HTFIFO_DormancyGate_RejectsPartitionedCreate(t *testing.T) { "PartitionCount": n, }, }) - if status != http.StatusBadRequest { - t.Fatalf("PartitionCount=%s: status %d (expected 400 from dormancy gate); body=%v", n, status, out) - } - if got, _ := out["__type"].(string); got != sqsErrInvalidAttributeValue { - t.Fatalf("PartitionCount=%s: __type=%q (expected InvalidAttributeValue)", n, got) - } - msg, _ := out["message"].(string) - if msg == "" || !strings.Contains(msg, "not yet enabled") { - t.Fatalf("PartitionCount=%s: message %q must mention the gate reason", n, msg) + if status != http.StatusOK { + t.Fatalf("PartitionCount=%s on single-node cluster: status %d (expected 200 after gate-lift); body=%v", n, status, out) } } } -// TestSQSServer_HTFIFO_DormancyGate_AllowsPartitionCountOne pins +// TestSQSServer_HTFIFO_CapabilityGate_AllowsPartitionCountOne pins // the no-op-partition-count path: PartitionCount=1 is the legacy -// single-partition layout and must pass the dormancy gate even on -// FIFO queues that explicitly set the field. -func TestSQSServer_HTFIFO_DormancyGate_AllowsPartitionCountOne(t *testing.T) { +// single-partition layout and bypasses the capability gate +// entirely (validateHTFIFOCapability short-circuits on +// PartitionCount <= 1). +func TestSQSServer_HTFIFO_CapabilityGate_AllowsPartitionCountOne(t *testing.T) { t.Parallel() nodes, _, _ := createNode(t, 1) defer shutdown(nodes) @@ -62,9 +59,9 @@ func TestSQSServer_HTFIFO_DormancyGate_AllowsPartitionCountOne(t *testing.T) { } // TestSQSServer_HTFIFO_RejectsNonPowerOfTwoPartitionCount pins the -// validator's power-of-two rule. The validator runs before the -// dormancy gate so an invalid count (3) reports the validator's -// reason, not the gate's. +// validator's power-of-two rule. The schema validator runs before +// the capability gate so an invalid count (3) reports the +// validator's reason, not the gate's. func TestSQSServer_HTFIFO_RejectsNonPowerOfTwoPartitionCount(t *testing.T) { t.Parallel() nodes, _, _ := createNode(t, 1) @@ -112,16 +109,13 @@ func TestSQSServer_HTFIFO_RejectsHTFIFOAttrsOnStandardQueue(t *testing.T) { // the §3.2 cross-attribute control-plane gate at the wire layer. // {PartitionCount > 1, DeduplicationScope = "queue"} is rejected by // validatePartitionConfig (the schema validator) which runs inside -// parseAttributesIntoMeta — that is, BEFORE validatePartitionDormancyGate -// runs in createQueue. So the cross-attr rejection is what the wire -// layer sees today, even though the dormancy gate would also reject -// the same input on its own. After PR 5 lifts the dormancy gate the -// cross-attr rule remains the sole rejection path. +// parseAttributesIntoMeta — that is, BEFORE the capability gate +// (validateHTFIFOCapability) ever runs. The schema rejection is the +// sole rejection path after the PR 5b-3 dormancy gate-lift. // // The test only checks the 400 status to stay agnostic about which -// validator fires first — both are correct behaviour, and a future -// reordering of the createQueue control flow does not need to break -// this test. +// validator fires first — a future reordering of the createQueue +// control flow does not need to break this test. func TestSQSServer_HTFIFO_RejectsQueueScopedDedupOnPartitioned(t *testing.T) { t.Parallel() nodes, _, _ := createNode(t, 1) @@ -145,8 +139,9 @@ func TestSQSServer_HTFIFO_RejectsQueueScopedDedupOnPartitioned(t *testing.T) { // the §3.2 immutability rule at the wire layer: SetQueueAttributes // attempts to change PartitionCount / FifoThroughputLimit / // DeduplicationScope reject with InvalidAttributeValue. Test creates -// a single-partition FIFO queue (allowed by dormancy) with -// FifoThroughputLimit set, then tries to change it. +// a single-partition FIFO queue (which bypasses the capability +// gate entirely) with FifoThroughputLimit set, then tries to +// change it. func TestSQSServer_HTFIFO_ImmutabilitySetQueueAttributesRejects(t *testing.T) { t.Parallel() nodes, _, _ := createNode(t, 1) @@ -252,9 +247,10 @@ func TestSQSServer_HTFIFO_GetQueueAttributesRoundTrip(t *testing.T) { // --- helpers --- // mustCreateFIFOWithThroughputLimit creates a single-partition FIFO -// queue (allowed by the §11 PR 2 dormancy gate) with the requested -// FifoThroughputLimit set. Used by the immutability tests so they -// have a non-empty FifoThroughputLimit to attempt to change. +// queue (PartitionCount=1 bypasses the §11 PR 5b-3 capability +// gate) with the requested FifoThroughputLimit set. Used by the +// immutability tests so they have a non-empty FifoThroughputLimit +// to attempt to change. func mustCreateFIFOWithThroughputLimit(t *testing.T, node Node, name, limit string) string { t.Helper() status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{ diff --git a/adapter/sqs_partitioning_test.go b/adapter/sqs_partitioning_test.go index 0183674a..7c98907e 100644 --- a/adapter/sqs_partitioning_test.go +++ b/adapter/sqs_partitioning_test.go @@ -253,24 +253,6 @@ func TestValidatePartitionConfig_PerMessageGroupIDRequiresExplicitPartitionCount })) } -// --- validatePartitionDormancyGate unit tests --- - -// TestValidatePartitionDormancyGate_RejectsAboveOne pins the §11 -// PR 2 dormancy gate: PartitionCount > 1 must reject until PR 5 -// lifts the gate. PartitionCount 0 or 1 must pass (both are the -// legacy single-partition layout). -func TestValidatePartitionDormancyGate_RejectsAboveOne(t *testing.T) { - t.Parallel() - require.NoError(t, validatePartitionDormancyGate(&sqsQueueMeta{PartitionCount: 0})) - require.NoError(t, validatePartitionDormancyGate(&sqsQueueMeta{PartitionCount: 1})) - for _, n := range []uint32{2, 4, 8, 16, 32} { - err := validatePartitionDormancyGate(&sqsQueueMeta{PartitionCount: n}) - require.Error(t, err, "PartitionCount=%d must reject under the dormancy gate", n) - require.Contains(t, err.Error(), "not yet enabled", - "the gate's reason must surface to the operator") - } -} - // --- validatePartitionImmutability unit tests --- // TestValidatePartitionImmutability_RejectsAnyChange pins the §3.2 From 166e079abba11634bb83a62463b9a891182e0b78 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Mon, 4 May 2026 18:28:17 +0900 Subject: [PATCH 2/4] sqs(catalog,capability): idempotent gate placement + sanitized public message (PR #734, round 1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two PR #734 review findings addressed in one commit because they touch the same code path: 1. Codex P1 — Check existing queue before enforcing HTFIFO peer gate The previous placement of validateHTFIFOCapability inside createQueueCore (before createQueueWithRetry) ran the gate on EVERY CreateQueue call with PartitionCount > 1, including idempotent retries on an already-existing queue with identical attributes. A transient peer poll failure (timeout / unreachable / malformed health) during such a retry would then return InvalidAttributeValue instead of the AWS-correct 200 OK, breaking create-or-get clients during partial outages or rolling upgrades. Fix: move the gate INTO tryCreateQueueOnce after the existence check and BEFORE the OCC dispatch. The order in tryCreateQueueOnce is now: 1. loadQueueMetaAt — check if queue exists at the snapshot 2. exists + matching attrs → return (true, nil) idempotent OK 3. exists + different attrs → return QueueNameExists 4. validateHTFIFOCapability — runs ONLY on the genuine create path 5. loadQueueGenerationAt + dispatch the create The gate may run more than once across OCC retries (each retry that gets to "queue still missing" re-polls), but every retry that hits an existing-queue match short-circuits before the gate runs — so idempotent CreateQueue under a partial cluster outage stays AWS-correct. Caller audit: validateHTFIFOCapability has exactly one production caller (now tryCreateQueueOnce); both the JSON handler (createQueue → createQueueCore → createQueueWithRetry → tryCreateQueueOnce) and the query-protocol handler (sqs_query_protocol.go: 182 → createQueueCore → …) reach it through that one path. Move is symmetric — no caller observes a semantic change for a queue that genuinely needs to be created; only the existing-queue path stops paying the gate cost. 2. CodeRabbit major — Don't send raw peer poll details back to caller buildHTFIFOCapabilityRejection's output (peer addresses + raw poller error text) was returned to the wire layer as the InvalidAttributeValue message, leaking cluster topology to any authenticated CreateQueue caller. This conflicts with the stricter error-redaction policy already used elsewhere in sqs_catalog.go. Fix: the wire-level rejection is now the sanitized constant htfifoCapabilityRejectionPublic ("PartitionCount > 1 requires every cluster peer to advertise the htfifo capability via /sqs_health; one or more peers did not — see server logs for details"). The full per-peer detail goes to slog.Warn with structured fields (queueName, partitionCount, peerCount, detail) so an operator triaging the rolling upgrade can read the failing peer addresses + reasons from the server logs without rerunning the poll out-of-band. Renamed buildHTFIFOCapabilityRejection → formatHTFIFOCapabilityReportForLog to make its server-side-only contract obvious at the call site. Test changes: - New TestSQSServer_HTFIFO_CapabilityGate_IsIdempotentOnExistingQueue (wire-level): creates a partitioned queue on a single-node cluster (gate vacuously passes), poisons leaderSQS with an unreachable address, then re-creates the same queue with identical attrs and expects 200; finally creates a NEW queue with the poisoned peer map and expects the 400 (proves the gate is still in effect for genuine creates). - New TestValidateHTFIFOCapability_PublicMessageDoesNotLeakPeerDetails: pins the sanitization contract — the wire-level message must equal htfifoCapabilityRejectionPublic exactly, never contain a peer host:port. - Updated TestValidateHTFIFOCapability_RejectsWhenOnePeerLacksCapability and TestValidateHTFIFOCapability_RejectsWhenPeerUnreachable to assert the sanitized constant + NotContains on the peer address. - Renamed TestBuildHTFIFOCapabilityRejection_ShapesOperatorMessage → TestFormatHTFIFOCapabilityReportForLog_ShapesServerSideDetail to match the renamed helper; assertion that the helper output is server-side-only (no client wire surface assertion here). Below threshold (intentionally not addressed in this round): - Gemini medium on collectSQSPeers concurrency: leaderSQS is only mutated at SQSServer construction (WithSQSLeaderMap), not at request time. Gemini's own comment acknowledges this. - Gemini medium on caching the capability status: CreateQueue is a rare control-plane operation; caching adds a stale-window failure mode (a cluster that already lost a peer would still accept a partitioned queue while the cache is warm). Pure performance suggestion, not correctness. --- adapter/sqs_capability_gate.go | 42 +++++++++--- adapter/sqs_capability_gate_test.go | 70 ++++++++++++++------ adapter/sqs_catalog.go | 41 ++++++++---- adapter/sqs_partitioning_integration_test.go | 70 ++++++++++++++++++++ 4 files changed, 180 insertions(+), 43 deletions(-) diff --git a/adapter/sqs_capability_gate.go b/adapter/sqs_capability_gate.go index 1895e9d1..0b353d30 100644 --- a/adapter/sqs_capability_gate.go +++ b/adapter/sqs_capability_gate.go @@ -2,11 +2,21 @@ package adapter import ( "context" + "log/slog" "net/http" "sort" "strings" ) +// htfifoCapabilityRejectionPublic is the sanitized client-facing +// reason returned from validateHTFIFOCapability when the cluster +// poll fails. Per CodeRabbit major review: peer addresses and raw +// poller error text MUST NOT leak to authenticated clients (the +// CreateQueue surface is part of the public AWS-shaped API), so +// the wire-level message is intentionally generic and the per-peer +// detail goes to slog.Warn for operator triage. +const htfifoCapabilityRejectionPublic = "PartitionCount > 1 requires every cluster peer to advertise the htfifo capability via /sqs_health; one or more peers did not — see server logs for details" + // validateHTFIFOCapability is the §11 PR 5b-3 gate that replaced the // PR 2 dormancy reject. CreateQueue calls this on every request; it // is a no-op for legacy / single-partition meta and the full @@ -53,8 +63,18 @@ func (s *SQSServer) validateHTFIFOCapability(ctx context.Context, requested *sqs } report := PollSQSHTFIFOCapability(ctx, peers, PollerConfig{}) if report == nil || !report.AllAdvertise { + // Log the full per-peer detail for operator triage. The + // client-visible message stays generic (no peer addresses, + // no raw poller error text) so the CreateQueue surface + // does not leak cluster topology to authenticated callers + // — CodeRabbit major review on PR #734. + slog.Warn("sqs: htfifo capability gate rejected partitioned CreateQueue", + "queueName", requested.Name, + "partitionCount", requested.PartitionCount, + "peerCount", len(peers), + "detail", formatHTFIFOCapabilityReportForLog(report)) return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, - buildHTFIFOCapabilityRejection(report)) + htfifoCapabilityRejectionPublic) } return nil } @@ -86,15 +106,17 @@ func (s *SQSServer) collectSQSPeers() []string { return peers } -// buildHTFIFOCapabilityRejection composes the operator-facing message -// for a failed capability poll. Lists the peers that did not -// advertise htfifo (with the per-peer Error or "missing capability" -// reason) so the operator can fix the rolling-upgrade lag without -// rerunning the poll out-of-band. Order matches report.Peers, which -// matches collectSQSPeers' sorted input order — deterministic. -func buildHTFIFOCapabilityRejection(report *HTFIFOCapabilityReport) string { +// formatHTFIFOCapabilityReportForLog composes the per-peer detail +// surfaced to slog.Warn when the gate rejects. Lists each failing +// peer's address and reason (per-peer Error or "missing capability") +// so an operator triaging a partial-rolling-upgrade cluster can fix +// the lag from the server logs without rerunning the poll +// out-of-band. NEVER returned to the client — that path uses the +// sanitized htfifoCapabilityRejectionPublic constant. Order matches +// report.Peers, which matches collectSQSPeers' sorted input order — +// deterministic so log lines diff cleanly across runs. +func formatHTFIFOCapabilityReportForLog(report *HTFIFOCapabilityReport) string { var b strings.Builder - b.WriteString("PartitionCount > 1 requires every cluster peer to advertise the htfifo capability via /sqs_health; the following peers did not: ") if report == nil { b.WriteString("(no report)") return b.String() @@ -120,7 +142,7 @@ func buildHTFIFOCapabilityRejection(report *HTFIFOCapabilityReport) string { if first { // Defensive: AllAdvertise was false but no peer surfaced a // reason. Should never happen, but emit a non-empty hint - // rather than a truncated message ending in a colon. + // rather than a truncated empty string. b.WriteString("(unknown peer)") } return b.String() diff --git a/adapter/sqs_capability_gate_test.go b/adapter/sqs_capability_gate_test.go index 40932003..3de9fd98 100644 --- a/adapter/sqs_capability_gate_test.go +++ b/adapter/sqs_capability_gate_test.go @@ -112,10 +112,10 @@ func TestValidateHTFIFOCapability_RejectsWhenOnePeerLacksCapability(t *testing.T require.True(t, ok, "must surface as sqsAPIError so the wire layer maps to InvalidAttributeValue, got %T", err) require.Equal(t, http.StatusBadRequest, apiErr.status) require.Equal(t, sqsErrInvalidAttributeValue, apiErr.errorType) - require.Contains(t, apiErr.message, "every cluster peer to advertise the htfifo capability", - "message must explain the gate so the operator knows what to fix") - require.Contains(t, apiErr.message, oldAddr, - "the offending peer must appear in the message, got %q", apiErr.message) + require.Equal(t, htfifoCapabilityRejectionPublic, apiErr.message, + "client message must be the sanitized constant — peer addresses live in server logs (CodeRabbit major)") + require.NotContains(t, apiErr.message, oldAddr, + "peer host:port MUST NOT leak through the wire-level rejection") } // TestValidateHTFIFOCapability_RejectsWhenPeerUnreachable pins @@ -145,8 +145,10 @@ func TestValidateHTFIFOCapability_RejectsWhenPeerUnreachable(t *testing.T) { require.True(t, ok) require.Equal(t, http.StatusBadRequest, apiErr.status) require.Equal(t, sqsErrInvalidAttributeValue, apiErr.errorType) - require.Contains(t, apiErr.message, deadAddr, - "unreachable peer must be named in the rejection message") + require.Equal(t, htfifoCapabilityRejectionPublic, apiErr.message, + "client message must be the sanitized constant — transport error text lives in server logs (CodeRabbit major)") + require.NotContains(t, apiErr.message, deadAddr, + "peer host:port MUST NOT leak through the wire-level rejection") } // TestCollectSQSPeers_Deterministic pins the helper's order + @@ -174,11 +176,15 @@ func TestCollectSQSPeers_Deterministic(t *testing.T) { require.Empty(t, (&SQSServer{}).collectSQSPeers()) } -// TestBuildHTFIFOCapabilityRejection_ShapesOperatorMessage pins the -// rejection-message shape so a future refactor cannot accidentally -// truncate the per-peer detail. Each failing peer must contribute -// a "(reason)" suffix; peers that pass do not appear at all. -func TestBuildHTFIFOCapabilityRejection_ShapesOperatorMessage(t *testing.T) { +// TestFormatHTFIFOCapabilityReportForLog_ShapesServerSideDetail +// pins the SERVER-SIDE log helper's shape — never returned to the +// client (CodeRabbit major review on PR #734: peer addresses + raw +// poller errors leak cluster topology to authenticated callers). +// Each failing peer must contribute a "(reason)" suffix so an +// operator triaging a rolling-upgrade cluster can fix the lag from +// the log lines without rerunning the poll out-of-band; peers that +// pass do not appear. +func TestFormatHTFIFOCapabilityReportForLog_ShapesServerSideDetail(t *testing.T) { t.Parallel() report := &HTFIFOCapabilityReport{ @@ -189,15 +195,41 @@ func TestBuildHTFIFOCapabilityRejection_ShapesOperatorMessage(t *testing.T) { }, } - msg := buildHTFIFOCapabilityRejection(report) - require.Contains(t, msg, "every cluster peer to advertise the htfifo capability") - require.NotContains(t, msg, "ok:9000", "advertising peers must NOT appear in the rejection") - require.Contains(t, msg, "old:9000 (missing capability)") - require.Contains(t, msg, "down:9000 (dial tcp: refused)") + detail := formatHTFIFOCapabilityReportForLog(report) + require.NotContains(t, detail, "ok:9000", "advertising peers must NOT appear in the log detail") + require.Contains(t, detail, "old:9000 (missing capability)") + require.Contains(t, detail, "down:9000 (dial tcp: refused)") // Defensive: nil report and "all-passing-but-AllAdvertise-false" path. - require.Contains(t, buildHTFIFOCapabilityRejection(nil), "no report") + require.Contains(t, formatHTFIFOCapabilityReportForLog(nil), "no report") allPass := &HTFIFOCapabilityReport{Peers: []HTFIFOCapabilityPeerStatus{{Address: "x", HasHTFIFO: true}}} - require.Contains(t, buildHTFIFOCapabilityRejection(allPass), "unknown peer", - "never emit a truncated 'did not: ' tail when no peer details surface") + require.Contains(t, formatHTFIFOCapabilityReportForLog(allPass), "unknown peer", + "never emit an empty detail when no peer reasons surface") +} + +// TestValidateHTFIFOCapability_PublicMessageDoesNotLeakPeerDetails +// pins the CodeRabbit major review's redaction contract: the +// client-visible InvalidAttributeValue message MUST NOT include +// peer addresses or raw poller error text. The two failure-path +// tests above check that the gate REJECTS; this test specifically +// checks that the rejection message is the sanitized +// htfifoCapabilityRejectionPublic constant — no host:port, no raw +// transport error. +func TestValidateHTFIFOCapability_PublicMessageDoesNotLeakPeerDetails(t *testing.T) { + t.Parallel() + + old := htfifoCapabilityServer(t, []string{}) // no htfifo + oldAddr := strings.TrimPrefix(old.URL, "http://") + s := &SQSServer{leaderSQS: map[string]string{"raft1": oldAddr}} + + err := s.validateHTFIFOCapability(context.Background(), &sqsQueueMeta{Name: "q.fifo", PartitionCount: 4}) + require.Error(t, err) + var apiErr *sqsAPIError + require.True(t, errors.As(err, &apiErr)) + require.Equal(t, htfifoCapabilityRejectionPublic, apiErr.message, + "client message must be the sanitized constant, never the per-peer detail") + require.NotContains(t, apiErr.message, oldAddr, + "peer host:port MUST NOT appear in the wire-level rejection — operator detail is server-side only") + require.Contains(t, apiErr.message, "see server logs for details", + "public message must point operators at the server-side detail") } diff --git a/adapter/sqs_catalog.go b/adapter/sqs_catalog.go index 9521978c..50086505 100644 --- a/adapter/sqs_catalog.go +++ b/adapter/sqs_catalog.go @@ -481,9 +481,10 @@ var sqsAttributeAppliers = map[string]attributeApplier{ // docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md). Set // at CreateQueue time; SetQueueAttributes attempts to change it // reject via the immutability check in trySetQueueAttributesOnce. - // PR 2 of the rollout introduces the field but the temporary - // dormancy gate in tryCreateQueueOnce rejects PartitionCount > 1 - // until PR 5 lifts the gate atomically with the data plane. + // PartitionCount > 1 is gated by validateHTFIFOCapability (the + // cluster-wide htfifo capability poll, PR 5b-3) which runs + // inside tryCreateQueueOnce after the existence check so + // idempotent retries do not pay the network cost. "PartitionCount": func(m *sqsQueueMeta, v string) error { // Parse at uint64 width and bound-check explicitly so the // uint32 narrowing below is gosec-clean. @@ -903,17 +904,12 @@ func (s *SQSServer) createQueueCore(ctx context.Context, in *sqsCreateQueueInput if err != nil { return "", err } - // Cluster-wide htfifo capability gate (Phase 3.D §11 PR 5b-3, - // replaces the PR 2 dormancy reject). PartitionCount > 1 is - // rejected unless this binary AND every peer in s.leaderSQS - // advertise the htfifo capability via /sqs_health. The gate - // fails closed on any peer timeout, HTTP error, malformed body, - // or missing capability so a partitioned queue cannot land in a - // partially-upgraded cluster where some peer would silently - // store its records under the legacy single-partition keyspace. - if err := s.validateHTFIFOCapability(ctx, requested); err != nil { - return "", err - } + // The cluster-wide htfifo capability gate (Phase 3.D §11 PR 5b-3) + // runs INSIDE tryCreateQueueOnce after the existence check, not + // here. Idempotent CreateQueue retries on an already-existing + // partitioned queue must NOT pay the network poll cost or risk a + // transient peer-poll failure flipping a 200-OK retry into a 400 + // (Codex P1 review on PR #734). if len(in.Tags) > sqsMaxTagsPerQueue { // AWS caps tags per queue at 50. CreateQueue must reject // over-cap tag bundles up front; a silent slice-and-store @@ -963,6 +959,23 @@ func (s *SQSServer) tryCreateQueueOnce(ctx context.Context, requested *sqsQueueM } return false, newSQSAPIError(http.StatusBadRequest, sqsErrQueueNameExists, "queue already exists with different attributes") } + // Cluster-wide htfifo capability gate (Phase 3.D §11 PR 5b-3, + // replaces the PR 2 dormancy reject). PartitionCount > 1 is + // rejected unless this binary AND every peer in s.leaderSQS + // advertise the htfifo capability via /sqs_health. Fails closed + // on any peer timeout, HTTP error, malformed body, or missing + // capability so a partitioned queue cannot land in a partially- + // upgraded cluster. + // + // Runs AFTER the existence check (so idempotent CreateQueue + // retries on an already-existing partitioned queue do not pay + // the network poll cost or risk a transient peer-poll failure + // flipping a 200-OK retry into a 400 — Codex P1 review on PR + // #734) and BEFORE the dispatch (so a rejected create does not + // burn an OCC commit). + if err := s.validateHTFIFOCapability(ctx, requested); err != nil { + return false, err + } lastGen, err := s.loadQueueGenerationAt(ctx, requested.Name, readTS) if err != nil { return false, errors.WithStack(err) diff --git a/adapter/sqs_partitioning_integration_test.go b/adapter/sqs_partitioning_integration_test.go index 57f204fc..153547f3 100644 --- a/adapter/sqs_partitioning_integration_test.go +++ b/adapter/sqs_partitioning_integration_test.go @@ -4,6 +4,8 @@ import ( "net/http" "strings" "testing" + + "github.com/stretchr/testify/require" ) // TestSQSServer_HTFIFO_CapabilityGate_AcceptsOnSingleNode pins the @@ -35,6 +37,74 @@ func TestSQSServer_HTFIFO_CapabilityGate_AcceptsOnSingleNode(t *testing.T) { } } +// TestSQSServer_HTFIFO_CapabilityGate_IsIdempotentOnExistingQueue +// pins the Codex P1 review fix on PR #734: a CreateQueue retry on +// an already-existing partitioned queue with identical attributes +// MUST return 200 (idempotent) even when the cluster-wide +// capability poll would now fail. Before the fix, the gate ran +// before the existence check, so a transient peer outage during +// a CreateQueue retry would flip a 200-OK idempotent response +// into a 400. Now the gate runs INSIDE tryCreateQueueOnce after +// the existence check; an existing queue with matching attrs +// short-circuits to 200 and never touches the network. +// +// The test creates a partitioned queue on a single-node cluster +// (gate passes vacuously), then poisons the SQSServer's peer +// map with an unreachable address so any subsequent gate +// invocation would reject. The second CreateQueue with identical +// attrs must still succeed; only an actually-new partitioned +// queue would now fail the gate. +func TestSQSServer_HTFIFO_CapabilityGate_IsIdempotentOnExistingQueue(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + attrs := map[string]string{ + "FifoQueue": "true", + "PartitionCount": "4", + } + + // First create — succeeds because the leaderSQS map is empty + // and the local binary advertises htfifo (vacuous gate). + status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{ + "QueueName": "htfifo-idempotent.fifo", + "Attributes": attrs, + }) + require.Equal(t, http.StatusOK, status, + "first create on single-node cluster must succeed: body=%v", out) + + // Poison the peer map with an unreachable address. Any gate + // invocation from this point would call PollSQSHTFIFOCapability + // against this dead peer and fail closed → 400. The + // post-fix code path skips the gate on an existing-queue + // match, so the next create must STILL succeed. + require.NotNil(t, node.sqsServer) + node.sqsServer.leaderSQS = map[string]string{ + "raft-fake": "127.0.0.1:1", // unreachable: connection refused + } + + status, out = callSQS(t, node, sqsCreateQueueTarget, map[string]any{ + "QueueName": "htfifo-idempotent.fifo", + "Attributes": attrs, + }) + require.Equal(t, http.StatusOK, status, + "second create with identical attrs must be idempotent (gate must NOT run on existing-queue match) — Codex P1 PR #734; body=%v", out) + + // Sanity: the gate IS still in effect for genuinely new + // partitioned queues — a different name with the poisoned + // peer map must fail the gate. + status, out = callSQS(t, node, sqsCreateQueueTarget, map[string]any{ + "QueueName": "htfifo-newqueue.fifo", + "Attributes": attrs, + }) + require.Equal(t, http.StatusBadRequest, status, + "new partitioned queue must hit the gate when a peer is unreachable: body=%v", out) + if got, _ := out["__type"].(string); got != sqsErrInvalidAttributeValue { + t.Fatalf("new queue: __type=%q (expected InvalidAttributeValue)", got) + } +} + // TestSQSServer_HTFIFO_CapabilityGate_AllowsPartitionCountOne pins // the no-op-partition-count path: PartitionCount=1 is the legacy // single-partition layout and bypasses the capability gate From dccac18e0c8183b20fdabc9639542453279a4a48 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Mon, 4 May 2026 18:42:35 +0900 Subject: [PATCH 3/4] sqs: refresh stale dormancy-gate doc references after PR 5b-3 (PR #734, round 2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Claude review on round 1 flagged that several doc comments still described the §11 PR 2 dormancy gate as if it were live, even though PR 5b-3 has lifted it and replaced it with the cluster-wide htfifo capability check. Documentation-only sweep — no behavior change. Updated: - adapter/sqs_catalog.go (sqsQueueMeta.PartitionCount field doc): was "PR 2 of the rollout introduces this field but a temporary CreateQueue gate rejects PartitionCount > 1 until PR 5 lifts the gate atomically with the data-plane fanout — so the schema exists but no partitioned data can land before the data plane is wired". Now describes validateHTFIFOCapability in tryCreateQueueOnce as the live guard. - adapter/sqs_catalog.go (parseAttributesIntoMeta comment): swapped the dormancy-gate reference for the capability gate. - adapter/sqs.go (sqsCapabilityHTFIFO and htfifoCapabilityAdvertised doc blocks): three stale references updated. The "PR 5 will use this for the CreateQueue capability gate" forward-reference is now "merged via #721 (PR 4-B-3a) and now consumed by validateHTFIFOCapability in the CreateQueue gate (PR 5b-3)". The block describing PR 5 lifting the dormancy gate is now factual past-tense. - adapter/sqs_keys.go (partitioned key family doc): swapped the "dormancy gate currently rejects" sentence for the validateHTFIFOCapability gate. - adapter/sqs_messages.go (decodeClientReceiptHandle doc): the "PR 5b-3 lifts the gate together with the capability check" was itself stale — PR 5b-3 has shipped. Updated to describe the current routing through validateReceiptHandleVersion. - adapter/sqs_partitioning_test.go (two test docstrings): "after PR 5 lifts the dormancy gate" / "the dormancy gate runs separately on CreateQueue and rejects this at the wire today" updated to refer to the capability gate. Verified by build + golangci-lint (0 issues) + targeted SQS test sweep (-race) all clean. --- adapter/sqs.go | 18 +++++++++--------- adapter/sqs_catalog.go | 18 +++++++++--------- adapter/sqs_keys.go | 6 +++--- adapter/sqs_messages.go | 13 +++++++------ adapter/sqs_partitioning_test.go | 14 ++++++++------ 5 files changed, 36 insertions(+), 33 deletions(-) diff --git a/adapter/sqs.go b/adapter/sqs.go index 670f440f..23a22afb 100644 --- a/adapter/sqs.go +++ b/adapter/sqs.go @@ -53,8 +53,8 @@ const ( // group that hosts a partitioned queue when the binary itself does // not advertise this string. // -// CreateQueue's catalog-polling gate (Phase 3.D PR 5 lifts the -// dormancy and starts checking) reads this list off /sqs_health on +// CreateQueue's cluster-wide capability gate (validateHTFIFOCapability, +// landed in Phase 3.D PR 5b-3) reads this list off /sqs_health on // every peer; a CreateQueue with PartitionCount > 1 is rejected // unless every peer reports "htfifo" — fail-closed against rolling // upgrades that have not yet finished. @@ -75,8 +75,8 @@ const sqsCapabilityHTFIFO = "htfifo" // keys fail closed for recognised-but-unresolved partitioned // keys. // - Capability poller: PollSQSHTFIFOCapability, merged via -// #721 (Phase 3.D PR 4-B-3a). PR 5 will use this for the -// CreateQueue capability gate. +// #721 (Phase 3.D PR 4-B-3a) and now consumed by +// validateHTFIFOCapability in the CreateQueue gate (PR 5b-3). // - Leadership-refusal hook: // raftengine.RegisterLeaderAcquiredCallback + // main_sqs_leadership_refusal.go (Phase 3.D PR 4-B-3b, this @@ -84,11 +84,11 @@ const sqsCapabilityHTFIFO = "htfifo" // the hook refuses leadership of any Raft group hosting a // partitioned queue when the binary lacks htfifo. // -// Both pieces are now in the binary, so the flag flips to true. -// PR 5 lifts the PartitionCount > 1 dormancy gate AND wires the -// CreateQueue capability poll in the same commit, at which point -// a partitioned queue can land in production and every node in -// the cluster must report htfifo for the gate to allow it. +// Both pieces are in the binary and PR 5b-3 has lifted the +// dormancy gate, so a partitioned queue can land in production — +// CreateQueue's validateHTFIFOCapability gate refuses +// PartitionCount > 1 unless every node in the cluster reports +// htfifo on /sqs_health. // // Stays a const (not a var) because the flag is build-time. A // future runtime override (env var, --no-htfifo flag for diff --git a/adapter/sqs_catalog.go b/adapter/sqs_catalog.go index 50086505..1344a4a5 100644 --- a/adapter/sqs_catalog.go +++ b/adapter/sqs_catalog.go @@ -122,11 +122,10 @@ type sqsQueueMeta struct { // Zero or 1 means the legacy single-partition layout — no schema // change. Greater than 1 enables HT-FIFO. Set at CreateQueue time // and immutable thereafter (SetQueueAttributes rejects any change). - // Power-of-two values only (validator rejects others). PR 2 of the - // rollout introduces this field but a temporary CreateQueue gate - // rejects PartitionCount > 1 until PR 5 lifts the gate atomically - // with the data-plane fanout — so the schema exists but no - // partitioned data can land before the data plane is wired. + // Power-of-two values only (validator rejects others). CreateQueue + // gates PartitionCount > 1 on the cluster-wide htfifo capability + // (validateHTFIFOCapability in tryCreateQueueOnce) so a + // partitioned queue cannot land in a partially-upgraded cluster. PartitionCount uint32 `json:"partition_count,omitempty"` // FifoThroughputLimit mirrors the AWS attribute. "perMessageGroupId" // (default for HT-FIFO) keeps the §3.3 hash-by-MessageGroupId @@ -383,10 +382,11 @@ func parseAttributesIntoMeta(name string, attrs map[string]string) (*sqsQueueMet return nil, newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, "ContentBasedDeduplication is only valid on FIFO queues") } // HT-FIFO validation runs after resolveFifoQueueFlag so the - // IsFIFO-only checks see the post-resolution flag. The temporary - // dormancy gate (§11 PR 2) runs separately in createQueue so - // SetQueueAttributes paths share the schema validator without - // re-rejecting on the gate. + // IsFIFO-only checks see the post-resolution flag. The + // cluster-wide capability gate (validateHTFIFOCapability) runs + // separately in tryCreateQueueOnce so SetQueueAttributes paths + // share the schema validator without paying the network poll + // cost (immutability also blocks PartitionCount changes there). if err := validatePartitionConfig(meta); err != nil { return nil, err } diff --git a/adapter/sqs_keys.go b/adapter/sqs_keys.go index 44105c0e..0418bee6 100644 --- a/adapter/sqs_keys.go +++ b/adapter/sqs_keys.go @@ -262,9 +262,9 @@ func queueNameFromMetaKey(key []byte) (string, bool) { // partition uint32 inserted between the queue segment and the // generation. The legacy keyspace is unchanged on disk, so existing // queues and Standard queues stay byte-identical — these helpers are -// reachable only when meta.PartitionCount > 1, and the §11 PR 2 -// dormancy gate currently rejects that at CreateQueue. The data plane -// dispatch lands together with the gate-lift in PR 5. +// reachable only when meta.PartitionCount > 1, gated at CreateQueue +// time by the cluster-wide htfifo capability check +// (validateHTFIFOCapability, PR 5b-3). // // Each helper appends the partition as a fixed-width big-endian // uint32 so prefix scans `!sqs|msg||p|||` diff --git a/adapter/sqs_messages.go b/adapter/sqs_messages.go index b3994fc5..8dafa893 100644 --- a/adapter/sqs_messages.go +++ b/adapter/sqs_messages.go @@ -384,12 +384,13 @@ func decodeReceiptHandleV1(b []byte) (*decodedReceiptHandle, error) { // dormancy promise of PR 724 round 3 (no v2 wire-format // probability from the public API) under the new contract. // -// On partitioned queues (PartitionCount > 1) the §11 PR 2 -// dormancy gate is still in force in PR 5b-2 — CreateQueue -// rejects PartitionCount > 1, so no production queue can be in -// the partitioned branch yet, and validateReceiptHandleVersion -// against a non-partitioned meta still rejects every v2 handle. -// PR 5b-3 lifts the gate together with the capability check. +// On partitioned queues (PartitionCount > 1) the cluster-wide +// htfifo capability gate (PR 5b-3) admits CreateQueue; v2 handles +// then reach validateReceiptHandleVersion through the meta load +// performed inside loadMessageForDelete / loadAndVerifyMessage, +// which checks the version against the queue's PartitionCount and +// rejects the v2-on-legacy and v1-on-partitioned mismatches with +// ReceiptHandleIsInvalid. func decodeClientReceiptHandle(raw string) (*decodedReceiptHandle, error) { return decodeReceiptHandle(raw) } diff --git a/adapter/sqs_partitioning_test.go b/adapter/sqs_partitioning_test.go index 7c98907e..db8ce746 100644 --- a/adapter/sqs_partitioning_test.go +++ b/adapter/sqs_partitioning_test.go @@ -166,9 +166,10 @@ func TestValidatePartitionConfig_RejectsAboveMax(t *testing.T) { // // PartitionCount > 1 is also FIFO-only — without the guard a // Standard queue with PartitionCount=2 would slip past the validator -// after PR 5 lifts the dormancy gate. PartitionCount 0/1 are still -// accepted -// on Standard queues because both mean "single-partition layout". +// (the capability gate that gates partitioned creates only fires on +// FIFO queues, so the schema rule has to catch the Standard case). +// PartitionCount 0/1 are still accepted on Standard queues because +// both mean "single-partition layout". func TestValidatePartitionConfig_StandardQueueRejectsHTFIFOAttrs(t *testing.T) { t.Parallel() require.Error(t, validatePartitionConfig(&sqsQueueMeta{IsFIFO: false, FifoThroughputLimit: htfifoThroughputPerQueue})) @@ -232,9 +233,10 @@ func TestValidatePartitionConfig_PerMessageGroupIDRequiresExplicitPartitionCount require.Equal(t, sqsErrInvalidAttributeValue, apiErr.errorType) } // FIFO + perMessageGroupId + PartitionCount=8: accept (the - // dormancy gate runs separately on CreateQueue and rejects this - // at the wire today, but the cross-attribute validator on its - // own does not). + // cluster-wide capability gate runs separately on CreateQueue + // and may still reject this at the wire on a partially-upgraded + // cluster, but the cross-attribute validator on its own does + // not). require.NoError(t, validatePartitionConfig(&sqsQueueMeta{ IsFIFO: true, FifoThroughputLimit: htfifoThroughputPerMessageGroupID, From 63193ad095d722d7100b225e3e2a57cb72cddfed Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Mon, 4 May 2026 19:05:17 +0900 Subject: [PATCH 4/4] sqs(capability,routing): gate partitioned CreateQueue on routing-map coverage (PR #734, round 2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex P1 review: validateHTFIFOCapability admitted PartitionCount > 1 queues after only checking peer /sqs_health, but never verified that the queue was actually routable. In a cluster where --sqsFifoPartitionMap is configured but missing the new queue (or configured with fewer partitions than requested), CreateQueue succeeded and then SendMessage / ReceiveMessage / DeleteMessage failed closed at kv.ShardRouter ("no route for key") on first use, surfacing as InternalFailure to the client. Operators ended up with unusable queues that couldn't be deleted by the AWS CreateQueue contract's idempotency rules either. Fix: extend validateHTFIFOCapability with a routing-coverage check that runs INDEPENDENTLY of the peer poll (and BEFORE it, so the empty-peer-list short-circuit doesn't bypass coverage). Three branches: - resolver==nil → skip (single-shard / no---sqsFifoPartitionMap deployment, partitioned keys fall through to the engine's default group, this case has worked since PR 5b-3 and stays that way for backward compatibility) - resolver!=nil + queue not in map → reject - resolver!=nil + RoutedPartitionCount(queue) < PartitionCount → reject The wire-level rejection is the new sanitized constant htfifoRoutingCoverageRejectionPublic — same redaction principle as htfifoCapabilityRejectionPublic from round 1: the operator detail (queue name, requested vs. actual partition count) goes to slog.Warn so an authenticated CreateQueue caller cannot probe the operator's --sqsFifoPartitionMap shape via error messages. What changed: - adapter/sqs_partition_resolver.go: new RoutedPartitionCount(queueName string) int method on *SQSPartitionResolver. Nil-receiver returns 0 so the gate's resolver==nil branch + empty-queue branch both treat unconfigured routes as "no coverage", which routes naturally to the rejection path or short-circuit. - adapter/sqs_capability_gate.go: new htfifoRoutingCoverageRejectionPublic constant and validateHTFIFORoutingCoverage helper. validateHTFIFOCapability now calls the routing-coverage check before the peer poll. - adapter/sqs.go: new partitionResolver field on SQSServer + WithSQSPartitionResolver option. - main_sqs.go: startSQSServer takes the resolver as a parameter and threads it through WithSQSPartitionResolver. - main.go: split buildSQSPartitionResolver into a wrapper + buildSQSPartitionResolverConcrete (returns the concrete pointer the SQS server needs without re-introducing the typed-nil interface trap that the wrapper guards against); serversInput populates the runtime's sqsPartitionResolver from the same partition map the coordinator consumes. Tests (sqs_capability_gate_test.go): - TestValidateHTFIFOCapability_RejectsWhenRoutingMapMissingQueue - TestValidateHTFIFOCapability_RejectsWhenRoutingMapPartiallyCoversQueue - TestValidateHTFIFOCapability_AcceptsWhenRoutingMapFullyCoversQueue - TestValidateHTFIFOCapability_AcceptsWhenResolverNil (backward compat) - TestRoutedPartitionCount_NilReceiver - TestRoutedPartitionCount_KnownAndUnknownQueue Caller audit: - validateHTFIFOCapability: one production caller (tryCreateQueueOnce in sqs_catalog.go), tests aside. Returns nil / *sqsAPIError as before; new failure mode joins the existing InvalidAttributeValue bucket. No caller observes a semantic change. - RoutedPartitionCount: only validateHTFIFORoutingCoverage consumes it in production. Nil-receiver semantics documented and tested. - WithSQSPartitionResolver: only main_sqs.go uses it. Default-nil short-circuit preserves existing test fixtures that build &SQSServer{} directly. - buildSQSPartitionResolverConcrete: new in this commit; one caller (serversInput populate). The typed-nil-safe wrapper buildSQSPartitionResolver still exists and is the only path the coordinator's WithPartitionResolver consumes. --- adapter/sqs.go | 32 ++++++++ adapter/sqs_capability_gate.go | 62 ++++++++++++++ adapter/sqs_capability_gate_test.go | 123 ++++++++++++++++++++++++++++ adapter/sqs_partition_resolver.go | 22 +++++ main.go | 66 +++++++++++---- main_sqs.go | 2 + 6 files changed, 290 insertions(+), 17 deletions(-) diff --git a/adapter/sqs.go b/adapter/sqs.go index 23a22afb..14f96179 100644 --- a/adapter/sqs.go +++ b/adapter/sqs.go @@ -184,6 +184,22 @@ type SQSServer struct { // receives for in-process — bounded by the operator-controlled // CreateQueue rate. receiveFanoutCounters sync.Map + // partitionResolver, when non-nil, is the per-cluster resolver + // that maps (queue, partition) keys to operator-chosen Raft + // groups (built from --sqsFifoPartitionMap, see main.go). The + // CreateQueue capability gate (validateHTFIFOCapability) uses + // it to verify routing coverage on partitioned creates BEFORE + // the meta record commits — without that check, a queue could + // land with PartitionCount=N but only K 1 requires the htfifo capability, which this node does not advertise") } + // Routing-coverage check runs INDEPENDENTLY of the peer poll + // (and BEFORE it) so the empty-peer-list short-circuit below + // does not bypass coverage. A single-node cluster with a + // partition resolver installed (rare but possible: operator + // pre-configures --sqsFifoPartitionMap before adding peers) + // must still reject a partitioned create whose partitions + // aren't all routable. + if err := s.validateHTFIFORoutingCoverage(requested); err != nil { + return err + } peers := s.collectSQSPeers() if len(peers) == 0 { // Single-node deployment: the local check above is the @@ -79,6 +98,49 @@ func (s *SQSServer) validateHTFIFOCapability(ctx context.Context, requested *sqs return nil } +// validateHTFIFORoutingCoverage rejects a partitioned CreateQueue +// when the cluster's --sqsFifoPartitionMap does not cover every +// requested partition. Without this, a queue could land with +// PartitionCount=N but only K= int64(requested.PartitionCount) { + return nil + } + slog.Warn("sqs: htfifo capability gate rejected partitioned CreateQueue — incomplete routing map", + "queueName", requested.Name, + "partitionCount", requested.PartitionCount, + "routedPartitionCount", routed) + return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, + htfifoRoutingCoverageRejectionPublic) +} + // collectSQSPeers returns every distinct, non-empty SQS-side address // from s.leaderSQS in deterministic (sorted) order. Used by the // CreateQueue capability gate so error messages and tests pin a diff --git a/adapter/sqs_capability_gate_test.go b/adapter/sqs_capability_gate_test.go index 3de9fd98..d6668fbe 100644 --- a/adapter/sqs_capability_gate_test.go +++ b/adapter/sqs_capability_gate_test.go @@ -207,6 +207,129 @@ func TestFormatHTFIFOCapabilityReportForLog_ShapesServerSideDetail(t *testing.T) "never emit an empty detail when no peer reasons surface") } +// TestValidateHTFIFOCapability_RejectsWhenRoutingMapMissingQueue +// pins the Codex P1 review fix on PR #734 round 2: when a partition +// resolver is installed (--sqsFifoPartitionMap is configured) but +// the requested queue is NOT in the map, CreateQueue must reject +// before the meta record commits — silently accepting it would +// land a queue whose SendMessage / ReceiveMessage calls fail +// closed at the kv.ShardRouter "no route for key" path on first +// use, surfacing as InternalFailure to the client. +func TestValidateHTFIFOCapability_RejectsWhenRoutingMapMissingQueue(t *testing.T) { + t.Parallel() + + resolver := NewSQSPartitionResolver(map[string][]uint64{ + "other-queue.fifo": {100, 101, 102, 103}, + }) + require.NotNil(t, resolver) + s := &SQSServer{partitionResolver: resolver} + + err := s.validateHTFIFOCapability(context.Background(), &sqsQueueMeta{ + Name: "newqueue.fifo", + PartitionCount: 4, + }) + require.Error(t, err) + var apiErr *sqsAPIError + require.True(t, errors.As(err, &apiErr)) + require.Equal(t, http.StatusBadRequest, apiErr.status) + require.Equal(t, sqsErrInvalidAttributeValue, apiErr.errorType) + require.Equal(t, htfifoRoutingCoverageRejectionPublic, apiErr.message, + "client message must be the sanitized routing-coverage constant — operator detail is in the slog.Warn line") +} + +// TestValidateHTFIFOCapability_RejectsWhenRoutingMapPartiallyCoversQueue +// is the partial-coverage variant of the routing-map gate: the +// queue is in the map but with FEWER routes than the requested +// PartitionCount. SendMessage on the missing partitions would fail +// closed at the router; the gate must reject before the create +// commits. +func TestValidateHTFIFOCapability_RejectsWhenRoutingMapPartiallyCoversQueue(t *testing.T) { + t.Parallel() + + resolver := NewSQSPartitionResolver(map[string][]uint64{ + "queue.fifo": {100, 101}, // only 2 routes + }) + s := &SQSServer{partitionResolver: resolver} + + err := s.validateHTFIFOCapability(context.Background(), &sqsQueueMeta{ + Name: "queue.fifo", + PartitionCount: 4, // requesting 4 partitions + }) + require.Error(t, err) + var apiErr *sqsAPIError + require.True(t, errors.As(err, &apiErr)) + require.Equal(t, sqsErrInvalidAttributeValue, apiErr.errorType) + require.Equal(t, htfifoRoutingCoverageRejectionPublic, apiErr.message) +} + +// TestValidateHTFIFOCapability_AcceptsWhenRoutingMapFullyCoversQueue +// pins the happy path: queue is in the map with at least +// PartitionCount entries → gate passes. ">=" rather than "==" +// because over-allocating routes (operator preparing for a future +// expansion) is harmless: the create only uses indices [0, +// PartitionCount). +func TestValidateHTFIFOCapability_AcceptsWhenRoutingMapFullyCoversQueue(t *testing.T) { + t.Parallel() + + resolver := NewSQSPartitionResolver(map[string][]uint64{ + "queue.fifo": {100, 101, 102, 103, 104, 105}, // 6 routes + }) + s := &SQSServer{partitionResolver: resolver} + + require.NoError(t, s.validateHTFIFOCapability(context.Background(), &sqsQueueMeta{ + Name: "queue.fifo", + PartitionCount: 4, // requesting 4, under the 6 configured + })) +} + +// TestValidateHTFIFOCapability_AcceptsWhenResolverNil pins the +// backward-compat path: a single-shard / no-flag deployment has +// no partition resolver wired (s.partitionResolver == nil), so +// the gate's coverage check is skipped and the create proceeds +// (provided the local + peer-poll checks above also pass). This +// is what TestSQSServer_HTFIFO_CapabilityGate_AcceptsOnSingleNode +// exercises end-to-end at the wire level; this unit test is the +// narrower assertion that the partitionResolver==nil branch is +// the one that bypasses the coverage check. +func TestValidateHTFIFOCapability_AcceptsWhenResolverNil(t *testing.T) { + t.Parallel() + s := &SQSServer{} // partitionResolver implicitly nil + require.NoError(t, s.validateHTFIFOCapability(context.Background(), &sqsQueueMeta{ + Name: "queue.fifo", + PartitionCount: 4, + })) +} + +// TestRoutedPartitionCount_NilReceiver pins the typed-nil safety +// of the resolver method: a nil *SQSPartitionResolver returns 0 +// (no routes) so the gate's "len(routes) < PartitionCount" check +// rejects naturally rather than panicking. Used by the +// validateHTFIFORoutingCoverage short-circuit on resolver==nil +// (the gate skips the coverage check entirely) but also documents +// the safe-by-default behaviour for any other future caller. +func TestRoutedPartitionCount_NilReceiver(t *testing.T) { + t.Parallel() + var r *SQSPartitionResolver + require.Equal(t, 0, r.RoutedPartitionCount("anything")) +} + +// TestRoutedPartitionCount_KnownAndUnknownQueue pins the basic +// API: the count for a configured queue equals len(routes), and +// 0 for any queue not in the map. +func TestRoutedPartitionCount_KnownAndUnknownQueue(t *testing.T) { + t.Parallel() + r := NewSQSPartitionResolver(map[string][]uint64{ + "a.fifo": {1, 2}, + "b.fifo": {3, 4, 5, 6, 7, 8, 9, 10}, + }) + require.Equal(t, 2, r.RoutedPartitionCount("a.fifo")) + require.Equal(t, 8, r.RoutedPartitionCount("b.fifo")) + require.Equal(t, 0, r.RoutedPartitionCount("c.fifo"), + "unknown queue must return 0, not panic — gate routes that case to InvalidAttributeValue") + require.Equal(t, 0, r.RoutedPartitionCount(""), + "empty queue name must also return 0") +} + // TestValidateHTFIFOCapability_PublicMessageDoesNotLeakPeerDetails // pins the CodeRabbit major review's redaction contract: the // client-visible InvalidAttributeValue message MUST NOT include diff --git a/adapter/sqs_partition_resolver.go b/adapter/sqs_partition_resolver.go index 9e7ac02c..6c27834c 100644 --- a/adapter/sqs_partition_resolver.go +++ b/adapter/sqs_partition_resolver.go @@ -119,6 +119,28 @@ func (r *SQSPartitionResolver) RecognisesPartitionedKey(key []byte) bool { return ok } +// RoutedPartitionCount returns the number of partition routes +// configured for queueName, or 0 if the queue is not in the +// routing map. Used by the CreateQueue capability gate +// (validateHTFIFOCapability) to verify that EVERY partition of a +// requested partitioned queue is routable BEFORE the create +// commits — without this, a queue could land with PartitionCount=N +// but only K