diff --git a/CHANGELOG.md b/CHANGELOG.md index d68dab0..78d8abb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Added `unique_skipped_as_duplicate` attributes to otel `insert_many` spans and `insert_count` metric. [PR #58](https://github.com/riverqueue/rivercontrib/pull/58). + ### Changed - Record snoozed jobs with status `ok` instead of `error` in `otelriver` middleware. Add new `snooze.duration` span attribute. [PR #59](https://github.com/riverqueue/rivercontrib/pull/59). diff --git a/otelriver/middleware.go b/otelriver/middleware.go index e8e79af..5878809 100644 --- a/otelriver/middleware.go +++ b/otelriver/middleware.go @@ -156,12 +156,32 @@ func (m *Middleware) InsertMany(ctx context.Context, manyParams []*rivertype.Job duration := m.durationInPreferredUnit(time.Since(begin)) setStatus(attrs, statusIndex, span, panicked, err) + + var skipped int64 + for _, r := range insertRes { + if r != nil && r.UniqueSkippedAsDuplicate { + skipped++ + } + } + span.SetAttributes(attrs...) // set after finalizing status + span.SetAttributes(attribute.Int64("unique_skipped_as_duplicate_count", skipped)) // This allocates a new slice, so make sure to do it as few times as possible. measurementOpt := metric.WithAttributes(attrs...) - m.metrics.insertCount.Add(ctx, int64(len(manyParams)), measurementOpt) + // Partition insert_count by unique_skipped_as_duplicate so the + // metric shows how many of the submitted jobs were dropped by + // UniqueOpts vs. actually enqueued. Sum across both data points + // still equals len(manyParams). + if inserted := int64(len(manyParams)) - skipped; inserted > 0 { + m.metrics.insertCount.Add(ctx, inserted, + metric.WithAttributes(append(attrs, attribute.Bool("unique_skipped_as_duplicate", false))...)) + } + if skipped > 0 { + m.metrics.insertCount.Add(ctx, skipped, + metric.WithAttributes(append(attrs, attribute.Bool("unique_skipped_as_duplicate", true))...)) + } m.metrics.insertManyCount.Add(ctx, 1, measurementOpt) m.metrics.insertManyDuration.Record(ctx, duration, measurementOpt) m.metrics.insertManyDurationHistogram.Record(ctx, duration, measurementOpt) diff --git a/otelriver/middleware_test.go b/otelriver/middleware_test.go index 72e619e..7097dfd 100644 --- a/otelriver/middleware_test.go +++ b/otelriver/middleware_test.go @@ -82,6 +82,7 @@ func TestMiddleware(t *testing.T) { require.Equal(t, "ok", getAttribute(t, span.Attributes, "status").AsString()) require.Equal(t, "river.insert_many", span.Name) require.Equal(t, codes.Ok, span.Status.Code) + require.EqualValues(t, 0, getAttribute(t, span.Attributes, "unique_skipped_as_duplicate_count").AsInt64()) var ( expectedAttrs = []attribute.KeyValue{ @@ -90,7 +91,13 @@ func TestMiddleware(t *testing.T) { metrics metricdata.ResourceMetrics ) require.NoError(t, bundle.metricReader.Collect(ctx, &metrics)) - requireSum(t, metrics, "river.insert_count", 1, expectedAttrs...) + requireSumByAttrs(t, metrics, "river.insert_count", 1, + attribute.String("status", "ok"), + attribute.Bool("unique_skipped_as_duplicate", false), + ) + requireSumByAttrs(t, metrics, "river.insert_count", 0, + attribute.Bool("unique_skipped_as_duplicate", true), + ) requireSum(t, metrics, "river.insert_many_count", 1, expectedAttrs...) { metric, _ := requireGaugeNotEmpty(t, metrics, "river.insert_many_duration", expectedAttrs...) @@ -122,6 +129,7 @@ func TestMiddleware(t *testing.T) { require.Equal(t, "river.insert_many", span.Name) require.Equal(t, codes.Error, span.Status.Code) require.Equal(t, "error from doInner", span.Status.Description) + require.EqualValues(t, 0, getAttribute(t, span.Attributes, "unique_skipped_as_duplicate_count").AsInt64()) var ( expectedAttrs = []attribute.KeyValue{ @@ -130,7 +138,13 @@ func TestMiddleware(t *testing.T) { metrics metricdata.ResourceMetrics ) require.NoError(t, bundle.metricReader.Collect(ctx, &metrics)) - requireSum(t, metrics, "river.insert_count", 1, expectedAttrs...) + requireSumByAttrs(t, metrics, "river.insert_count", 1, + attribute.String("status", "error"), + attribute.Bool("unique_skipped_as_duplicate", false), + ) + requireSumByAttrs(t, metrics, "river.insert_count", 0, + attribute.Bool("unique_skipped_as_duplicate", true), + ) requireSum(t, metrics, "river.insert_many_count", 1, expectedAttrs...) requireGaugeNotEmpty(t, metrics, "river.insert_many_duration", expectedAttrs...) requireHistogramCount(t, metrics, "river.insert_many_duration_histogram", 1, expectedAttrs...) @@ -157,6 +171,7 @@ func TestMiddleware(t *testing.T) { require.Equal(t, "river.insert_many", span.Name) require.Equal(t, codes.Error, span.Status.Code) require.Equal(t, "panic", span.Status.Description) + require.EqualValues(t, 0, getAttribute(t, span.Attributes, "unique_skipped_as_duplicate_count").AsInt64()) var ( expectedAttrs = []attribute.KeyValue{ @@ -191,6 +206,73 @@ func TestMiddleware(t *testing.T) { require.NoError(t, err) }) + t.Run("InsertManyAllDuplicates", func(t *testing.T) { + t.Parallel() + + middleware, bundle := setup(t) + + doInner := func(ctx context.Context) ([]*rivertype.JobInsertResult, error) { + return []*rivertype.JobInsertResult{ + {Job: &rivertype.JobRow{ID: 1}, UniqueSkippedAsDuplicate: true}, + {Job: &rivertype.JobRow{ID: 2}, UniqueSkippedAsDuplicate: true}, + }, nil + } + + _, err := middleware.InsertMany(ctx, []*rivertype.JobInsertParams{{}, {}}, doInner) + require.NoError(t, err) + + spans := bundle.traceExporter.GetSpans() + require.Len(t, spans, 1) + require.EqualValues(t, 2, getAttribute(t, spans[0].Attributes, "unique_skipped_as_duplicate_count").AsInt64()) + + var metrics metricdata.ResourceMetrics + require.NoError(t, bundle.metricReader.Collect(ctx, &metrics)) + requireSumByAttrs(t, metrics, "river.insert_count", 2, + attribute.String("status", "ok"), + attribute.Bool("unique_skipped_as_duplicate", true), + ) + requireSumByAttrs(t, metrics, "river.insert_count", 0, + attribute.Bool("unique_skipped_as_duplicate", false), + ) + }) + + t.Run("InsertManyMixedDuplicates", func(t *testing.T) { + t.Parallel() + + middleware, bundle := setup(t) + + doInner := func(ctx context.Context) ([]*rivertype.JobInsertResult, error) { + return []*rivertype.JobInsertResult{ + {Job: &rivertype.JobRow{ID: 1}, UniqueSkippedAsDuplicate: false}, + {Job: &rivertype.JobRow{ID: 2}, UniqueSkippedAsDuplicate: true}, + {Job: &rivertype.JobRow{ID: 3}, UniqueSkippedAsDuplicate: false}, + {Job: &rivertype.JobRow{ID: 4}, UniqueSkippedAsDuplicate: true}, + {Job: &rivertype.JobRow{ID: 5}, UniqueSkippedAsDuplicate: false}, + }, nil + } + + _, err := middleware.InsertMany(ctx, + []*rivertype.JobInsertParams{{}, {}, {}, {}, {}}, doInner) + require.NoError(t, err) + + spans := bundle.traceExporter.GetSpans() + require.Len(t, spans, 1) + require.EqualValues(t, 2, getAttribute(t, spans[0].Attributes, "unique_skipped_as_duplicate_count").AsInt64()) + + var metrics metricdata.ResourceMetrics + require.NoError(t, bundle.metricReader.Collect(ctx, &metrics)) + requireSumByAttrs(t, metrics, "river.insert_count", 3, + attribute.String("status", "ok"), + attribute.Bool("unique_skipped_as_duplicate", false), + ) + requireSumByAttrs(t, metrics, "river.insert_count", 2, + attribute.String("status", "ok"), + attribute.Bool("unique_skipped_as_duplicate", true), + ) + // Sum across both data points should still equal len(manyParams). + requireSumByAttrs(t, metrics, "river.insert_count", 5) + }) + t.Run("InsertManyDurationUnitMS", func(t *testing.T) { t.Parallel() @@ -737,3 +819,47 @@ func requireSum(t *testing.T, metrics metricdata.ResourceMetrics, name string, v metricdatatest.AssertHasAttributes(t, metric, attrs...) return metric, metricData } + +// requireSumByAttrs asserts that the sum of all data points on the named +// metric whose attributes are a superset of the given attrs equals the +// expected value. A missing metric is treated as sum=0 so callers don't +// need to distinguish "no data point" from "data point with value 0". +// Pass no attrs to assert against the metric's grand total. +func requireSumByAttrs(t *testing.T, metrics metricdata.ResourceMetrics, name string, expected int64, attrs ...attribute.KeyValue) { //nolint:unparam + t.Helper() + + _, metricData, ok := getMetric[metricdata.Sum[int64]](t, metrics, name) + var got int64 + if ok { + wantSet := attribute.NewSet(attrs...) + for _, dp := range metricData.DataPoints { + if attributeSetContains(dp.Attributes, wantSet) { + got += dp.Value + } + } + } + if got != expected { + t.Fatalf("sum of %s data points matching %v: got %d, want %d; data points: %v", + name, attrs, got, expected, formatDataPoints(metricData.DataPoints)) + } +} + +func attributeSetContains(got, want attribute.Set) bool { + iter := want.Iter() + for iter.Next() { + kv := iter.Attribute() + gotVal, ok := got.Value(kv.Key) + if !ok || gotVal != kv.Value { + return false + } + } + return true +} + +func formatDataPoints(dps []metricdata.DataPoint[int64]) string { + out := make([]string, 0, len(dps)) + for _, dp := range dps { + out = append(out, fmt.Sprintf("value=%d attrs=%v", dp.Value, dp.Attributes.ToSlice())) + } + return "[" + fmt.Sprint(out) + "]" +}