From 8499d436cba5d266b38d65df900d949e0e0facd1 Mon Sep 17 00:00:00 2001 From: peter941221 Date: Mon, 25 May 2026 21:07:01 +0800 Subject: [PATCH 1/3] Add timeout to standard pilot fetch [skip ci] --- CHANGELOG.md | 4 + rivershared/riverpilot/standard_pilot.go | 18 +++- rivershared/riverpilot/standard_pilot_test.go | 93 +++++++++++++++++++ 3 files changed, 114 insertions(+), 1 deletion(-) create mode 100644 rivershared/riverpilot/standard_pilot_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 9b319a40..4d7f7cba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Add a 10-second timeout around `StandardPilot.JobGetAvailable` so a stalled standard-pilot fetch no longer hangs a producer indefinitely. [PR #TODO](https://github.com/riverqueue/river/pull/TODO) + ## [0.38.0] - 2026-05-22 ### Added diff --git a/rivershared/riverpilot/standard_pilot.go b/rivershared/riverpilot/standard_pilot.go index 22598eb9..0998dee5 100644 --- a/rivershared/riverpilot/standard_pilot.go +++ b/rivershared/riverpilot/standard_pilot.go @@ -3,14 +3,18 @@ package riverpilot import ( "context" "sync/atomic" + "time" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/baseservice" "github.com/riverqueue/river/rivertype" ) +const standardPilotJobGetAvailableTimeoutDefault = 10 * time.Second + type StandardPilot struct { - seq atomic.Int64 + jobGetAvailableTimeout time.Duration + seq atomic.Int64 } func (p *StandardPilot) JobCleanerQueuesExcluded() []string { return nil } @@ -19,6 +23,10 @@ func (p *StandardPilot) JobGetAvailable(ctx context.Context, exec riverdriver.Ex if params.MaxToLock <= 0 { return nil, nil } + + ctx, cancel := context.WithTimeoutCause(ctx, p.jobGetAvailableTimeoutOrDefault(), context.DeadlineExceeded) + defer cancel() + return exec.JobGetAvailable(ctx, params) } @@ -75,6 +83,14 @@ func (p *StandardPilot) QueueMetadataChanged(ctx context.Context, exec riverdriv return nil } +func (p *StandardPilot) jobGetAvailableTimeoutOrDefault() time.Duration { + if p.jobGetAvailableTimeout > 0 { + return p.jobGetAvailableTimeout + } + + return standardPilotJobGetAvailableTimeoutDefault +} + type standardProducerState struct{} func (s *standardProducerState) JobFinish(job *rivertype.JobRow) { diff --git a/rivershared/riverpilot/standard_pilot_test.go b/rivershared/riverpilot/standard_pilot_test.go new file mode 100644 index 00000000..42320944 --- /dev/null +++ b/rivershared/riverpilot/standard_pilot_test.go @@ -0,0 +1,93 @@ +package riverpilot + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/riverqueue/river/riverdriver" + "github.com/riverqueue/river/rivertype" + "github.com/stretchr/testify/require" +) + +type standardPilotExecutorMock struct { + riverdriver.Executor + + jobGetAvailableFunc func(ctx context.Context, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) +} + +func (m *standardPilotExecutorMock) JobGetAvailable(ctx context.Context, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) { + return m.jobGetAvailableFunc(ctx, params) +} + +func TestStandardPilot_JobGetAvailable(t *testing.T) { + t.Parallel() + + type testBundle struct { + exec *standardPilotExecutorMock + pilot *StandardPilot + } + + setup := func(t *testing.T) *testBundle { + t.Helper() + + return &testBundle{ + exec: &standardPilotExecutorMock{}, + pilot: &StandardPilot{ + jobGetAvailableTimeout: 5 * time.Millisecond, + }, + } + } + + t.Run("ReturnsNilWhenMaxToLockIsZero", func(t *testing.T) { + t.Parallel() + + bundle := setup(t) + + res, err := bundle.pilot.JobGetAvailable(context.Background(), bundle.exec, nil, &riverdriver.JobGetAvailableParams{}) + require.NoError(t, err) + require.Nil(t, res) + }) + + t.Run("TimesOutHungFetch", func(t *testing.T) { + t.Parallel() + + bundle := setup(t) + timeoutSeen := make(chan error, 1) + + bundle.exec.jobGetAvailableFunc = func(ctx context.Context, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) { + <-ctx.Done() + timeoutSeen <- ctx.Err() + return nil, ctx.Err() + } + + start := time.Now() + _, err := bundle.pilot.JobGetAvailable(context.Background(), bundle.exec, nil, &riverdriver.JobGetAvailableParams{ + MaxToLock: 1, + }) + require.Error(t, err) + require.ErrorIs(t, err, context.DeadlineExceeded) + require.WithinDuration(t, time.Now(), start.Add(bundle.pilot.jobGetAvailableTimeout), 25*time.Millisecond) + require.ErrorIs(t, <-timeoutSeen, context.DeadlineExceeded) + }) + + t.Run("PreservesParentCancellation", func(t *testing.T) { + t.Parallel() + + bundle := setup(t) + parentErr := errors.New("parent cancelled") + parentCtx, cancel := context.WithCancelCause(context.Background()) + cancel(parentErr) + + bundle.exec.jobGetAvailableFunc = func(ctx context.Context, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) { + <-ctx.Done() + return nil, context.Cause(ctx) + } + + _, err := bundle.pilot.JobGetAvailable(parentCtx, bundle.exec, nil, &riverdriver.JobGetAvailableParams{ + MaxToLock: 1, + }) + require.ErrorIs(t, err, parentErr) + }) +} From 9d5b60393b1b246ffe1c141763b9082820b6da63 Mon Sep 17 00:00:00 2001 From: peter941221 Date: Mon, 25 May 2026 21:08:34 +0800 Subject: [PATCH 2/3] Update changelog for standard pilot fetch timeout [skip ci] --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4d7f7cba..a5f2cbdb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed -- Add a 10-second timeout around `StandardPilot.JobGetAvailable` so a stalled standard-pilot fetch no longer hangs a producer indefinitely. [PR #TODO](https://github.com/riverqueue/river/pull/TODO) +- Add a 10-second timeout around `StandardPilot.JobGetAvailable` so a stalled standard-pilot fetch no longer hangs a producer indefinitely. [PR #1255](https://github.com/riverqueue/river/pull/1255) ## [0.38.0] - 2026-05-22 From 5cc9e930f19b89fa04194ce466d98c23db57f109 Mon Sep 17 00:00:00 2001 From: peter941221 Date: Tue, 26 May 2026 18:28:55 +0800 Subject: [PATCH 3/3] Simplify standard pilot fetch timeout --- rivershared/riverpilot/standard_pilot.go | 13 ++------ rivershared/riverpilot/standard_pilot_test.go | 32 +++---------------- 2 files changed, 6 insertions(+), 39 deletions(-) diff --git a/rivershared/riverpilot/standard_pilot.go b/rivershared/riverpilot/standard_pilot.go index 0998dee5..45e2ff2b 100644 --- a/rivershared/riverpilot/standard_pilot.go +++ b/rivershared/riverpilot/standard_pilot.go @@ -13,8 +13,7 @@ import ( const standardPilotJobGetAvailableTimeoutDefault = 10 * time.Second type StandardPilot struct { - jobGetAvailableTimeout time.Duration - seq atomic.Int64 + seq atomic.Int64 } func (p *StandardPilot) JobCleanerQueuesExcluded() []string { return nil } @@ -24,7 +23,7 @@ func (p *StandardPilot) JobGetAvailable(ctx context.Context, exec riverdriver.Ex return nil, nil } - ctx, cancel := context.WithTimeoutCause(ctx, p.jobGetAvailableTimeoutOrDefault(), context.DeadlineExceeded) + ctx, cancel := context.WithTimeoutCause(ctx, standardPilotJobGetAvailableTimeoutDefault, context.DeadlineExceeded) defer cancel() return exec.JobGetAvailable(ctx, params) @@ -83,14 +82,6 @@ func (p *StandardPilot) QueueMetadataChanged(ctx context.Context, exec riverdriv return nil } -func (p *StandardPilot) jobGetAvailableTimeoutOrDefault() time.Duration { - if p.jobGetAvailableTimeout > 0 { - return p.jobGetAvailableTimeout - } - - return standardPilotJobGetAvailableTimeoutDefault -} - type standardProducerState struct{} func (s *standardProducerState) JobFinish(job *rivertype.JobRow) { diff --git a/rivershared/riverpilot/standard_pilot_test.go b/rivershared/riverpilot/standard_pilot_test.go index 42320944..ae7d4254 100644 --- a/rivershared/riverpilot/standard_pilot_test.go +++ b/rivershared/riverpilot/standard_pilot_test.go @@ -4,11 +4,11 @@ import ( "context" "errors" "testing" - "time" + + "github.com/stretchr/testify/require" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivertype" - "github.com/stretchr/testify/require" ) type standardPilotExecutorMock struct { @@ -33,10 +33,8 @@ func TestStandardPilot_JobGetAvailable(t *testing.T) { t.Helper() return &testBundle{ - exec: &standardPilotExecutorMock{}, - pilot: &StandardPilot{ - jobGetAvailableTimeout: 5 * time.Millisecond, - }, + exec: &standardPilotExecutorMock{}, + pilot: &StandardPilot{}, } } @@ -50,28 +48,6 @@ func TestStandardPilot_JobGetAvailable(t *testing.T) { require.Nil(t, res) }) - t.Run("TimesOutHungFetch", func(t *testing.T) { - t.Parallel() - - bundle := setup(t) - timeoutSeen := make(chan error, 1) - - bundle.exec.jobGetAvailableFunc = func(ctx context.Context, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) { - <-ctx.Done() - timeoutSeen <- ctx.Err() - return nil, ctx.Err() - } - - start := time.Now() - _, err := bundle.pilot.JobGetAvailable(context.Background(), bundle.exec, nil, &riverdriver.JobGetAvailableParams{ - MaxToLock: 1, - }) - require.Error(t, err) - require.ErrorIs(t, err, context.DeadlineExceeded) - require.WithinDuration(t, time.Now(), start.Add(bundle.pilot.jobGetAvailableTimeout), 25*time.Millisecond) - require.ErrorIs(t, <-timeoutSeen, context.DeadlineExceeded) - }) - t.Run("PreservesParentCancellation", func(t *testing.T) { t.Parallel()