Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Added `unique_skipped_as_duplicate` attributes to otel `insert_many` spans and `insert_count` metric. [PR #58](https://github.com/riverqueue/rivercontrib/pull/58).

### Changed

- Record snoozed jobs with status `ok` instead of `error` in `otelriver` middleware. Add new `snooze.duration` span attribute. [PR #59](https://github.com/riverqueue/rivercontrib/pull/59).
Expand Down
22 changes: 21 additions & 1 deletion otelriver/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,32 @@ func (m *Middleware) InsertMany(ctx context.Context, manyParams []*rivertype.Job
duration := m.durationInPreferredUnit(time.Since(begin))

setStatus(attrs, statusIndex, span, panicked, err)

var skipped int64
for _, r := range insertRes {
if r != nil && r.UniqueSkippedAsDuplicate {
skipped++
}
}

span.SetAttributes(attrs...) // set after finalizing status
span.SetAttributes(attribute.Int64("unique_skipped_as_duplicate_count", skipped))

// This allocates a new slice, so make sure to do it as few times as possible.
measurementOpt := metric.WithAttributes(attrs...)

m.metrics.insertCount.Add(ctx, int64(len(manyParams)), measurementOpt)
// Partition insert_count by unique_skipped_as_duplicate so the
// metric shows how many of the submitted jobs were dropped by
// UniqueOpts vs. actually enqueued. Sum across both data points
// still equals len(manyParams).
if inserted := int64(len(manyParams)) - skipped; inserted > 0 {
m.metrics.insertCount.Add(ctx, inserted,
metric.WithAttributes(append(attrs, attribute.Bool("unique_skipped_as_duplicate", false))...))
}
if skipped > 0 {
m.metrics.insertCount.Add(ctx, skipped,
metric.WithAttributes(append(attrs, attribute.Bool("unique_skipped_as_duplicate", true))...))
}
m.metrics.insertManyCount.Add(ctx, 1, measurementOpt)
m.metrics.insertManyDuration.Record(ctx, duration, measurementOpt)
m.metrics.insertManyDurationHistogram.Record(ctx, duration, measurementOpt)
Expand Down
130 changes: 128 additions & 2 deletions otelriver/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func TestMiddleware(t *testing.T) {
require.Equal(t, "ok", getAttribute(t, span.Attributes, "status").AsString())
require.Equal(t, "river.insert_many", span.Name)
require.Equal(t, codes.Ok, span.Status.Code)
require.EqualValues(t, 0, getAttribute(t, span.Attributes, "unique_skipped_as_duplicate_count").AsInt64())

var (
expectedAttrs = []attribute.KeyValue{
Expand All @@ -90,7 +91,13 @@ func TestMiddleware(t *testing.T) {
metrics metricdata.ResourceMetrics
)
require.NoError(t, bundle.metricReader.Collect(ctx, &metrics))
requireSum(t, metrics, "river.insert_count", 1, expectedAttrs...)
requireSumByAttrs(t, metrics, "river.insert_count", 1,
attribute.String("status", "ok"),
attribute.Bool("unique_skipped_as_duplicate", false),
)
requireSumByAttrs(t, metrics, "river.insert_count", 0,
attribute.Bool("unique_skipped_as_duplicate", true),
)
requireSum(t, metrics, "river.insert_many_count", 1, expectedAttrs...)
{
metric, _ := requireGaugeNotEmpty(t, metrics, "river.insert_many_duration", expectedAttrs...)
Expand Down Expand Up @@ -122,6 +129,7 @@ func TestMiddleware(t *testing.T) {
require.Equal(t, "river.insert_many", span.Name)
require.Equal(t, codes.Error, span.Status.Code)
require.Equal(t, "error from doInner", span.Status.Description)
require.EqualValues(t, 0, getAttribute(t, span.Attributes, "unique_skipped_as_duplicate_count").AsInt64())

var (
expectedAttrs = []attribute.KeyValue{
Expand All @@ -130,7 +138,13 @@ func TestMiddleware(t *testing.T) {
metrics metricdata.ResourceMetrics
)
require.NoError(t, bundle.metricReader.Collect(ctx, &metrics))
requireSum(t, metrics, "river.insert_count", 1, expectedAttrs...)
requireSumByAttrs(t, metrics, "river.insert_count", 1,
attribute.String("status", "error"),
attribute.Bool("unique_skipped_as_duplicate", false),
)
requireSumByAttrs(t, metrics, "river.insert_count", 0,
attribute.Bool("unique_skipped_as_duplicate", true),
)
requireSum(t, metrics, "river.insert_many_count", 1, expectedAttrs...)
requireGaugeNotEmpty(t, metrics, "river.insert_many_duration", expectedAttrs...)
requireHistogramCount(t, metrics, "river.insert_many_duration_histogram", 1, expectedAttrs...)
Expand All @@ -157,6 +171,7 @@ func TestMiddleware(t *testing.T) {
require.Equal(t, "river.insert_many", span.Name)
require.Equal(t, codes.Error, span.Status.Code)
require.Equal(t, "panic", span.Status.Description)
require.EqualValues(t, 0, getAttribute(t, span.Attributes, "unique_skipped_as_duplicate_count").AsInt64())

var (
expectedAttrs = []attribute.KeyValue{
Expand Down Expand Up @@ -191,6 +206,73 @@ func TestMiddleware(t *testing.T) {
require.NoError(t, err)
})

t.Run("InsertManyAllDuplicates", func(t *testing.T) {
t.Parallel()

middleware, bundle := setup(t)

doInner := func(ctx context.Context) ([]*rivertype.JobInsertResult, error) {
return []*rivertype.JobInsertResult{
{Job: &rivertype.JobRow{ID: 1}, UniqueSkippedAsDuplicate: true},
{Job: &rivertype.JobRow{ID: 2}, UniqueSkippedAsDuplicate: true},
}, nil
}

_, err := middleware.InsertMany(ctx, []*rivertype.JobInsertParams{{}, {}}, doInner)
require.NoError(t, err)

spans := bundle.traceExporter.GetSpans()
require.Len(t, spans, 1)
require.EqualValues(t, 2, getAttribute(t, spans[0].Attributes, "unique_skipped_as_duplicate_count").AsInt64())

var metrics metricdata.ResourceMetrics
require.NoError(t, bundle.metricReader.Collect(ctx, &metrics))
requireSumByAttrs(t, metrics, "river.insert_count", 2,
attribute.String("status", "ok"),
attribute.Bool("unique_skipped_as_duplicate", true),
)
requireSumByAttrs(t, metrics, "river.insert_count", 0,
attribute.Bool("unique_skipped_as_duplicate", false),
)
})

t.Run("InsertManyMixedDuplicates", func(t *testing.T) {
t.Parallel()

middleware, bundle := setup(t)

doInner := func(ctx context.Context) ([]*rivertype.JobInsertResult, error) {
return []*rivertype.JobInsertResult{
{Job: &rivertype.JobRow{ID: 1}, UniqueSkippedAsDuplicate: false},
{Job: &rivertype.JobRow{ID: 2}, UniqueSkippedAsDuplicate: true},
{Job: &rivertype.JobRow{ID: 3}, UniqueSkippedAsDuplicate: false},
{Job: &rivertype.JobRow{ID: 4}, UniqueSkippedAsDuplicate: true},
{Job: &rivertype.JobRow{ID: 5}, UniqueSkippedAsDuplicate: false},
}, nil
}

_, err := middleware.InsertMany(ctx,
[]*rivertype.JobInsertParams{{}, {}, {}, {}, {}}, doInner)
require.NoError(t, err)

spans := bundle.traceExporter.GetSpans()
require.Len(t, spans, 1)
require.EqualValues(t, 2, getAttribute(t, spans[0].Attributes, "unique_skipped_as_duplicate_count").AsInt64())

var metrics metricdata.ResourceMetrics
require.NoError(t, bundle.metricReader.Collect(ctx, &metrics))
requireSumByAttrs(t, metrics, "river.insert_count", 3,
attribute.String("status", "ok"),
attribute.Bool("unique_skipped_as_duplicate", false),
)
requireSumByAttrs(t, metrics, "river.insert_count", 2,
attribute.String("status", "ok"),
attribute.Bool("unique_skipped_as_duplicate", true),
)
// Sum across both data points should still equal len(manyParams).
requireSumByAttrs(t, metrics, "river.insert_count", 5)
})

t.Run("InsertManyDurationUnitMS", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -737,3 +819,47 @@ func requireSum(t *testing.T, metrics metricdata.ResourceMetrics, name string, v
metricdatatest.AssertHasAttributes(t, metric, attrs...)
return metric, metricData
}

// requireSumByAttrs asserts that the sum of all data points on the named
// metric whose attributes are a superset of the given attrs equals the
// expected value. A missing metric is treated as sum=0 so callers don't
// need to distinguish "no data point" from "data point with value 0".
// Pass no attrs to assert against the metric's grand total.
func requireSumByAttrs(t *testing.T, metrics metricdata.ResourceMetrics, name string, expected int64, attrs ...attribute.KeyValue) { //nolint:unparam
t.Helper()

_, metricData, ok := getMetric[metricdata.Sum[int64]](t, metrics, name)
var got int64
if ok {
wantSet := attribute.NewSet(attrs...)
for _, dp := range metricData.DataPoints {
if attributeSetContains(dp.Attributes, wantSet) {
got += dp.Value
}
}
}
if got != expected {
t.Fatalf("sum of %s data points matching %v: got %d, want %d; data points: %v",
name, attrs, got, expected, formatDataPoints(metricData.DataPoints))
}
}

func attributeSetContains(got, want attribute.Set) bool {
iter := want.Iter()
for iter.Next() {
kv := iter.Attribute()
gotVal, ok := got.Value(kv.Key)
if !ok || gotVal != kv.Value {
return false
}
}
return true
}

func formatDataPoints(dps []metricdata.DataPoint[int64]) string {
out := make([]string, 0, len(dps))
for _, dp := range dps {
out = append(out, fmt.Sprintf("value=%d attrs=%v", dp.Value, dp.Attributes.ToSlice()))
}
return "[" + fmt.Sprint(out) + "]"
}
Loading