Skip to content

Commit 315baf2

Browse files
authored
refactor(run-engine,webapp): route TaskRun writes through a new RunStore adapter (#3981)
1 parent a6400f9 commit 315baf2

35 files changed

Lines changed: 3267 additions & 675 deletions

apps/webapp/app/routes/api.v1.runs.$runId.tags.ts

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
99
import { logger } from "~/services/logger.server";
1010
import { publishChangeRecord } from "~/services/realtime/runChangeNotifierInstance.server";
1111
import { mutateWithFallback } from "~/v3/mollifier/mutateWithFallback.server";
12+
import { runStore } from "~/v3/runStore.server";
1213

1314
// Pull the existing tags out of a buffer entry's serialised payload so
1415
// the buffer-path response can dedup against them, matching the
@@ -84,14 +85,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
8485
if (newTags.length === 0) {
8586
return json({ message: "No new tags to add" }, { status: 200 });
8687
}
87-
const updated = await prisma.taskRun.update({
88-
where: {
89-
id: taskRun.id,
90-
runtimeEnvironmentId: env.id,
91-
},
92-
data: { runTags: { push: newTags } },
93-
select: { updatedAt: true },
94-
});
88+
const updated = await runStore.pushTags(taskRun.id, newTags, { runtimeEnvironmentId: env.id }, prisma);
9589
// Publish a run-changed record with the NEW tag set so tag feeds reindex
9690
// (no-op unless enabled). updatedAt is the read-your-writes watermark.
9791
publishChangeRecord({

apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.append.ts

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { $replica, prisma } from "~/db.server";
66
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
77
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
88
import { ServiceValidationError } from "~/v3/services/common.server";
9+
import { runStore } from "~/v3/runStore.server";
910

1011
const ParamsSchema = z.object({
1112
runId: z.string(),
@@ -87,16 +88,7 @@ const { action } = createActionApiRoute(
8788
}
8889

8990
if (!targetRun.realtimeStreams.includes(params.streamId)) {
90-
await prisma.taskRun.update({
91-
where: {
92-
id: targetRun.id,
93-
},
94-
data: {
95-
realtimeStreams: {
96-
push: params.streamId,
97-
},
98-
},
99-
});
91+
await runStore.pushRealtimeStream(targetRun.id, params.streamId, prisma);
10092
}
10193

10294
const part = await request.text();

apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
createActionApiRoute,
77
createLoaderApiRoute,
88
} from "~/services/routeBuilders/apiBuilder.server";
9+
import { runStore } from "~/v3/runStore.server";
910

1011
const ParamsSchema = z.object({
1112
runId: z.string(),
@@ -86,12 +87,7 @@ const { action } = createActionApiRoute(
8687
}
8788

8889
if (!target.realtimeStreams.includes(params.streamId)) {
89-
await prisma.taskRun.update({
90-
where: { id: target.id },
91-
data: {
92-
realtimeStreams: { push: params.streamId },
93-
},
94-
});
90+
await runStore.pushRealtimeStream(target.id, params.streamId, prisma);
9591
}
9692

9793
const realtimeStream = getRealtimeStreamInstance(

apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server";
1010
import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server";
1111
import { claimOrAwait } from "~/v3/mollifier/idempotencyClaim.server";
1212
import { makeResolveMollifierFlag } from "~/v3/mollifier/mollifierGate.server";
13+
import { runStore } from "~/v3/runStore.server";
1314
import type { TraceEventConcern, TriggerTaskRequest } from "../types";
1415

1516
// In-memory per-org mollifier-enabled check, shared with `evaluateGate`
@@ -190,10 +191,10 @@ export class IdempotencyKeyConcern {
190191
});
191192

192193
// Update the existing run to remove the idempotency key
193-
await this.prisma.taskRun.updateMany({
194-
where: { id: existingRun.id, idempotencyKey },
195-
data: { idempotencyKey: null, idempotencyKeyExpiresAt: null },
196-
});
194+
await runStore.clearIdempotencyKey(
195+
{ byId: { runId: existingRun.id, idempotencyKey } },
196+
this.prisma
197+
);
197198

198199
return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt };
199200
}
@@ -207,10 +208,10 @@ export class IdempotencyKeyConcern {
207208
});
208209

209210
// Update the existing run to remove the idempotency key
210-
await this.prisma.taskRun.updateMany({
211-
where: { id: existingRun.id, idempotencyKey },
212-
data: { idempotencyKey: null, idempotencyKeyExpiresAt: null },
213-
});
211+
await runStore.clearIdempotencyKey(
212+
{ byId: { runId: existingRun.id, idempotencyKey } },
213+
this.prisma
214+
);
214215

215216
return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt };
216217
}

apps/webapp/app/services/metadata/updateMetadata.server.ts

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { Effect, Schedule, Duration, Fiber } from "effect";
1313
import { type RuntimeFiber } from "effect/Fiber";
1414
import { setTimeout } from "timers/promises";
1515
import { Logger, LogLevel } from "@trigger.dev/core/logger";
16+
import type { RunStore } from "@internal/run-store";
1617

1718
const RUN_UPDATABLE_WINDOW_MS = 60 * 60 * 1000; // 1 hour
1819

@@ -24,6 +25,7 @@ type BufferedRunMetadataChangeOperation = {
2425

2526
export type UpdateMetadataServiceOptions = {
2627
prisma: PrismaClientOrTransaction;
28+
runStore: RunStore;
2729
flushIntervalMs?: number;
2830
flushEnabled?: boolean;
2931
flushLoggingEnabled?: boolean;
@@ -49,6 +51,7 @@ export class UpdateMetadataService {
4951
private _bufferedOperations: Map<string, BufferedRunMetadataChangeOperation[]> = new Map();
5052
private _flushFiber: RuntimeFiber<void> | null = null;
5153
private readonly _prisma: PrismaClientOrTransaction;
54+
private readonly _runStore: RunStore;
5255
private readonly flushIntervalMs: number;
5356
private readonly flushEnabled: boolean;
5457
private readonly flushLoggingEnabled: boolean;
@@ -57,6 +60,7 @@ export class UpdateMetadataService {
5760

5861
constructor(private readonly options: UpdateMetadataServiceOptions) {
5962
this._prisma = options.prisma;
63+
this._runStore = options.runStore;
6064
this.flushIntervalMs = options.flushIntervalMs ?? 5000;
6165
this.flushEnabled = options.flushEnabled ?? true;
6266
this.flushLoggingEnabled = options.flushLoggingEnabled ?? true;
@@ -260,17 +264,16 @@ export class UpdateMetadataService {
260264
const writeTime = new Date();
261265
const result = yield* _(
262266
Effect.tryPromise(() =>
263-
this._prisma.taskRun.updateMany({
264-
where: {
265-
id: runId,
266-
metadataVersion: run.metadataVersion,
267-
},
268-
data: {
269-
metadata: newMetadataPacket.data,
267+
this._runStore.updateMetadata(
268+
runId,
269+
{
270+
metadata: newMetadataPacket.data!,
270271
metadataVersion: { increment: 1 },
271272
updatedAt: writeTime,
272273
},
273-
})
274+
{ expectedMetadataVersion: run.metadataVersion },
275+
this._prisma
276+
)
274277
)
275278
);
276279

@@ -469,20 +472,19 @@ export class UpdateMetadataService {
469472
// Update with optimistic locking; updatedAt stamped explicitly so the caller can
470473
// publish the exact committed watermark without a follow-up read.
471474
const writeTime = new Date();
472-
const result = await this._prisma.taskRun.updateMany({
473-
where: {
474-
id: runId,
475-
metadataVersion: run.metadataVersion,
476-
},
477-
data: {
478-
metadata: newMetadataPacket.data,
475+
const result = await this._runStore.updateMetadata(
476+
runId,
477+
{
478+
metadata: newMetadataPacket.data!,
479479
metadataType: newMetadataPacket.dataType,
480480
metadataVersion: {
481481
increment: 1,
482482
},
483483
updatedAt: writeTime,
484484
},
485-
});
485+
{ expectedMetadataVersion: run.metadataVersion },
486+
this._prisma
487+
);
486488

487489
if (result.count === 0) {
488490
if (this.flushLoggingEnabled) {
@@ -564,19 +566,19 @@ export class UpdateMetadataService {
564566
// Update the metadata without version check; updatedAt stamped explicitly so the
565567
// caller can publish the exact committed watermark.
566568
const writeTime = new Date();
567-
await this._prisma.taskRun.update({
568-
where: {
569-
id: runId,
570-
},
571-
data: {
572-
metadata: metadataPacket?.data,
569+
await this._runStore.updateMetadata(
570+
runId,
571+
{
572+
metadata: metadataPacket?.data!,
573573
metadataType: metadataPacket?.dataType,
574574
metadataVersion: {
575575
increment: 1,
576576
},
577577
updatedAt: writeTime,
578578
},
579-
});
579+
{},
580+
this._prisma
581+
);
580582
updatedAtMs = writeTime.getTime();
581583
}
582584

apps/webapp/app/services/metadata/updateMetadataInstance.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@ import { singleton } from "~/utils/singleton";
22
import { env } from "~/env.server";
33
import { UpdateMetadataService } from "./updateMetadata.server";
44
import { prisma } from "~/db.server";
5+
import { runStore } from "~/v3/runStore.server";
56
import { publishChangeRecord } from "~/services/realtime/runChangeNotifierInstance.server";
67

78
export const updateMetadataService = singleton(
89
"update-metadata-service",
910
() =>
1011
new UpdateMetadataService({
1112
prisma,
13+
runStore,
1214
flushIntervalMs: env.BATCH_METADATA_OPERATIONS_FLUSH_INTERVAL_MS,
1315
flushEnabled: env.BATCH_METADATA_OPERATIONS_FLUSH_ENABLED === "1",
1416
flushLoggingEnabled: env.BATCH_METADATA_OPERATIONS_FLUSH_LOGGING_ENABLED === "1",
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
import { PostgresRunStore } from "@internal/run-store";
2+
import { $replica, prisma } from "~/db.server";
3+
import { singleton } from "~/utils/singleton";
4+
5+
export const runStore = singleton(
6+
"PostgresRunStore",
7+
() => new PostgresRunStore({ prisma, readOnlyPrisma: $replica })
8+
);

apps/webapp/app/v3/services/baseService.server.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
11
import { Span, SpanKind } from "@opentelemetry/api";
2+
import type { RunStore } from "@internal/run-store";
23
import { $replica, PrismaClientOrTransaction, prisma } from "~/db.server";
34
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
45
import { attributesFromAuthenticatedEnv, tracer } from "../tracer.server";
56
import { engine, RunEngine } from "../runEngine.server";
7+
import { runStore as defaultRunStore } from "../runStore.server";
68
import { ServiceValidationError } from "./common.server";
79

810
export { ServiceValidationError };
911

1012
export abstract class BaseService {
1113
constructor(
1214
protected readonly _prisma: PrismaClientOrTransaction = prisma,
13-
protected readonly _replica: PrismaClientOrTransaction = $replica
15+
protected readonly _replica: PrismaClientOrTransaction = $replica,
16+
protected readonly runStore: RunStore = defaultRunStore
1417
) {}
1518

1619
protected async traceWithEnv<T>(

apps/webapp/app/v3/services/batchTriggerV3.server.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -408,10 +408,10 @@ export class BatchTriggerV3Service extends BaseService {
408408

409409
// Expire the cached runs that are no longer valid
410410
if (expiredRunIds.size) {
411-
await this._prisma.taskRun.updateMany({
412-
where: { friendlyId: { in: Array.from(expiredRunIds) } },
413-
data: { idempotencyKey: null },
414-
});
411+
await this.runStore.clearIdempotencyKey(
412+
{ byFriendlyIds: Array.from(expiredRunIds) },
413+
this._prisma
414+
);
415415
}
416416

417417
return runs;

apps/webapp/app/v3/services/rescheduleTaskRun.server.ts

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,14 @@ export class RescheduleTaskRunService extends BaseService {
1717
throw new ServiceValidationError(`Invalid delay: ${body.delay}`);
1818
}
1919

20-
const updatedRun = await this._prisma.taskRun.update({
21-
where: {
22-
id: taskRun.id,
23-
},
24-
data: {
20+
const updatedRun = await this.runStore.rescheduleRun(
21+
taskRun.id,
22+
{
2523
delayUntil: delay,
2624
queueTimestamp: delay,
2725
},
28-
});
26+
this._prisma
27+
);
2928

3029
if (updatedRun.engine === "V1") {
3130
await EnqueueDelayedRunService.reschedule(taskRun.id, delay);

0 commit comments

Comments
 (0)