Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions client_pilot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type pilotSpy struct {
}

type pilotSpyTestSignals struct {
JobBegin testsignal.TestSignal[int64]
JobEnd testsignal.TestSignal[int64]
JobGetAvailable testsignal.TestSignal[struct{}]
JobSetStateIfRunningMany testsignal.TestSignal[struct{}]
PeriodicJobGetAll testsignal.TestSignal[struct{}]
Expand All @@ -47,6 +49,8 @@ type pilotSpyTestSignals struct {
}

func (ts *pilotSpyTestSignals) Init(tb testutil.TestingTB) {
ts.JobBegin.Init(tb)
ts.JobEnd.Init(tb)
ts.JobGetAvailable.Init(tb)
ts.JobSetStateIfRunningMany.Init(tb)
ts.PeriodicJobGetAll.Init(tb)
Expand All @@ -59,6 +63,11 @@ func (ts *pilotSpyTestSignals) Init(tb testutil.TestingTB) {
ts.QueueMetadataChanged.Init(tb)
}

func (p *pilotSpy) JobBegin(ctx context.Context, job *rivertype.JobRow) {
p.testSignals.JobBegin.Signal(job.ID)
p.StandardPilot.JobBegin(ctx, job)
}

func (p *pilotSpy) JobCancel(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobCancelParams) (*rivertype.JobRow, error) {
p.jobCancelCalls.Add(1)
return p.StandardPilot.JobCancel(ctx, exec, params)
Expand All @@ -69,6 +78,11 @@ func (p *pilotSpy) JobCleanerQueuesExcluded() []string {
return p.StandardPilot.JobCleanerQueuesExcluded()
}

func (p *pilotSpy) JobEnd(ctx context.Context, job *rivertype.JobRow) {
p.testSignals.JobEnd.Signal(job.ID)
p.StandardPilot.JobEnd(ctx, job)
}

func (p *pilotSpy) JobGetAvailable(ctx context.Context, exec riverdriver.Executor, state riverpilot.ProducerState, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) {
p.testSignals.JobGetAvailable.Signal(struct{}{})
return p.StandardPilot.JobGetAvailable(ctx, exec, state, params)
Expand Down Expand Up @@ -324,6 +338,8 @@ func Test_Client_PilotUsage(t *testing.T) {
riversharedtest.WaitOrTimeout(t, jobDone)
require.NotZero(t, insertRes.Job.ID)

require.Equal(t, insertRes.Job.ID, pilot.testSignals.JobBegin.WaitOrTimeout())
require.Equal(t, insertRes.Job.ID, pilot.testSignals.JobEnd.WaitOrTimeout())
pilot.testSignals.JobGetAvailable.WaitOrTimeout()
pilot.testSignals.JobSetStateIfRunningMany.WaitOrTimeout()
pilot.testSignals.ProducerInit.WaitOrTimeout()
Expand Down
11 changes: 11 additions & 0 deletions internal/jobexecutor/job_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/riverqueue/river/internal/workunit"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivershared/riverpilot"
"github.com/riverqueue/river/rivertype"
)

Expand Down Expand Up @@ -114,6 +115,7 @@ type JobExecutor struct {
HookLookupGlobal hooklookup.HookLookupInterface
JobRow *rivertype.JobRow
MiddlewareLookupGlobal middlewarelookup.MiddlewareLookupInterface
Pilot riverpilot.Pilot
ProducerCallbacks struct {
JobDone func(jobRow *rivertype.JobRow)
Stuck func()
Expand Down Expand Up @@ -143,7 +145,16 @@ func (e *JobExecutor) Execute(ctx context.Context) {
QueueWaitDuration: e.start.Sub(e.JobRow.ScheduledAt),
}

if e.Pilot != nil {
e.Pilot.JobBegin(ctx, e.JobRow)
}

res := e.execute(ctx)

if e.Pilot != nil {
e.Pilot.JobEnd(ctx, e.JobRow)
}

if res.Err != nil && errors.Is(context.Cause(ctx), rivertype.ErrJobCancelledRemotely) {
res.Err = context.Cause(ctx)
}
Expand Down
103 changes: 103 additions & 0 deletions internal/jobexecutor/job_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,25 @@ func (h *testErrorHandler) HandlePanic(ctx context.Context, job *rivertype.JobRo
return h.HandlePanicFunc(ctx, job, panicVal, trace)
}

type testPilotWithJobCallbacks struct {
riverpilot.StandardPilot

JobBeginFunc func(ctx context.Context, job *rivertype.JobRow)
JobEndFunc func(ctx context.Context, job *rivertype.JobRow)
}

func (p *testPilotWithJobCallbacks) JobBegin(ctx context.Context, job *rivertype.JobRow) {
if p.JobBeginFunc != nil {
p.JobBeginFunc(ctx, job)
}
}

func (p *testPilotWithJobCallbacks) JobEnd(ctx context.Context, job *rivertype.JobRow) {
if p.JobEndFunc != nil {
p.JobEndFunc(ctx, job)
}
}

func TestJobExecutor_Execute(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -995,6 +1014,90 @@ func TestJobExecutor_Execute(t *testing.T) {
require.True(t, bundle.errorHandler.HandlePanicCalled)
})

t.Run("PilotJobCallbacksJobBeginAndEnd", func(t *testing.T) {
t.Parallel()

executor, bundle := setup(t)

var events []string

executor.Pilot = &testPilotWithJobCallbacks{
JobBeginFunc: func(ctx context.Context, job *rivertype.JobRow) {
require.Equal(t, bundle.jobRow.ID, job.ID)
events = append(events, "begin")
},
JobEndFunc: func(ctx context.Context, job *rivertype.JobRow) {
require.Equal(t, bundle.jobRow.ID, job.ID)
events = append(events, "end")
},
}
executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error {
events = append(events, "work")
return nil
}, nil).MakeUnit(bundle.jobRow)

executor.Execute(ctx)
riversharedtest.WaitOrTimeout(t, bundle.updateCh)

require.Equal(t, []string{"begin", "work", "end"}, events)
})

t.Run("PilotJobCallbacksJobEndInvokedOnError", func(t *testing.T) {
t.Parallel()

executor, bundle := setup(t)

var events []string

executor.Pilot = &testPilotWithJobCallbacks{
JobBeginFunc: func(ctx context.Context, job *rivertype.JobRow) {
require.Equal(t, bundle.jobRow.ID, job.ID)
events = append(events, "begin")
},
JobEndFunc: func(ctx context.Context, job *rivertype.JobRow) {
require.Equal(t, bundle.jobRow.ID, job.ID)
events = append(events, "end")
},
}
executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error {
events = append(events, "work")
return errors.New("work failed")
}, nil).MakeUnit(bundle.jobRow)

executor.Execute(ctx)
riversharedtest.WaitOrTimeout(t, bundle.updateCh)

require.Equal(t, []string{"begin", "work", "end"}, events)
})

t.Run("PilotJobCallbacksJobEndInvokedOnPanic", func(t *testing.T) {
t.Parallel()

executor, bundle := setup(t)

var events []string

executor.Pilot = &testPilotWithJobCallbacks{
JobBeginFunc: func(ctx context.Context, job *rivertype.JobRow) {
require.Equal(t, bundle.jobRow.ID, job.ID)
events = append(events, "begin")
},
JobEndFunc: func(ctx context.Context, job *rivertype.JobRow) {
require.Equal(t, bundle.jobRow.ID, job.ID)
events = append(events, "end")
},
}
executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error {
events = append(events, "work")
panic("panic val")
}, nil).MakeUnit(bundle.jobRow)

executor.Execute(ctx)
riversharedtest.WaitOrTimeout(t, bundle.updateCh)

require.Equal(t, []string{"begin", "work", "end"}, events)
})

t.Run("CancelFuncCleanedUpEvenWithoutCancel", func(t *testing.T) {
t.Parallel()

Expand Down
2 changes: 2 additions & 0 deletions internal/maintenance/job_rescuer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ func TestJobRescuer(t *testing.T) {

stuckToRetryJob1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), MaxAttempts: ptrutil.Ptr(5)})
stuckToRetryJob2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Minute)), MaxAttempts: ptrutil.Ptr(5)})
stuckToRetryJobWithLastSeen := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Minute)), Metadata: fmt.Appendf(nil, `{"%s": %q}`, riversharedmaintenance.MetadataKeyLastSeenAt, bundle.rescueHorizon.Add(-1*time.Minute).UTC().Format(time.RFC3339Nano)), MaxAttempts: ptrutil.Ptr(5)})
stuckToRetryJob3 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(1 * time.Minute)), MaxAttempts: ptrutil.Ptr(5)}) // won't be rescued

