Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 41 additions & 9 deletions adapter/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ const (
// group that hosts a partitioned queue when the binary itself does
// not advertise this string.
//
// CreateQueue's catalog-polling gate (Phase 3.D PR 5 lifts the
// dormancy and starts checking) reads this list off /sqs_health on
// CreateQueue's cluster-wide capability gate (validateHTFIFOCapability,
// landed in Phase 3.D PR 5b-3) reads this list off /sqs_health on
// every peer; a CreateQueue with PartitionCount > 1 is rejected
// unless every peer reports "htfifo" — fail-closed against rolling
// upgrades that have not yet finished.
Expand All @@ -75,20 +75,20 @@ const sqsCapabilityHTFIFO = "htfifo"
// keys fail closed for recognised-but-unresolved partitioned
// keys.
// - Capability poller: PollSQSHTFIFOCapability, merged via
// #721 (Phase 3.D PR 4-B-3a). PR 5 will use this for the
// CreateQueue capability gate.
// #721 (Phase 3.D PR 4-B-3a) and now consumed by
// validateHTFIFOCapability in the CreateQueue gate (PR 5b-3).
// - Leadership-refusal hook:
// raftengine.RegisterLeaderAcquiredCallback +
// main_sqs_leadership_refusal.go (Phase 3.D PR 4-B-3b, this
// PR). On startup AND on every leader-acquired transition,
// the hook refuses leadership of any Raft group hosting a
// partitioned queue when the binary lacks htfifo.
//
// Both pieces are now in the binary, so the flag flips to true.
// PR 5 lifts the PartitionCount > 1 dormancy gate AND wires the
// CreateQueue capability poll in the same commit, at which point
// a partitioned queue can land in production and every node in
// the cluster must report htfifo for the gate to allow it.
// Both pieces are in the binary and PR 5b-3 has lifted the
// dormancy gate, so a partitioned queue can land in production —
// CreateQueue's validateHTFIFOCapability gate refuses
// PartitionCount > 1 unless every node in the cluster reports
// htfifo on /sqs_health.
//
// Stays a const (not a var) because the flag is build-time. A
// future runtime override (env var, --no-htfifo flag for
Expand Down Expand Up @@ -184,6 +184,22 @@ type SQSServer struct {
// receives for in-process — bounded by the operator-controlled
// CreateQueue rate.
receiveFanoutCounters sync.Map
// partitionResolver, when non-nil, is the per-cluster resolver
// that maps (queue, partition) keys to operator-chosen Raft
// groups (built from --sqsFifoPartitionMap, see main.go). The
// CreateQueue capability gate (validateHTFIFOCapability) uses
// it to verify routing coverage on partitioned creates BEFORE
// the meta record commits — without that check, a queue could
// land with PartitionCount=N but only K<N routes installed,
// and SendMessage on the missing partitions would fail closed
// at the router with "no route for key" (Codex P1 review on
// PR #734).
//
// nil on single-shard / no---sqsFifoPartitionMap deployments;
// the gate's resolver==nil branch then skips the coverage
// check so partitioned queues can land on a single-shard
// cluster and route through the engine's default group.
partitionResolver *SQSPartitionResolver
}

// WithSQSLeaderMap configures the Raft-address-to-SQS-address mapping used to
Expand All @@ -198,6 +214,22 @@ func WithSQSLeaderMap(m map[string]string) SQSServerOption {
}
}

// WithSQSPartitionResolver installs the cluster's partition
// resolver on the SQS server so the CreateQueue capability gate
// (validateHTFIFOCapability) can verify routing coverage before
// admitting a partitioned create. Pass nil (the default) on
// single-shard / no---sqsFifoPartitionMap deployments — the gate
// then skips the coverage check.
//
// Callers must ensure the resolver passed here matches the one
// installed on the kv coordinator via WithPartitionResolver,
// otherwise the gate would admit a queue that the coordinator
// then fails to route. main.go builds the resolver once and
// hands the same pointer to both consumers.
func WithSQSPartitionResolver(r *SQSPartitionResolver) SQSServerOption {
return func(s *SQSServer) { s.partitionResolver = r }
}

func NewSQSServer(listen net.Listener, st store.MVCCStore, coordinate kv.Coordinator, opts ...SQSServerOption) *SQSServer {
reaperCtx, reaperCancel := context.WithCancel(context.Background())
s := &SQSServer{
Expand Down
211 changes: 211 additions & 0 deletions adapter/sqs_capability_gate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
package adapter

import (
"context"
"log/slog"
"net/http"
"sort"
"strings"
)

// htfifoCapabilityRejectionPublic is the sanitized client-facing
// reason returned from validateHTFIFOCapability when the cluster
// poll fails. Per CodeRabbit major review: peer addresses and raw
// poller error text MUST NOT leak to authenticated clients (the
// CreateQueue surface is part of the public AWS-shaped API), so
// the wire-level message is intentionally generic and the per-peer
// detail goes to slog.Warn for operator triage.
const htfifoCapabilityRejectionPublic = "PartitionCount > 1 requires every cluster peer to advertise the htfifo capability via /sqs_health; one or more peers did not — see server logs for details"

// htfifoRoutingCoverageRejectionPublic is the sanitized client-
// facing reason returned when the partition resolver does not
// cover every partition of the requested queue. Same redaction
// principle as htfifoCapabilityRejectionPublic: the operator
// detail (which queue, how many partitions are missing) goes to
// slog.Warn, the wire message is generic so an authenticated
// caller cannot probe the operator's --sqsFifoPartitionMap shape.
const htfifoRoutingCoverageRejectionPublic = "PartitionCount > 1 requires every requested partition to be covered by the cluster's partition routing map; one or more partitions are not routable — see server logs for details"

// validateHTFIFOCapability is the §11 PR 5b-3 gate that replaced the
// PR 2 dormancy reject. CreateQueue calls this on every request; it
// is a no-op for legacy / single-partition meta and the full
// cluster-wide capability check for partitioned FIFO meta.
//
// Two-stage check, both fail-closed:
//
// 1. Local: this binary must advertise the htfifo capability
// (htfifoCapabilityAdvertised). If false, no amount of peer
// polling can make the create safe — the leader handling the
// request will write the partitioned-shape meta but its own
// data plane does not understand the partitioned keyspace.
//
// 2. Peers: every entry in s.leaderSQS must report htfifo via
// /sqs_health within the poller's per-peer timeout. Any
// timeout, HTTP error, malformed body, or missing capability
// blocks the create. This catches mid-rolling-upgrade clusters
// where the leader is on a new binary but a follower is still
// on the old one — the follower would silently store a
// partitioned record under the legacy keyspace if it ever won
// leadership, so we refuse the create until everyone is on a
// binary that handles the new layout.
//
// The vacuous case (single-node cluster, leaderSQS empty) is
// allowed: the local check covers the only node that will ever
// host the queue. proxyToLeader has already steered the request to
// the leader, and the leadership-refusal hook (PR 4-B-3b) keeps
// non-htfifo binaries from acquiring leadership over partitioned-
// queue Raft groups, so the gate's fail-closed default holds even
// after the create succeeds.
func (s *SQSServer) validateHTFIFOCapability(ctx context.Context, requested *sqsQueueMeta) error {
if requested == nil || requested.PartitionCount <= 1 {
return nil
}
if !htfifoCapabilityAdvertised {
return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue,
"PartitionCount > 1 requires the htfifo capability, which this node does not advertise")
}
// Routing-coverage check runs INDEPENDENTLY of the peer poll
// (and BEFORE it) so the empty-peer-list short-circuit below
// does not bypass coverage. A single-node cluster with a
// partition resolver installed (rare but possible: operator
// pre-configures --sqsFifoPartitionMap before adding peers)
// must still reject a partitioned create whose partitions
// aren't all routable.
if err := s.validateHTFIFORoutingCoverage(requested); err != nil {
return err
}
peers := s.collectSQSPeers()
if len(peers) == 0 {
// Single-node deployment: the local check above is the
// whole cluster. Vacuously true on the peer side.
return nil
}
report := PollSQSHTFIFOCapability(ctx, peers, PollerConfig{})
if report == nil || !report.AllAdvertise {
Comment on lines +83 to +84
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Reject partitioned CreateQueue without routing map coverage

validateHTFIFOCapability now allows PartitionCount > 1 queues after only checking peer /sqs_health, but it never verifies that the queue is actually routable by the partition resolver. In clusters where --sqsFifoPartitionMap is present but missing this queue (or has fewer partitions than requested), CreateQueue succeeds here, yet later Send/Receive/Delete operations build partitioned keys that ShardRouter.ResolveGroup treats as recognized-but-unresolved and fails closed, surfacing as coordinator no route for key errors (and then SQS InternalFailure). This change introduces a create-success/runtime-failure mode that leaves users with unusable queues; CreateQueue should fail early unless every partition for the requested queue can be routed.

Useful? React with 👍 / 👎.

// Log the full per-peer detail for operator triage. The
// client-visible message stays generic (no peer addresses,
// no raw poller error text) so the CreateQueue surface
// does not leak cluster topology to authenticated callers
// — CodeRabbit major review on PR #734.
slog.Warn("sqs: htfifo capability gate rejected partitioned CreateQueue",
"queueName", requested.Name,
"partitionCount", requested.PartitionCount,
"peerCount", len(peers),
"detail", formatHTFIFOCapabilityReportForLog(report))
return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue,
htfifoCapabilityRejectionPublic)
}
Comment on lines +77 to +97
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The validateHTFIFOCapability function performs a cluster-wide HTTP poll for every CreateQueue request where PartitionCount > 1. This introduces a blocking network dependency. Consider caching the capability status. If retries are implemented for these network calls, keep the maximum backoff duration short (e.g., 10ms) to minimize latency impact on the control plane, as is standard for our low-latency components.

References
  1. For low-latency systems, use a short maximum backoff duration for retries (e.g., 10ms) as long-lived contention is not the primary scenario to handle.

return nil
}

