Skip to content

Commit 618b921

Browse files
authored
feat(run-ops): webapp routes — friendlyId reads, cross-seam token resolution, co-location writes (#4123)
1 parent 0a51341 commit 618b921

41 files changed

Lines changed: 1925 additions & 549 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: feature
4+
---
5+
6+
Route dashboard and API run/waitpoint reads through the run store, and resolve public wait-token requests across both backing stores, so runs and tokens are found regardless of which store they reside on.

apps/webapp/app/components/admin/debugRun.tsx

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ function DebugRunData(props: UseDataFunctionReturn<typeof loader>) {
7777

7878
function DebugRunDataEngineV1({
7979
run,
80+
environment,
8081
queueConcurrencyLimit,
8182
queueCurrentConcurrency,
8283
envConcurrencyLimit,
@@ -121,7 +122,7 @@ function DebugRunDataEngineV1({
121122
<Property.Value className="flex items-center gap-2">
122123
<ClipboardField
123124
value={withPrefix(
124-
keys.queueKey(run.runtimeEnvironment, run.queue, run.concurrencyKey ?? undefined)
125+
keys.queueKey(environment, run.queue, run.concurrencyKey ?? undefined)
125126
)}
126127
variant="tertiary/small"
127128
iconButton
@@ -133,7 +134,7 @@ function DebugRunDataEngineV1({
133134
<Property.Value className="flex items-center gap-2">
134135
<ClipboardField
135136
value={`ZRANGE ${withPrefix(
136-
keys.queueKey(run.runtimeEnvironment, run.queue, run.concurrencyKey ?? undefined)
137+
keys.queueKey(environment, run.queue, run.concurrencyKey ?? undefined)
137138
)} 0 -1`}
138139
variant="tertiary/small"
139140
iconButton
@@ -146,7 +147,7 @@ function DebugRunDataEngineV1({
146147
<ClipboardField
147148
value={withPrefix(
148149
keys.queueCurrentConcurrencyKey(
149-
run.runtimeEnvironment,
150+
environment,
150151
run.queue,
151152
run.concurrencyKey ?? undefined
152153
)
@@ -163,7 +164,7 @@ function DebugRunDataEngineV1({
163164
<ClipboardField
164165
value={`SMEMBERS ${withPrefix(
165166
keys.queueCurrentConcurrencyKey(
166-
run.runtimeEnvironment,
167+
environment,
167168
run.queue,
168169
run.concurrencyKey ?? undefined
169170
)
@@ -185,7 +186,7 @@ function DebugRunDataEngineV1({
185186
<ClipboardField
186187
value={withPrefix(
187188
keys.queueReserveConcurrencyKeyFromQueue(
188-
keys.queueKey(run.runtimeEnvironment, run.queue, run.concurrencyKey ?? undefined)
189+
keys.queueKey(environment, run.queue, run.concurrencyKey ?? undefined)
189190
)
190191
)}
191192
variant="tertiary/small"
@@ -200,7 +201,7 @@ function DebugRunDataEngineV1({
200201
<ClipboardField
201202
value={`SMEMBERS ${withPrefix(
202203
keys.queueReserveConcurrencyKeyFromQueue(
203-
keys.queueKey(run.runtimeEnvironment, run.queue, run.concurrencyKey ?? undefined)
204+
keys.queueKey(environment, run.queue, run.concurrencyKey ?? undefined)
204205
)
205206
)}`}
206207
variant="tertiary/small"
@@ -218,7 +219,7 @@ function DebugRunDataEngineV1({
218219
<Property.Label>Queue concurrency limit key</Property.Label>
219220
<Property.Value className="flex items-center gap-2">
220221
<ClipboardField
221-
value={withPrefix(keys.queueConcurrencyLimitKey(run.runtimeEnvironment, run.queue))}
222+
value={withPrefix(keys.queueConcurrencyLimitKey(environment, run.queue))}
222223
variant="tertiary/small"
223224
iconButton
224225
/>
@@ -228,9 +229,7 @@ function DebugRunDataEngineV1({
228229
<Property.Label>GET queue concurrency limit</Property.Label>
229230
<Property.Value className="flex items-center gap-2">
230231
<ClipboardField
231-
value={`GET ${withPrefix(
232-
keys.queueConcurrencyLimitKey(run.runtimeEnvironment, run.queue)
233-
)}`}
232+
value={`GET ${withPrefix(keys.queueConcurrencyLimitKey(environment, run.queue))}`}
234233
variant="tertiary/small"
235234
iconButton
236235
/>
@@ -246,7 +245,7 @@ function DebugRunDataEngineV1({
246245
<Property.Label>Env current concurrency key</Property.Label>
247246
<Property.Value className="flex items-center gap-2">
248247
<ClipboardField
249-
value={withPrefix(keys.envCurrentConcurrencyKey(run.runtimeEnvironment))}
248+
value={withPrefix(keys.envCurrentConcurrencyKey(environment))}
250249
variant="tertiary/small"
251250
iconButton
252251
/>
@@ -256,7 +255,7 @@ function DebugRunDataEngineV1({
256255
<Property.Label>Get env current concurrency</Property.Label>
257256
<Property.Value className="flex items-center gap-2">
258257
<ClipboardField
259-
value={`SMEMBERS ${withPrefix(keys.envCurrentConcurrencyKey(run.runtimeEnvironment))}`}
258+
value={`SMEMBERS ${withPrefix(keys.envCurrentConcurrencyKey(environment))}`}
260259
variant="tertiary/small"
261260
iconButton
262261
/>
@@ -272,7 +271,7 @@ function DebugRunDataEngineV1({
272271
<Property.Label>Env reserve concurrency key</Property.Label>
273272
<Property.Value className="flex items-center gap-2">
274273
<ClipboardField
275-
value={withPrefix(keys.envReserveConcurrencyKey(run.runtimeEnvironment.id))}
274+
value={withPrefix(keys.envReserveConcurrencyKey(environment.id))}
276275
variant="tertiary/small"
277276
iconButton
278277
/>
@@ -282,9 +281,7 @@ function DebugRunDataEngineV1({
282281
<Property.Label>Get env reserve concurrency</Property.Label>
283282
<Property.Value className="flex items-center gap-2">
284283
<ClipboardField
285-
value={`SMEMBERS ${withPrefix(
286-
keys.envReserveConcurrencyKey(run.runtimeEnvironment.id)
287-
)}`}
284+
value={`SMEMBERS ${withPrefix(keys.envReserveConcurrencyKey(environment.id))}`}
288285
variant="tertiary/small"
289286
iconButton
290287
/>
@@ -300,7 +297,7 @@ function DebugRunDataEngineV1({
300297
<Property.Label>Env concurrency limit key</Property.Label>
301298
<Property.Value className="flex items-center gap-2">
302299
<ClipboardField
303-
value={withPrefix(keys.envConcurrencyLimitKey(run.runtimeEnvironment))}
300+
value={withPrefix(keys.envConcurrencyLimitKey(environment))}
304301
variant="tertiary/small"
305302
iconButton
306303
/>
@@ -310,7 +307,7 @@ function DebugRunDataEngineV1({
310307
<Property.Label>GET env concurrency limit</Property.Label>
311308
<Property.Value className="flex items-center gap-2">
312309
<ClipboardField
313-
value={`GET ${withPrefix(keys.envConcurrencyLimitKey(run.runtimeEnvironment))}`}
310+
value={`GET ${withPrefix(keys.envConcurrencyLimitKey(environment))}`}
314311
variant="tertiary/small"
315312
iconButton
316313
/>
@@ -326,7 +323,7 @@ function DebugRunDataEngineV1({
326323
<Property.Label>Shared queue key</Property.Label>
327324
<Property.Value className="flex items-center gap-2">
328325
<ClipboardField
329-
value={`GET ${withPrefix(keys.envSharedQueueKey(run.runtimeEnvironment))}`}
326+
value={`GET ${withPrefix(keys.envSharedQueueKey(environment))}`}
330327
variant="tertiary/small"
331328
iconButton
332329
/>
@@ -337,7 +334,7 @@ function DebugRunDataEngineV1({
337334
<Property.Value className="flex items-center gap-2">
338335
<ClipboardField
339336
value={`ZRANGEBYSCORE ${withPrefix(
340-
keys.envSharedQueueKey(run.runtimeEnvironment)
337+
keys.envSharedQueueKey(environment)
341338
)} -inf ${Date.now()} WITHSCORES`}
342339
variant="tertiary/small"
343340
iconButton

apps/webapp/app/routes/@.runs.$runParam.ts

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { redirect, type LoaderFunctionArgs } from "@remix-run/server-runtime";
22
import { z } from "zod";
33
import { prisma } from "~/db.server";
44
import { runStore } from "~/v3/runStore.server";
5+
import { controlPlaneResolver } from "~/v3/runOpsMigration/controlPlaneResolver.server";
56
import { redirectWithErrorMessage } from "~/models/message.server";
67
import { requireUser } from "~/services/session.server";
78
import { impersonate, rootPath, v3RunPath, v3RunSpanPath } from "~/utils/pathBuilder";
@@ -36,21 +37,7 @@ export async function loader({ params, request }: LoaderFunctionArgs) {
3637
{
3738
select: {
3839
spanId: true,
39-
runtimeEnvironment: {
40-
select: {
41-
slug: true,
42-
},
43-
},
44-
project: {
45-
select: {
46-
slug: true,
47-
organization: {
48-
select: {
49-
slug: true,
50-
},
51-
},
52-
},
53-
},
40+
runtimeEnvironmentId: true,
5441
},
5542
},
5643
prisma
@@ -90,10 +77,18 @@ export async function loader({ params, request }: LoaderFunctionArgs) {
9077
});
9178
}
9279

80+
const environment = await controlPlaneResolver.resolveAuthenticatedEnv(run.runtimeEnvironmentId);
81+
82+
if (!environment) {
83+
return redirectWithErrorMessage(rootPath(), request, "Run doesn't exist", {
84+
ephemeral: false,
85+
});
86+
}
87+
9388
const path = v3RunSpanPath(
94-
{ slug: run.project.organization.slug },
95-
{ slug: run.project.slug },
96-
{ slug: run.runtimeEnvironment.slug },
89+
{ slug: environment.organization.slug },
90+
{ slug: environment.project.slug },
91+
{ slug: environment.slug },
9792
{ friendlyId: runParam },
9893
{ spanId: run.spanId }
9994
);

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.batches/route.tsx

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,13 @@ import { findProjectBySlug } from "~/models/project.server";
4848
import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server";
4949
import { type BatchList, BatchListPresenter } from "~/presenters/v3/BatchListPresenter.server";
5050
import { requireUserId } from "~/services/session.server";
51+
import {
52+
$replica,
53+
runOpsNewReplicaClient,
54+
runOpsLegacyReplica,
55+
runOpsSplitReadEnabled,
56+
type PrismaClientOrTransaction,
57+
} from "~/db.server";
5158
import {
5259
docsPath,
5360
EnvironmentParamSchema,
@@ -90,7 +97,12 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
9097
};
9198
const filters = BatchListFilters.parse(s);
9299

93-
const presenter = new BatchListPresenter();
100+
const presenter = new BatchListPresenter(undefined, undefined, {
101+
runOpsNew: runOpsNewReplicaClient as unknown as PrismaClientOrTransaction,
102+
runOpsLegacyReplica: runOpsLegacyReplica as unknown as PrismaClientOrTransaction,
103+
controlPlaneReplica: $replica as unknown as PrismaClientOrTransaction,
104+
splitEnabled: runOpsSplitReadEnabled,
105+
});
94106
const list = await presenter.call({
95107
userId,
96108
projectId: project.id,

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.waitpoints.tokens.$waitpointParam/route.tsx

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@ import { useProject } from "~/hooks/useProject";
1111
import { findProjectBySlug } from "~/models/project.server";
1212
import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server";
1313
import { WaitpointPresenter } from "~/presenters/v3/WaitpointPresenter.server";
14+
import {
15+
runOpsNewReplicaClient,
16+
runOpsLegacyReplica,
17+
runOpsSplitReadEnabled,
18+
type PrismaClientOrTransaction,
19+
} from "~/db.server";
1420
import { requireUserId } from "~/services/session.server";
1521
import { cn } from "~/utils/cn";
1622
import { EnvironmentParamSchema, v3WaitpointTokensPath } from "~/utils/pathBuilder";
@@ -45,7 +51,11 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
4551
}
4652

4753
try {
48-
const presenter = new WaitpointPresenter();
54+
const presenter = new WaitpointPresenter(undefined, undefined, {
55+
newClient: runOpsNewReplicaClient as unknown as PrismaClientOrTransaction,
56+
legacyReplica: runOpsLegacyReplica as unknown as PrismaClientOrTransaction,
57+
splitEnabled: runOpsSplitReadEnabled,
58+
});
4959
const result = await presenter.call({
5060
friendlyId: waitpointParam,
5161
environmentId: environment.id,

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.waitpoints.tokens/route.tsx

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,12 @@ import { findProjectBySlug } from "~/models/project.server";
4242
import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server";
4343
import { WaitpointListPresenter } from "~/presenters/v3/WaitpointListPresenter.server";
4444
import { requireUserId } from "~/services/session.server";
45+
import {
46+
runOpsNewReplicaClient,
47+
runOpsLegacyReplica,
48+
runOpsSplitReadEnabled,
49+
type PrismaClientOrTransaction,
50+
} from "~/db.server";
4551
import { docsPath, EnvironmentParamSchema, v3WaitpointTokenPath } from "~/utils/pathBuilder";
4652

4753
export const meta: MetaFunction = () => {
@@ -88,7 +94,11 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
8894
}
8995

9096
try {
91-
const presenter = new WaitpointListPresenter();
97+
const presenter = new WaitpointListPresenter(undefined, undefined, {
98+
runOpsNew: runOpsNewReplicaClient as unknown as PrismaClientOrTransaction,
99+
runOpsLegacyReplica: runOpsLegacyReplica as unknown as PrismaClientOrTransaction,
100+
splitEnabled: runOpsSplitReadEnabled,
101+
});
92102
const result = await presenter.call({
93103
environment,
94104
...searchParams,

apps/webapp/app/routes/api.v1.batches.$batchId.ts

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { json } from "@remix-run/server-runtime";
22
import { z } from "zod";
3-
import { $replica } from "~/db.server";
43
import { anyResource, createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
4+
import { runStore } from "~/v3/runStore.server";
55

66
const ParamsSchema = z.object({
77
batchId: z.string(),
@@ -13,14 +13,8 @@ export const loader = createLoaderApiRoute(
1313
allowJWT: true,
1414
corsStrategy: "all",
1515
findResource: (params, auth) => {
16-
return $replica.batchTaskRun.findFirst({
17-
where: {
18-
friendlyId: params.batchId,
19-
runtimeEnvironmentId: auth.environment.id,
20-
},
21-
include: {
22-
errors: true,
23-
},
16+
return runStore.findBatchTaskRunByFriendlyId(params.batchId, auth.environment.id, {
17+
include: { errors: true },
2418
});
2519
},
2620
authorization: {

apps/webapp/app/routes/api.v1.batches.$batchParam.results.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import type { LoaderFunctionArgs } from "@remix-run/server-runtime";
22
import { json } from "@remix-run/server-runtime";
33
import { z } from "zod";
4+
import { $replica, runOpsNewReplica, runOpsSplitReadEnabled } from "~/db.server";
45
import { ApiBatchResultsPresenter } from "~/presenters/v3/ApiBatchResultsPresenter.server";
56
import { authenticateApiRequest } from "~/services/apiAuth.server";
67
import { logger } from "~/services/logger.server";
@@ -28,7 +29,11 @@ export async function loader({ request, params }: LoaderFunctionArgs) {
2829
const { batchParam } = parsed.data;
2930

3031
try {
31-
const presenter = new ApiBatchResultsPresenter();
32+
const presenter = new ApiBatchResultsPresenter(undefined, undefined, {
33+
newClient: runOpsNewReplica,
34+
legacyReplica: $replica,
35+
splitEnabled: runOpsSplitReadEnabled,
36+
});
3237
const result = await presenter.call(batchParam, authenticationResult.environment);
3338

3439
if (!result) {

apps/webapp/app/routes/api.v1.runs.$runFriendlyId.input-streams.wait.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,10 @@ const { action, loader } = createActionApiRoute(
7878
}
7979
}
8080

81-
// Step 1: Create the waitpoint
81+
// Create the waitpoint. Co-locate it with the owning run (run-ops split) so a ksuid
82+
// run's input-stream waitpoint lands on the run's DB and its block edge resolves.
8283
const result = await engine.createManualWaitpoint({
84+
runId: run.id,
8385
environmentId: authentication.environment.id,
8486
projectId: authentication.environment.projectId,
8587
idempotencyKey: body.idempotencyKey,
@@ -88,7 +90,7 @@ const { action, loader } = createActionApiRoute(
8890
tags: bodyTags,
8991
});
9092

91-
// Step 2: Cache the mapping in Redis for fast lookup from .send()
93+
// Cache the mapping in Redis for fast lookup from .send()
9294
const ttlMs = timeout ? timeout.getTime() - Date.now() : undefined;
9395
await setInputStreamWaitpoint(
9496
run.friendlyId,
@@ -97,7 +99,7 @@ const { action, loader } = createActionApiRoute(
9799
ttlMs && ttlMs > 0 ? ttlMs : undefined
98100
);
99101

100-
// Step 3: Check if data was already sent to this input stream (race condition handling).
102+
// Check if data was already sent to this input stream (race condition handling).
101103
// If .send() landed before .wait(), the data is in the S2 stream but no waitpoint
102104
// existed to complete. We check from the client's last known position.
103105
if (!result.isCached) {

0 commit comments

Comments
 (0)