// Already at max attempts:
Expand Down Expand Up @@ -183,6 +184,7 @@ func TestJobRescuer(t *testing.T) {
var err error
confirmRetried(stuckToRetryJob1)
confirmRetried(stuckToRetryJob2)
confirmRetried(stuckToRetryJobWithLastSeen)

job3After, err := bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: stuckToRetryJob3.ID, Schema: rescuer.Config.Schema})
require.NoError(t, err)
Expand Down
1 change: 1 addition & 0 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,7 @@ func (p *producer) startNewExecutors(workCtx context.Context, jobs []*rivertype.
HookLookupGlobal: p.config.HookLookupGlobal,
MiddlewareLookupGlobal: p.config.MiddlewareLookupGlobal,
JobRow: job,
Pilot: p.pilot,
ProducerCallbacks: struct {
JobDone func(jobRow *rivertype.JobRow)
Stuck func()
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 19 additions & 5 deletions riverdriver/riverdrivertest/job_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package riverdrivertest

import (
"context"
"fmt"
"sort"
"strconv"
"testing"
Expand All @@ -12,6 +13,7 @@ import (

"github.com/riverqueue/river/internal/rivercommon"
"github.com/riverqueue/river/riverdriver"
riversharedmaintenance "github.com/riverqueue/river/rivershared/riversharedmaintenance"
"github.com/riverqueue/river/rivershared/testfactory"
"github.com/riverqueue/river/rivershared/util/ptrutil"
"github.com/riverqueue/river/rivershared/util/sliceutil"
Expand Down Expand Up @@ -519,16 +521,22 @@ func exerciseJobRead[TTx any](ctx context.Context, t *testing.T, executorWithTx
afterHorizon = horizon.Add(1 * time.Minute)
)

stuckJob1 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)})
metadataWithLastSeen := func(t time.Time) []byte {
return fmt.Appendf(nil, `{"%s": %q}`, riversharedmaintenance.MetadataKeyLastSeenAt,
t.UTC().Round(time.Millisecond).Format("2006-01-02 15:04:05.999-07:00"))
}

stuckJob1 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, Metadata: []byte(`{"other":"value"}`), State: ptrutil.Ptr(rivertype.JobStateRunning)})
stuckJob2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)})
stuckJob3 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)})

