From 2e6ea3339dbec90f721440fae9e636403163813f Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 22 Apr 2026 23:17:42 +0900 Subject: [PATCH 1/3] obs(proxy): classify secondary write failures by reason Adds proxy_secondary_write_errors_by_reason_total{cmd,reason} so OCC "write conflict" incidents (e.g. the !txn|rb| races from the post-#581 log pattern) and other secondary-write failure classes are observable in Prometheus/Grafana rather than only in logs. Keeps the existing unlabelled counter so current dashboards keep working. --- proxy/dualwrite.go | 30 +++++++++++++++++ proxy/metrics.go | 19 +++++++---- proxy/metrics_test.go | 75 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 118 insertions(+), 6 deletions(-) create mode 100644 proxy/metrics_test.go diff --git a/proxy/dualwrite.go b/proxy/dualwrite.go index 9937c880d..95e46b2d4 100644 --- a/proxy/dualwrite.go +++ b/proxy/dualwrite.go @@ -319,6 +319,8 @@ func (d *DualWriter) writeSecondary(cmd string, iArgs []any) { // structured warning with enough context to diagnose EVALSHA timeouts. func (d *DualWriter) recordSecondaryWriteFailure(cmd string, iArgs []any, elapsed time.Duration, attempts int, usedNOSCRIPTFallback bool, sErr error) { d.metrics.SecondaryWriteErrors.Inc() + reason := classifySecondaryWriteError(sErr) + d.metrics.SecondaryWriteErrorsByReason.WithLabelValues(cmd, reason).Inc() d.metrics.CommandTotal.WithLabelValues(cmd, d.secondary.Name(), "error").Inc() fingerprint := fmt.Sprintf("secondary_write_%s", cmd) if d.sentry.ShouldReport(fingerprint) { @@ -488,3 +490,31 @@ func argsToBytes(iArgs []any) [][]byte { } return out } + +// classifySecondaryWriteError maps a secondary-write error to a small fixed set +// of reason labels suitable for a Prometheus counter. The elastickv secondary +// backend is in-house, so matching on substrings of the error message is safe. +// +// Order matters: "retry limit exceeded" is checked before "write conflict" +// because the retry-limit message embeds the underlying conflict string, and +// we want the outer (retry_limit) classification to win. +func classifySecondaryWriteError(err error) string { + if err == nil { + return "other" + } + msg := err.Error() + switch { + case strings.Contains(msg, "retry limit exceeded"): + return "retry_limit" + case strings.Contains(msg, "write conflict"): + return "write_conflict" + case errors.Is(err, context.DeadlineExceeded) || strings.Contains(msg, "deadline exceeded"): + return "deadline_exceeded" + case strings.Contains(msg, "not leader"): + return "not_leader" + case strings.Contains(msg, "txn already committed") || strings.Contains(msg, "txn already aborted"): + return "txn_already_finalized" + default: + return "other" + } +} diff --git a/proxy/metrics.go b/proxy/metrics.go index f2b4d406d..eb3d27979 100644 --- a/proxy/metrics.go +++ b/proxy/metrics.go @@ -7,12 +7,13 @@ type ProxyMetrics struct { CommandTotal *prometheus.CounterVec CommandDuration *prometheus.HistogramVec - PrimaryWriteErrors prometheus.Counter - SecondaryWriteErrors prometheus.Counter - PrimaryReadErrors prometheus.Counter - ShadowReadErrors prometheus.Counter - Divergences *prometheus.CounterVec - MigrationGaps *prometheus.CounterVec + PrimaryWriteErrors prometheus.Counter + SecondaryWriteErrors prometheus.Counter + SecondaryWriteErrorsByReason *prometheus.CounterVec + PrimaryReadErrors prometheus.Counter + ShadowReadErrors prometheus.Counter + Divergences *prometheus.CounterVec + MigrationGaps *prometheus.CounterVec ActiveConnections prometheus.Gauge @@ -48,6 +49,11 @@ func NewProxyMetrics(reg prometheus.Registerer) *ProxyMetrics { Name: "secondary_write_errors_total", Help: "Total write errors from the secondary backend.", }), + SecondaryWriteErrorsByReason: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "proxy", + Name: "secondary_write_errors_by_reason_total", + Help: "secondary write failures broken out by redis command and error classification (write_conflict / retry_limit / not_leader / deadline_exceeded / other)", + }, []string{"cmd", "reason"}), PrimaryReadErrors: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: "proxy", Name: "primary_read_errors_total", @@ -98,6 +104,7 @@ func NewProxyMetrics(reg prometheus.Registerer) *ProxyMetrics { m.CommandDuration, m.PrimaryWriteErrors, m.SecondaryWriteErrors, + m.SecondaryWriteErrorsByReason, m.PrimaryReadErrors, m.ShadowReadErrors, m.Divergences, diff --git a/proxy/metrics_test.go b/proxy/metrics_test.go new file mode 100644 index 000000000..5449bbbf9 --- /dev/null +++ b/proxy/metrics_test.go @@ -0,0 +1,75 @@ +package proxy + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/assert" +) + +func TestClassifySecondaryWriteError(t *testing.T) { + tests := []struct { + name string + err error + want string + }{ + {"nil error falls back to other", nil, "other"}, + {"plain write conflict", errors.New("write conflict on key foo"), "write_conflict"}, + { + "retry-limit message wins over embedded write-conflict substring", + errors.New("redis txn retry limit exceeded: write conflict"), + "retry_limit", + }, + {"not leader", errors.New("not leader"), "not_leader"}, + {"linearizable read not leader", errors.New("linearizable read: not leader"), "not_leader"}, + {"context deadline exceeded via errors.Is", context.DeadlineExceeded, "deadline_exceeded"}, + {"wrapped deadline exceeded", fmt.Errorf("dispatch failed: %w", context.DeadlineExceeded), "deadline_exceeded"}, + {"deadline exceeded substring only", errors.New("rpc: deadline exceeded"), "deadline_exceeded"}, + {"txn already committed", errors.New("txn already committed"), "txn_already_finalized"}, + {"txn already aborted", errors.New("txn already aborted"), "txn_already_finalized"}, + {"unknown", errors.New("some random failure"), "other"}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got := classifySecondaryWriteError(tc.err) + assert.Equal(t, tc.want, got) + }) + } +} + +func TestRecordSecondaryWriteFailureEmitsBothCounters(t *testing.T) { + metrics := newTestMetrics() + primary := newMockBackend("primary") + secondary := newMockBackend("secondary") + d := NewDualWriter( + primary, secondary, + ProxyConfig{Mode: ModeDualWrite, SecondaryTimeout: time.Second}, + metrics, newTestSentry(), testLogger, + ) + + err := errors.New("write conflict on !txn|rb|foo") + d.recordSecondaryWriteFailure("SET", []any{"SET", "k", "v"}, 5*time.Millisecond, 1, false, err) + + assert.InDelta(t, 1, testutil.ToFloat64(metrics.SecondaryWriteErrors), 0.001, + "unlabelled counter should still tick for dashboard backwards compatibility") + assert.InDelta(t, 1, testutil.ToFloat64( + metrics.SecondaryWriteErrorsByReason.WithLabelValues("SET", "write_conflict"), + ), 0.001, "labelled counter should record the write_conflict reason") + + // Second failure: different reason + command should populate a distinct label pair. + d.recordSecondaryWriteFailure("EVALSHA", []any{"EVALSHA", "deadbeef"}, time.Millisecond, 3, false, + errors.New("redis txn retry limit exceeded: write conflict")) + + assert.InDelta(t, 2, testutil.ToFloat64(metrics.SecondaryWriteErrors), 0.001) + assert.InDelta(t, 1, testutil.ToFloat64( + metrics.SecondaryWriteErrorsByReason.WithLabelValues("EVALSHA", "retry_limit"), + ), 0.001) + assert.InDelta(t, 1, testutil.ToFloat64( + metrics.SecondaryWriteErrorsByReason.WithLabelValues("SET", "write_conflict"), + ), 0.001, "previous label pair should be unchanged") +} From f3d03e684458e14333479f5f2153d57d2bda2684 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 23 Apr 2026 00:43:26 +0900 Subject: [PATCH 2/3] docs(proxy): include txn_already_finalized in metric help string --- proxy/metrics.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proxy/metrics.go b/proxy/metrics.go index eb3d27979..25297f544 100644 --- a/proxy/metrics.go +++ b/proxy/metrics.go @@ -52,7 +52,7 @@ func NewProxyMetrics(reg prometheus.Registerer) *ProxyMetrics { SecondaryWriteErrorsByReason: prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "proxy", Name: "secondary_write_errors_by_reason_total", - Help: "secondary write failures broken out by redis command and error classification (write_conflict / retry_limit / not_leader / deadline_exceeded / other)", + Help: "secondary write failures broken out by redis command and error classification (write_conflict / retry_limit / not_leader / deadline_exceeded / txn_already_finalized / other)", }, []string{"cmd", "reason"}), PrimaryReadErrors: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: "proxy", From f17add37d40289a01528a234972a167d8c74c3dc Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 23 Apr 2026 03:36:10 +0900 Subject: [PATCH 3/3] obs(proxy): classify leader-not-found and txn-locked secondary errors Address reviewer feedback on the secondary-write error classifier: - map "leader not found" (kv.ErrLeaderNotFound / adapter.ErrLeaderNotFound) to not_leader alongside the existing "not leader" substring check, so leadership failures stop leaking into the generic "other" bucket. - introduce a dedicated txn_locked reason for kv.ErrTxnLocked, a common OCC contention signal that was previously classified as other. - pull the substring table into a package-level slice and iterate it, keeping the function under the cyclomatic-complexity limit. - extend metrics_test and the metric help string accordingly. --- proxy/dualwrite.go | 43 +++++++++++++++++++++++++++---------------- proxy/metrics.go | 2 +- proxy/metrics_test.go | 2 ++ 3 files changed, 30 insertions(+), 17 deletions(-) diff --git a/proxy/dualwrite.go b/proxy/dualwrite.go index 95e46b2d4..9ec0b1888 100644 --- a/proxy/dualwrite.go +++ b/proxy/dualwrite.go @@ -491,30 +491,41 @@ func argsToBytes(iArgs []any) [][]byte { return out } +// secondaryWriteErrorPatterns maps substrings found in secondary-write error +// messages to their Prometheus reason label. The list is scanned in order so +// more-specific patterns (e.g. "retry limit exceeded" which embeds "write +// conflict") must precede the generic ones to win the classification. +var secondaryWriteErrorPatterns = []struct { + substr string + reason string +}{ + {"retry limit exceeded", "retry_limit"}, + {"write conflict", "write_conflict"}, + {"deadline exceeded", "deadline_exceeded"}, + {"not leader", "not_leader"}, + {"leader not found", "not_leader"}, + {"txn already committed", "txn_already_finalized"}, + {"txn already aborted", "txn_already_finalized"}, + {"txn locked", "txn_locked"}, +} + // classifySecondaryWriteError maps a secondary-write error to a small fixed set // of reason labels suitable for a Prometheus counter. The elastickv secondary // backend is in-house, so matching on substrings of the error message is safe. // -// Order matters: "retry limit exceeded" is checked before "write conflict" -// because the retry-limit message embeds the underlying conflict string, and -// we want the outer (retry_limit) classification to win. +// Order in secondaryWriteErrorPatterns matters (see its doc). func classifySecondaryWriteError(err error) string { if err == nil { return "other" } - msg := err.Error() - switch { - case strings.Contains(msg, "retry limit exceeded"): - return "retry_limit" - case strings.Contains(msg, "write conflict"): - return "write_conflict" - case errors.Is(err, context.DeadlineExceeded) || strings.Contains(msg, "deadline exceeded"): + if errors.Is(err, context.DeadlineExceeded) { return "deadline_exceeded" - case strings.Contains(msg, "not leader"): - return "not_leader" - case strings.Contains(msg, "txn already committed") || strings.Contains(msg, "txn already aborted"): - return "txn_already_finalized" - default: - return "other" } + msg := err.Error() + for _, p := range secondaryWriteErrorPatterns { + if strings.Contains(msg, p.substr) { + return p.reason + } + } + return "other" } diff --git a/proxy/metrics.go b/proxy/metrics.go index 25297f544..cd8b3837e 100644 --- a/proxy/metrics.go +++ b/proxy/metrics.go @@ -52,7 +52,7 @@ func NewProxyMetrics(reg prometheus.Registerer) *ProxyMetrics { SecondaryWriteErrorsByReason: prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "proxy", Name: "secondary_write_errors_by_reason_total", - Help: "secondary write failures broken out by redis command and error classification (write_conflict / retry_limit / not_leader / deadline_exceeded / txn_already_finalized / other)", + Help: "secondary write failures broken out by redis command and error classification (write_conflict / retry_limit / not_leader / deadline_exceeded / txn_already_finalized / txn_locked / other)", }, []string{"cmd", "reason"}), PrimaryReadErrors: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: "proxy", diff --git a/proxy/metrics_test.go b/proxy/metrics_test.go index 5449bbbf9..4f75d0866 100644 --- a/proxy/metrics_test.go +++ b/proxy/metrics_test.go @@ -26,11 +26,13 @@ func TestClassifySecondaryWriteError(t *testing.T) { }, {"not leader", errors.New("not leader"), "not_leader"}, {"linearizable read not leader", errors.New("linearizable read: not leader"), "not_leader"}, + {"leader not found (ErrLeaderNotFound message)", errors.New("leader not found"), "not_leader"}, {"context deadline exceeded via errors.Is", context.DeadlineExceeded, "deadline_exceeded"}, {"wrapped deadline exceeded", fmt.Errorf("dispatch failed: %w", context.DeadlineExceeded), "deadline_exceeded"}, {"deadline exceeded substring only", errors.New("rpc: deadline exceeded"), "deadline_exceeded"}, {"txn already committed", errors.New("txn already committed"), "txn_already_finalized"}, {"txn already aborted", errors.New("txn already aborted"), "txn_already_finalized"}, + {"txn locked", errors.New("key: foo: txn locked"), "txn_locked"}, {"unknown", errors.New("some random failure"), "other"}, }