Skip to content

Commit 3d6d068

Browse files
d-csclaude
andcommitted
fix(run-ops split): resolve public wait tokens across the split boundary
The public wait-token routes (complete, HTTP callback, retrieve) resolved the waitpoint with a bare read-through that defaulted its new-side client to the dedicated run-ops replica and gated on the async mint flag. A standalone wait token has a cuid id and, having no owning run, is written to the control-plane store, so under the split topology the run-ops replica does not hold it and the routes returned 404 "Waitpoint not found". Resolve these routes the same way the working waiter route does: fan out through the run-ops replica first, then the control-plane replica, so both a co-located (run-owned) waitpoint and a standalone control-plane token are found. Gate the fan-out on the URL-presence read gate rather than the mint flag, so read visibility spans both DBs whenever both are configured — including the window where both database URLs are set but the mint flag is off. The retrieve route hands the same fan-out clients and gate to ApiWaitpointPresenter. Adds a two-database testcontainer regression proving a control-plane-resident standalone token resolves via the legacy fallback under the read gate, and that the passthrough branch (gate off) misses it. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 1012e5b commit 3d6d068

4 files changed

Lines changed: 204 additions & 42 deletions

apps/webapp/app/routes/api.v1.waitpoints.tokens.$waitpointFriendlyId.callback.$hash.ts

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,18 @@ import { type ActionFunctionArgs, json } from "@remix-run/server-runtime";
22
import { type CompleteWaitpointTokenResponseBody, stringifyIO } from "@trigger.dev/core/v3";
33
import { WaitpointId } from "@trigger.dev/core/v3/isomorphic";
44
import { z } from "zod";
5-
import type { PrismaReplicaClient } from "~/db.server";
5+
import {
6+
$replica,
7+
type PrismaReplicaClient,
8+
runOpsNewReplica,
9+
runOpsSplitReadEnabled,
10+
} from "~/db.server";
611
import { env } from "~/env.server";
712
import { processWaitpointCompletionPacket } from "~/runEngine/concerns/waitpointCompletionPacket.server";
13+
import { resolveWaitpointThroughReadThrough } from "~/runEngine/concerns/resolveWaitpointThroughReadThrough.server";
814
import { verifyHttpCallbackHash } from "~/services/httpCallback.server";
915
import { logger } from "~/services/logger.server";
1016
import { controlPlaneResolver } from "~/v3/runOpsMigration/controlPlaneResolver.server";
11-
import { readThroughRun } from "~/v3/runOpsMigration/readThrough.server";
1217
import { engine } from "~/v3/runEngine.server";
1318