t.Logf("horizon = %s", horizon)
t.Logf("stuckJob1 = %s", stuckJob1.AttemptedAt)
t.Logf("stuckJob2 = %s", stuckJob2.AttemptedAt)

t.Logf("stuckJob1 full = %s", spew.Sdump(stuckJob1))

// Not returned because we put a maximum of two.
// Not returned because we put a maximum of three.
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)})

// Not stuck because not in running state.
Expand All @@ -537,13 +545,19 @@ func exerciseJobRead[TTx any](ctx context.Context, t *testing.T, executorWithTx
// Not stuck because after queried horizon.
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &afterHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)})

// Max two stuck
// Not stuck because last seen is after queried horizon.
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, Metadata: metadataWithLastSeen(afterHorizon), State: ptrutil.Ptr(rivertype.JobStateRunning)})

// Not stuck because attempted at is after queried horizon.
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &afterHorizon, Metadata: metadataWithLastSeen(beforeHorizon), State: ptrutil.Ptr(rivertype.JobStateRunning)})

// Max three stuck.
stuckJobs, err := exec.JobGetStuck(ctx, &riverdriver.JobGetStuckParams{
Max: 2,
Max: 3,
StuckHorizon: horizon,
})
require.NoError(t, err)
require.Equal(t, []int64{stuckJob1.ID, stuckJob2.ID},
require.Equal(t, []int64{stuckJob1.ID, stuckJob2.ID, stuckJob3.ID},
sliceutil.Map(stuckJobs, func(j *rivertype.JobRow) int64 { return j.ID }))
})

Expand Down
10 changes: 8 additions & 2 deletions riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,13 @@ ORDER BY id;
SELECT *
FROM /* TEMPLATE: schema */river_job
WHERE state = 'running'
AND attempted_at < @stuck_horizon::timestamptz
-- `last_seen_at` may still be present on a row from its last retry, so make
-- sure we have `max` to take `attempted_at` (set on the latest lock of the
-- job) if it's larger.
AND greatest(
attempted_at,
(metadata->>'river:last_seen_at')::timestamptz
) < @stuck_horizon::timestamptz
ORDER BY id
LIMIT @max;

Expand Down Expand Up @@ -726,4 +732,4 @@ SET
metadata = CASE WHEN @metadata_do_update::boolean THEN @metadata::jsonb ELSE metadata END,
state = CASE WHEN @state_do_update::boolean THEN @state::/* TEMPLATE: schema */river_job_state ELSE state END
WHERE id = @id
RETURNING *;
RETURNING *;
8 changes: 7 additions & 1 deletion riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 10 additions & 2 deletions riverdriver/riversqlite/internal/dbsqlc/river_job.sql
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,15 @@ ORDER BY id;
SELECT *
FROM /* TEMPLATE: schema */river_job
WHERE state = 'running'
AND attempted_at < cast(@stuck_horizon AS text)
-- `last_seen_at` may still be present on a row from its last retry, so make
-- sure we have `max` to take `attempted_at` (set on the latest lock of the
-- job) if it's larger.
--
-- `coalesce` is necessary because `max(NULL, ...)` always returns `NULL`.
AND max(
attempted_at,
coalesce(json_extract(metadata, '$."river:last_seen_at"'), attempted_at)
) < cast(@stuck_horizon AS text)
ORDER BY id
LIMIT @max;

Expand Down Expand Up @@ -513,4 +521,4 @@ SET
metadata = CASE WHEN cast(@metadata_do_update AS boolean) THEN json(cast(@metadata AS blob)) ELSE metadata END,
state = CASE WHEN cast(@state_do_update AS boolean) THEN @state ELSE state END
WHERE id = @id
RETURNING *;
RETURNING *;
10 changes: 9 additions & 1 deletion riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading