Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions adapter/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,36 @@ type SQSServer struct {
// check so partitioned queues can land on a single-shard
// cluster and route through the engine's default group.
partitionResolver *SQSPartitionResolver
// partitionObserver records the
// elastickv_sqs_partition_messages_total{queue, partition,
// action} counter for HT-FIFO operations (PR 7a). nil on
// non-monitored test fixtures and on single-binary CLI
// tools that build SQSServer without a monitoring registry.
// Increment call sites use a nil-receiver-safe call so the
// metrics path costs nothing when unwired.
partitionObserver SQSPartitionObserver
}

// SQSPartitionObserver is the metrics-package interface
// (monitoring.SQSPartitionObserver) re-declared here so the
// adapter does not import monitoring at the package boundary —
// matches the existing observer pattern for DynamoDB / Redis.
type SQSPartitionObserver interface {
ObservePartitionMessage(queue string, partition uint32, action string)
}

// SQSPartitionAction* mirror the action label values from
// monitoring.SQSPartitionAction*. Re-declared so adapter call
// sites do not need a monitoring import; the observer interface
// validates the value at runtime so a drift between these
// constants and the monitoring side surfaces as a dropped
// observation rather than a wedge.
const (
SQSPartitionActionSend = "send"
SQSPartitionActionReceive = "receive"
SQSPartitionActionDelete = "delete"
)

// WithSQSLeaderMap configures the Raft-address-to-SQS-address mapping used to
// forward requests from followers to the current leader. Format mirrors
// WithDynamoDBLeaderMap / WithS3LeaderMap.
Expand All @@ -214,6 +242,28 @@ func WithSQSLeaderMap(m map[string]string) SQSServerOption {
}
}

// WithSQSPartitionObserver installs the
// elastickv_sqs_partition_messages_total counter observer on the
// SQS server. Pass nil (the default) on non-monitored test
// fixtures; the partitioned send / receive / delete paths then
// observe via a nil interface and the metric stays at zero. The
// monitoring registry's SQSPartitionObserver() returns the
// concrete implementation in production.
func WithSQSPartitionObserver(o SQSPartitionObserver) SQSServerOption {
return func(s *SQSServer) { s.partitionObserver = o }
}

// observePartitionMessage is a nil-receiver-safe wrapper around
// the configured observer. Pulled into a helper so the call
// sites in send / receive / delete each cost one branch instead
// of repeating the nil check.
func (s *SQSServer) observePartitionMessage(queue string, partition uint32, action string) {
if s == nil || s.partitionObserver == nil {
return
}
s.partitionObserver.ObservePartitionMessage(queue, partition, action)
}

// WithSQSPartitionResolver installs the cluster's partition
// resolver on the SQS server so the CreateQueue capability gate
// (validateHTFIFOCapability) can verify routing coverage before
Expand Down
8 changes: 8 additions & 0 deletions adapter/sqs_fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,14 @@ func (s *SQSServer) sendFifoMessage(
}
return nil, false, errors.WithStack(err)
}
// Hot-partition observability (§11 PR 7): record the per-
// (queue, partition) send so dashboards can spot uneven
// MessageGroupId distributions. Only partitioned queues emit
// (PartitionCount > 1); for legacy queues partition is always
// 0 and the metric would be uninformative + cardinality cost.
if meta.PartitionCount > 1 {
s.observePartitionMessage(queueName, partition, SQSPartitionActionSend)
}
return map[string]string{
"MessageId": rec.MessageID,
"MD5OfMessageBody": rec.MD5OfBody,
Expand Down
15 changes: 15 additions & 0 deletions adapter/sqs_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -1269,6 +1269,15 @@ func (s *SQSServer) commitReceiveRotation(ctx context.Context, queueName string,
}
return nil, false, errors.WithStack(err)
}
// Hot-partition observability (§11 PR 7): record the per-
// (queue, partition) receive-rotation. Same gating rule as
// the send path — only partitioned queues emit (the metric
// would be uninformative for legacy queues that always sit
// on partition 0). meta is dereferenced above, so we don't
// re-check for nil here.
if meta.PartitionCount > 1 {
s.observePartitionMessage(queueName, cand.partition, SQSPartitionActionReceive)
}

