Skip to content

Commit 84f3e1b

Browse files
d-csclaude
andauthored
feat(run-ops): webapp write path — trigger/batch minting, idempotency routing, run lifecycle (#4118)
## What Routes the webapp write path through the run-ops split seam: trigger/batch minting, idempotency-key resolution, and the run-lifecycle services now determine residency and dispatch writes to the correct store. - **Trigger & batch** (`runEngine/services/triggerTask.server.ts`, `batchTrigger.server.ts`, `createBatch.server.ts`, `streamBatchItems.server.ts`, `v3/services/batchTriggerV3.server.ts`): mint ids with the run-ops-aware minting and route creation/streaming through the store; batch children inherit the parent's residency. - **Idempotency** (`runEngine/concerns/idempotencyKeys.server.ts` + new `idempotencyResidency.server.ts`): idempotency-key lookup/dedup is residency-aware so a keyed retrigger resolves against the store that owns the original run. - **Run lifecycle services** (`createCheckpoint`, `createTaskRunAttempt`, `enqueueDelayedRun`, `expireEnqueuedRun`, `finalizeTaskRun`, `resumeBatchRun`, `cancelDevSessionRuns`, `executeTasksWaitingForDeploy`, `triggerFailedTask`): resolve their target run through the store rather than a fixed client. - **Reads that fan out from writes** (`runsRepository` + `clickhouseRunsRepository`, `BulkActionV2` + batch read-through, realtime `sessions`/`runReader`, alerts `deliverAlert`/`performTaskRunAlerts`): route through the read-through resolver. - `9535ae63d` — resolves the parent run through an injectable run store in `TriggerFailedTaskService`. - `bf8f7c881` — drops the "known-migrated" concept from write-path and read repos; residency is id-shape only. - `515b897ea` — self-defaults `resolveWaitpointThroughReadThrough` to the safe run-ops clients. ## Why PR6 of the run-ops split stack. This is the write-path counterpart to the read foundation in the previous PRs: with it in place, both reads and writes route through the seam. Additive when the split is disabled (id-shape resolution collapses to the control-plane client); behavior-changing on the minting, idempotency, and lifecycle paths when enabled. ## Tests Large new/expanded vitest suite under `apps/webapp/test/` and colocated service tests: trigger-task and batch-trigger store routing, residency inheritance, idempotency dedup residency + legacy-authority, bulk-action read routing, cancel-dev-session routing, alerts store routing, runs-repository read-through, realtime session/run-reader read-through and stream-registration routing, and the waitpoint read-through default. Testcontainers-backed; no mocks. ## Notes Draft, **stacked on #4117** (`runops/pr05-webapp-foundation`). Review that first; this diff is against it. Server-change / changeset note to be added at stack-assembly time. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 8465ac5 commit 84f3e1b

54 files changed

Lines changed: 7188 additions & 665 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: feature
4+
---
5+
6+
Route the webapp write path — trigger/batch run minting, idempotency-key resolution, and run lifecycle writes — through the run store so runs can be created and mutated on the dedicated run-ops database.

apps/webapp/app/models/runtimeEnvironment.server.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -358,8 +358,11 @@ export async function disconnectSession(environmentId: string) {
358358
return session;
359359
}
360360

361-
export async function findLatestSession(environmentId: string) {
362-
const session = await $replica.runtimeEnvironmentSession.findFirst({
361+
export async function findLatestSession(
362+
environmentId: string,
363+
client: PrismaClientOrTransaction = $replica
364+
) {
365+
const session = await client.runtimeEnvironmentSession.findFirst({
363366
where: {
364367
environmentId,
365368
},

apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts

Lines changed: 49 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.se
1111
import { claimOrAwait } from "~/v3/mollifier/idempotencyClaim.server";
1212
import { makeResolveMollifierFlag } from "~/v3/mollifier/mollifierGate.server";
1313
import { runStore } from "~/v3/runStore.server";
14+
import { runOpsLegacyPrisma, runOpsNewPrisma } from "~/db.server";
15+
import { isSplitEnabled } from "~/v3/runOpsMigration/splitMode.server";
16+
import { resolveRunIdMintKind } from "~/v3/engineVersion.server";
17+
import { resolveIdempotencyDedupClient } from "./idempotencyResidency.server";
1418
import type { TraceEventConcern, TriggerTaskRequest } from "../types";
1519

1620
// In-memory per-org mollifier-enabled check, shared with `evaluateGate`
@@ -147,6 +151,28 @@ export class IdempotencyKeyConcern {
147151
return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt };
148152
}
149153

154+
// Probe and clears must hit the DB where the would-be run will physically live.
155+
const dedupClient = await resolveIdempotencyDedupClient(
156+
{
157+
environmentForMint: {
158+
organizationId: request.environment.organizationId,
159+
id: request.environment.id,
160+
orgFeatureFlags: request.environment.organization?.featureFlags,
161+
},
162+
parentRunFriendlyId: request.body.options?.parentRunId,
163+
},
164+
{
165+
isSplitEnabled,
166+
fallbackClient: this.prisma,
167+
newClient: runOpsNewPrisma,
168+
legacyClient: runOpsLegacyPrisma,
169+
resolveMintKind: resolveRunIdMintKind,
170+
// `isMigrated` is intentionally omitted: until a child of a swept
171+
// legacy-id parent can be born on the new DB, the swept-marker override
172+
// would never change the answer, so a child routes by parent id-shape.
173+
}
174+
);
175+
150176
const existingRun = idempotencyKey
151177
? await runStore.findRun(
152178
{
@@ -159,7 +185,7 @@ export class IdempotencyKeyConcern {
159185
associatedWaitpoint: true,
160186
},
161187
},
162-
this.prisma
188+
dedupClient
163189
)
164190
: undefined;
165191

@@ -193,7 +219,7 @@ export class IdempotencyKeyConcern {
193219
// Update the existing run to remove the idempotency key
194220
await runStore.clearIdempotencyKey(
195221
{ byId: { runId: existingRun.id, idempotencyKey } },
196-
this.prisma
222+
dedupClient
197223
);
198224

199225
return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt };
@@ -210,7 +236,7 @@ export class IdempotencyKeyConcern {
210236
// Update the existing run to remove the idempotency key
211237
await runStore.clearIdempotencyKey(
212238
{ byId: { runId: existingRun.id, idempotencyKey } },
213-
this.prisma
239+
dedupClient
214240
);
215241

216242
return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt };
@@ -249,7 +275,6 @@ export class IdempotencyKeyConcern {
249275
? `${event.traceparent.spanId}:${event.spanId}`
250276
: event.spanId;
251277

252-
//block run with waitpoint
253278
await this.engine.blockRunWithWaitpoint({
254279
runId: RunId.fromFriendlyId(parentRunId),
255280
waitpoints: associatedWaitpoint!.id,
@@ -262,7 +287,7 @@ export class IdempotencyKeyConcern {
262287
: undefined,
263288
projectId: request.environment.projectId,
264289
organizationId: request.environment.organizationId,
265-
tx: this.prisma,
290+
tx: dedupClient,
266291
});
267292
}
268293
);
@@ -277,24 +302,13 @@ export class IdempotencyKeyConcern {
277302
// (resumeParentOnCompletion) — that path bypasses the gate entirely
278303
// and its existing PG-side dedup is sufficient.
279304
//
280-
// Also gated on the same per-org mollifier flag the gate uses: when
281-
// `TRIGGER_MOLLIFIER_ENABLED=1` globally for staged rollout, the buffer
282-
// singleton is constructed and `claimOrAwait` would otherwise issue a
283-
// Redis SETNX for EVERY idempotency-keyed trigger — including orgs
284-
// that haven't opted in. Those orgs never enter the mollify branch
285-
// (the gate always returns pass_through for them), so there's no
286-
// buffer activity to serialise against; PG's unique constraint
287-
// already deduplicates concurrent same-key races. Resolving the org
288-
// flag is a pure in-memory read of `Organization.featureFlags` — no
289-
// DB query, same predicate the gate uses — keeping the claim's Redis
290-
// RTT off the hot path for non-opted-in orgs during incremental
291-
// rollout.
292-
// Match the gate's bypass list (`mollifierGate.server.ts:158-175`).
293-
// debounce + oneTimeUseToken triggers always return pass_through from
294-
// the gate, so claiming a Redis SETNX here is wasted RTT on the
295-
// trigger hot path. Excluding them keeps the claim aligned with the
296-
// gate — if the gate would never mollify the request, there's no
297-
// buffer to serialise against.
305+
// Gated on the same per-org mollifier flag the gate uses, and the same
306+
// bypass list (debounce + oneTimeUseToken): if the gate would never mollify
307+
// the request, there's no buffer to serialise against and PG's unique
308+
// constraint already deduplicates concurrent same-key races. Skipping the
309+
// claim's Redis SETNX keeps its RTT off the hot path for those requests
310+
// during staged rollout. The org-flag check is a pure in-memory read of
311+
// `Organization.featureFlags`, no DB query.
298312
const claimEligible =
299313
!request.body.options?.resumeParentOnCompletion &&
300314
!request.body.options?.debounce &&
@@ -336,7 +350,7 @@ export class IdempotencyKeyConcern {
336350
taskIdentifier: request.taskId,
337351
},
338352
{ include: { associatedWaitpoint: true } },
339-
this.prisma
353+
dedupClient
340354
);
341355
if (writerRun) {
342356
return { isCached: true, run: writerRun };
@@ -350,27 +364,18 @@ export class IdempotencyKeyConcern {
350364
if (buffered) {
351365
return { isCached: true, run: buffered };
352366
}
353-
// Claim resolved to a runId nothing can find — the run was
354-
// genuinely lost (claimant errored after publish, drain failed,
355-
// or both the PG row and buffer entry TTL'd out). This is
356-
// terminal, not transient: `lookupIdempotency` self-heals a
357-
// dangling pointer, and `ack` keeps the entry hash as a
358-
// read-fallback past the PG write, so re-polling cannot conjure
359-
// a run that is gone. Falling through to a fresh trigger is the
360-
// correct recovery.
367+
// Claim resolved to a runId nothing can find — the run was genuinely
368+
// lost (claimant errored after publish, or both the PG row and buffer
369+
// entry TTL'd out). Terminal, not transient, so falling through to a
370+
// fresh trigger is the correct recovery.
361371
//
362-
// Why falling through claimless is safe (no duplicate runs):
363-
// concurrent triggers that also fall through here converge on a
364-
// single run via the same dedup backstops the claim layer relies
365-
// on — the PG unique constraint on the idempotency key
366-
// (RunDuplicateIdempotencyKeyError → retry resolves to the
367-
// winner) for the pass-through path, and `accept`'s idempotency
368-
// SETNX (`duplicate_idempotency`) for the mollify path. Once the
369-
// first fall-through commits a run, later callers find it via the
370-
// writer-PG / buffer lookups above despite the stale `resolved:`
371-
// slot, which the slot's TTL clears within ~30s. The residual
372-
// cost is a few redundant (deduped) trigger attempts in that
373-
// window, not duplicate runs.
372+
// Falling through claimless doesn't duplicate runs: concurrent
373+
// fall-throughs converge on one run via the same dedup backstops the
374+
// claim layer relies on — PG's unique constraint on the idempotency key
375+
// (pass-through path) and `accept`'s SETNX (mollify path). Once the
376+
// first commits, later callers find it via the writer-PG / buffer
377+
// lookups above despite the stale `resolved:` slot (cleared by its ~30s
378+
// TTL). Residual cost is a few deduped trigger attempts, not dup runs.
374379
logger.warn("idempotency claim resolved but runId not findable", {
375380
envId: request.environment.id,
376381
taskIdentifier: request.taskId,
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import { describe, expect, it } from "vitest";
2+
import { RunId } from "@trigger.dev/core/v3/isomorphic";
3+
import {
4+
resolveIdempotencyDedupClient,
5+
type ResolveIdempotencyClientDeps,
6+
} from "./idempotencyResidency.server";
7+
8+
// Distinct sentinel objects so we can assert WHICH client was selected by reference.
9+
const FALLBACK = { __tag: "fallback" } as never;
10+
const NEW_CLIENT = { __tag: "new" } as never;
11+
const LEGACY_CLIENT = { __tag: "legacy" } as never;
12+
13+
function makeDeps(over: Partial<ResolveIdempotencyClientDeps>): ResolveIdempotencyClientDeps {
14+
return {
15+
isSplitEnabled: async () => true,
16+
fallbackClient: FALLBACK,
17+
newClient: NEW_CLIENT,
18+
legacyClient: LEGACY_CLIENT,
19+
resolveMintKind: async () => "ksuid",
20+
classify: (id) => {
21+
if (id.length === 27) return "NEW";
22+
if (id.length === 25) return "LEGACY";
23+
throw new Error(`unclassifiable: ${id.length}`);
24+
},
25+
isMigrated: undefined,
26+
...over,
27+
};
28+
}
29+
30+
const env = { organizationId: "org_1", id: "env_1", orgFeatureFlags: {} };
31+
32+
describe("resolveIdempotencyDedupClient", () => {
33+
it("returns the fallback client unchanged when split is disabled", async () => {
34+
const client = await resolveIdempotencyDedupClient(
35+
{ environmentForMint: env, parentRunFriendlyId: undefined },
36+
makeDeps({ isSplitEnabled: async () => false })
37+
);
38+
expect(client).toBe(FALLBACK);
39+
});
40+
41+
it("routes a root run to the NEW client when the env mints ksuid", async () => {
42+
const client = await resolveIdempotencyDedupClient(
43+
{ environmentForMint: env, parentRunFriendlyId: undefined },
44+
makeDeps({ resolveMintKind: async () => "ksuid" })
45+
);
46+
expect(client).toBe(NEW_CLIENT);
47+
});
48+
49+
it("routes a root run to the LEGACY client when the env mints cuid", async () => {
50+
const client = await resolveIdempotencyDedupClient(
51+
{ environmentForMint: env, parentRunFriendlyId: undefined },
52+
makeDeps({ resolveMintKind: async () => "cuid" })
53+
);
54+
expect(client).toBe(LEGACY_CLIENT);
55+
});
56+
57+
it("routes a child to the NEW client when the ksuid parent is NEW-resident", async () => {
58+
const ksuidParent = RunId.toFriendlyId("a".repeat(27));
59+
const client = await resolveIdempotencyDedupClient(
60+
{ environmentForMint: env, parentRunFriendlyId: ksuidParent },
61+
makeDeps({ resolveMintKind: async () => "cuid" }) // mint flag must NOT win for a child
62+
);
63+
expect(client).toBe(NEW_CLIENT);
64+
});
65+
66+
it("routes a child to the LEGACY client when the cuid parent is LEGACY-resident", async () => {
67+
const cuidParent = RunId.toFriendlyId("b".repeat(25));
68+
const client = await resolveIdempotencyDedupClient(
69+
{ environmentForMint: env, parentRunFriendlyId: cuidParent },
70+
makeDeps({ resolveMintKind: async () => "ksuid" }) // mint flag must NOT win for a child
71+
);
72+
expect(client).toBe(LEGACY_CLIENT);
73+
});
74+
75+
it("routes a swept (migrated) cuid-parent child to the NEW client", async () => {
76+
const cuidParent = RunId.toFriendlyId("c".repeat(25));
77+
const client = await resolveIdempotencyDedupClient(
78+
{ environmentForMint: env, parentRunFriendlyId: cuidParent },
79+
makeDeps({ isMigrated: async () => true })
80+
);
81+
expect(client).toBe(NEW_CLIENT);
82+
});
83+
84+
it("routes a non-migrated cuid-parent child to the LEGACY client even when isMigrated is provided", async () => {
85+
const cuidParent = RunId.toFriendlyId("d".repeat(25));
86+
const client = await resolveIdempotencyDedupClient(
87+
{ environmentForMint: env, parentRunFriendlyId: cuidParent },
88+
makeDeps({ isMigrated: async () => false })
89+
);
90+
expect(client).toBe(LEGACY_CLIENT);
91+
});
92+
93+
it("falls back to the fallback client when a present parent id is unclassifiable", async () => {
94+
const client = await resolveIdempotencyDedupClient(
95+
{ environmentForMint: env, parentRunFriendlyId: "run_not-a-valid-length" },
96+
makeDeps({})
97+
);
98+
expect(client).toBe(FALLBACK);
99+
});
100+
});
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import { ownerEngine, RunId, type Residency } from "@trigger.dev/core/v3/isomorphic";
2+
import type { PrismaClientOrTransaction } from "@trigger.dev/database";
3+
4+
type MintKind = "cuid" | "ksuid";
5+
6+
export type ResolveIdempotencyClientDeps = {
7+
isSplitEnabled: () => Promise<boolean>;
8+
fallbackClient: PrismaClientOrTransaction;
9+
newClient: PrismaClientOrTransaction;
10+
legacyClient: PrismaClientOrTransaction;
11+
resolveMintKind: (environment: {
12+
organizationId: string;
13+
id: string;
14+
orgFeatureFlags?: unknown;
15+
}) => Promise<MintKind>;
16+
classify?: (id: string) => Residency;
17+
isMigrated?: (id: string) => Promise<boolean>;
18+
};
19+
20+
export async function resolveIdempotencyDedupClient(
21+
args: {
22+
environmentForMint: { organizationId: string; id: string; orgFeatureFlags?: unknown };
23+
parentRunFriendlyId: string | undefined;
24+
},
25+
deps: ResolveIdempotencyClientDeps
26+
): Promise<PrismaClientOrTransaction> {
27+
if (!(await deps.isSplitEnabled())) {
28+
return deps.fallbackClient;
29+
}
30+
31+
const classify = deps.classify ?? ownerEngine;
32+
const clientFor = (residency: Residency): PrismaClientOrTransaction =>
33+
residency === "NEW" ? deps.newClient : deps.legacyClient;
34+
35+
if (args.parentRunFriendlyId) {
36+
let parentInternalId: string;
37+
try {
38+
parentInternalId = RunId.fromFriendlyId(args.parentRunFriendlyId);
39+
} catch {
40+
return deps.fallbackClient;
41+
}
42+
let residency: Residency;
43+
try {
44+
residency = classify(parentInternalId);
45+
} catch {
46+
return deps.fallbackClient;
47+
}
48+
if (residency === "LEGACY" && deps.isMigrated && (await deps.isMigrated(parentInternalId))) {
49+
return deps.newClient;
50+
}
51+
return clientFor(residency);
52+
}
53+
54+
const kind = await deps.resolveMintKind(args.environmentForMint);
55+
return clientFor(kind === "ksuid" ? "NEW" : "LEGACY");
56+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import type { PrismaReplicaClient } from "~/db.server";
2+
import {
3+
$replica as defaultLegacyReplica,
4+
runOpsNewReplica as defaultNewClient,
5+
runOpsSplitReadEnabled as defaultSplitReadEnabled,
6+
} from "~/db.server";
7+
import { readThroughRun } from "~/v3/runOpsMigration/readThrough.server";
8+
9+
type ResolveWaitpointDeps = {
10+
newClient?: PrismaReplicaClient;
11+
legacyReplica?: PrismaReplicaClient;
12+
splitEnabled?: boolean;
13+
isPastRetention?: (id: string) => boolean;
14+
};
15+
16+
// Safe defaults matching the deps `complete`/`callback` pass, so a bare caller still fans
17+
// out to the dedicated run-ops replica (NEW-resident waitpoints) before control-plane.
18+
export type ResolveWaitpointReadThroughDefaults = {
19+
newClient: PrismaReplicaClient;
20+
legacyReplica: PrismaReplicaClient;
21+
splitEnabled: boolean;
22+
};
23+
24+
const productionDefaults: ResolveWaitpointReadThroughDefaults = {
25+
newClient: defaultNewClient,
26+
legacyReplica: defaultLegacyReplica,
27+
splitEnabled: defaultSplitReadEnabled,
28+
};
29+
30+
export async function resolveWaitpointThroughReadThrough<T>(opts: {
31+
waitpointId: string;
32+
environmentId: string;
33+
read: (client: PrismaReplicaClient) => Promise<T | null>;
34+
deps?: ResolveWaitpointDeps;
35+
defaults?: ResolveWaitpointReadThroughDefaults;
36+
}): Promise<T | null> {
37+
const defaults = opts.defaults ?? productionDefaults;
38+
39+
const result = await readThroughRun({
40+
runId: opts.waitpointId,
41+
environmentId: opts.environmentId,
42+
readNew: (client) => opts.read(client),
43+
readLegacy: (replica) => opts.read(replica),
44+
deps: {
45+
splitEnabled: opts.deps?.splitEnabled ?? defaults.splitEnabled,
46+
newClient: opts.deps?.newClient ?? defaults.newClient,
47+
legacyReplica: opts.deps?.legacyReplica ?? defaults.legacyReplica,
48+
isPastRetention: opts.deps?.isPastRetention,
49+
},
50+
});
51+
52+
return result.source === "new" || result.source === "legacy-replica" ? result.value : null;
53+
}

0 commit comments

Comments
 (0)