1419
const paramsSchema = z.object({
@@ -34,28 +39,27 @@ export async function action({ request, params }: ActionFunctionArgs) {
3439
const waitpointId = WaitpointId.toId(waitpointFriendlyId);
3540

3641
try {
37-
// Read through the split-aware run-ops read-through (passthrough in single-DB). The env is
38-
// resolved below from the row; residency is classified off the waitpoint id, so env "" is fine.
39-
const findWaitpoint = (client: PrismaReplicaClient) =>
40-
client.waitpoint.findFirst({
41-
where: {
42-
id: waitpointId,
43-
},
44-
select: { id: true, status: true, environmentId: true },
45-
});
46-
47-
const waitpointResult = await readThroughRun({
48-
runId: waitpointId,
42+
// Resolve wherever the waitpoint resides. The env is resolved below from the row; residency
43+
// is classified off the waitpoint id, so env "" is fine. Fan-out reads the run-ops replica
44+
// first, then the control-plane replica so both a co-located and a standalone token resolve,
45+
// gated on the URL-presence read gate so the fan-out spans both DBs independent of the mint flag.
46+
const waitpoint = await resolveWaitpointThroughReadThrough({
47+
waitpointId,
4948
environmentId: "",
50-
readNew: (client) => findWaitpoint(client),
51-
readLegacy: (replica) => findWaitpoint(replica),
49+
read: (client: PrismaReplicaClient) =>
50+
client.waitpoint.findFirst({
51+
where: {
52+
id: waitpointId,
53+
},
54+
select: { id: true, status: true, environmentId: true },
55+
}),
56+
deps: {
57+
newClient: runOpsNewReplica,
58+
legacyReplica: $replica,
59+
splitEnabled: runOpsSplitReadEnabled,
60+
},
5261
});
5362

54-
const waitpoint =
55-
waitpointResult.source === "new" || waitpointResult.source === "legacy-replica"
56-
? waitpointResult.value
57-
: null;
58-
5963
if (!waitpoint) {
6064
return json({ error: "Waitpoint not found" }, { status: 404 });
6165
}

apps/webapp/app/routes/api.v1.waitpoints.tokens.$waitpointFriendlyId.complete.ts

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,17 @@ import {
66
} from "@trigger.dev/core/v3";
77
import { WaitpointId } from "@trigger.dev/core/v3/isomorphic";
88
import { z } from "zod";
9-
import type { PrismaReplicaClient } from "~/db.server";
9+
import {
10+
$replica,
11+
type PrismaReplicaClient,
12+
runOpsNewReplica,
13+
runOpsSplitReadEnabled,
14+
} from "~/db.server";
1015
import { env } from "~/env.server";
1116
import { logger } from "~/services/logger.server";
1217
import { processWaitpointCompletionPacket } from "~/runEngine/concerns/waitpointCompletionPacket.server";
18+
import { resolveWaitpointThroughReadThrough } from "~/runEngine/concerns/resolveWaitpointThroughReadThrough.server";
1319
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
14-
import { readThroughRun } from "~/v3/runOpsMigration/readThrough.server";
1520
import { engine } from "~/v3/runEngine.server";
1621

1722
const { action, loader } = createActionApiRoute(
@@ -34,27 +39,27 @@ const { action, loader } = createActionApiRoute(
3439

3540
try {
3641
//check permissions
37-
// Read through the split-aware run-ops read-through (passthrough in single-DB).
38-
const findWaitpoint = (client: PrismaReplicaClient) =>
39-
client.waitpoint.findFirst({
40-
where: {
41-
id: waitpointId,
42-
environmentId: authentication.environment.id,
43-
},
44-
});
45-
46-
const waitpointResult = await readThroughRun({
47-
runId: waitpointId,
42+
// Resolve wherever the waitpoint resides: a standalone token lives on the control-plane
43+
// store, while a run-owned waitpoint co-locates with its run. Fan-out reads the run-ops
44+
// replica first, then the control-plane replica so both residencies resolve, gated on the
45+
// URL-presence read gate so the fan-out spans both DBs independent of the mint flag.
46+
const waitpoint = await resolveWaitpointThroughReadThrough({
47+
waitpointId,
4848
environmentId: authentication.environment.id,
49-
readNew: (client) => findWaitpoint(client),
50-
readLegacy: (replica) => findWaitpoint(replica),
49+
read: (client: PrismaReplicaClient) =>
50+
client.waitpoint.findFirst({
51+
where: {
52+
id: waitpointId,
53+
environmentId: authentication.environment.id,
54+
},
55+
}),
56+
deps: {
57+
newClient: runOpsNewReplica,
58+
legacyReplica: $replica,
59+
splitEnabled: runOpsSplitReadEnabled,
60+
},
5161
});
5262

53-
const waitpoint =
54-
waitpointResult.source === "new" || waitpointResult.source === "legacy-replica"
55-
? waitpointResult.value
56-
: null;
57-
5863
if (!waitpoint) {
5964
throw json({ error: "Waitpoint not found" }, { status: 404 });
6065
}

apps/webapp/app/routes/api.v1.waitpoints.tokens.$waitpointFriendlyId.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { json } from "@remix-run/server-runtime";
22
import { type WaitpointRetrieveTokenResponse } from "@trigger.dev/core/v3";
33
import { WaitpointId } from "@trigger.dev/core/v3/isomorphic";
44
import { z } from "zod";
5+
import { $replica, runOpsNewReplica, runOpsSplitReadEnabled } from "~/db.server";
56
import { ApiWaitpointPresenter } from "~/presenters/v3/ApiWaitpointPresenter.server";
67
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
78

@@ -13,7 +14,11 @@ export const loader = createLoaderApiRoute(
1314
findResource: async () => 1, // This is a dummy function, we don't need to find a resource
1415
},
1516
async ({ params, authentication }) => {
16-
const presenter = new ApiWaitpointPresenter();
17+
const presenter = new ApiWaitpointPresenter(undefined, undefined, {
18+
newClient: runOpsNewReplica,
19+
legacyReplica: $replica,
20+
splitEnabled: runOpsSplitReadEnabled,
21+
});
1722
const result: WaitpointRetrieveTokenResponse = await presenter.call(
1823
authentication.environment,
1924
WaitpointId.toId(params.waitpointFriendlyId)
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
// A standalone wait token is minted with a cuid id and, having no owning run, is written to the
2+
// control-plane store. Under the split topology the run-ops read replica is a distinct database
3+
// that does not hold it, so the public token routes must fan out to the control-plane replica or
4+
// the token is reported missing. These reads run as real queries against two containers.
5+
import { heteroPostgresTest } from "@internal/testcontainers";
6+
import { WaitpointId } from "@trigger.dev/core/v3/isomorphic";
7+
import { describe, expect, vi } from "vitest";
8+
import type { PrismaClient } from "@trigger.dev/database";
9+
import type { PrismaReplicaClient } from "~/db.server";
10+
import { resolveWaitpointThroughReadThrough } from "~/runEngine/concerns/resolveWaitpointThroughReadThrough.server";
11+
import { readThroughRun } from "./readThrough.server";
12+
13+
vi.setConfig({ testTimeout: 60_000 });
14+
15+
async function seedControlPlaneEnv(prisma: PrismaClient, suffix: string) {
16+
const organization = await prisma.organization.create({
17+
data: { title: `Org ${suffix}`, slug: `org-${suffix}` },
18+
});
19+
const project = await prisma.project.create({
20+
data: {
21+
name: `Project ${suffix}`,
22+
slug: `project-${suffix}`,
23+
externalRef: `proj_${suffix}`,
24+
organizationId: organization.id,
25+
},
26+
});
27+
const environment = await prisma.runtimeEnvironment.create({
28+
data: {
29+
type: "PRODUCTION",
30+
slug: `prod-${suffix}`,
31+
projectId: project.id,
32+
organizationId: organization.id,
33+
apiKey: `tr_prod_${suffix}`,
34+
pkApiKey: `pk_prod_${suffix}`,
35+
shortcode: `short_${suffix}`,
36+
maximumConcurrencyLimit: 10,
37+
},
38+
});
39+
return { organization, project, environment };
40+
}
41+
42+
async function seedStandaloneToken(prisma: PrismaClient, environmentId: string, projectId: string) {
43+
const { id, friendlyId } = WaitpointId.generate();
44+
await prisma.waitpoint.create({
45+
data: {
46+
id,
47+
friendlyId,
48+
type: "MANUAL",
49+
status: "PENDING",
50+
idempotencyKey: id,
51+
userProvidedIdempotencyKey: false,
52+
environmentId,
53+
projectId,
54+
},
55+
});
56+
return { id, friendlyId };
57+
}
58+
59+
describe("public wait-token resolution across the split boundary", () => {
60+
heteroPostgresTest(
61+
"a control-plane-resident standalone token is found when the run-ops replica does not hold it",
62+
async ({ prisma14, prisma17 }) => {
63+
const { project, environment } = await seedControlPlaneEnv(prisma14, "token_cp");
64+
const { id: waitpointId } = await seedStandaloneToken(prisma14, environment.id, project.id);
65+
66+
const waitpoint = await resolveWaitpointThroughReadThrough({
67+
waitpointId,
68+
environmentId: environment.id,
69+
read: (client: PrismaReplicaClient) =>
70+
client.waitpoint.findFirst({
71+
where: { id: waitpointId, environmentId: environment.id },
72+
}),
73+
deps: {
74+
newClient: prisma17 as unknown as PrismaReplicaClient,
75+
legacyReplica: prisma14 as unknown as PrismaReplicaClient,
76+
},
77+
});
78+
79+
expect(waitpoint).not.toBeNull();
80+
expect(waitpoint?.id).toBe(waitpointId);
81+
}
82+
);
83+
84+
heteroPostgresTest(
85+
"pinning both reads at the run-ops replica misses the control-plane token",
86+
async ({ prisma14, prisma17 }) => {
87+
const { project, environment } = await seedControlPlaneEnv(prisma14, "token_miss");
88+
const { id: waitpointId } = await seedStandaloneToken(prisma14, environment.id, project.id);
89+
90+
const waitpoint = await resolveWaitpointThroughReadThrough({
91+
waitpointId,
92+
environmentId: environment.id,
93+
read: (client: PrismaReplicaClient) =>
94+
client.waitpoint.findFirst({
95+
where: { id: waitpointId, environmentId: environment.id },
96+
}),
97+
deps: {
98+
newClient: prisma17 as unknown as PrismaReplicaClient,
99+
legacyReplica: prisma17 as unknown as PrismaReplicaClient,
100+
},
101+
});
102+
103+
expect(waitpoint).toBeNull();
104+
}
105+
);
106+
107+
heteroPostgresTest(
108+
"the read gate forces fan-out so a control-plane token resolves while the mint flag is off",
109+
async ({ prisma14, prisma17 }) => {
110+
const { project, environment } = await seedControlPlaneEnv(prisma14, "token_gate");
111+
const { id: waitpointId } = await seedStandaloneToken(prisma14, environment.id, project.id);
112+
113+
const read = (client: PrismaReplicaClient) =>
114+
client.waitpoint.findFirst({
115+
where: { id: waitpointId, environmentId: environment.id },
116+
});
117+
118+
const gated = await resolveWaitpointThroughReadThrough({
119+
waitpointId,
120+
environmentId: environment.id,
121+
read,
122+
deps: {
123+
splitEnabled: true,
124+
newClient: prisma17 as unknown as PrismaReplicaClient,
125+
legacyReplica: prisma14 as unknown as PrismaReplicaClient,
126+
},
127+
});
128+
129+
expect(gated).not.toBeNull();
130+
expect(gated?.id).toBe(waitpointId);
131+
132+
const passthrough = await readThroughRun({
133+
runId: waitpointId,
134+
environmentId: environment.id,
135+
readNew: (c) => read(c),
136+
readLegacy: (r) => read(r),
137+
deps: {
138+
splitEnabled: false,
139+
newClient: prisma17 as unknown as PrismaReplicaClient,
140+
legacyReplica: prisma14 as unknown as PrismaReplicaClient,
141+
},
142+
});
143+
144+
expect(gated).not.toBeNull();
145+
expect(passthrough.source).toBe("not-found");
146+
}
147+
);
148+
});

0 commit comments

Comments
 (0)