Skip to content

Commit 65c545d

Browse files
authored
refactor(run-store,webapp,run-engine): route Postgres TaskRun reads through the run store (#3990)
## Summary Adds read methods to `RunStore` (`findRun`, `findRunOrThrow`, `findRuns`) and routes every Postgres read of `TaskRun` through them, mirroring how writes already go through the store. Behavior-preserving: each relocated read keeps its exact query, field selection, and database client (writer, replica, or transaction). This lets `TaskRun` reads be retargeted to a different backing store later without touching call sites. Stacked on #3981 (the write adapter); that PR is the base of this one. ## Scope In scope: the run engine, webapp services, presenters, and route loaders. Three reads that pulled `TaskRun` in through a parent model's relation `include` (alert delivery, batch results, attempt-dependency cancellation) are decomposed to fetch the run(s) through the store and stitch them back, since a relation include would not follow `TaskRun` to a new table. Left reading the existing table (out of scope): the legacy MarQS paths, the legacy trigger idempotency read, and one raw-SQL recovery script (commented for revisiting at cutover). ## Notes Reads default to the read replica; callers pass the writer or a transaction client wherever the original read did, so writer-vs-replica behavior is unchanged.
1 parent 7621601 commit 65c545d

100 files changed

Lines changed: 2612 additions & 1470 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: improvement
4+
---
5+
6+
Route Postgres task run reads through the run store so they can be retargeted to a different backing store without changing call sites.

apps/webapp/app/models/runtimeEnvironment.server.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import type { AuthenticatedEnvironment } from "@internal/run-engine";
22
import type { Prisma, PrismaClientOrTransaction, RuntimeEnvironment } from "@trigger.dev/database";
33
import { $replica, prisma } from "~/db.server";
4+
import { runStore } from "~/v3/runStore.server";
45
import { logger } from "~/services/logger.server";
56
import { getUsername } from "~/utils/username";
67
import { sanitizeBranchName } from "@trigger.dev/core/v3/utils/gitBranch";
@@ -251,14 +252,17 @@ export async function findEnvironmentFromRun(
251252
): Promise<EnvironmentFromRun | null> {
252253
// The include (no select) already pulls every taskRun scalar, so runTags/batchId
253254
// ride along for free — no extra query for the realtime publish to send a full record.
254-
const taskRun = await (tx ?? $replica).taskRun.findFirst({
255-
where: {
255+
const taskRun = await runStore.findRun(
256+
{
256257
id: runId,
257258
},
258-
include: {
259-
runtimeEnvironment: { include: authIncludeBase },
259+
{
260+
include: {
261+
runtimeEnvironment: { include: authIncludeBase },
262+
},
260263
},
261-
});
264+
tx ?? $replica
265+
);
262266
if (!taskRun?.runtimeEnvironment) {
263267
return null;
264268
}

apps/webapp/app/presenters/v3/ApiBatchResultsPresenter.server.ts

Lines changed: 43 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { BatchTaskRunExecutionResult } from "@trigger.dev/core/v3";
2-
import { executionResultForTaskRun } from "~/models/taskRun.server";
2+
import { executionResultForTaskRun, TaskRunWithAttempts } from "~/models/taskRun.server";
33
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
4+
import { runStore } from "~/v3/runStore.server";
45
import { BasePresenter } from "./basePresenter.server";
56

67
export class ApiBatchResultsPresenter extends BasePresenter {
@@ -16,16 +17,8 @@ export class ApiBatchResultsPresenter extends BasePresenter {
1617
},
1718
include: {
1819
items: {
19-
include: {
20-
taskRun: {
21-
include: {
22-
attempts: {
23-
orderBy: {
24-
createdAt: "desc",
25-
},
26-
},
27-
},
28-
},
20+
select: {
21+
taskRunId: true,
2922
},
3023
},
3124
},
@@ -35,10 +28,48 @@ export class ApiBatchResultsPresenter extends BasePresenter {
3528
return undefined;
3629
}
3730

31+
const taskRunIds = batchRun.items.map((item) => item.taskRunId);
32+
33+
if (taskRunIds.length === 0) {
34+
return {
35+
id: batchRun.friendlyId,
36+
items: [],
37+
};
38+
}
39+
40+
const taskRuns = await runStore.findRuns(
41+
{
42+
where: { id: { in: taskRunIds } },
43+
select: {
44+
id: true,
45+
friendlyId: true,
46+
status: true,
47+
taskIdentifier: true,
48+
attempts: {
49+
select: {
50+
status: true,
51+
output: true,
52+
outputType: true,
53+
error: true,
54+
},
55+
orderBy: {
56+
createdAt: "desc",
57+
},
58+
},
59+
},
60+
},
61+
this._prisma
62+
);
63+
64+
const runMap = new Map(taskRuns.map((run) => [run.id, run]));
65+
3866
return {
3967
id: batchRun.friendlyId,
4068
items: batchRun.items
41-
.map((item) => executionResultForTaskRun(item.taskRun))
69+
.map((item) => {
70+
const run = runMap.get(item.taskRunId);
71+
return run ? executionResultForTaskRun(run as TaskRunWithAttempts) : undefined;
72+
})
4273
.filter(Boolean),
4374
};
4475
});

apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts

Lines changed: 30 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import {
2222
type SyntheticRun,
2323
} from "~/v3/mollifier/readFallback.server";
2424
import { generatePresignedUrl } from "~/v3/objectStore.server";
25+
import { runStore } from "~/v3/runStore.server";
2526
import { tracer } from "~/v3/tracer.server";
2627
import { startSpanWithEnv } from "~/v3/tracing.server";
2728

@@ -110,38 +111,41 @@ export class ApiRetrieveRunPresenter {
110111
friendlyId: string,
111112
env: AuthenticatedEnvironment,
112113
): Promise<FoundRun | null> {
113-
const pgRow = await $replica.taskRun.findFirst({
114-
where: {
114+
const pgRow = await runStore.findRun(
115+
{
115116
friendlyId,
116117
runtimeEnvironmentId: env.id,
117118
},
118-
select: {
119-
...commonRunSelect,
120-
traceId: true,
121-
payload: true,
122-
payloadType: true,
123-
output: true,
124-
outputType: true,
125-
error: true,
126-
attempts: {
127-
select: {
128-
id: true,
119+
{
120+
select: {
121+
...commonRunSelect,
122+
traceId: true,
123+
payload: true,
124+
payloadType: true,
125+
output: true,
126+
outputType: true,
127+
error: true,
128+
attempts: {
129+
select: {
130+
id: true,
131+
},
132+
},
133+
attemptNumber: true,
134+
engine: true,
135+
taskEventStore: true,
136+
parentTaskRun: {
137+
select: commonRunSelect,
138+
},
139+
rootTaskRun: {
140+
select: commonRunSelect,
141+
},
142+
childRuns: {
143+
select: commonRunSelect,
129144
},
130-
},
131-
attemptNumber: true,
132-
engine: true,
133-
taskEventStore: true,
134-
parentTaskRun: {
135-
select: commonRunSelect,
136-
},
137-
rootTaskRun: {
138-
select: commonRunSelect,
139-
},
140-
childRuns: {
141-
select: commonRunSelect,
142145
},
143146
},
144-
});
147+
$replica
148+
);
145149

146150
if (pgRow) return { ...pgRow, isBuffered: false };
147151

apps/webapp/app/presenters/v3/ApiRunResultPresenter.server.ts

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { TaskRunExecutionResult } from "@trigger.dev/core/v3";
22
import { executionResultForTaskRun } from "~/models/taskRun.server";
33
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
4+
import { runStore } from "~/v3/runStore.server";
45
import { BasePresenter } from "./basePresenter.server";
56

67
export class ApiRunResultPresenter extends BasePresenter {
@@ -9,19 +10,22 @@ export class ApiRunResultPresenter extends BasePresenter {
910
env: AuthenticatedEnvironment
1011
): Promise<TaskRunExecutionResult | undefined> {
1112
return this.traceWithEnv("call", env, async (span) => {
12-
const taskRun = await this._prisma.taskRun.findFirst({
13-
where: {
13+
const taskRun = await runStore.findRun(
14+
{
1415
friendlyId,
1516
runtimeEnvironmentId: env.id,
1617
},
17-
include: {
18-
attempts: {
19-
orderBy: {
20-
createdAt: "desc",
18+
{
19+
include: {
20+
attempts: {
21+
orderBy: {
22+
createdAt: "desc",
23+
},
2124
},
2225
},
2326
},
24-
});
27+
this._prisma
28+
);
2529

2630
if (!taskRun) {
2731
return undefined;

apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { getTaskIdentifiers } from "~/models/task.server";
1313
import { RunsRepository } from "~/services/runsRepository/runsRepository.server";
1414
import { regionForDisplay } from "~/runEngine/concerns/workerQueueSplit.server";
1515
import { machinePresetFromRun } from "~/v3/machinePresets.server";
16+
import { runStore } from "~/v3/runStore.server";
1617
import { ServiceValidationError } from "~/v3/services/baseService.server";
1718
import { isCancellableRunStatus, isFinalRunStatus, isPendingRunStatus } from "~/v3/taskStatus";
1819

@@ -206,11 +207,12 @@ export class NextRunListPresenter {
206207
let hasAnyRuns = runs.length > 0;
207208

208209
if (!hasAnyRuns) {
209-
const firstRun = await this.replica.taskRun.findFirst({
210-
where: {
210+
const firstRun = await runStore.findRun(
211+
{
211212
runtimeEnvironmentId: environmentId,
212213
},
213-
});
214+
this.replica
215+
);
214216

215217
if (firstRun) {
216218
hasAnyRuns = true;

apps/webapp/app/presenters/v3/RunPresenter.server.ts

Lines changed: 56 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server";
88
import { isFinalRunStatus } from "~/v3/taskStatus";
99
import { env } from "~/env.server";
1010
import { getEventRepositoryForStore } from "~/v3/eventRepository/index.server";
11+
import { runStore } from "~/v3/runStore.server";
1112

1213
type Result = Awaited<ReturnType<RunPresenter["call"]>>;
1314
export type Run = Result["run"];
@@ -62,57 +63,8 @@ export class RunPresenter {
6263
// buffer view. `findFirstOrThrow` would log a `PrismaClient error`
6364
// every tick of the page poll, masking real DB issues with synthetic
6465
// not-found noise.
65-
const run = await this.#prismaClient.taskRun.findFirst({
66-
select: {
67-
id: true,
68-
createdAt: true,
69-
taskEventStore: true,
70-
taskIdentifier: true,
71-
number: true,
72-
traceId: true,
73-
spanId: true,
74-
parentSpanId: true,
75-
friendlyId: true,
76-
status: true,
77-
startedAt: true,
78-
completedAt: true,
79-
logsDeletedAt: true,
80-
annotations: true,
81-
rootTaskRun: {
82-
select: {
83-
friendlyId: true,
84-
spanId: true,
85-
createdAt: true,
86-
},
87-
},
88-
parentTaskRun: {
89-
select: {
90-
friendlyId: true,
91-
spanId: true,
92-
createdAt: true,
93-
},
94-
},
95-
runtimeEnvironment: {
96-
select: {
97-
id: true,
98-
type: true,
99-
slug: true,
100-
organizationId: true,
101-
orgMember: {
102-
select: {
103-
user: {
104-
select: {
105-
id: true,
106-
name: true,
107-
displayName: true,
108-
},
109-
},
110-
},
111-
},
112-
},
113-
},
114-
},
115-
where: {
66+
const run = await runStore.findRun(
67+
{
11668
friendlyId: runFriendlyId,
11769
project: {
11870
slug: projectSlug,
@@ -125,7 +77,59 @@ export class RunPresenter {
12577
},
12678
},
12779
},
128-
});
80+
{
81+
select: {
82+
id: true,
83+
createdAt: true,
84+
taskEventStore: true,
85+
taskIdentifier: true,
86+
number: true,
87+
traceId: true,
88+
spanId: true,
89+
parentSpanId: true,
90+
friendlyId: true,
91+
status: true,
92+
startedAt: true,
93+
completedAt: true,
94+
logsDeletedAt: true,
95+
annotations: true,
96+
rootTaskRun: {
97+
select: {
98+
friendlyId: true,
99+
spanId: true,
100+
createdAt: true,
101+
},
102+
},
103+
parentTaskRun: {
104+
select: {
105+
friendlyId: true,
106+
spanId: true,
107+
createdAt: true,
108+
},
109+
},
110+
runtimeEnvironment: {
111+
select: {
112+
id: true,
113+
type: true,
114+
slug: true,
115+
organizationId: true,
116+
orgMember: {
117+
select: {
118+
user: {
119+
select: {
120+
id: true,
121+
name: true,
122+
displayName: true,
123+
},
124+
},
125+
},
126+
},
127+
},
128+
},
129+
},
130+
},
131+
this.#prismaClient
132+
);
129133

130134
if (!run) {
131135
throw new RunNotInPgError(runFriendlyId);

0 commit comments

Comments
 (0)