diff --git a/adapter/sqs.go b/adapter/sqs.go index 14f961797..acff4ff09 100644 --- a/adapter/sqs.go +++ b/adapter/sqs.go @@ -200,8 +200,36 @@ type SQSServer struct { // check so partitioned queues can land on a single-shard // cluster and route through the engine's default group. partitionResolver *SQSPartitionResolver + // partitionObserver records the + // elastickv_sqs_partition_messages_total{queue, partition, + // action} counter for HT-FIFO operations (PR 7a). nil on + // non-monitored test fixtures and on single-binary CLI + // tools that build SQSServer without a monitoring registry. + // Increment call sites use a nil-receiver-safe call so the + // metrics path costs nothing when unwired. + partitionObserver SQSPartitionObserver } +// SQSPartitionObserver is the metrics-package interface +// (monitoring.SQSPartitionObserver) re-declared here so the +// adapter does not import monitoring at the package boundary — +// matches the existing observer pattern for DynamoDB / Redis. +type SQSPartitionObserver interface { + ObservePartitionMessage(queue string, partition uint32, action string) +} + +// SQSPartitionAction* mirror the action label values from +// monitoring.SQSPartitionAction*. Re-declared so adapter call +// sites do not need a monitoring import; the observer interface +// validates the value at runtime so a drift between these +// constants and the monitoring side surfaces as a dropped +// observation rather than a wedge. +const ( + SQSPartitionActionSend = "send" + SQSPartitionActionReceive = "receive" + SQSPartitionActionDelete = "delete" +) + // WithSQSLeaderMap configures the Raft-address-to-SQS-address mapping used to // forward requests from followers to the current leader. Format mirrors // WithDynamoDBLeaderMap / WithS3LeaderMap. @@ -214,6 +242,28 @@ func WithSQSLeaderMap(m map[string]string) SQSServerOption { } } +// WithSQSPartitionObserver installs the +// elastickv_sqs_partition_messages_total counter observer on the +// SQS server. Pass nil (the default) on non-monitored test +// fixtures; the partitioned send / receive / delete paths then +// observe via a nil interface and the metric stays at zero. The +// monitoring registry's SQSPartitionObserver() returns the +// concrete implementation in production. +func WithSQSPartitionObserver(o SQSPartitionObserver) SQSServerOption { + return func(s *SQSServer) { s.partitionObserver = o } +} + +// observePartitionMessage is a nil-receiver-safe wrapper around +// the configured observer. Pulled into a helper so the call +// sites in send / receive / delete each cost one branch instead +// of repeating the nil check. +func (s *SQSServer) observePartitionMessage(queue string, partition uint32, action string) { + if s == nil || s.partitionObserver == nil { + return + } + s.partitionObserver.ObservePartitionMessage(queue, partition, action) +} + // WithSQSPartitionResolver installs the cluster's partition // resolver on the SQS server so the CreateQueue capability gate // (validateHTFIFOCapability) can verify routing coverage before diff --git a/adapter/sqs_fifo.go b/adapter/sqs_fifo.go index 4db6dec21..b4445adeb 100644 --- a/adapter/sqs_fifo.go +++ b/adapter/sqs_fifo.go @@ -254,6 +254,14 @@ func (s *SQSServer) sendFifoMessage( } return nil, false, errors.WithStack(err) } + // Hot-partition observability (§11 PR 7): record the per- + // (queue, partition) send so dashboards can spot uneven + // MessageGroupId distributions. Only partitioned queues emit + // (PartitionCount > 1); for legacy queues partition is always + // 0 and the metric would be uninformative + cardinality cost. + if meta.PartitionCount > 1 { + s.observePartitionMessage(queueName, partition, SQSPartitionActionSend) + } return map[string]string{ "MessageId": rec.MessageID, "MD5OfMessageBody": rec.MD5OfBody, diff --git a/adapter/sqs_messages.go b/adapter/sqs_messages.go index 8dafa8936..fbcbe0992 100644 --- a/adapter/sqs_messages.go +++ b/adapter/sqs_messages.go @@ -1269,6 +1269,15 @@ func (s *SQSServer) commitReceiveRotation(ctx context.Context, queueName string, } return nil, false, errors.WithStack(err) } + // Hot-partition observability (§11 PR 7): record the per- + // (queue, partition) receive-rotation. Same gating rule as + // the send path — only partitioned queues emit (the metric + // would be uninformative for legacy queues that always sit + // on partition 0). meta is dereferenced above, so we don't + // re-check for nil here. + if meta.PartitionCount > 1 { + s.observePartitionMessage(queueName, cand.partition, SQSPartitionActionReceive) + } handle, err := encodeReceiptHandleDispatch(meta, cand.partition, gen, cand.messageID, newToken) if err != nil { @@ -1432,6 +1441,12 @@ func (s *SQSServer) deleteMessageWithRetry(ctx context.Context, queueName string return err } if _, err := s.coordinator.Dispatch(ctx, req); err == nil { + // Hot-partition observability (§11 PR 7): record the + // successful delete on the partitioned commit branch + // only. Legacy queues stay off the metric. + if meta != nil && meta.PartitionCount > 1 { + s.observePartitionMessage(queueName, handle.Partition, SQSPartitionActionDelete) + } return nil } else if !isRetryableTransactWriteError(err) { return errors.WithStack(err) diff --git a/main.go b/main.go index dfccb2583..f94f60896 100644 --- a/main.go +++ b/main.go @@ -893,6 +893,10 @@ func startServers(in serversInput) error { // which case validateHTFIFOCapability skips the routing- // coverage check (Codex P1 review on PR #734, round 2). sqsPartitionResolver: buildSQSPartitionResolverConcrete(in.cfg.sqsFifoPartitionMap), + // sqsPartitionObserver: the metrics registry's HT-FIFO + // partition counter observer. nil when --metricsAddress is + // empty (the adapter then no-ops the observe call). + sqsPartitionObserver: in.metricsRegistry.SQSPartitionObserver(), metricsAddress: *metricsAddr, metricsToken: *metricsToken, pprofAddress: *pprofAddr, @@ -1480,6 +1484,13 @@ type runtimeServerRunner struct { // the coverage check. sqsPartitionResolver *adapter.SQSPartitionResolver + // sqsPartitionObserver records the + // elastickv_sqs_partition_messages_total counter (PR 7a) for + // HT-FIFO send / receive / delete operations. Sourced from + // the monitoring registry; nil-receiver-safe on the adapter + // side so a test fixture without a registry can omit it. + sqsPartitionObserver adapter.SQSPartitionObserver + // roleStore is the access-key → role index the leader-side // gRPC AdminForward service uses to re-validate the principal // on every forwarded write. Mirrors what admin.Config.RoleIndex @@ -1535,7 +1546,7 @@ func (r *runtimeServerRunner) start() error { ); err != nil { return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err) } - sqsServer, err := startSQSServer(r.ctx, r.lc, r.eg, r.sqsAddress, r.shardStore, r.coordinate, r.leaderSQS, r.sqsRegion, r.sqsCredsFile, r.sqsPartitionResolver) + sqsServer, err := startSQSServer(r.ctx, r.lc, r.eg, r.sqsAddress, r.shardStore, r.coordinate, r.leaderSQS, r.sqsRegion, r.sqsCredsFile, r.sqsPartitionResolver, r.sqsPartitionObserver) if err != nil { return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err) } diff --git a/main_sqs.go b/main_sqs.go index db8a28a31..23ad16036 100644 --- a/main_sqs.go +++ b/main_sqs.go @@ -27,6 +27,7 @@ func startSQSServer( region string, credentialsFile string, partitionResolver *adapter.SQSPartitionResolver, + partitionObserver adapter.SQSPartitionObserver, ) (*adapter.SQSServer, error) { sqsAddr = strings.TrimSpace(sqsAddr) if sqsAddr == "" { @@ -49,6 +50,7 @@ func startSQSServer( adapter.WithSQSRegion(region), adapter.WithSQSStaticCredentials(staticCreds), adapter.WithSQSPartitionResolver(partitionResolver), + adapter.WithSQSPartitionObserver(partitionObserver), ) // Two-goroutine shutdown pattern mirrors startS3Server: one goroutine waits // on either ctx.Done() or Run completion to call Stop, the other runs the diff --git a/monitoring/registry.go b/monitoring/registry.go index 9f73d9798..8ce4e15d1 100644 --- a/monitoring/registry.go +++ b/monitoring/registry.go @@ -21,6 +21,7 @@ type Registry struct { hotPath *HotPathMetrics pebble *PebbleMetrics writeConflict *WriteConflictMetrics + sqs *SQSMetrics } // NewRegistry builds a registry with constant labels that identify the local node. @@ -43,6 +44,7 @@ func NewRegistry(nodeID string, nodeAddress string) *Registry { r.hotPath = newHotPathMetrics(registerer) r.pebble = newPebbleMetrics(registerer) r.writeConflict = newWriteConflictMetrics(registerer) + r.sqs = newSQSMetrics(registerer) return r } @@ -158,6 +160,18 @@ func (r *Registry) SetFSMApplySyncMode(activeLabel string) { r.pebble.SetFSMApplySyncMode(activeLabel) } +// SQSPartitionObserver returns the HT-FIFO partition-messages +// observer backed by this registry. Returns nil when the registry +// itself is nil so adapter call sites can pass the result through +// without checking; SQSMetrics.ObservePartitionMessage is also +// nil-receiver safe. +func (r *Registry) SQSPartitionObserver() SQSPartitionObserver { + if r == nil { + return nil + } + return r.sqs +} + // WriteConflictCollector returns a collector that polls each MVCC // store's per-(kind, key_prefix) OCC conflict counters and mirrors // them into the elastickv_store_write_conflict_total Prometheus diff --git a/monitoring/sqs.go b/monitoring/sqs.go new file mode 100644 index 000000000..7616b124c --- /dev/null +++ b/monitoring/sqs.go @@ -0,0 +1,128 @@ +package monitoring + +import ( + "strconv" + "sync" + + "github.com/prometheus/client_golang/prometheus" +) + +// SQS HT-FIFO partition action labels. Stable string set so +// dashboards / alerts can rely on the values not changing. +const ( + SQSPartitionActionSend = "send" + SQSPartitionActionReceive = "receive" + SQSPartitionActionDelete = "delete" + + // sqsMaxTrackedQueues caps the number of distinct queue names + // the metrics layer will emit a per-(queue, partition, action) + // series for. Any queue beyond this cap collapses to the + // _other label so a misbehaving caller (e.g. a script that + // generates random queue names) cannot blow up the + // Prometheus cardinality budget. Mirrors dynamoMaxTrackedTables. + sqsMaxTrackedQueues = 512 + + // sqsQueueOverflow is the placeholder label used when a queue + // name is not in the tracked set (cap exceeded). Operators see + // the overflow as a single _other series and know to look at + // the application logs for the real names. + sqsQueueOverflow = "_other" +) + +// SQSPartitionObserver records per-(queue, partition, action) +// counters for HT-FIFO operations. The interface is small so +// adapter call sites can pass a no-op observer in tests without +// pulling in the full Prometheus registry. +type SQSPartitionObserver interface { + // ObservePartitionMessage increments the + // sqs_partition_messages_total counter for one operation on + // one (queue, partition) pair. Action must be one of + // SQSPartitionActionSend / Receive / Delete; any other value + // is silently dropped so a typo at a future call site cannot + // crash the process. + ObservePartitionMessage(queue string, partition uint32, action string) +} + +// SQSMetrics owns the Prometheus counter for HT-FIFO partition +// operations. Mirrors DynamoDBMetrics' shape: per-Registry +// instance, label-cardinality-bounded by sqsMaxTrackedQueues. +type SQSMetrics struct { + partitionMessages *prometheus.CounterVec + + mu sync.Mutex + trackedQueues map[string]struct{} +} + +func newSQSMetrics(registerer prometheus.Registerer) *SQSMetrics { + m := &SQSMetrics{ + partitionMessages: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "elastickv_sqs_partition_messages_total", + Help: "Total HT-FIFO partition operations by queue, partition, and action (send / receive / delete). Non-zero only for queues with PartitionCount > 1; use to spot uneven MessageGroupId distributions across partitions.", + }, + []string{"queue", "partition", "action"}, + ), + trackedQueues: map[string]struct{}{}, + } + registerer.MustRegister(m.partitionMessages) + return m +} + +// ObservePartitionMessage implements SQSPartitionObserver. The +// (queue, action) pair is validated and (queue) is collapsed to +// the overflow label past sqsMaxTrackedQueues distinct names. +func (m *SQSMetrics) ObservePartitionMessage(queue string, partition uint32, action string) { + if m == nil { + return + } + if !sqsValidPartitionAction(action) { + return + } + if queue == "" { + // Defensive: an empty queue name would collapse all + // requests onto a single series — almost certainly a bug + // at the call site. Drop silently rather than emit + // poisoned data. + return + } + queueLabel := m.queueLabelForCardinalityBudget(queue) + // WithLabelValues avoids the prometheus.Labels map allocation + // on every observe call. Label order matches the + // NewCounterVec declaration: queue, partition, action. + // Mirrors DynamoDBMetrics. + m.partitionMessages.WithLabelValues( + queueLabel, + strconv.FormatUint(uint64(partition), 10), + action, + ).Inc() +} + +// queueLabelForCardinalityBudget returns queue if the metric has +// already emitted a series for it OR there is room in the +// tracked-queues set; returns sqsQueueOverflow otherwise. The +// cap-and-collapse pattern mirrors DynamoDBMetrics.tableLabel +// so a misbehaving caller cannot exhaust the Prometheus +// cardinality budget. +func (m *SQSMetrics) queueLabelForCardinalityBudget(queue string) string { + m.mu.Lock() + defer m.mu.Unlock() + if _, ok := m.trackedQueues[queue]; ok { + return queue + } + if len(m.trackedQueues) >= sqsMaxTrackedQueues { + return sqsQueueOverflow + } + m.trackedQueues[queue] = struct{}{} + return queue +} + +// sqsValidPartitionAction returns true iff action is one of the +// stable label values. Keeps a typo at the call site (e.g. +// "Send" vs "send") from polluting the metric. +func sqsValidPartitionAction(action string) bool { + switch action { + case SQSPartitionActionSend, SQSPartitionActionReceive, SQSPartitionActionDelete: + return true + } + return false +} diff --git a/monitoring/sqs_test.go b/monitoring/sqs_test.go new file mode 100644 index 000000000..5c3784782 --- /dev/null +++ b/monitoring/sqs_test.go @@ -0,0 +1,141 @@ +package monitoring + +import ( + "strconv" + "strings" + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" +) + +// TestSQSMetrics_ObservePartitionMessage_IncrementsByLabelTriple +// pins the basic counter contract: each (queue, partition, +// action) tuple gets its own series and only the matching +// observation increments it. +func TestSQSMetrics_ObservePartitionMessage_IncrementsByLabelTriple(t *testing.T) { + t.Parallel() + reg := prometheus.NewRegistry() + m := newSQSMetrics(reg) + + m.ObservePartitionMessage("q.fifo", 0, SQSPartitionActionSend) + m.ObservePartitionMessage("q.fifo", 0, SQSPartitionActionSend) + m.ObservePartitionMessage("q.fifo", 1, SQSPartitionActionSend) + m.ObservePartitionMessage("q.fifo", 0, SQSPartitionActionReceive) + m.ObservePartitionMessage("q.fifo", 0, SQSPartitionActionDelete) + m.ObservePartitionMessage("other.fifo", 7, SQSPartitionActionSend) + + require.InDelta(t, 2.0, testutil.ToFloat64(m.partitionMessages.WithLabelValues("q.fifo", "0", SQSPartitionActionSend)), 0.001) + require.InDelta(t, 1.0, testutil.ToFloat64(m.partitionMessages.WithLabelValues("q.fifo", "1", SQSPartitionActionSend)), 0.001) + require.InDelta(t, 1.0, testutil.ToFloat64(m.partitionMessages.WithLabelValues("q.fifo", "0", SQSPartitionActionReceive)), 0.001) + require.InDelta(t, 1.0, testutil.ToFloat64(m.partitionMessages.WithLabelValues("q.fifo", "0", SQSPartitionActionDelete)), 0.001) + require.InDelta(t, 1.0, testutil.ToFloat64(m.partitionMessages.WithLabelValues("other.fifo", "7", SQSPartitionActionSend)), 0.001) +} + +// TestSQSMetrics_ObservePartitionMessage_DropsInvalidAction pins +// the typo guard: an action label that isn't one of the three +// stable values (send / receive / delete) is silently dropped so +// a future call-site bug cannot pollute the metric with new +// label values that dashboards / alerts have to learn about. +func TestSQSMetrics_ObservePartitionMessage_DropsInvalidAction(t *testing.T) { + t.Parallel() + reg := prometheus.NewRegistry() + m := newSQSMetrics(reg) + + for _, bad := range []string{"", "Send", "SEND", "publish", "create"} { + m.ObservePartitionMessage("q.fifo", 0, bad) + } + // Valid action increments after the dropped attempts to prove + // the metric is still healthy and only the bogus actions were + // rejected. + m.ObservePartitionMessage("q.fifo", 0, SQSPartitionActionSend) + require.InDelta(t, 1.0, testutil.ToFloat64(m.partitionMessages.WithLabelValues("q.fifo", "0", SQSPartitionActionSend)), 0.001) +} + +// TestSQSMetrics_ObservePartitionMessage_DropsEmptyQueue pins +// that an empty queue name is dropped — collapsing all empty- +// name observations onto a single series would mask a call-site +// bug. Mirrors the typo guard above. +func TestSQSMetrics_ObservePartitionMessage_DropsEmptyQueue(t *testing.T) { + t.Parallel() + reg := prometheus.NewRegistry() + m := newSQSMetrics(reg) + + m.ObservePartitionMessage("", 0, SQSPartitionActionSend) + + // A subsequent valid observation lands on its own series — + // proves the empty-name observe was dropped, not collapsed + // into the valid one. + m.ObservePartitionMessage("q.fifo", 0, SQSPartitionActionSend) + require.InDelta(t, 1.0, testutil.ToFloat64(m.partitionMessages.WithLabelValues("q.fifo", "0", SQSPartitionActionSend)), 0.001) +} + +// TestSQSMetrics_NilReceiverIsSafe pins the nil-receiver +// short-circuit: adapter call sites pass the observer through +// without checking, and the SQSPartitionObserver interface lets +// a nil concrete value land here. The observe call must be a +// no-op rather than a nil-pointer panic. +func TestSQSMetrics_NilReceiverIsSafe(t *testing.T) { + t.Parallel() + var m *SQSMetrics + require.NotPanics(t, func() { + m.ObservePartitionMessage("q.fifo", 0, SQSPartitionActionSend) + }) +} + +// TestSQSMetrics_QueueLabelOverflow pins the cardinality cap: +// after sqsMaxTrackedQueues distinct queue names, every new +// name collapses to the _other label so a misbehaving caller +// cannot exhaust Prometheus's series budget. The first +// sqsMaxTrackedQueues names retain their real labels. +func TestSQSMetrics_QueueLabelOverflow(t *testing.T) { + t.Parallel() + reg := prometheus.NewRegistry() + m := newSQSMetrics(reg) + + for i := 0; i < sqsMaxTrackedQueues; i++ { + m.ObservePartitionMessage("q-"+strconv.Itoa(i)+".fifo", 0, SQSPartitionActionSend) + } + // All observations within the cap retain their real names. + require.InDelta(t, 1.0, testutil.ToFloat64(m.partitionMessages.WithLabelValues("q-0.fifo", "0", SQSPartitionActionSend)), 0.001) + require.InDelta(t, 1.0, testutil.ToFloat64(m.partitionMessages.WithLabelValues("q-"+strconv.Itoa(sqsMaxTrackedQueues-1)+".fifo", "0", SQSPartitionActionSend)), 0.001) + + // One more name overflows to the placeholder label. + m.ObservePartitionMessage("overflow.fifo", 0, SQSPartitionActionSend) + require.InDelta(t, 1.0, testutil.ToFloat64(m.partitionMessages.WithLabelValues(sqsQueueOverflow, "0", SQSPartitionActionSend)), 0.001) + + // Hitting the same overflow name again accumulates on the + // _other series, not on a fresh real-name series — the cap + // is a one-way collapse. + m.ObservePartitionMessage("overflow.fifo", 0, SQSPartitionActionSend) + require.InDelta(t, 2.0, testutil.ToFloat64(m.partitionMessages.WithLabelValues(sqsQueueOverflow, "0", SQSPartitionActionSend)), 0.001) +} + +// TestSQSMetrics_RegistryWiring pins the integration with the +// public Registry: a freshly-built Registry hands back an +// observer whose increments show up on the underlying gatherer +// under the documented metric name. Catches a regression that +// either skipped the SQS metrics registration or exposed it +// under the wrong name. +func TestSQSMetrics_RegistryWiring(t *testing.T) { + t.Parallel() + r := NewRegistry("node-test", "127.0.0.1:50051") + obs := r.SQSPartitionObserver() + require.NotNil(t, obs) + + obs.ObservePartitionMessage("q.fifo", 3, SQSPartitionActionReceive) + + mfs, err := r.Gatherer().Gather() + require.NoError(t, err) + var found bool + for _, mf := range mfs { + if !strings.HasPrefix(mf.GetName(), "elastickv_sqs_partition_messages_total") { + continue + } + found = true + require.Len(t, mf.GetMetric(), 1) + } + require.True(t, found, + "elastickv_sqs_partition_messages_total must be registered on the public Registry") +}