Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions proxy/dualwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,8 @@ func (d *DualWriter) writeSecondary(cmd string, iArgs []any) {
// structured warning with enough context to diagnose EVALSHA timeouts.
func (d *DualWriter) recordSecondaryWriteFailure(cmd string, iArgs []any, elapsed time.Duration, attempts int, usedNOSCRIPTFallback bool, sErr error) {
d.metrics.SecondaryWriteErrors.Inc()
reason := classifySecondaryWriteError(sErr)
d.metrics.SecondaryWriteErrorsByReason.WithLabelValues(cmd, reason).Inc()
d.metrics.CommandTotal.WithLabelValues(cmd, d.secondary.Name(), "error").Inc()
fingerprint := fmt.Sprintf("secondary_write_%s", cmd)
if d.sentry.ShouldReport(fingerprint) {
Expand Down Expand Up @@ -488,3 +490,42 @@ func argsToBytes(iArgs []any) [][]byte {
}
return out
}

// secondaryWriteErrorPatterns maps substrings found in secondary-write error
// messages to their Prometheus reason label. The list is scanned in order so
// more-specific patterns (e.g. "retry limit exceeded" which embeds "write
// conflict") must precede the generic ones to win the classification.
var secondaryWriteErrorPatterns = []struct {
substr string
reason string
}{
{"retry limit exceeded", "retry_limit"},
{"write conflict", "write_conflict"},
{"deadline exceeded", "deadline_exceeded"},
{"not leader", "not_leader"},
{"leader not found", "not_leader"},
{"txn already committed", "txn_already_finalized"},
{"txn already aborted", "txn_already_finalized"},
{"txn locked", "txn_locked"},
}

// classifySecondaryWriteError maps a secondary-write error to a small fixed set
// of reason labels suitable for a Prometheus counter. The elastickv secondary
// backend is in-house, so matching on substrings of the error message is safe.
//
// Order in secondaryWriteErrorPatterns matters (see its doc).
func classifySecondaryWriteError(err error) string {
if err == nil {
return "other"
}
if errors.Is(err, context.DeadlineExceeded) {
return "deadline_exceeded"
}
msg := err.Error()
for _, p := range secondaryWriteErrorPatterns {
if strings.Contains(msg, p.substr) {
return p.reason
}
}
return "other"
}
Comment on lines +512 to +531
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Confirm the exact wording of "retry limit" errors produced anywhere in the tree,
# and survey leadership-related error strings that the classifier should match.
rg -nP -C2 'retry[ _-]?limit' --type=go
rg -nP -C2 '"(not leader|leader not found|txn locked|txn already (committed|aborted)|write conflict)"' --type=go

Repository: bootjp/elastickv

Length of output: 10524


Fix classifier to handle additional error types and prevent misclassification.

The substring matching approach for error classification has gaps:

  1. "leader not found" contains adapter/internal.go:40's ErrLeaderNotFound message, but the classifier checks only for "not leader" (line 513). This causes leadership-related failures to be misclassified as "other" despite being distinct from transient write conflicts.

  2. Several transient error types fall to "other" and won't be visible on dashboards:

    • ErrTxnLocked ("txn locked") — common OCC contention signal
    • ErrTxnCommitTSRequired, ErrTxnMetaMissing, ErrTxnInvalidMeta, ErrTxnTimestampOverflow — metadata/timestamp issues
  3. The nil branch (line 502–503) is unreachable from recordSecondaryWriteFailure but safe to keep defensively.

Consider adding checks for the above error strings or extending the existing "not leader" check to match "leader not found".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@proxy/dualwrite.go` around lines 494 - 520, Update
classifySecondaryWriteError to detect the additional transient and leadership
error messages so they aren’t misclassified: in the function
classifySecondaryWriteError add checks (preserving order semantics) for "leader
not found" (or broaden the existing "not leader" check to include that phrase),
and for transaction/meta/transient error substrings such as "txn locked", "txn
commit ts required", "txn meta missing", "txn invalid meta", and "txn timestamp
overflow" (map them to appropriate labels like "not_leader", "txn_locked",
"txn_meta_error" or a single transient label as your metrics require); keep the
defensive nil branch intact and ensure the new cases are placed before the
generic default to avoid fallback to "other".

19 changes: 13 additions & 6 deletions proxy/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ type ProxyMetrics struct {
CommandTotal *prometheus.CounterVec
CommandDuration *prometheus.HistogramVec

PrimaryWriteErrors prometheus.Counter
SecondaryWriteErrors prometheus.Counter
PrimaryReadErrors prometheus.Counter
ShadowReadErrors prometheus.Counter
Divergences *prometheus.CounterVec
MigrationGaps *prometheus.CounterVec
PrimaryWriteErrors prometheus.Counter
SecondaryWriteErrors prometheus.Counter
SecondaryWriteErrorsByReason *prometheus.CounterVec
PrimaryReadErrors prometheus.Counter
ShadowReadErrors prometheus.Counter
Divergences *prometheus.CounterVec
MigrationGaps *prometheus.CounterVec

ActiveConnections prometheus.Gauge

Expand Down Expand Up @@ -48,6 +49,11 @@ func NewProxyMetrics(reg prometheus.Registerer) *ProxyMetrics {
Name: "secondary_write_errors_total",
Help: "Total write errors from the secondary backend.",
}),
SecondaryWriteErrorsByReason: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "proxy",
Name: "secondary_write_errors_by_reason_total",
Help: "secondary write failures broken out by redis command and error classification (write_conflict / retry_limit / not_leader / deadline_exceeded / txn_already_finalized / txn_locked / other)",
}, []string{"cmd", "reason"}),
PrimaryReadErrors: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "proxy",
Name: "primary_read_errors_total",
Expand Down Expand Up @@ -98,6 +104,7 @@ func NewProxyMetrics(reg prometheus.Registerer) *ProxyMetrics {
m.CommandDuration,
m.PrimaryWriteErrors,
m.SecondaryWriteErrors,
m.SecondaryWriteErrorsByReason,
m.PrimaryReadErrors,
m.ShadowReadErrors,
m.Divergences,
Expand Down
77 changes: 77 additions & 0 deletions proxy/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package proxy

import (
"context"
"errors"
"fmt"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
)

func TestClassifySecondaryWriteError(t *testing.T) {
tests := []struct {
name string
err error
want string
}{
{"nil error falls back to other", nil, "other"},
{"plain write conflict", errors.New("write conflict on key foo"), "write_conflict"},
{
"retry-limit message wins over embedded write-conflict substring",
errors.New("redis txn retry limit exceeded: write conflict"),
"retry_limit",
},
{"not leader", errors.New("not leader"), "not_leader"},
{"linearizable read not leader", errors.New("linearizable read: not leader"), "not_leader"},
{"leader not found (ErrLeaderNotFound message)", errors.New("leader not found"), "not_leader"},
{"context deadline exceeded via errors.Is", context.DeadlineExceeded, "deadline_exceeded"},
{"wrapped deadline exceeded", fmt.Errorf("dispatch failed: %w", context.DeadlineExceeded), "deadline_exceeded"},
{"deadline exceeded substring only", errors.New("rpc: deadline exceeded"), "deadline_exceeded"},
{"txn already committed", errors.New("txn already committed"), "txn_already_finalized"},
{"txn already aborted", errors.New("txn already aborted"), "txn_already_finalized"},
{"txn locked", errors.New("key: foo: txn locked"), "txn_locked"},
{"unknown", errors.New("some random failure"), "other"},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
got := classifySecondaryWriteError(tc.err)
assert.Equal(t, tc.want, got)
})
}
}

func TestRecordSecondaryWriteFailureEmitsBothCounters(t *testing.T) {
metrics := newTestMetrics()
primary := newMockBackend("primary")
secondary := newMockBackend("secondary")
d := NewDualWriter(
primary, secondary,
ProxyConfig{Mode: ModeDualWrite, SecondaryTimeout: time.Second},
metrics, newTestSentry(), testLogger,
)

err := errors.New("write conflict on !txn|rb|foo")
d.recordSecondaryWriteFailure("SET", []any{"SET", "k", "v"}, 5*time.Millisecond, 1, false, err)

assert.InDelta(t, 1, testutil.ToFloat64(metrics.SecondaryWriteErrors), 0.001,
"unlabelled counter should still tick for dashboard backwards compatibility")
assert.InDelta(t, 1, testutil.ToFloat64(
metrics.SecondaryWriteErrorsByReason.WithLabelValues("SET", "write_conflict"),
), 0.001, "labelled counter should record the write_conflict reason")

// Second failure: different reason + command should populate a distinct label pair.
d.recordSecondaryWriteFailure("EVALSHA", []any{"EVALSHA", "deadbeef"}, time.Millisecond, 3, false,
errors.New("redis txn retry limit exceeded: write conflict"))

assert.InDelta(t, 2, testutil.ToFloat64(metrics.SecondaryWriteErrors), 0.001)
assert.InDelta(t, 1, testutil.ToFloat64(
metrics.SecondaryWriteErrorsByReason.WithLabelValues("EVALSHA", "retry_limit"),
), 0.001)
assert.InDelta(t, 1, testutil.ToFloat64(
metrics.SecondaryWriteErrorsByReason.WithLabelValues("SET", "write_conflict"),
), 0.001, "previous label pair should be unchanged")
}
Loading