// validateHTFIFORoutingCoverage rejects a partitioned CreateQueue
// when the cluster's --sqsFifoPartitionMap does not cover every
// requested partition. Without this, a queue could land with
// PartitionCount=N but only K<N routes installed, and SendMessage
// on the missing partitions would fail closed at the
// kv.ShardRouter "no route for key" path → InternalFailure (Codex
// P1 review on PR #734, round 2).
//
// resolver==nil short-circuits to "OK": the cluster has no
// partitioned routing installed at all, so all keys (including
// partitioned ones) fall through to the byte-range engine's
// default group. This preserves the single-shard / no-flag
// deployment as a working CreateQueue path; the dormancy promise
// is upheld by the leadership-refusal hook on the catalog group.
//
// resolver!=nil + queue not in map → reject (the operator would
// need to add the queue to --sqsFifoPartitionMap and restart
// before the create can succeed).
//
// resolver!=nil + queue partially mapped (RoutedPartitionCount <
// PartitionCount) → reject (the missing partitions would
// fail-close on first SendMessage to a group ID they hash into).
func (s *SQSServer) validateHTFIFORoutingCoverage(requested *sqsQueueMeta) error {
if s.partitionResolver == nil {
return nil
}
routed := s.partitionResolver.RoutedPartitionCount(requested.Name)
// Compare in int64 — both sides widen losslessly (int → int64
// and uint32 → int64 are always safe), avoiding the gosec G115
// narrowing flag we'd hit on int → uint32 even though
// PartitionCount is bounded by htfifoMaxPartitions (=32) at the
// schema validator before we get here.
if int64(routed) >= int64(requested.PartitionCount) {
return nil
}
slog.Warn("sqs: htfifo capability gate rejected partitioned CreateQueue — incomplete routing map",
"queueName", requested.Name,
"partitionCount", requested.PartitionCount,
"routedPartitionCount", routed)
return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue,
htfifoRoutingCoverageRejectionPublic)
}

// collectSQSPeers returns every distinct, non-empty SQS-side address
// from s.leaderSQS in deterministic (sorted) order. Used by the
// CreateQueue capability gate so error messages and tests pin a
// stable peer order. The map may legitimately contain self (the
// proxy-to-leader path uses the same map to find the leader's SQS
// address by Raft addr); polling self over loopback is cheap and
// keeps the "every peer reports htfifo" invariant uniform.
func (s *SQSServer) collectSQSPeers() []string {
if len(s.leaderSQS) == 0 {
return nil
}
peers := make([]string, 0, len(s.leaderSQS))
seen := make(map[string]struct{}, len(s.leaderSQS))
for _, addr := range s.leaderSQS {
if addr == "" {
continue
}
if _, ok := seen[addr]; ok {
continue
}
seen[addr] = struct{}{}
peers = append(peers, addr)
}
sort.Strings(peers)
return peers
}
Comment on lines +151 to +169
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The collectSQSPeers method iterates over s.leaderSQS, which is a map. If this map is modified concurrently by another goroutine (e.g., during a cluster membership update), this iteration will cause a panic. While the PR description mentions that leaderSQS is only mutated at construction, it is safer to protect this access with a read lock. Additionally, if this operation is part of a read-modify-write sequence requiring leadership, ensure you verify leadership status after acquiring the lock to prevent race conditions.

References
  1. To prevent race conditions in read-modify-write operations that require leadership, verify leadership status after acquiring the lock that protects the operation.


// formatHTFIFOCapabilityReportForLog composes the per-peer detail
// surfaced to slog.Warn when the gate rejects. Lists each failing
// peer's address and reason (per-peer Error or "missing capability")
// so an operator triaging a partial-rolling-upgrade cluster can fix
// the lag from the server logs without rerunning the poll
// out-of-band. NEVER returned to the client — that path uses the
// sanitized htfifoCapabilityRejectionPublic constant. Order matches
// report.Peers, which matches collectSQSPeers' sorted input order —
// deterministic so log lines diff cleanly across runs.
func formatHTFIFOCapabilityReportForLog(report *HTFIFOCapabilityReport) string {
var b strings.Builder
if report == nil {
b.WriteString("(no report)")
return b.String()
}
first := true
for _, p := range report.Peers {
if p.HasHTFIFO {
continue
}
if !first {
b.WriteString(", ")
}
first = false
b.WriteString(p.Address)
b.WriteString(" (")
if p.Error != "" {
b.WriteString(p.Error)
} else {
b.WriteString("missing capability")
}
b.WriteString(")")
}
if first {
// Defensive: AllAdvertise was false but no peer surfaced a
// reason. Should never happen, but emit a non-empty hint
// rather than a truncated empty string.
b.WriteString("(unknown peer)")
}
return b.String()
}
Loading
Loading