handle, err := encodeReceiptHandleDispatch(meta, cand.partition, gen, cand.messageID, newToken)
if err != nil {
Expand Down Expand Up @@ -1432,6 +1441,12 @@ func (s *SQSServer) deleteMessageWithRetry(ctx context.Context, queueName string
return err
}
if _, err := s.coordinator.Dispatch(ctx, req); err == nil {
// Hot-partition observability (§11 PR 7): record the
// successful delete on the partitioned commit branch
// only. Legacy queues stay off the metric.
if meta != nil && meta.PartitionCount > 1 {
s.observePartitionMessage(queueName, handle.Partition, SQSPartitionActionDelete)
}
return nil
} else if !isRetryableTransactWriteError(err) {
return errors.WithStack(err)
Expand Down
13 changes: 12 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,10 @@ func startServers(in serversInput) error {
// which case validateHTFIFOCapability skips the routing-
// coverage check (Codex P1 review on PR #734, round 2).
sqsPartitionResolver: buildSQSPartitionResolverConcrete(in.cfg.sqsFifoPartitionMap),
// sqsPartitionObserver: the metrics registry's HT-FIFO
// partition counter observer. nil when --metricsAddress is
// empty (the adapter then no-ops the observe call).
sqsPartitionObserver: in.metricsRegistry.SQSPartitionObserver(),
metricsAddress: *metricsAddr,
metricsToken: *metricsToken,
pprofAddress: *pprofAddr,
Expand Down Expand Up @@ -1480,6 +1484,13 @@ type runtimeServerRunner struct {
// the coverage check.
sqsPartitionResolver *adapter.SQSPartitionResolver

// sqsPartitionObserver records the
// elastickv_sqs_partition_messages_total counter (PR 7a) for
// HT-FIFO send / receive / delete operations. Sourced from
// the monitoring registry; nil-receiver-safe on the adapter
// side so a test fixture without a registry can omit it.
sqsPartitionObserver adapter.SQSPartitionObserver

// roleStore is the access-key → role index the leader-side
// gRPC AdminForward service uses to re-validate the principal
// on every forwarded write. Mirrors what admin.Config.RoleIndex
Expand Down Expand Up @@ -1535,7 +1546,7 @@ func (r *runtimeServerRunner) start() error {
); err != nil {
return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err)
}
sqsServer, err := startSQSServer(r.ctx, r.lc, r.eg, r.sqsAddress, r.shardStore, r.coordinate, r.leaderSQS, r.sqsRegion, r.sqsCredsFile, r.sqsPartitionResolver)
sqsServer, err := startSQSServer(r.ctx, r.lc, r.eg, r.sqsAddress, r.shardStore, r.coordinate, r.leaderSQS, r.sqsRegion, r.sqsCredsFile, r.sqsPartitionResolver, r.sqsPartitionObserver)
if err != nil {
return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err)
}
Expand Down
2 changes: 2 additions & 0 deletions main_sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func startSQSServer(
region string,
credentialsFile string,
partitionResolver *adapter.SQSPartitionResolver,
partitionObserver adapter.SQSPartitionObserver,
) (*adapter.SQSServer, error) {
sqsAddr = strings.TrimSpace(sqsAddr)
if sqsAddr == "" {
Expand All @@ -49,6 +50,7 @@ func startSQSServer(
adapter.WithSQSRegion(region),
adapter.WithSQSStaticCredentials(staticCreds),
adapter.WithSQSPartitionResolver(partitionResolver),
adapter.WithSQSPartitionObserver(partitionObserver),
)
// Two-goroutine shutdown pattern mirrors startS3Server: one goroutine waits
// on either ctx.Done() or Run completion to call Stop, the other runs the
Expand Down
14 changes: 14 additions & 0 deletions monitoring/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Registry struct {
hotPath *HotPathMetrics
pebble *PebbleMetrics
writeConflict *WriteConflictMetrics
sqs *SQSMetrics
}

// NewRegistry builds a registry with constant labels that identify the local node.
Expand All @@ -43,6 +44,7 @@ func NewRegistry(nodeID string, nodeAddress string) *Registry {
r.hotPath = newHotPathMetrics(registerer)
r.pebble = newPebbleMetrics(registerer)
r.writeConflict = newWriteConflictMetrics(registerer)
r.sqs = newSQSMetrics(registerer)
return r
}

Expand Down Expand Up @@ -158,6 +160,18 @@ func (r *Registry) SetFSMApplySyncMode(activeLabel string) {
r.pebble.SetFSMApplySyncMode(activeLabel)
}

// SQSPartitionObserver returns the HT-FIFO partition-messages
// observer backed by this registry. Returns nil when the registry
// itself is nil so adapter call sites can pass the result through
// without checking; SQSMetrics.ObservePartitionMessage is also
// nil-receiver safe.
func (r *Registry) SQSPartitionObserver() SQSPartitionObserver {
if r == nil {
return nil
}
return r.sqs
}

// WriteConflictCollector returns a collector that polls each MVCC
// store's per-(kind, key_prefix) OCC conflict counters and mirrors
// them into the elastickv_store_write_conflict_total Prometheus
Expand Down
128 changes: 128 additions & 0 deletions monitoring/sqs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package monitoring

import (
"strconv"
"sync"

"github.com/prometheus/client_golang/prometheus"
)

// SQS HT-FIFO partition action labels. Stable string set so
// dashboards / alerts can rely on the values not changing.
const (
SQSPartitionActionSend = "send"
SQSPartitionActionReceive = "receive"
SQSPartitionActionDelete = "delete"

// sqsMaxTrackedQueues caps the number of distinct queue names
// the metrics layer will emit a per-(queue, partition, action)
// series for. Any queue beyond this cap collapses to the
// _other label so a misbehaving caller (e.g. a script that
// generates random queue names) cannot blow up the
// Prometheus cardinality budget. Mirrors dynamoMaxTrackedTables.
sqsMaxTrackedQueues = 512

// sqsQueueOverflow is the placeholder label used when a queue
// name is not in the tracked set (cap exceeded). Operators see
// the overflow as a single _other series and know to look at
// the application logs for the real names.
sqsQueueOverflow = "_other"
)

// SQSPartitionObserver records per-(queue, partition, action)
// counters for HT-FIFO operations. The interface is small so
// adapter call sites can pass a no-op observer in tests without
// pulling in the full Prometheus registry.
type SQSPartitionObserver interface {
// ObservePartitionMessage increments the
// sqs_partition_messages_total counter for one operation on
// one (queue, partition) pair. Action must be one of
// SQSPartitionActionSend / Receive / Delete; any other value
// is silently dropped so a typo at a future call site cannot
// crash the process.
ObservePartitionMessage(queue string, partition uint32, action string)
}

// SQSMetrics owns the Prometheus counter for HT-FIFO partition
// operations. Mirrors DynamoDBMetrics' shape: per-Registry
// instance, label-cardinality-bounded by sqsMaxTrackedQueues.
type SQSMetrics struct {
partitionMessages *prometheus.CounterVec

mu sync.Mutex
trackedQueues map[string]struct{}
}

func newSQSMetrics(registerer prometheus.Registerer) *SQSMetrics {
m := &SQSMetrics{
partitionMessages: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "elastickv_sqs_partition_messages_total",
Help: "Total HT-FIFO partition operations by queue, partition, and action (send / receive / delete). Non-zero only for queues with PartitionCount > 1; use to spot uneven MessageGroupId distributions across partitions.",
},
[]string{"queue", "partition", "action"},
),
trackedQueues: map[string]struct{}{},
}
registerer.MustRegister(m.partitionMessages)
return m
}

// ObservePartitionMessage implements SQSPartitionObserver. The
// (queue, action) pair is validated and (queue) is collapsed to
// the overflow label past sqsMaxTrackedQueues distinct names.
func (m *SQSMetrics) ObservePartitionMessage(queue string, partition uint32, action string) {
if m == nil {
return
}
if !sqsValidPartitionAction(action) {
return
}
if queue == "" {
// Defensive: an empty queue name would collapse all
// requests onto a single series — almost certainly a bug
// at the call site. Drop silently rather than emit
// poisoned data.
return
}
queueLabel := m.queueLabelForCardinalityBudget(queue)
// WithLabelValues avoids the prometheus.Labels map allocation
// on every observe call. Label order matches the
// NewCounterVec declaration: queue, partition, action.
// Mirrors DynamoDBMetrics.
m.partitionMessages.WithLabelValues(
queueLabel,
strconv.FormatUint(uint64(partition), 10),
action,
).Inc()
}

// queueLabelForCardinalityBudget returns queue if the metric has
// already emitted a series for it OR there is room in the
// tracked-queues set; returns sqsQueueOverflow otherwise. The
// cap-and-collapse pattern mirrors DynamoDBMetrics.tableLabel
// so a misbehaving caller cannot exhaust the Prometheus
// cardinality budget.
func (m *SQSMetrics) queueLabelForCardinalityBudget(queue string) string {
m.mu.Lock()
defer m.mu.Unlock()
if _, ok := m.trackedQueues[queue]; ok {
return queue
}
if len(m.trackedQueues) >= sqsMaxTrackedQueues {
return sqsQueueOverflow
}
m.trackedQueues[queue] = struct{}{}
return queue
}
Comment on lines +106 to +117
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The queueLabelForCardinalityBudget method is called on every SQS operation for partitioned queues. Using a plain sync.Mutex and defer creates a global bottleneck by serializing all metrics observations. Since trackedQueues is read-mostly once initialized, using a sync.RWMutex with a fast-path RLock or a sync.Map would significantly reduce contention. This follows the general rule of preferring efficient synchronization on hot paths.

References
  1. For frequently accessed fields that require monotonic updates (like a configuration index), prefer atomic operations (e.g., CAS) over mutexes to improve performance on hot paths.


// sqsValidPartitionAction returns true iff action is one of the
// stable label values. Keeps a typo at the call site (e.g.
// "Send" vs "send") from polluting the metric.
func sqsValidPartitionAction(action string) bool {
switch action {
case SQSPartitionActionSend, SQSPartitionActionReceive, SQSPartitionActionDelete:
return true
}
return false
}
Loading
Loading