From f8e26774a1da590af029c54eb7215c2ceae35ed2 Mon Sep 17 00:00:00 2001 From: Elizabeth Chatman Date: Mon, 1 Jun 2026 13:05:16 -0700 Subject: [PATCH 1/4] otelriver: partition river.insert_count by skipped_as_duplicate Today every job submitted to InsertMany is counted in river.insert_count even if River dropped it via UniqueOpts. That makes the insert metric unable to distinguish "we accepted 100 new jobs" from "we accepted 0 new jobs because all 100 were dedup'd against in-flight work" - both look identical on a chart. This adds a skipped_as_duplicate boolean attribute to river.insert_count data points and a duplicate_skipped_count span attribute on the river.insert_many span: - river.insert_count{skipped_as_duplicate:false} - jobs actually enqueued - river.insert_count{skipped_as_duplicate:true} - jobs dropped by UniqueOpts - span.duplicate_skipped_count - how many of the batch were skipped The sum across both insert_count data points still equals len(manyParams), so existing dashboards that sum the metric without filtering see no change. New dashboards / monitors can now compute a duplicate rate directly from the otel metric rather than needing a separate counter in each service that uses UniqueOpts. Tests cover all-duplicates, no-duplicates, and mixed-batch shapes. Co-authored-by: Cursor --- otelriver/middleware.go | 22 +++++- otelriver/middleware_test.go | 130 ++++++++++++++++++++++++++++++++++- 2 files changed, 149 insertions(+), 3 deletions(-) diff --git a/otelriver/middleware.go b/otelriver/middleware.go index e8e79af..775f9c3 100644 --- a/otelriver/middleware.go +++ b/otelriver/middleware.go @@ -156,12 +156,32 @@ func (m *Middleware) InsertMany(ctx context.Context, manyParams []*rivertype.Job duration := m.durationInPreferredUnit(time.Since(begin)) setStatus(attrs, statusIndex, span, panicked, err) + + var skipped int64 + for _, r := range insertRes { + if r != nil && r.UniqueSkippedAsDuplicate { + skipped++ + } + } + span.SetAttributes(attrs...) // set after finalizing status + span.SetAttributes(attribute.Int64("duplicate_skipped_count", skipped)) // This allocates a new slice, so make sure to do it as few times as possible. measurementOpt := metric.WithAttributes(attrs...) - m.metrics.insertCount.Add(ctx, int64(len(manyParams)), measurementOpt) + // Partition insert_count by skipped_as_duplicate so the metric + // shows how many of the submitted jobs were dropped by UniqueOpts + // vs. actually enqueued. Sum across both data points still equals + // len(manyParams). + if inserted := int64(len(manyParams)) - skipped; inserted > 0 { + m.metrics.insertCount.Add(ctx, inserted, + metric.WithAttributes(append(attrs, attribute.Bool("skipped_as_duplicate", false))...)) + } + if skipped > 0 { + m.metrics.insertCount.Add(ctx, skipped, + metric.WithAttributes(append(attrs, attribute.Bool("skipped_as_duplicate", true))...)) + } m.metrics.insertManyCount.Add(ctx, 1, measurementOpt) m.metrics.insertManyDuration.Record(ctx, duration, measurementOpt) m.metrics.insertManyDurationHistogram.Record(ctx, duration, measurementOpt) diff --git a/otelriver/middleware_test.go b/otelriver/middleware_test.go index 72e619e..94dc772 100644 --- a/otelriver/middleware_test.go +++ b/otelriver/middleware_test.go @@ -82,6 +82,7 @@ func TestMiddleware(t *testing.T) { require.Equal(t, "ok", getAttribute(t, span.Attributes, "status").AsString()) require.Equal(t, "river.insert_many", span.Name) require.Equal(t, codes.Ok, span.Status.Code) + require.EqualValues(t, 0, getAttribute(t, span.Attributes, "duplicate_skipped_count").AsInt64()) var ( expectedAttrs = []attribute.KeyValue{ @@ -90,7 +91,13 @@ func TestMiddleware(t *testing.T) { metrics metricdata.ResourceMetrics ) require.NoError(t, bundle.metricReader.Collect(ctx, &metrics)) - requireSum(t, metrics, "river.insert_count", 1, expectedAttrs...) + requireSumByAttrs(t, metrics, "river.insert_count", 1, + attribute.String("status", "ok"), + attribute.Bool("skipped_as_duplicate", false), + ) + requireSumByAttrs(t, metrics, "river.insert_count", 0, + attribute.Bool("skipped_as_duplicate", true), + ) requireSum(t, metrics, "river.insert_many_count", 1, expectedAttrs...) { metric, _ := requireGaugeNotEmpty(t, metrics, "river.insert_many_duration", expectedAttrs...) @@ -122,6 +129,7 @@ func TestMiddleware(t *testing.T) { require.Equal(t, "river.insert_many", span.Name) require.Equal(t, codes.Error, span.Status.Code) require.Equal(t, "error from doInner", span.Status.Description) + require.EqualValues(t, 0, getAttribute(t, span.Attributes, "duplicate_skipped_count").AsInt64()) var ( expectedAttrs = []attribute.KeyValue{ @@ -130,7 +138,13 @@ func TestMiddleware(t *testing.T) { metrics metricdata.ResourceMetrics ) require.NoError(t, bundle.metricReader.Collect(ctx, &metrics)) - requireSum(t, metrics, "river.insert_count", 1, expectedAttrs...) + requireSumByAttrs(t, metrics, "river.insert_count", 1, + attribute.String("status", "error"), + attribute.Bool("skipped_as_duplicate", false), + ) + requireSumByAttrs(t, metrics, "river.insert_count", 0, + attribute.Bool("skipped_as_duplicate", true), + ) requireSum(t, metrics, "river.insert_many_count", 1, expectedAttrs...) requireGaugeNotEmpty(t, metrics, "river.insert_many_duration", expectedAttrs...) requireHistogramCount(t, metrics, "river.insert_many_duration_histogram", 1, expectedAttrs...) @@ -157,6 +171,7 @@ func TestMiddleware(t *testing.T) { require.Equal(t, "river.insert_many", span.Name) require.Equal(t, codes.Error, span.Status.Code) require.Equal(t, "panic", span.Status.Description) + require.EqualValues(t, 0, getAttribute(t, span.Attributes, "duplicate_skipped_count").AsInt64()) var ( expectedAttrs = []attribute.KeyValue{ @@ -191,6 +206,73 @@ func TestMiddleware(t *testing.T) { require.NoError(t, err) }) + t.Run("InsertManyAllDuplicates", func(t *testing.T) { + t.Parallel() + + middleware, bundle := setup(t) + + doInner := func(ctx context.Context) ([]*rivertype.JobInsertResult, error) { + return []*rivertype.JobInsertResult{ + {Job: &rivertype.JobRow{ID: 1}, UniqueSkippedAsDuplicate: true}, + {Job: &rivertype.JobRow{ID: 2}, UniqueSkippedAsDuplicate: true}, + }, nil + } + + _, err := middleware.InsertMany(ctx, []*rivertype.JobInsertParams{{}, {}}, doInner) + require.NoError(t, err) + + spans := bundle.traceExporter.GetSpans() + require.Len(t, spans, 1) + require.EqualValues(t, 2, getAttribute(t, spans[0].Attributes, "duplicate_skipped_count").AsInt64()) + + var metrics metricdata.ResourceMetrics + require.NoError(t, bundle.metricReader.Collect(ctx, &metrics)) + requireSumByAttrs(t, metrics, "river.insert_count", 2, + attribute.String("status", "ok"), + attribute.Bool("skipped_as_duplicate", true), + ) + requireSumByAttrs(t, metrics, "river.insert_count", 0, + attribute.Bool("skipped_as_duplicate", false), + ) + }) + + t.Run("InsertManyMixedDuplicates", func(t *testing.T) { + t.Parallel() + + middleware, bundle := setup(t) + + doInner := func(ctx context.Context) ([]*rivertype.JobInsertResult, error) { + return []*rivertype.JobInsertResult{ + {Job: &rivertype.JobRow{ID: 1}, UniqueSkippedAsDuplicate: false}, + {Job: &rivertype.JobRow{ID: 2}, UniqueSkippedAsDuplicate: true}, + {Job: &rivertype.JobRow{ID: 3}, UniqueSkippedAsDuplicate: false}, + {Job: &rivertype.JobRow{ID: 4}, UniqueSkippedAsDuplicate: true}, + {Job: &rivertype.JobRow{ID: 5}, UniqueSkippedAsDuplicate: false}, + }, nil + } + + _, err := middleware.InsertMany(ctx, + []*rivertype.JobInsertParams{{}, {}, {}, {}, {}}, doInner) + require.NoError(t, err) + + spans := bundle.traceExporter.GetSpans() + require.Len(t, spans, 1) + require.EqualValues(t, 2, getAttribute(t, spans[0].Attributes, "duplicate_skipped_count").AsInt64()) + + var metrics metricdata.ResourceMetrics + require.NoError(t, bundle.metricReader.Collect(ctx, &metrics)) + requireSumByAttrs(t, metrics, "river.insert_count", 3, + attribute.String("status", "ok"), + attribute.Bool("skipped_as_duplicate", false), + ) + requireSumByAttrs(t, metrics, "river.insert_count", 2, + attribute.String("status", "ok"), + attribute.Bool("skipped_as_duplicate", true), + ) + // Sum across both data points should still equal len(manyParams). + requireSumByAttrs(t, metrics, "river.insert_count", 5) + }) + t.Run("InsertManyDurationUnitMS", func(t *testing.T) { t.Parallel() @@ -737,3 +819,47 @@ func requireSum(t *testing.T, metrics metricdata.ResourceMetrics, name string, v metricdatatest.AssertHasAttributes(t, metric, attrs...) return metric, metricData } + +// requireSumByAttrs asserts that the sum of all data points on the named +// metric whose attributes are a superset of the given attrs equals the +// expected value. A missing metric is treated as sum=0 so callers don't +// need to distinguish "no data point" from "data point with value 0". +// Pass no attrs to assert against the metric's grand total. +func requireSumByAttrs(t *testing.T, metrics metricdata.ResourceMetrics, name string, expected int64, attrs ...attribute.KeyValue) { + t.Helper() + + _, metricData, ok := getMetric[metricdata.Sum[int64]](t, metrics, name) + var got int64 + if ok { + wantSet := attribute.NewSet(attrs...) + for _, dp := range metricData.DataPoints { + if attributeSetContains(dp.Attributes, wantSet) { + got += dp.Value + } + } + } + if got != expected { + t.Fatalf("sum of %s data points matching %v: got %d, want %d; data points: %v", + name, attrs, got, expected, formatDataPoints(metricData.DataPoints)) + } +} + +func attributeSetContains(got, want attribute.Set) bool { + iter := want.Iter() + for iter.Next() { + kv := iter.Attribute() + gotVal, ok := got.Value(kv.Key) + if !ok || gotVal != kv.Value { + return false + } + } + return true +} + +func formatDataPoints(dps []metricdata.DataPoint[int64]) string { + out := make([]string, 0, len(dps)) + for _, dp := range dps { + out = append(out, fmt.Sprintf("value=%d attrs=%v", dp.Value, dp.Attributes.ToSlice())) + } + return "[" + fmt.Sprint(out) + "]" +} From 921436da2c2b58fb3b551e1185270dc292bbf4e5 Mon Sep 17 00:00:00 2001 From: Elizabeth Chatman Date: Mon, 1 Jun 2026 13:07:51 -0700 Subject: [PATCH 2/4] otelriver: silence unparam lint on requireSumByAttrs Match the existing requireSum helper's //nolint:unparam pattern; the name parameter is constant for now but the helper is intended to grow more callers as more partitioned-by-attribute metrics show up. Co-authored-by: Cursor --- otelriver/middleware_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/otelriver/middleware_test.go b/otelriver/middleware_test.go index 94dc772..9863491 100644 --- a/otelriver/middleware_test.go +++ b/otelriver/middleware_test.go @@ -825,7 +825,7 @@ func requireSum(t *testing.T, metrics metricdata.ResourceMetrics, name string, v // expected value. A missing metric is treated as sum=0 so callers don't // need to distinguish "no data point" from "data point with value 0". // Pass no attrs to assert against the metric's grand total. -func requireSumByAttrs(t *testing.T, metrics metricdata.ResourceMetrics, name string, expected int64, attrs ...attribute.KeyValue) { +func requireSumByAttrs(t *testing.T, metrics metricdata.ResourceMetrics, name string, expected int64, attrs ...attribute.KeyValue) { //nolint:unparam t.Helper() _, metricData, ok := getMetric[metricdata.Sum[int64]](t, metrics, name) From 0aa8fcc44e1ed9a3ee6b41857ab10d2edfdc8d2d Mon Sep 17 00:00:00 2001 From: Elizabeth Chatman Date: Mon, 1 Jun 2026 13:46:41 -0700 Subject: [PATCH 3/4] Rename attributes to match UniqueSkippedAsDuplicate / unique_skipped_as_duplicate --- otelriver/middleware.go | 14 +++++++------- otelriver/middleware_test.go | 26 +++++++++++++------------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/otelriver/middleware.go b/otelriver/middleware.go index 775f9c3..5878809 100644 --- a/otelriver/middleware.go +++ b/otelriver/middleware.go @@ -165,22 +165,22 @@ func (m *Middleware) InsertMany(ctx context.Context, manyParams []*rivertype.Job } span.SetAttributes(attrs...) // set after finalizing status - span.SetAttributes(attribute.Int64("duplicate_skipped_count", skipped)) + span.SetAttributes(attribute.Int64("unique_skipped_as_duplicate_count", skipped)) // This allocates a new slice, so make sure to do it as few times as possible. measurementOpt := metric.WithAttributes(attrs...) - // Partition insert_count by skipped_as_duplicate so the metric - // shows how many of the submitted jobs were dropped by UniqueOpts - // vs. actually enqueued. Sum across both data points still equals - // len(manyParams). + // Partition insert_count by unique_skipped_as_duplicate so the + // metric shows how many of the submitted jobs were dropped by + // UniqueOpts vs. actually enqueued. Sum across both data points + // still equals len(manyParams). if inserted := int64(len(manyParams)) - skipped; inserted > 0 { m.metrics.insertCount.Add(ctx, inserted, - metric.WithAttributes(append(attrs, attribute.Bool("skipped_as_duplicate", false))...)) + metric.WithAttributes(append(attrs, attribute.Bool("unique_skipped_as_duplicate", false))...)) } if skipped > 0 { m.metrics.insertCount.Add(ctx, skipped, - metric.WithAttributes(append(attrs, attribute.Bool("skipped_as_duplicate", true))...)) + metric.WithAttributes(append(attrs, attribute.Bool("unique_skipped_as_duplicate", true))...)) } m.metrics.insertManyCount.Add(ctx, 1, measurementOpt) m.metrics.insertManyDuration.Record(ctx, duration, measurementOpt) diff --git a/otelriver/middleware_test.go b/otelriver/middleware_test.go index 9863491..7097dfd 100644 --- a/otelriver/middleware_test.go +++ b/otelriver/middleware_test.go @@ -82,7 +82,7 @@ func TestMiddleware(t *testing.T) { require.Equal(t, "ok", getAttribute(t, span.Attributes, "status").AsString()) require.Equal(t, "river.insert_many", span.Name) require.Equal(t, codes.Ok, span.Status.Code) - require.EqualValues(t, 0, getAttribute(t, span.Attributes, "duplicate_skipped_count").AsInt64()) + require.EqualValues(t, 0, getAttribute(t, span.Attributes, "unique_skipped_as_duplicate_count").AsInt64()) var ( expectedAttrs = []attribute.KeyValue{ @@ -93,10 +93,10 @@ func TestMiddleware(t *testing.T) { require.NoError(t, bundle.metricReader.Collect(ctx, &metrics)) requireSumByAttrs(t, metrics, "river.insert_count", 1, attribute.String("status", "ok"), - attribute.Bool("skipped_as_duplicate", false), + attribute.Bool("unique_skipped_as_duplicate", false), ) requireSumByAttrs(t, metrics, "river.insert_count", 0, - attribute.Bool("skipped_as_duplicate", true), + attribute.Bool("unique_skipped_as_duplicate", true), ) requireSum(t, metrics, "river.insert_many_count", 1, expectedAttrs...) { @@ -129,7 +129,7 @@ func TestMiddleware(t *testing.T) { require.Equal(t, "river.insert_many", span.Name) require.Equal(t, codes.Error, span.Status.Code) require.Equal(t, "error from doInner", span.Status.Description) - require.EqualValues(t, 0, getAttribute(t, span.Attributes, "duplicate_skipped_count").AsInt64()) + require.EqualValues(t, 0, getAttribute(t, span.Attributes, "unique_skipped_as_duplicate_count").AsInt64()) var ( expectedAttrs = []attribute.KeyValue{ @@ -140,10 +140,10 @@ func TestMiddleware(t *testing.T) { require.NoError(t, bundle.metricReader.Collect(ctx, &metrics)) requireSumByAttrs(t, metrics, "river.insert_count", 1, attribute.String("status", "error"), - attribute.Bool("skipped_as_duplicate", false), + attribute.Bool("unique_skipped_as_duplicate", false), ) requireSumByAttrs(t, metrics, "river.insert_count", 0, - attribute.Bool("skipped_as_duplicate", true), + attribute.Bool("unique_skipped_as_duplicate", true), ) requireSum(t, metrics, "river.insert_many_count", 1, expectedAttrs...) requireGaugeNotEmpty(t, metrics, "river.insert_many_duration", expectedAttrs...) @@ -171,7 +171,7 @@ func TestMiddleware(t *testing.T) { require.Equal(t, "river.insert_many", span.Name) require.Equal(t, codes.Error, span.Status.Code) require.Equal(t, "panic", span.Status.Description) - require.EqualValues(t, 0, getAttribute(t, span.Attributes, "duplicate_skipped_count").AsInt64()) + require.EqualValues(t, 0, getAttribute(t, span.Attributes, "unique_skipped_as_duplicate_count").AsInt64()) var ( expectedAttrs = []attribute.KeyValue{ @@ -223,16 +223,16 @@ func TestMiddleware(t *testing.T) { spans := bundle.traceExporter.GetSpans() require.Len(t, spans, 1) - require.EqualValues(t, 2, getAttribute(t, spans[0].Attributes, "duplicate_skipped_count").AsInt64()) + require.EqualValues(t, 2, getAttribute(t, spans[0].Attributes, "unique_skipped_as_duplicate_count").AsInt64()) var metrics metricdata.ResourceMetrics require.NoError(t, bundle.metricReader.Collect(ctx, &metrics)) requireSumByAttrs(t, metrics, "river.insert_count", 2, attribute.String("status", "ok"), - attribute.Bool("skipped_as_duplicate", true), + attribute.Bool("unique_skipped_as_duplicate", true), ) requireSumByAttrs(t, metrics, "river.insert_count", 0, - attribute.Bool("skipped_as_duplicate", false), + attribute.Bool("unique_skipped_as_duplicate", false), ) }) @@ -257,17 +257,17 @@ func TestMiddleware(t *testing.T) { spans := bundle.traceExporter.GetSpans() require.Len(t, spans, 1) - require.EqualValues(t, 2, getAttribute(t, spans[0].Attributes, "duplicate_skipped_count").AsInt64()) + require.EqualValues(t, 2, getAttribute(t, spans[0].Attributes, "unique_skipped_as_duplicate_count").AsInt64()) var metrics metricdata.ResourceMetrics require.NoError(t, bundle.metricReader.Collect(ctx, &metrics)) requireSumByAttrs(t, metrics, "river.insert_count", 3, attribute.String("status", "ok"), - attribute.Bool("skipped_as_duplicate", false), + attribute.Bool("unique_skipped_as_duplicate", false), ) requireSumByAttrs(t, metrics, "river.insert_count", 2, attribute.String("status", "ok"), - attribute.Bool("skipped_as_duplicate", true), + attribute.Bool("unique_skipped_as_duplicate", true), ) // Sum across both data points should still equal len(manyParams). requireSumByAttrs(t, metrics, "river.insert_count", 5) From a0631e4df316358f955e6dc11a888939c7758d87 Mon Sep 17 00:00:00 2001 From: Elizabeth Chatman Date: Mon, 1 Jun 2026 13:51:20 -0700 Subject: [PATCH 4/4] Also update changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d68dab0..78d8abb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Added `unique_skipped_as_duplicate` attributes to otel `insert_many` spans and `insert_count` metric. [PR #58](https://github.com/riverqueue/rivercontrib/pull/58). + ### Changed - Record snoozed jobs with status `ok` instead of `error` in `otelriver` middleware. Add new `snooze.duration` span attribute. [PR #59](https://github.com/riverqueue/rivercontrib/pull/59).