diff --git a/adapter/sqs.go b/adapter/sqs.go index 670f440f..14f96179 100644 --- a/adapter/sqs.go +++ b/adapter/sqs.go @@ -53,8 +53,8 @@ const ( // group that hosts a partitioned queue when the binary itself does // not advertise this string. // -// CreateQueue's catalog-polling gate (Phase 3.D PR 5 lifts the -// dormancy and starts checking) reads this list off /sqs_health on +// CreateQueue's cluster-wide capability gate (validateHTFIFOCapability, +// landed in Phase 3.D PR 5b-3) reads this list off /sqs_health on // every peer; a CreateQueue with PartitionCount > 1 is rejected // unless every peer reports "htfifo" — fail-closed against rolling // upgrades that have not yet finished. @@ -75,8 +75,8 @@ const sqsCapabilityHTFIFO = "htfifo" // keys fail closed for recognised-but-unresolved partitioned // keys. // - Capability poller: PollSQSHTFIFOCapability, merged via -// #721 (Phase 3.D PR 4-B-3a). PR 5 will use this for the -// CreateQueue capability gate. +// #721 (Phase 3.D PR 4-B-3a) and now consumed by +// validateHTFIFOCapability in the CreateQueue gate (PR 5b-3). // - Leadership-refusal hook: // raftengine.RegisterLeaderAcquiredCallback + // main_sqs_leadership_refusal.go (Phase 3.D PR 4-B-3b, this @@ -84,11 +84,11 @@ const sqsCapabilityHTFIFO = "htfifo" // the hook refuses leadership of any Raft group hosting a // partitioned queue when the binary lacks htfifo. // -// Both pieces are now in the binary, so the flag flips to true. -// PR 5 lifts the PartitionCount > 1 dormancy gate AND wires the -// CreateQueue capability poll in the same commit, at which point -// a partitioned queue can land in production and every node in -// the cluster must report htfifo for the gate to allow it. +// Both pieces are in the binary and PR 5b-3 has lifted the +// dormancy gate, so a partitioned queue can land in production — +// CreateQueue's validateHTFIFOCapability gate refuses +// PartitionCount > 1 unless every node in the cluster reports +// htfifo on /sqs_health. // // Stays a const (not a var) because the flag is build-time. A // future runtime override (env var, --no-htfifo flag for @@ -184,6 +184,22 @@ type SQSServer struct { // receives for in-process — bounded by the operator-controlled // CreateQueue rate. receiveFanoutCounters sync.Map + // partitionResolver, when non-nil, is the per-cluster resolver + // that maps (queue, partition) keys to operator-chosen Raft + // groups (built from --sqsFifoPartitionMap, see main.go). The + // CreateQueue capability gate (validateHTFIFOCapability) uses + // it to verify routing coverage on partitioned creates BEFORE + // the meta record commits — without that check, a queue could + // land with PartitionCount=N but only K 1 requires the htfifo capability, which this node does not advertise") + } + // Routing-coverage check runs INDEPENDENTLY of the peer poll + // (and BEFORE it) so the empty-peer-list short-circuit below + // does not bypass coverage. A single-node cluster with a + // partition resolver installed (rare but possible: operator + // pre-configures --sqsFifoPartitionMap before adding peers) + // must still reject a partitioned create whose partitions + // aren't all routable. + if err := s.validateHTFIFORoutingCoverage(requested); err != nil { + return err + } + peers := s.collectSQSPeers() + if len(peers) == 0 { + // Single-node deployment: the local check above is the + // whole cluster. Vacuously true on the peer side. + return nil + } + report := PollSQSHTFIFOCapability(ctx, peers, PollerConfig{}) + if report == nil || !report.AllAdvertise { + // Log the full per-peer detail for operator triage. The + // client-visible message stays generic (no peer addresses, + // no raw poller error text) so the CreateQueue surface + // does not leak cluster topology to authenticated callers + // — CodeRabbit major review on PR #734. + slog.Warn("sqs: htfifo capability gate rejected partitioned CreateQueue", + "queueName", requested.Name, + "partitionCount", requested.PartitionCount, + "peerCount", len(peers), + "detail", formatHTFIFOCapabilityReportForLog(report)) + return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, + htfifoCapabilityRejectionPublic) + } + return nil +} + +// validateHTFIFORoutingCoverage rejects a partitioned CreateQueue +// when the cluster's --sqsFifoPartitionMap does not cover every +// requested partition. Without this, a queue could land with +// PartitionCount=N but only K= int64(requested.PartitionCount) { + return nil + } + slog.Warn("sqs: htfifo capability gate rejected partitioned CreateQueue — incomplete routing map", + "queueName", requested.Name, + "partitionCount", requested.PartitionCount, + "routedPartitionCount", routed) + return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, + htfifoRoutingCoverageRejectionPublic) +} + +// collectSQSPeers returns every distinct, non-empty SQS-side address +// from s.leaderSQS in deterministic (sorted) order. Used by the +// CreateQueue capability gate so error messages and tests pin a +// stable peer order. The map may legitimately contain self (the +// proxy-to-leader path uses the same map to find the leader's SQS +// address by Raft addr); polling self over loopback is cheap and +// keeps the "every peer reports htfifo" invariant uniform. +func (s *SQSServer) collectSQSPeers() []string { + if len(s.leaderSQS) == 0 { + return nil + } + peers := make([]string, 0, len(s.leaderSQS)) + seen := make(map[string]struct{}, len(s.leaderSQS)) + for _, addr := range s.leaderSQS { + if addr == "" { + continue + } + if _, ok := seen[addr]; ok { + continue + } + seen[addr] = struct{}{} + peers = append(peers, addr) + } + sort.Strings(peers) + return peers +} + +// formatHTFIFOCapabilityReportForLog composes the per-peer detail +// surfaced to slog.Warn when the gate rejects. Lists each failing +// peer's address and reason (per-peer Error or "missing capability") +// so an operator triaging a partial-rolling-upgrade cluster can fix +// the lag from the server logs without rerunning the poll +// out-of-band. NEVER returned to the client — that path uses the +// sanitized htfifoCapabilityRejectionPublic constant. Order matches +// report.Peers, which matches collectSQSPeers' sorted input order — +// deterministic so log lines diff cleanly across runs. +func formatHTFIFOCapabilityReportForLog(report *HTFIFOCapabilityReport) string { + var b strings.Builder + if report == nil { + b.WriteString("(no report)") + return b.String() + } + first := true + for _, p := range report.Peers { + if p.HasHTFIFO { + continue + } + if !first { + b.WriteString(", ") + } + first = false + b.WriteString(p.Address) + b.WriteString(" (") + if p.Error != "" { + b.WriteString(p.Error) + } else { + b.WriteString("missing capability") + } + b.WriteString(")") + } + if first { + // Defensive: AllAdvertise was false but no peer surfaced a + // reason. Should never happen, but emit a non-empty hint + // rather than a truncated empty string. + b.WriteString("(unknown peer)") + } + return b.String() +} diff --git a/adapter/sqs_capability_gate_test.go b/adapter/sqs_capability_gate_test.go new file mode 100644 index 00000000..d6668fbe --- /dev/null +++ b/adapter/sqs_capability_gate_test.go @@ -0,0 +1,358 @@ +package adapter + +import ( + "context" + "errors" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +// htfifoCapabilityServer spins up a minimal /sqs_health responder +// returning HTTP 200 with the given capabilities array. Stand-in +// for a peer node during the capability-gate tests. +func htfifoCapabilityServer(t *testing.T, capabilities []string) *httptest.Server { + t.Helper() + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != sqsHealthPath { + http.NotFound(w, r) + return + } + writeSQSHealthJSONBody(w, r, http.StatusOK, sqsHealthBody{ + Status: "ok", + Capabilities: capabilities, + }) + })) + t.Cleanup(srv.Close) + return srv +} + +// TestValidateHTFIFOCapability_ShortCircuitsOnLegacyMeta pins the +// no-op path: PartitionCount <= 1 must skip the capability poll +// entirely so legacy and single-partition CreateQueue calls do not +// pay the cluster-wide poll cost. +func TestValidateHTFIFOCapability_ShortCircuitsOnLegacyMeta(t *testing.T) { + t.Parallel() + + // Wire a peer whose /sqs_health would FAIL the gate, then + // verify validateHTFIFOCapability does NOT poll it (the call + // would otherwise reject). leaderSQS is non-empty so the + // "vacuous empty list" path isn't what's saving us. + bad := htfifoCapabilityServer(t, nil) + s := &SQSServer{leaderSQS: map[string]string{"raft1": strings.TrimPrefix(bad.URL, "http://")}} + + for _, pc := range []uint32{0, 1} { + err := s.validateHTFIFOCapability(context.Background(), &sqsQueueMeta{PartitionCount: pc}) + require.NoErrorf(t, err, "PartitionCount=%d must skip the poll entirely (gate is HT-FIFO-only)", pc) + } + + // Defensive: nil meta also short-circuits — never reach the + // poll for an unset/zero meta. Pinned so a future refactor + // that added a poll on the nil path would fail loudly here. + require.NoError(t, s.validateHTFIFOCapability(context.Background(), nil)) +} + +// TestValidateHTFIFOCapability_AcceptsWhenAllPeersAdvertise pins +// the happy path: every peer in leaderSQS reports the htfifo +// capability via /sqs_health → the gate passes for +// PartitionCount > 1. +func TestValidateHTFIFOCapability_AcceptsWhenAllPeersAdvertise(t *testing.T) { + t.Parallel() + + caps := []string{sqsCapabilityHTFIFO} + good1 := htfifoCapabilityServer(t, caps) + good2 := htfifoCapabilityServer(t, caps) + s := &SQSServer{leaderSQS: map[string]string{ + "raft1": strings.TrimPrefix(good1.URL, "http://"), + "raft2": strings.TrimPrefix(good2.URL, "http://"), + }} + + require.NoError(t, s.validateHTFIFOCapability(context.Background(), &sqsQueueMeta{PartitionCount: 4})) +} + +// TestValidateHTFIFOCapability_AcceptsOnEmptyPeerList pins the +// vacuous case: a single-node cluster (no peers) with the local +// htfifo capability advertised must allow PartitionCount > 1. +// htfifoCapabilityAdvertised is a build-time const = true so the +// local check passes; the empty peer list short-circuits the +// poll. This is the path the wire-level +// TestSQSServer_HTFIFO_CapabilityGate_AcceptsOnSingleNode test +// exercises end-to-end. +func TestValidateHTFIFOCapability_AcceptsOnEmptyPeerList(t *testing.T) { + t.Parallel() + s := &SQSServer{} + require.NoError(t, s.validateHTFIFOCapability(context.Background(), &sqsQueueMeta{PartitionCount: 4})) +} + +// TestValidateHTFIFOCapability_RejectsWhenOnePeerLacksCapability +// pins the rolling-upgrade fail-closed: one peer advertises +// htfifo, the other doesn't — the gate must reject the create +// with InvalidAttributeValue and surface the offending peer in +// the error message so the operator can fix the cluster without +// re-running the poll out-of-band. +func TestValidateHTFIFOCapability_RejectsWhenOnePeerLacksCapability(t *testing.T) { + t.Parallel() + + good := htfifoCapabilityServer(t, []string{sqsCapabilityHTFIFO}) + old := htfifoCapabilityServer(t, []string{}) // pre-htfifo binary + oldAddr := strings.TrimPrefix(old.URL, "http://") + s := &SQSServer{leaderSQS: map[string]string{ + "raft1": strings.TrimPrefix(good.URL, "http://"), + "raft2": oldAddr, + }} + + err := s.validateHTFIFOCapability(context.Background(), &sqsQueueMeta{PartitionCount: 8}) + require.Error(t, err) + + var apiErr *sqsAPIError + ok := errors.As(err, &apiErr) + require.True(t, ok, "must surface as sqsAPIError so the wire layer maps to InvalidAttributeValue, got %T", err) + require.Equal(t, http.StatusBadRequest, apiErr.status) + require.Equal(t, sqsErrInvalidAttributeValue, apiErr.errorType) + require.Equal(t, htfifoCapabilityRejectionPublic, apiErr.message, + "client message must be the sanitized constant — peer addresses live in server logs (CodeRabbit major)") + require.NotContains(t, apiErr.message, oldAddr, + "peer host:port MUST NOT leak through the wire-level rejection") +} + +// TestValidateHTFIFOCapability_RejectsWhenPeerUnreachable pins +// the network-failure fail-closed: a peer whose /sqs_health is +// unreachable (closed listener) must block the create. A +// transient network blip during a CreateQueue is exactly the +// class of partial-cluster state the gate is designed to catch — +// silently accepting the create here would let a partitioned +// queue land while a peer is offline and silently drop messages +// the moment that peer comes back as a non-htfifo binary. +func TestValidateHTFIFOCapability_RejectsWhenPeerUnreachable(t *testing.T) { + t.Parallel() + + // Bind a port, capture its address, then close the listener + // so dials fail immediately rather than waiting on the + // per-peer timeout. + srv := htfifoCapabilityServer(t, []string{sqsCapabilityHTFIFO}) + deadAddr := strings.TrimPrefix(srv.URL, "http://") + srv.Close() + + s := &SQSServer{leaderSQS: map[string]string{"raft1": deadAddr}} + + err := s.validateHTFIFOCapability(context.Background(), &sqsQueueMeta{PartitionCount: 2}) + require.Error(t, err) + var apiErr *sqsAPIError + ok := errors.As(err, &apiErr) + require.True(t, ok) + require.Equal(t, http.StatusBadRequest, apiErr.status) + require.Equal(t, sqsErrInvalidAttributeValue, apiErr.errorType) + require.Equal(t, htfifoCapabilityRejectionPublic, apiErr.message, + "client message must be the sanitized constant — transport error text lives in server logs (CodeRabbit major)") + require.NotContains(t, apiErr.message, deadAddr, + "peer host:port MUST NOT leak through the wire-level rejection") +} + +// TestCollectSQSPeers_Deterministic pins the helper's order + +// dedup contract: leaderSQS is a map (random Go iteration order), +// but the gate's error message and the poller's per-peer index +// must be deterministic so test assertions, log lines, and +// operator triage are stable across runs. +func TestCollectSQSPeers_Deterministic(t *testing.T) { + t.Parallel() + + s := &SQSServer{leaderSQS: map[string]string{ + "r1": "node3:9000", + "r2": "node1:9000", + "r3": "node2:9000", + "r4": "node1:9000", // duplicate (two Raft nodes pointing at one SQS endpoint) + "r5": "", // empty string must be skipped + }} + + got := s.collectSQSPeers() + require.Equal(t, []string{"node1:9000", "node2:9000", "node3:9000"}, got, + "peers must be sorted, deduped, and free of empty strings") + + // Empty leaderSQS: caller relies on len()==0 to skip the + // poll on single-node deployments. + require.Empty(t, (&SQSServer{}).collectSQSPeers()) +} + +// TestFormatHTFIFOCapabilityReportForLog_ShapesServerSideDetail +// pins the SERVER-SIDE log helper's shape — never returned to the +// client (CodeRabbit major review on PR #734: peer addresses + raw +// poller errors leak cluster topology to authenticated callers). +// Each failing peer must contribute a "(reason)" suffix so an +// operator triaging a rolling-upgrade cluster can fix the lag from +// the log lines without rerunning the poll out-of-band; peers that +// pass do not appear. +func TestFormatHTFIFOCapabilityReportForLog_ShapesServerSideDetail(t *testing.T) { + t.Parallel() + + report := &HTFIFOCapabilityReport{ + Peers: []HTFIFOCapabilityPeerStatus{ + {Address: "ok:9000", HasHTFIFO: true}, + {Address: "old:9000", HasHTFIFO: false, Capabilities: []string{}}, + {Address: "down:9000", HasHTFIFO: false, Error: "dial tcp: refused"}, + }, + } + + detail := formatHTFIFOCapabilityReportForLog(report) + require.NotContains(t, detail, "ok:9000", "advertising peers must NOT appear in the log detail") + require.Contains(t, detail, "old:9000 (missing capability)") + require.Contains(t, detail, "down:9000 (dial tcp: refused)") + + // Defensive: nil report and "all-passing-but-AllAdvertise-false" path. + require.Contains(t, formatHTFIFOCapabilityReportForLog(nil), "no report") + allPass := &HTFIFOCapabilityReport{Peers: []HTFIFOCapabilityPeerStatus{{Address: "x", HasHTFIFO: true}}} + require.Contains(t, formatHTFIFOCapabilityReportForLog(allPass), "unknown peer", + "never emit an empty detail when no peer reasons surface") +} + +// TestValidateHTFIFOCapability_RejectsWhenRoutingMapMissingQueue +// pins the Codex P1 review fix on PR #734 round 2: when a partition +// resolver is installed (--sqsFifoPartitionMap is configured) but +// the requested queue is NOT in the map, CreateQueue must reject +// before the meta record commits — silently accepting it would +// land a queue whose SendMessage / ReceiveMessage calls fail +// closed at the kv.ShardRouter "no route for key" path on first +// use, surfacing as InternalFailure to the client. +func TestValidateHTFIFOCapability_RejectsWhenRoutingMapMissingQueue(t *testing.T) { + t.Parallel() + + resolver := NewSQSPartitionResolver(map[string][]uint64{ + "other-queue.fifo": {100, 101, 102, 103}, + }) + require.NotNil(t, resolver) + s := &SQSServer{partitionResolver: resolver} + + err := s.validateHTFIFOCapability(context.Background(), &sqsQueueMeta{ + Name: "newqueue.fifo", + PartitionCount: 4, + }) + require.Error(t, err) + var apiErr *sqsAPIError + require.True(t, errors.As(err, &apiErr)) + require.Equal(t, http.StatusBadRequest, apiErr.status) + require.Equal(t, sqsErrInvalidAttributeValue, apiErr.errorType) + require.Equal(t, htfifoRoutingCoverageRejectionPublic, apiErr.message, + "client message must be the sanitized routing-coverage constant — operator detail is in the slog.Warn line") +} + +// TestValidateHTFIFOCapability_RejectsWhenRoutingMapPartiallyCoversQueue +// is the partial-coverage variant of the routing-map gate: the +// queue is in the map but with FEWER routes than the requested +// PartitionCount. SendMessage on the missing partitions would fail +// closed at the router; the gate must reject before the create +// commits. +func TestValidateHTFIFOCapability_RejectsWhenRoutingMapPartiallyCoversQueue(t *testing.T) { + t.Parallel() + + resolver := NewSQSPartitionResolver(map[string][]uint64{ + "queue.fifo": {100, 101}, // only 2 routes + }) + s := &SQSServer{partitionResolver: resolver} + + err := s.validateHTFIFOCapability(context.Background(), &sqsQueueMeta{ + Name: "queue.fifo", + PartitionCount: 4, // requesting 4 partitions + }) + require.Error(t, err) + var apiErr *sqsAPIError + require.True(t, errors.As(err, &apiErr)) + require.Equal(t, sqsErrInvalidAttributeValue, apiErr.errorType) + require.Equal(t, htfifoRoutingCoverageRejectionPublic, apiErr.message) +} + +// TestValidateHTFIFOCapability_AcceptsWhenRoutingMapFullyCoversQueue +// pins the happy path: queue is in the map with at least +// PartitionCount entries → gate passes. ">=" rather than "==" +// because over-allocating routes (operator preparing for a future +// expansion) is harmless: the create only uses indices [0, +// PartitionCount). +func TestValidateHTFIFOCapability_AcceptsWhenRoutingMapFullyCoversQueue(t *testing.T) { + t.Parallel() + + resolver := NewSQSPartitionResolver(map[string][]uint64{ + "queue.fifo": {100, 101, 102, 103, 104, 105}, // 6 routes + }) + s := &SQSServer{partitionResolver: resolver} + + require.NoError(t, s.validateHTFIFOCapability(context.Background(), &sqsQueueMeta{ + Name: "queue.fifo", + PartitionCount: 4, // requesting 4, under the 6 configured + })) +} + +// TestValidateHTFIFOCapability_AcceptsWhenResolverNil pins the +// backward-compat path: a single-shard / no-flag deployment has +// no partition resolver wired (s.partitionResolver == nil), so +// the gate's coverage check is skipped and the create proceeds +// (provided the local + peer-poll checks above also pass). This +// is what TestSQSServer_HTFIFO_CapabilityGate_AcceptsOnSingleNode +// exercises end-to-end at the wire level; this unit test is the +// narrower assertion that the partitionResolver==nil branch is +// the one that bypasses the coverage check. +func TestValidateHTFIFOCapability_AcceptsWhenResolverNil(t *testing.T) { + t.Parallel() + s := &SQSServer{} // partitionResolver implicitly nil + require.NoError(t, s.validateHTFIFOCapability(context.Background(), &sqsQueueMeta{ + Name: "queue.fifo", + PartitionCount: 4, + })) +} + +// TestRoutedPartitionCount_NilReceiver pins the typed-nil safety +// of the resolver method: a nil *SQSPartitionResolver returns 0 +// (no routes) so the gate's "len(routes) < PartitionCount" check +// rejects naturally rather than panicking. Used by the +// validateHTFIFORoutingCoverage short-circuit on resolver==nil +// (the gate skips the coverage check entirely) but also documents +// the safe-by-default behaviour for any other future caller. +func TestRoutedPartitionCount_NilReceiver(t *testing.T) { + t.Parallel() + var r *SQSPartitionResolver + require.Equal(t, 0, r.RoutedPartitionCount("anything")) +} + +// TestRoutedPartitionCount_KnownAndUnknownQueue pins the basic +// API: the count for a configured queue equals len(routes), and +// 0 for any queue not in the map. +func TestRoutedPartitionCount_KnownAndUnknownQueue(t *testing.T) { + t.Parallel() + r := NewSQSPartitionResolver(map[string][]uint64{ + "a.fifo": {1, 2}, + "b.fifo": {3, 4, 5, 6, 7, 8, 9, 10}, + }) + require.Equal(t, 2, r.RoutedPartitionCount("a.fifo")) + require.Equal(t, 8, r.RoutedPartitionCount("b.fifo")) + require.Equal(t, 0, r.RoutedPartitionCount("c.fifo"), + "unknown queue must return 0, not panic — gate routes that case to InvalidAttributeValue") + require.Equal(t, 0, r.RoutedPartitionCount(""), + "empty queue name must also return 0") +} + +// TestValidateHTFIFOCapability_PublicMessageDoesNotLeakPeerDetails +// pins the CodeRabbit major review's redaction contract: the +// client-visible InvalidAttributeValue message MUST NOT include +// peer addresses or raw poller error text. The two failure-path +// tests above check that the gate REJECTS; this test specifically +// checks that the rejection message is the sanitized +// htfifoCapabilityRejectionPublic constant — no host:port, no raw +// transport error. +func TestValidateHTFIFOCapability_PublicMessageDoesNotLeakPeerDetails(t *testing.T) { + t.Parallel() + + old := htfifoCapabilityServer(t, []string{}) // no htfifo + oldAddr := strings.TrimPrefix(old.URL, "http://") + s := &SQSServer{leaderSQS: map[string]string{"raft1": oldAddr}} + + err := s.validateHTFIFOCapability(context.Background(), &sqsQueueMeta{Name: "q.fifo", PartitionCount: 4}) + require.Error(t, err) + var apiErr *sqsAPIError + require.True(t, errors.As(err, &apiErr)) + require.Equal(t, htfifoCapabilityRejectionPublic, apiErr.message, + "client message must be the sanitized constant, never the per-peer detail") + require.NotContains(t, apiErr.message, oldAddr, + "peer host:port MUST NOT appear in the wire-level rejection — operator detail is server-side only") + require.Contains(t, apiErr.message, "see server logs for details", + "public message must point operators at the server-side detail") +} diff --git a/adapter/sqs_catalog.go b/adapter/sqs_catalog.go index abb179a2..1344a4a5 100644 --- a/adapter/sqs_catalog.go +++ b/adapter/sqs_catalog.go @@ -122,11 +122,10 @@ type sqsQueueMeta struct { // Zero or 1 means the legacy single-partition layout — no schema // change. Greater than 1 enables HT-FIFO. Set at CreateQueue time // and immutable thereafter (SetQueueAttributes rejects any change). - // Power-of-two values only (validator rejects others). PR 2 of the - // rollout introduces this field but a temporary CreateQueue gate - // rejects PartitionCount > 1 until PR 5 lifts the gate atomically - // with the data-plane fanout — so the schema exists but no - // partitioned data can land before the data plane is wired. + // Power-of-two values only (validator rejects others). CreateQueue + // gates PartitionCount > 1 on the cluster-wide htfifo capability + // (validateHTFIFOCapability in tryCreateQueueOnce) so a + // partitioned queue cannot land in a partially-upgraded cluster. PartitionCount uint32 `json:"partition_count,omitempty"` // FifoThroughputLimit mirrors the AWS attribute. "perMessageGroupId" // (default for HT-FIFO) keeps the §3.3 hash-by-MessageGroupId @@ -383,10 +382,11 @@ func parseAttributesIntoMeta(name string, attrs map[string]string) (*sqsQueueMet return nil, newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, "ContentBasedDeduplication is only valid on FIFO queues") } // HT-FIFO validation runs after resolveFifoQueueFlag so the - // IsFIFO-only checks see the post-resolution flag. The temporary - // dormancy gate (§11 PR 2) runs separately in createQueue so - // SetQueueAttributes paths share the schema validator without - // re-rejecting on the gate. + // IsFIFO-only checks see the post-resolution flag. The + // cluster-wide capability gate (validateHTFIFOCapability) runs + // separately in tryCreateQueueOnce so SetQueueAttributes paths + // share the schema validator without paying the network poll + // cost (immutability also blocks PartitionCount changes there). if err := validatePartitionConfig(meta); err != nil { return nil, err } @@ -481,9 +481,10 @@ var sqsAttributeAppliers = map[string]attributeApplier{ // docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md). Set // at CreateQueue time; SetQueueAttributes attempts to change it // reject via the immutability check in trySetQueueAttributesOnce. - // PR 2 of the rollout introduces the field but the temporary - // dormancy gate in tryCreateQueueOnce rejects PartitionCount > 1 - // until PR 5 lifts the gate atomically with the data plane. + // PartitionCount > 1 is gated by validateHTFIFOCapability (the + // cluster-wide htfifo capability poll, PR 5b-3) which runs + // inside tryCreateQueueOnce after the existence check so + // idempotent retries do not pay the network cost. "PartitionCount": func(m *sqsQueueMeta, v string) error { // Parse at uint64 width and bound-check explicitly so the // uint32 narrowing below is gosec-clean. @@ -903,15 +904,12 @@ func (s *SQSServer) createQueueCore(ctx context.Context, in *sqsCreateQueueInput if err != nil { return "", err } - // Temporary dormancy gate (Phase 3.D §11 PR 2). PartitionCount > 1 - // must reject until PR 5 wires the data plane atomically with the - // gate-lift. Without this, accepting a partitioned-queue create - // would let SendMessage write under the legacy single-partition - // prefix; the PR 5 reader would never find those messages and the - // reaper would not enumerate them — silent message loss. - if err := validatePartitionDormancyGate(requested); err != nil { - return "", err - } + // The cluster-wide htfifo capability gate (Phase 3.D §11 PR 5b-3) + // runs INSIDE tryCreateQueueOnce after the existence check, not + // here. Idempotent CreateQueue retries on an already-existing + // partitioned queue must NOT pay the network poll cost or risk a + // transient peer-poll failure flipping a 200-OK retry into a 400 + // (Codex P1 review on PR #734). if len(in.Tags) > sqsMaxTagsPerQueue { // AWS caps tags per queue at 50. CreateQueue must reject // over-cap tag bundles up front; a silent slice-and-store @@ -961,6 +959,23 @@ func (s *SQSServer) tryCreateQueueOnce(ctx context.Context, requested *sqsQueueM } return false, newSQSAPIError(http.StatusBadRequest, sqsErrQueueNameExists, "queue already exists with different attributes") } + // Cluster-wide htfifo capability gate (Phase 3.D §11 PR 5b-3, + // replaces the PR 2 dormancy reject). PartitionCount > 1 is + // rejected unless this binary AND every peer in s.leaderSQS + // advertise the htfifo capability via /sqs_health. Fails closed + // on any peer timeout, HTTP error, malformed body, or missing + // capability so a partitioned queue cannot land in a partially- + // upgraded cluster. + // + // Runs AFTER the existence check (so idempotent CreateQueue + // retries on an already-existing partitioned queue do not pay + // the network poll cost or risk a transient peer-poll failure + // flipping a 200-OK retry into a 400 — Codex P1 review on PR + // #734) and BEFORE the dispatch (so a rejected create does not + // burn an OCC commit). + if err := s.validateHTFIFOCapability(ctx, requested); err != nil { + return false, err + } lastGen, err := s.loadQueueGenerationAt(ctx, requested.Name, readTS) if err != nil { return false, errors.WithStack(err) diff --git a/adapter/sqs_keys.go b/adapter/sqs_keys.go index 44105c0e..0418bee6 100644 --- a/adapter/sqs_keys.go +++ b/adapter/sqs_keys.go @@ -262,9 +262,9 @@ func queueNameFromMetaKey(key []byte) (string, bool) { // partition uint32 inserted between the queue segment and the // generation. The legacy keyspace is unchanged on disk, so existing // queues and Standard queues stay byte-identical — these helpers are -// reachable only when meta.PartitionCount > 1, and the §11 PR 2 -// dormancy gate currently rejects that at CreateQueue. The data plane -// dispatch lands together with the gate-lift in PR 5. +// reachable only when meta.PartitionCount > 1, gated at CreateQueue +// time by the cluster-wide htfifo capability check +// (validateHTFIFOCapability, PR 5b-3). // // Each helper appends the partition as a fixed-width big-endian // uint32 so prefix scans `!sqs|msg||p|||` diff --git a/adapter/sqs_messages.go b/adapter/sqs_messages.go index b3994fc5..8dafa893 100644 --- a/adapter/sqs_messages.go +++ b/adapter/sqs_messages.go @@ -384,12 +384,13 @@ func decodeReceiptHandleV1(b []byte) (*decodedReceiptHandle, error) { // dormancy promise of PR 724 round 3 (no v2 wire-format // probability from the public API) under the new contract. // -// On partitioned queues (PartitionCount > 1) the §11 PR 2 -// dormancy gate is still in force in PR 5b-2 — CreateQueue -// rejects PartitionCount > 1, so no production queue can be in -// the partitioned branch yet, and validateReceiptHandleVersion -// against a non-partitioned meta still rejects every v2 handle. -// PR 5b-3 lifts the gate together with the capability check. +// On partitioned queues (PartitionCount > 1) the cluster-wide +// htfifo capability gate (PR 5b-3) admits CreateQueue; v2 handles +// then reach validateReceiptHandleVersion through the meta load +// performed inside loadMessageForDelete / loadAndVerifyMessage, +// which checks the version against the queue's PartitionCount and +// rejects the v2-on-legacy and v1-on-partitioned mismatches with +// ReceiptHandleIsInvalid. func decodeClientReceiptHandle(raw string) (*decodedReceiptHandle, error) { return decodeReceiptHandle(raw) } diff --git a/adapter/sqs_partition_resolver.go b/adapter/sqs_partition_resolver.go index 9e7ac02c..6c27834c 100644 --- a/adapter/sqs_partition_resolver.go +++ b/adapter/sqs_partition_resolver.go @@ -119,6 +119,28 @@ func (r *SQSPartitionResolver) RecognisesPartitionedKey(key []byte) bool { return ok } +// RoutedPartitionCount returns the number of partition routes +// configured for queueName, or 0 if the queue is not in the +// routing map. Used by the CreateQueue capability gate +// (validateHTFIFOCapability) to verify that EVERY partition of a +// requested partitioned queue is routable BEFORE the create +// commits — without this, a queue could land with PartitionCount=N +// but only K 1 at CreateQueue (lifted atomically with the capability check -// in PR 5b-3), so these tests reach below the public CreateQueue -// surface to install a partitioned meta record directly. That -// short-circuits the dormancy gate for the duration of the test -// without disabling it for production CreateQueue calls — which -// is the exact split the design doc envisaged for "data plane -// landed but not user-creatable yet." +// wiring. PR 5b-3 has since lifted the §11 PR 2 dormancy gate, so +// CreateQueue with PartitionCount > 1 succeeds on a single-node +// cluster. The helper below predates that gate-lift and overrides +// the meta record directly to keep these tests independent of the +// CreateQueue capability path; new partitioned-FIFO tests should +// prefer the public CreateQueue surface. // installPartitionedMetaForTest overwrites the queue's meta record // with a partitioned shape (PartitionCount > 1) by dispatching a @@ -30,11 +28,13 @@ import ( // counters and the catalog index are populated correctly); only // the partition-shape attributes are mutated. // -// The dormancy gate intercepts CreateQueue, not the data plane, -// so once the meta record carries PartitionCount > 1 every -// SendMessage / ReceiveMessage / DeleteMessage call routes -// through the partitioned dispatch helpers exactly as it would -// after PR 5b-3 lifts the gate. +// Predates the PR 5b-3 gate-lift; still useful for tests that want +// to bypass the capability poll (e.g. when CreateQueue would have +// to negotiate over a fake peer list). Once the meta record +// carries PartitionCount > 1, every SendMessage / ReceiveMessage / +// DeleteMessage call routes through the partitioned dispatch +// helpers exactly as it would after a normal partitioned-queue +// CreateQueue. func installPartitionedMetaForTest(t *testing.T, node Node, queueName string, partitionCount uint32, throughputLimit string) { t.Helper() s := node.sqsServer diff --git a/adapter/sqs_partitioning.go b/adapter/sqs_partitioning.go index 39c0f7b7..bce51e21 100644 --- a/adapter/sqs_partitioning.go +++ b/adapter/sqs_partitioning.go @@ -48,11 +48,6 @@ const ( htfifoDedupeScopeQueue = "queue" ) -// htfifoTemporaryGateMessage is the operator-facing reason the -// CreateQueue gate uses while PR 2-4 are in production. Removed in -// PR 5 in the same commit that wires the data-plane fanout. -const htfifoTemporaryGateMessage = "PartitionCount > 1 requires HT-FIFO data plane — not yet enabled" - // partitionFor maps a (queue meta, MessageGroupId) pair to a // partition index in [0, PartitionCount). Edge cases: // @@ -131,11 +126,9 @@ func isPowerOfTwo(n uint32) bool { // with multi-partition FIFO because the dedup key cannot be // globally unique across partitions without a cross-partition // OCC transaction. -// - The §11 PR 2 dormancy gate (PartitionCount > 1 rejected at -// CreateQueue) lives in validatePartitionDormancyGate so the -// dormancy check can be turned off in unit tests that want to -// exercise the full schema path. Production CreateQueue calls -// both validators. +// - The §11 PR 2 dormancy gate has been lifted (Phase 3.D PR 5b-3); +// CreateQueue now gates PartitionCount > 1 on the cluster-wide +// htfifo capability via validateHTFIFOCapability instead. func validatePartitionConfig(meta *sqsQueueMeta) error { if err := validatePartitionShape(meta); err != nil { return err @@ -212,23 +205,6 @@ func validateStandardQueueRejectsHTFIFO(meta *sqsQueueMeta) error { return nil } -// validatePartitionDormancyGate is the temporary §11 PR 2 gate. As -// long as the data-plane fanout (PR 5) has not landed, accepting a -// partitioned-queue CreateQueue would let SendMessage write under -// the legacy single-partition prefix — the PR 5 reader would never -// find those messages and the reaper would not enumerate them. This -// gate makes the wrong-layout-data class of bug impossible. -// -// Removed in PR 5 in the same commit that wires the data plane so -// the gate-and-lift land atomically. -func validatePartitionDormancyGate(meta *sqsQueueMeta) error { - if meta.PartitionCount > 1 { - return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, - htfifoTemporaryGateMessage) - } - return nil -} - // validatePartitionImmutability enforces the §3.2 rule that // PartitionCount, FifoThroughputLimit, and DeduplicationScope are // all immutable from CreateQueue onward. Called from diff --git a/adapter/sqs_partitioning_integration_test.go b/adapter/sqs_partitioning_integration_test.go index b0c50178..153547f3 100644 --- a/adapter/sqs_partitioning_integration_test.go +++ b/adapter/sqs_partitioning_integration_test.go @@ -4,15 +4,20 @@ import ( "net/http" "strings" "testing" + + "github.com/stretchr/testify/require" ) -// TestSQSServer_HTFIFO_DormancyGate_RejectsPartitionedCreate pins -// the §11 PR 2 dormancy gate at the wire layer: CreateQueue with -// PartitionCount > 1 rejects with InvalidAttributeValue and the -// gate's reason ("not yet enabled") makes it into the operator- -// visible message. Removed in PR 5 in the same commit that wires -// the data plane. -func TestSQSServer_HTFIFO_DormancyGate_RejectsPartitionedCreate(t *testing.T) { +// TestSQSServer_HTFIFO_CapabilityGate_AcceptsOnSingleNode pins the +// §11 PR 5b-3 gate-lift at the wire layer on a single-node cluster: +// CreateQueue with PartitionCount > 1 now succeeds because the local +// binary advertises htfifo and there are no peers to poll. This test +// replaces the prior PR 2 dormancy-reject test (which expected a 400 +// with "not yet enabled" — the dormancy gate has been removed). The +// peer-rejection class is exercised in +// sqs_capability_gate_test.go where a fake peer can be wired in +// without a multi-node cluster. +func TestSQSServer_HTFIFO_CapabilityGate_AcceptsOnSingleNode(t *testing.T) { t.Parallel() nodes, _, _ := createNode(t, 1) defer shutdown(nodes) @@ -26,24 +31,86 @@ func TestSQSServer_HTFIFO_DormancyGate_RejectsPartitionedCreate(t *testing.T) { "PartitionCount": n, }, }) - if status != http.StatusBadRequest { - t.Fatalf("PartitionCount=%s: status %d (expected 400 from dormancy gate); body=%v", n, status, out) - } - if got, _ := out["__type"].(string); got != sqsErrInvalidAttributeValue { - t.Fatalf("PartitionCount=%s: __type=%q (expected InvalidAttributeValue)", n, got) - } - msg, _ := out["message"].(string) - if msg == "" || !strings.Contains(msg, "not yet enabled") { - t.Fatalf("PartitionCount=%s: message %q must mention the gate reason", n, msg) + if status != http.StatusOK { + t.Fatalf("PartitionCount=%s on single-node cluster: status %d (expected 200 after gate-lift); body=%v", n, status, out) } } } -// TestSQSServer_HTFIFO_DormancyGate_AllowsPartitionCountOne pins +// TestSQSServer_HTFIFO_CapabilityGate_IsIdempotentOnExistingQueue +// pins the Codex P1 review fix on PR #734: a CreateQueue retry on +// an already-existing partitioned queue with identical attributes +// MUST return 200 (idempotent) even when the cluster-wide +// capability poll would now fail. Before the fix, the gate ran +// before the existence check, so a transient peer outage during +// a CreateQueue retry would flip a 200-OK idempotent response +// into a 400. Now the gate runs INSIDE tryCreateQueueOnce after +// the existence check; an existing queue with matching attrs +// short-circuits to 200 and never touches the network. +// +// The test creates a partitioned queue on a single-node cluster +// (gate passes vacuously), then poisons the SQSServer's peer +// map with an unreachable address so any subsequent gate +// invocation would reject. The second CreateQueue with identical +// attrs must still succeed; only an actually-new partitioned +// queue would now fail the gate. +func TestSQSServer_HTFIFO_CapabilityGate_IsIdempotentOnExistingQueue(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + attrs := map[string]string{ + "FifoQueue": "true", + "PartitionCount": "4", + } + + // First create — succeeds because the leaderSQS map is empty + // and the local binary advertises htfifo (vacuous gate). + status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{ + "QueueName": "htfifo-idempotent.fifo", + "Attributes": attrs, + }) + require.Equal(t, http.StatusOK, status, + "first create on single-node cluster must succeed: body=%v", out) + + // Poison the peer map with an unreachable address. Any gate + // invocation from this point would call PollSQSHTFIFOCapability + // against this dead peer and fail closed → 400. The + // post-fix code path skips the gate on an existing-queue + // match, so the next create must STILL succeed. + require.NotNil(t, node.sqsServer) + node.sqsServer.leaderSQS = map[string]string{ + "raft-fake": "127.0.0.1:1", // unreachable: connection refused + } + + status, out = callSQS(t, node, sqsCreateQueueTarget, map[string]any{ + "QueueName": "htfifo-idempotent.fifo", + "Attributes": attrs, + }) + require.Equal(t, http.StatusOK, status, + "second create with identical attrs must be idempotent (gate must NOT run on existing-queue match) — Codex P1 PR #734; body=%v", out) + + // Sanity: the gate IS still in effect for genuinely new + // partitioned queues — a different name with the poisoned + // peer map must fail the gate. + status, out = callSQS(t, node, sqsCreateQueueTarget, map[string]any{ + "QueueName": "htfifo-newqueue.fifo", + "Attributes": attrs, + }) + require.Equal(t, http.StatusBadRequest, status, + "new partitioned queue must hit the gate when a peer is unreachable: body=%v", out) + if got, _ := out["__type"].(string); got != sqsErrInvalidAttributeValue { + t.Fatalf("new queue: __type=%q (expected InvalidAttributeValue)", got) + } +} + +// TestSQSServer_HTFIFO_CapabilityGate_AllowsPartitionCountOne pins // the no-op-partition-count path: PartitionCount=1 is the legacy -// single-partition layout and must pass the dormancy gate even on -// FIFO queues that explicitly set the field. -func TestSQSServer_HTFIFO_DormancyGate_AllowsPartitionCountOne(t *testing.T) { +// single-partition layout and bypasses the capability gate +// entirely (validateHTFIFOCapability short-circuits on +// PartitionCount <= 1). +func TestSQSServer_HTFIFO_CapabilityGate_AllowsPartitionCountOne(t *testing.T) { t.Parallel() nodes, _, _ := createNode(t, 1) defer shutdown(nodes) @@ -62,9 +129,9 @@ func TestSQSServer_HTFIFO_DormancyGate_AllowsPartitionCountOne(t *testing.T) { } // TestSQSServer_HTFIFO_RejectsNonPowerOfTwoPartitionCount pins the -// validator's power-of-two rule. The validator runs before the -// dormancy gate so an invalid count (3) reports the validator's -// reason, not the gate's. +// validator's power-of-two rule. The schema validator runs before +// the capability gate so an invalid count (3) reports the +// validator's reason, not the gate's. func TestSQSServer_HTFIFO_RejectsNonPowerOfTwoPartitionCount(t *testing.T) { t.Parallel() nodes, _, _ := createNode(t, 1) @@ -112,16 +179,13 @@ func TestSQSServer_HTFIFO_RejectsHTFIFOAttrsOnStandardQueue(t *testing.T) { // the §3.2 cross-attribute control-plane gate at the wire layer. // {PartitionCount > 1, DeduplicationScope = "queue"} is rejected by // validatePartitionConfig (the schema validator) which runs inside -// parseAttributesIntoMeta — that is, BEFORE validatePartitionDormancyGate -// runs in createQueue. So the cross-attr rejection is what the wire -// layer sees today, even though the dormancy gate would also reject -// the same input on its own. After PR 5 lifts the dormancy gate the -// cross-attr rule remains the sole rejection path. +// parseAttributesIntoMeta — that is, BEFORE the capability gate +// (validateHTFIFOCapability) ever runs. The schema rejection is the +// sole rejection path after the PR 5b-3 dormancy gate-lift. // // The test only checks the 400 status to stay agnostic about which -// validator fires first — both are correct behaviour, and a future -// reordering of the createQueue control flow does not need to break -// this test. +// validator fires first — a future reordering of the createQueue +// control flow does not need to break this test. func TestSQSServer_HTFIFO_RejectsQueueScopedDedupOnPartitioned(t *testing.T) { t.Parallel() nodes, _, _ := createNode(t, 1) @@ -145,8 +209,9 @@ func TestSQSServer_HTFIFO_RejectsQueueScopedDedupOnPartitioned(t *testing.T) { // the §3.2 immutability rule at the wire layer: SetQueueAttributes // attempts to change PartitionCount / FifoThroughputLimit / // DeduplicationScope reject with InvalidAttributeValue. Test creates -// a single-partition FIFO queue (allowed by dormancy) with -// FifoThroughputLimit set, then tries to change it. +// a single-partition FIFO queue (which bypasses the capability +// gate entirely) with FifoThroughputLimit set, then tries to +// change it. func TestSQSServer_HTFIFO_ImmutabilitySetQueueAttributesRejects(t *testing.T) { t.Parallel() nodes, _, _ := createNode(t, 1) @@ -252,9 +317,10 @@ func TestSQSServer_HTFIFO_GetQueueAttributesRoundTrip(t *testing.T) { // --- helpers --- // mustCreateFIFOWithThroughputLimit creates a single-partition FIFO -// queue (allowed by the §11 PR 2 dormancy gate) with the requested -// FifoThroughputLimit set. Used by the immutability tests so they -// have a non-empty FifoThroughputLimit to attempt to change. +// queue (PartitionCount=1 bypasses the §11 PR 5b-3 capability +// gate) with the requested FifoThroughputLimit set. Used by the +// immutability tests so they have a non-empty FifoThroughputLimit +// to attempt to change. func mustCreateFIFOWithThroughputLimit(t *testing.T, node Node, name, limit string) string { t.Helper() status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{ diff --git a/adapter/sqs_partitioning_test.go b/adapter/sqs_partitioning_test.go index 0183674a..db8ce746 100644 --- a/adapter/sqs_partitioning_test.go +++ b/adapter/sqs_partitioning_test.go @@ -166,9 +166,10 @@ func TestValidatePartitionConfig_RejectsAboveMax(t *testing.T) { // // PartitionCount > 1 is also FIFO-only — without the guard a // Standard queue with PartitionCount=2 would slip past the validator -// after PR 5 lifts the dormancy gate. PartitionCount 0/1 are still -// accepted -// on Standard queues because both mean "single-partition layout". +// (the capability gate that gates partitioned creates only fires on +// FIFO queues, so the schema rule has to catch the Standard case). +// PartitionCount 0/1 are still accepted on Standard queues because +// both mean "single-partition layout". func TestValidatePartitionConfig_StandardQueueRejectsHTFIFOAttrs(t *testing.T) { t.Parallel() require.Error(t, validatePartitionConfig(&sqsQueueMeta{IsFIFO: false, FifoThroughputLimit: htfifoThroughputPerQueue})) @@ -232,9 +233,10 @@ func TestValidatePartitionConfig_PerMessageGroupIDRequiresExplicitPartitionCount require.Equal(t, sqsErrInvalidAttributeValue, apiErr.errorType) } // FIFO + perMessageGroupId + PartitionCount=8: accept (the - // dormancy gate runs separately on CreateQueue and rejects this - // at the wire today, but the cross-attribute validator on its - // own does not). + // cluster-wide capability gate runs separately on CreateQueue + // and may still reject this at the wire on a partially-upgraded + // cluster, but the cross-attribute validator on its own does + // not). require.NoError(t, validatePartitionConfig(&sqsQueueMeta{ IsFIFO: true, FifoThroughputLimit: htfifoThroughputPerMessageGroupID, @@ -253,24 +255,6 @@ func TestValidatePartitionConfig_PerMessageGroupIDRequiresExplicitPartitionCount })) } -// --- validatePartitionDormancyGate unit tests --- - -// TestValidatePartitionDormancyGate_RejectsAboveOne pins the §11 -// PR 2 dormancy gate: PartitionCount > 1 must reject until PR 5 -// lifts the gate. PartitionCount 0 or 1 must pass (both are the -// legacy single-partition layout). -func TestValidatePartitionDormancyGate_RejectsAboveOne(t *testing.T) { - t.Parallel() - require.NoError(t, validatePartitionDormancyGate(&sqsQueueMeta{PartitionCount: 0})) - require.NoError(t, validatePartitionDormancyGate(&sqsQueueMeta{PartitionCount: 1})) - for _, n := range []uint32{2, 4, 8, 16, 32} { - err := validatePartitionDormancyGate(&sqsQueueMeta{PartitionCount: n}) - require.Error(t, err, "PartitionCount=%d must reject under the dormancy gate", n) - require.Contains(t, err.Error(), "not yet enabled", - "the gate's reason must surface to the operator") - } -} - // --- validatePartitionImmutability unit tests --- // TestValidatePartitionImmutability_RejectsAnyChange pins the §3.2 diff --git a/main.go b/main.go index a821c877..dfccb258 100644 --- a/main.go +++ b/main.go @@ -519,6 +519,28 @@ func buildLeaderSQS(groups []groupSpec, sqsAddr string, raftSqsMap string) (map[ // (e.g. a test seeding the map programmatically) gets a clear panic // instead of a silent route-to-group-zero. func buildSQSPartitionResolver(partitionMap map[string]sqsFifoQueueRouting) kv.PartitionResolver { + r := buildSQSPartitionResolverConcrete(partitionMap) + if r == nil { + // Defensive typed-nil → untyped-nil interface conversion. + // The doc on this function explains the typed-nil hazard: + // a non-nil interface wrapping a nil concrete pointer + // would defeat kv.ShardRouter's `s.partitionResolver != + // nil` short-circuit and cost every request an extra + // nil-receiver ResolveGroup call. + return nil + } + return r +} + +// buildSQSPartitionResolverConcrete returns the concrete +// *adapter.SQSPartitionResolver so the SQS server can install it +// via WithSQSPartitionResolver and reuse the routing map for the +// CreateQueue capability gate's coverage check (Codex P1 review +// on PR #734, round 2). Returns nil when partitionMap is empty — +// callers that need the kv.PartitionResolver interface must go +// through buildSQSPartitionResolver to avoid the typed-nil +// interface trap. +func buildSQSPartitionResolverConcrete(partitionMap map[string]sqsFifoQueueRouting) *adapter.SQSPartitionResolver { if len(partitionMap) == 0 { return nil } @@ -540,16 +562,7 @@ func buildSQSPartitionResolver(partitionMap map[string]sqsFifoQueueRouting) kv.P } flat[queue] = ids } - r := adapter.NewSQSPartitionResolver(flat) - if r == nil { - // Defensive: NewSQSPartitionResolver returns nil on an - // empty input. The len-check above already short-circuits - // for empty partitionMap, so reaching this branch means - // the canonicalisation collapsed every entry — surface as - // a true nil interface, not a typed-nil pointer wrapper. - return nil - } - return r + return adapter.NewSQSPartitionResolver(flat) } // buildSQSFifoPartitionMap parses and validates the @@ -871,12 +884,21 @@ func startServers(in serversInput) error { leaderSQS: in.cfg.leaderSQS, sqsRegion: *sqsRegion, sqsCredsFile: *sqsCredsFile, - metricsAddress: *metricsAddr, - metricsToken: *metricsToken, - pprofAddress: *pprofAddr, - pprofToken: *pprofToken, - metricsRegistry: in.metricsRegistry, - roleStore: roleStore, + // sqsPartitionResolver is rebuilt from the same config map + // the coordinator's WithPartitionResolver consumes (line + // ~328) so the SQS server's CreateQueue capability gate + // sees exactly the routes the coordinator will use to + // resolve SendMessage / ReceiveMessage / DeleteMessage + // dispatch. Returns nil on a non-partitioned cluster, in + // which case validateHTFIFOCapability skips the routing- + // coverage check (Codex P1 review on PR #734, round 2). + sqsPartitionResolver: buildSQSPartitionResolverConcrete(in.cfg.sqsFifoPartitionMap), + metricsAddress: *metricsAddr, + metricsToken: *metricsToken, + pprofAddress: *pprofAddr, + pprofToken: *pprofToken, + metricsRegistry: in.metricsRegistry, + roleStore: roleStore, } if err := runner.start(); err != nil { return err @@ -1448,6 +1470,16 @@ type runtimeServerRunner struct { // (the mux 404s those paths). sqsServer *adapter.SQSServer + // sqsPartitionResolver is the concrete pointer to the same + // resolver installed on the coordinator (line ~322). startSQSServer + // hands this through WithSQSPartitionResolver so the CreateQueue + // capability gate can verify routing coverage on partitioned + // creates without re-parsing --sqsFifoPartitionMap (Codex P1 + // review on PR #734, round 2). Nil on single-shard / no-flag + // deployments — the gate's resolver==nil branch then skips + // the coverage check. + sqsPartitionResolver *adapter.SQSPartitionResolver + // roleStore is the access-key → role index the leader-side // gRPC AdminForward service uses to re-validate the principal // on every forwarded write. Mirrors what admin.Config.RoleIndex @@ -1503,7 +1535,7 @@ func (r *runtimeServerRunner) start() error { ); err != nil { return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err) } - sqsServer, err := startSQSServer(r.ctx, r.lc, r.eg, r.sqsAddress, r.shardStore, r.coordinate, r.leaderSQS, r.sqsRegion, r.sqsCredsFile) + sqsServer, err := startSQSServer(r.ctx, r.lc, r.eg, r.sqsAddress, r.shardStore, r.coordinate, r.leaderSQS, r.sqsRegion, r.sqsCredsFile, r.sqsPartitionResolver) if err != nil { return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err) } diff --git a/main_sqs.go b/main_sqs.go index 7bb8623e..db8a28a3 100644 --- a/main_sqs.go +++ b/main_sqs.go @@ -26,6 +26,7 @@ func startSQSServer( leaderSQS map[string]string, region string, credentialsFile string, + partitionResolver *adapter.SQSPartitionResolver, ) (*adapter.SQSServer, error) { sqsAddr = strings.TrimSpace(sqsAddr) if sqsAddr == "" { @@ -47,6 +48,7 @@ func startSQSServer( adapter.WithSQSLeaderMap(leaderSQS), adapter.WithSQSRegion(region), adapter.WithSQSStaticCredentials(staticCreds), + adapter.WithSQSPartitionResolver(partitionResolver), ) // Two-goroutine shutdown pattern mirrors startS3Server: one goroutine waits // on either ctx.Done() or Run completion to call Stop, the other runs the