From 81ec1eb1865636836ec8dc809bb05630a5f42c71 Mon Sep 17 00:00:00 2001 From: Elizabeth Chatman Date: Mon, 1 Jun 2026 13:08:05 -0700 Subject: [PATCH 1/2] otelriver: treat JobSnoozeError as flow control, not a failure Today a job that returns JobSnoozeError (the standard way to ask River to re-schedule it) flows through the same path as a real failure: - the span status is set to codes.Error with the snooze error as the description - the river.work_count metric records status:error,snooze:true That means snoozes pollute span error rates in tracing UIs and force every chart that wants a true error rate to defensively exclude snooze:true. Snoozes aren't failures; they're flow control - the job asked to run later and will be retried. This change makes setStatus recognize *rivertype.JobSnoozeError and return status:ok + codes.Ok. The snooze:true span/metric attribute (already set by the existing switch on errors.As) is preserved so snoozes remain queryable as a dimension. river.work_count snooze data points are now emitted as status:ok,snooze:true. It also adds a snooze.duration string attribute to the span (e.g. "5s"), making it easy to see how long the job asked to be deferred without having to parse the error string. The duration is span-only because attaching it to the metric would explode cardinality. Tests cover both the wrapped-error path (JobSnoozeError test) and the batch-result path (WorkBatchResultWithJobSnoozeError test). Co-authored-by: Cursor --- otelriver/middleware.go | 38 ++++++++++++++++++++++-------------- otelriver/middleware_test.go | 37 ++++++++++++++++++++++++++++++++--- 2 files changed, 57 insertions(+), 18 deletions(-) diff --git a/otelriver/middleware.go b/otelriver/middleware.go index e47a3fd..e8e79af 100644 --- a/otelriver/middleware.go +++ b/otelriver/middleware.go @@ -210,6 +210,11 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu defer func() { duration := m.durationInPreferredUnit(time.Since(begin)) + var ( + cancelErr *river.JobCancelError + snoozeErr *river.JobSnoozeError + ) + if err != nil { var batchResult interface { // To be superseded if riverbatch.MultiError is moved to rivertype. ErrorsByID() map[int64]error @@ -218,11 +223,6 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu err = batchResult.ErrorsByID()[job.ID] } - var ( - cancelErr *river.JobCancelError - snoozeErr *river.JobSnoozeError - ) - switch { case errors.As(err, &cancelErr): attrs = append(attrs, attribute.Bool("cancel", true)) @@ -233,16 +233,17 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu setStatus(attrs, statusIndex, span, panicked, err) - { - // Add some higher cardinality attributes to spans, but keep them - // out of metrics given it's been traditional wisdom that metric - // attribute sets shouldn't be too large. - attrs := append(attrs, - attribute.Int64("id", job.ID), - attribute.String("created_at", job.CreatedAt.Format(time.RFC3339)), - attribute.String("scheduled_at", job.ScheduledAt.Format(time.RFC3339)), - ) - span.SetAttributes(attrs...) // set after finalizing status + // Add some higher cardinality attributes to spans, but keep them + // out of metrics given it's been traditional wisdom that metric + // attribute sets shouldn't be too large. + span.SetAttributes( + attribute.Int64("id", job.ID), + attribute.String("created_at", job.CreatedAt.Format(time.RFC3339)), + attribute.String("scheduled_at", job.ScheduledAt.Format(time.RFC3339)), + ) + span.SetAttributes(attrs...) // set after finalizing status + if snoozeErr != nil { + span.SetAttributes(attribute.String("snooze.duration", snoozeErr.Duration.String())) } // This allocates a new slice, so make sure to do it as few times as possible. @@ -316,6 +317,13 @@ func setStatus(attrs []attribute.KeyValue, statusIndex int, span trace.Span, pan case panicked: attrs[statusIndex] = attribute.String("status", "panic") span.SetStatus(codes.Error, "panic") + case errors.Is(err, &river.JobSnoozeError{}): + // Snooze is flow control, not failure: the job will be retried + // later. Record as ok so it doesn't pollute error rates; the + // snooze:true span/metric attribute (set by the caller) keeps + // snoozes queryable as a dimension. + attrs[statusIndex] = attribute.String("status", "ok") + span.SetStatus(codes.Ok, "") case err != nil: attrs[statusIndex] = attribute.String("status", "error") span.SetStatus(codes.Error, err.Error()) diff --git a/otelriver/middleware_test.go b/otelriver/middleware_test.go index 0b66d86..72e619e 100644 --- a/otelriver/middleware_test.go +++ b/otelriver/middleware_test.go @@ -387,19 +387,37 @@ func TestMiddleware(t *testing.T) { middleware, bundle := setup(t) doInner := func(ctx context.Context) error { - return fmt.Errorf("wrapped job snooze: %w", &rivertype.JobSnoozeError{}) + return fmt.Errorf("wrapped job snooze: %w", &rivertype.JobSnoozeError{Duration: 5 * time.Second}) } err := middleware.Work(ctx, &rivertype.JobRow{ Kind: "no_op", }, doInner) - require.EqualError(t, err, "wrapped job snooze: JobSnoozeError: 0s") + require.EqualError(t, err, "wrapped job snooze: JobSnoozeError: 5s") spans := bundle.traceExporter.GetSpans() require.Len(t, spans, 1) span := spans[0] require.True(t, getAttribute(t, span.Attributes, "snooze").AsBool()) + // Snooze is flow control, not failure: span status is ok and the + // snooze.duration is recorded as a span-only attribute. + require.Equal(t, "ok", getAttribute(t, span.Attributes, "status").AsString()) + require.Equal(t, codes.Ok, span.Status.Code) + require.Equal(t, "5s", getAttribute(t, span.Attributes, "snooze.duration").AsString()) + + // The metric records status:ok with snooze:true so snoozes remain + // queryable as a metric dimension without counting against the + // error rate. + var ( + expectedAttrs = []attribute.KeyValue{ + attribute.String("status", "ok"), + attribute.Bool("snooze", true), + } + metrics metricdata.ResourceMetrics + ) + require.NoError(t, bundle.metricReader.Collect(ctx, &metrics)) + requireSum(t, metrics, "river.work_count", 1, expectedAttrs...) }) t.Run("WorkBatchResultWithJobError", func(t *testing.T) { @@ -476,7 +494,7 @@ func TestMiddleware(t *testing.T) { doInner := func(ctx context.Context) error { return &fakeBatchError{errorsByID: map[int64]error{ - 123: &rivertype.JobSnoozeError{}, + 123: &rivertype.JobSnoozeError{Duration: 7 * time.Second}, }} } @@ -488,6 +506,19 @@ func TestMiddleware(t *testing.T) { span := spans[0] require.True(t, getAttribute(t, span.Attributes, "snooze").AsBool()) + require.Equal(t, "ok", getAttribute(t, span.Attributes, "status").AsString()) + require.Equal(t, codes.Ok, span.Status.Code) + require.Equal(t, "7s", getAttribute(t, span.Attributes, "snooze.duration").AsString()) + + var ( + expectedAttrs = []attribute.KeyValue{ + attribute.String("status", "ok"), + attribute.Bool("snooze", true), + } + metrics metricdata.ResourceMetrics + ) + require.NoError(t, bundle.metricReader.Collect(ctx, &metrics)) + requireSum(t, metrics, "river.work_count", 1, expectedAttrs...) }) t.Run("WorkPanic", func(t *testing.T) { From f90b3ffa2542acd7399209a0f99ce47df32d602e Mon Sep 17 00:00:00 2001 From: Elizabeth Chatman Date: Mon, 1 Jun 2026 13:40:51 -0700 Subject: [PATCH 2/2] Update changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f3025e..d68dab0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- Record snoozed jobs with status `ok` instead of `error` in `otelriver` middleware. Add new `snooze.duration` span attribute. [PR #59](https://github.com/riverqueue/rivercontrib/pull/59). + ## [0.8.0] - 2026-05-15 ### Added