Skip to content

Commit 29a7ea0

Browse files
d-csclaude
andcommitted
feat(run-ops): ClickHouse multi-source replication fan-in + admin ops
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent d5415e8 commit 29a7ea0

9 files changed

Lines changed: 1732 additions & 136 deletions

apps/webapp/app/routes/admin.api.v1.runs-replication.backfill.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { prisma } from "~/db.server";
55
import { runStore } from "~/v3/runStore.server";
66
import { logger } from "~/services/logger.server";
77
import { requireAdminApiRequest } from "~/services/personalAccessToken.server";
8+
import { getRunsReplicationGlobal } from "~/services/runsReplicationGlobal.server";
89
import { runsReplicationInstance } from "~/services/runsReplicationInstance.server";
910
import { FINAL_RUN_STATUSES } from "~/v3/taskStatus";
1011

@@ -40,11 +41,12 @@ export async function action({ request }: ActionFunctionArgs) {
4041
runs.push(...batchRuns);
4142
}
4243

43-
if (!runsReplicationInstance) {
44+
const service = getRunsReplicationGlobal() ?? runsReplicationInstance;
45+
if (!service) {
4446
throw new Error("Runs replication instance not found");
4547
}
4648

47-
await runsReplicationInstance.backfill(
49+
await service.backfill(
4850
runs.map((run) => ({
4951
...run,
5052
masterQueue: run.workerQueue,
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import { type LoaderFunctionArgs, json } from "@remix-run/server-runtime";
2+
import Redis from "ioredis";
3+
import { env } from "~/env.server";
4+
import { requireAdminApiRequest } from "~/services/personalAccessToken.server";
5+
import { getRunsReplicationConfiguredSources } from "~/services/runsReplicationGlobal.server";
6+
7+
/**
8+
* Probes per-source replication leadership via the redlock leader-lock key, which
9+
* is DOUBLE-PREFIXED with `logical-replication-client:` — once from the connection's
10+
* keyPrefix and once from redlock's resource string. So we prefix this connection
11+
* with `runs-replication:logical-replication-client:` and EXISTS on the resource
12+
* `logical-replication-client:runs-replication:<id>`, resolving to:
13+
* runs-replication:logical-replication-client:logical-replication-client:runs-replication:<id>
14+
*/
15+
async function probeLeadership(sourceIds: string[]): Promise<Map<string, boolean>> {
16+
const leaders = new Map<string, boolean>();
17+
18+
const redis = new Redis({
19+
keyPrefix: "runs-replication:logical-replication-client:",
20+
port: env.RUN_REPLICATION_REDIS_PORT ?? undefined,
21+
host: env.RUN_REPLICATION_REDIS_HOST ?? undefined,
22+
username: env.RUN_REPLICATION_REDIS_USERNAME ?? undefined,
23+
password: env.RUN_REPLICATION_REDIS_PASSWORD ?? undefined,
24+
enableAutoPipelining: true,
25+
...(env.RUN_REPLICATION_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
26+
});
27+
28+
try {
29+
for (const id of sourceIds) {
30+
const exists = await redis.exists(`logical-replication-client:runs-replication:${id}`);
31+
leaders.set(id, exists === 1);
32+
}
33+
} finally {
34+
await redis.quit();
35+
}
36+
37+
return leaders;
38+
}
39+
40+
export async function loader({ request }: LoaderFunctionArgs) {
41+
await requireAdminApiRequest(request);
42+
43+
const sources = getRunsReplicationConfiguredSources();
44+
45+
if (!sources || sources.length === 0) {
46+
return json({ enabled: false, sources: [] });
47+
}
48+
49+
const leaders = await probeLeadership(sources.map((s) => s.id));
50+
51+
return json({
52+
enabled: env.RUN_REPLICATION_ENABLED === "1" && sources.length > 0,
53+
sources: sources.map((s) => ({
54+
id: s.id,
55+
slotName: s.slotName,
56+
originGeneration: s.originGeneration,
57+
leader: leaders.get(s.id) ?? false,
58+
})),
59+
});
60+
}

apps/webapp/app/services/runsReplicationGlobal.server.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,14 @@ import { RunsReplicationService } from "./runsReplicationService.server";
22

33
const GLOBAL_RUNS_REPLICATION_KEY = Symbol.for("dev.trigger.ts.runs-replication");
44
const GLOBAL_TCP_MONITOR_KEY = Symbol.for("dev.trigger.ts.tcp-monitor");
5+
const GLOBAL_RUNS_REPLICATION_SOURCES_KEY = Symbol.for("dev.trigger.ts.runs-replication-sources");
6+
7+
export type ConfiguredSource = { id: string; slotName: string; originGeneration: number };
58

69
type RunsReplicationGlobal = {
710
[GLOBAL_RUNS_REPLICATION_KEY]?: RunsReplicationService;
811
[GLOBAL_TCP_MONITOR_KEY]?: NodeJS.Timeout;
12+
[GLOBAL_RUNS_REPLICATION_SOURCES_KEY]?: ConfiguredSource[];
913
};
1014

1115
const _globalThis = typeof globalThis === "object" ? globalThis : global;
@@ -23,6 +27,14 @@ export function unregisterRunsReplicationGlobal() {
2327
delete _global[GLOBAL_RUNS_REPLICATION_KEY];
2428
}
2529

30+
export function getRunsReplicationConfiguredSources(): ConfiguredSource[] | undefined {
31+
return _global[GLOBAL_RUNS_REPLICATION_SOURCES_KEY];
32+
}
33+
34+
export function setRunsReplicationConfiguredSources(sources: ConfiguredSource[]) {
35+
_global[GLOBAL_RUNS_REPLICATION_SOURCES_KEY] = sources;
36+
}
37+
2638
export function getTcpMonitorGlobal(): NodeJS.Timeout | undefined {
2739
return _global[GLOBAL_TCP_MONITOR_KEY];
2840
}

apps/webapp/app/services/runsReplicationInstance.server.ts

Lines changed: 159 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,90 @@ import invariant from "tiny-invariant";
22
import { env } from "~/env.server";
33
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactoryInstance.server";
44
import { singleton } from "~/utils/singleton";
5+
import { isSplitEnabled } from "~/v3/runOpsMigration/splitMode.server";
56
import { meter, provider } from "~/v3/tracer.server";
6-
import { RunsReplicationService } from "./runsReplicationService.server";
7+
import {
8+
setRunsReplicationConfiguredSources,
9+
setRunsReplicationGlobal,
10+
} from "./runsReplicationGlobal.server";
11+
import {
12+
RunsReplicationService,
13+
type RunsReplicationSource,
14+
} from "./runsReplicationService.server";
715
import { signalsEmitter } from "./signals.server";
816

917
export const runsReplicationInstance = singleton(
1018
"runsReplicationInstance",
1119
initializeRunsReplicationInstance
1220
);
1321

22+
export function buildReplicationSources(args: {
23+
splitEnabled: boolean;
24+
legacyUrl: string;
25+
newUrl?: string;
26+
/** `false` forces the new source off under split; undefined follows split. */
27+
newSourceOverride?: boolean;
28+
legacySlotName: string;
29+
legacyPublicationName: string;
30+
legacyOriginGeneration: number;
31+
newSlotName: string;
32+
newPublicationName: string;
33+
newOriginGeneration: number;
34+
}): RunsReplicationSource[] {
35+
const legacy: RunsReplicationSource = {
36+
id: "legacy",
37+
pgConnectionUrl: args.legacyUrl,
38+
slotName: args.legacySlotName,
39+
publicationName: args.legacyPublicationName,
40+
originGeneration: args.legacyOriginGeneration,
41+
};
42+
43+
const newSourceOn = args.splitEnabled && !!args.newUrl && args.newSourceOverride !== false;
44+
45+
if (!newSourceOn || !args.newUrl) {
46+
return [legacy];
47+
}
48+
49+
const next: RunsReplicationSource = {
50+
id: "new",
51+
pgConnectionUrl: args.newUrl,
52+
slotName: args.newSlotName,
53+
publicationName: args.newPublicationName,
54+
originGeneration: args.newOriginGeneration,
55+
};
56+
57+
return [legacy, next];
58+
}
59+
60+
/**
61+
* The residency-split gate and the `#new`->ClickHouse replication gate are
62+
* independent env vars. If split is on (ksuid runs are minted on the new DB) but the
63+
* constructed sources[] has no `"new"` source, every ksuid run is silently missing from
64+
* ClickHouse — under-counting all CH-fronted usage/cost/metrics aggregates with no
65+
* Postgres fallback. Couple the gates at boot: this misconfiguration must fail loudly
66+
* rather than ship a fleet-wide under-count.
67+
*/
68+
export class SplitReplicationMisconfiguredError extends Error {
69+
constructor() {
70+
super(
71+
"RUN_OPS_SPLIT_ENABLED is on but the runs-replication sources[] has no \"new\" source: " +
72+
"ksuid runs on the new DB would not replicate to ClickHouse, under-counting every " +
73+
"ClickHouse-fronted aggregate. Enable the new replication source " +
74+
"(RUN_REPLICATION_NEW_ENABLED / RUN_OPS_DATABASE_URL) or turn the split off."
75+
);
76+
this.name = "SplitReplicationMisconfiguredError";
77+
}
78+
}
79+
80+
export function assertReplicationCoversSplit(args: {
81+
splitEnabled: boolean;
82+
sources: RunsReplicationSource[];
83+
}): void {
84+
if (args.splitEnabled && !args.sources.some((s) => s.id === "new")) {
85+
throw new SplitReplicationMisconfiguredError();
86+
}
87+
}
88+
1489
function initializeRunsReplicationInstance() {
1590
const { DATABASE_URL } = process.env;
1691
invariant(typeof DATABASE_URL === "string", "DATABASE_URL env var not set");
@@ -22,12 +97,11 @@ function initializeRunsReplicationInstance() {
2297

2398
console.log("🗃️ Runs replication service enabled");
2499

25-
const service = new RunsReplicationService({
100+
// Shared options for both the legacy-only and the multi-source constructions.
101+
// Excludes per-source identity (pgConnectionUrl/slotName/publicationName/sources).
102+
const baseReplicationOptions = {
26103
clickhouseFactory,
27-
pgConnectionUrl: DATABASE_URL,
28104
serviceName: "runs-replication",
29-
slotName: env.RUN_REPLICATION_SLOT_NAME,
30-
publicationName: env.RUN_REPLICATION_PUBLICATION_NAME,
31105
redisOptions: {
32106
keyPrefix: "runs-replication:",
33107
port: env.RUN_REPLICATION_REDIS_PORT ?? undefined,
@@ -55,24 +129,94 @@ function initializeRunsReplicationInstance() {
55129
insertStrategy: env.RUN_REPLICATION_INSERT_STRATEGY,
56130
disablePayloadInsert: env.RUN_REPLICATION_DISABLE_PAYLOAD_INSERT === "1",
57131
disableErrorFingerprinting: env.RUN_REPLICATION_DISABLE_ERROR_FINGERPRINTING === "1",
132+
};
133+
134+
// Construct the SINGLE legacy source synchronously (the split gate has not resolved
135+
// yet at module-init time, and singleton(...) memoizes this synchronous return value).
136+
let service = new RunsReplicationService({
137+
...baseReplicationOptions,
138+
pgConnectionUrl: DATABASE_URL,
139+
slotName: env.RUN_REPLICATION_SLOT_NAME,
140+
publicationName: env.RUN_REPLICATION_PUBLICATION_NAME,
58141
});
59142

143+
// Register the live handle so the status route + lifecycle routes can find it.
144+
setRunsReplicationGlobal(service);
145+
setRunsReplicationConfiguredSources([
146+
{
147+
id: "legacy",
148+
slotName: env.RUN_REPLICATION_SLOT_NAME,
149+
originGeneration: env.RUN_REPLICATION_LEGACY_ORIGIN_GENERATION,
150+
},
151+
]);
152+
60153
if (env.RUN_REPLICATION_ENABLED === "1") {
61-
clickhouseFactory
62-
.isReady()
63-
.then(() => service.start())
64-
.then(() => {
65-
console.log("🗃️ Runs replication service started");
154+
// Shape B (construct-after-gate): resolve the async split gate ONCE at boot, and
155+
// when both sources are enabled rebuild `service` with sources[] before starting.
156+
// The legacy-only instance above is never started in the dual path (no slot/lock
157+
// taken). runsReplicationService.server.ts is untouched. The create route also calls
158+
// setRunsReplicationGlobal — last-writer-wins is the existing contract.
159+
isSplitEnabled()
160+
.then((splitEnabled) => {
161+
const sources = buildReplicationSources({
162+
splitEnabled,
163+
legacyUrl: DATABASE_URL,
164+
newUrl: env.RUN_OPS_DATABASE_URL ?? env.TASK_RUN_DATABASE_URL,
165+
newSourceOverride:
166+
env.RUN_REPLICATION_NEW_ENABLED === "disabled" ? false : undefined,
167+
legacySlotName: env.RUN_REPLICATION_SLOT_NAME,
168+
legacyPublicationName: env.RUN_REPLICATION_PUBLICATION_NAME,
169+
legacyOriginGeneration: env.RUN_REPLICATION_LEGACY_ORIGIN_GENERATION,
170+
newSlotName: env.RUN_REPLICATION_NEW_SLOT_NAME,
171+
newPublicationName: env.RUN_REPLICATION_NEW_PUBLICATION_NAME,
172+
newOriginGeneration: env.RUN_REPLICATION_NEW_ORIGIN_GENERATION,
173+
});
174+
175+
// Refuse to start replication if split is on but `#new` is not a source.
176+
assertReplicationCoversSplit({ splitEnabled, sources });
177+
178+
if (sources.length > 1) {
179+
// The scalar pgConnectionUrl/slotName/publicationName remain required on the
180+
// options type, but are ignored when sources[] is non-empty — the
181+
// service normalizes off sources. Pass the legacy scalars to satisfy the type.
182+
service = new RunsReplicationService({
183+
...baseReplicationOptions,
184+
pgConnectionUrl: DATABASE_URL,
185+
slotName: env.RUN_REPLICATION_SLOT_NAME,
186+
publicationName: env.RUN_REPLICATION_PUBLICATION_NAME,
187+
sources,
188+
});
189+
setRunsReplicationGlobal(service);
190+
setRunsReplicationConfiguredSources(
191+
sources.map((s) => ({
192+
id: s.id,
193+
slotName: s.slotName,
194+
originGeneration: s.originGeneration,
195+
}))
196+
);
197+
}
198+
199+
return clickhouseFactory.isReady().then(() => service.start());
66200
})
201+
.then(() => console.log("🗃️ Runs replication service started"))
67202
.catch((error) => {
68-
console.error("🗃️ Runs replication service failed to start", {
69-
error,
70-
});
203+
if (error instanceof SplitReplicationMisconfiguredError) {
204+
// A silent ClickHouse under-count is worse than a crash — make it fatal.
205+
console.error("🚨 FATAL: run-ops split / ClickHouse replication misconfiguration", {
206+
error,
207+
});
208+
process.exit(1);
209+
}
210+
console.error("🗃️ Runs replication service failed to start", { error });
71211
});
72212

73-
signalsEmitter.on("SIGTERM", service.shutdown.bind(service));
74-
signalsEmitter.on("SIGINT", service.shutdown.bind(service));
213+
// Closures over the `let` so SIGTERM/SIGINT hit whichever instance is live (NOT a
214+
// stale .bind() to the discarded legacy-only instance).
215+
signalsEmitter.on("SIGTERM", () => service.shutdown());
216+
signalsEmitter.on("SIGINT", () => service.shutdown());
75217
}
76218

219+
// Returns the legacy-only instance synchronously (singleton memoizes this). Lifecycle
220+
// routes read getRunsReplicationGlobal() first, so they get the live multi-source one.
77221
return service;
78222
}

0 commit comments

Comments
 (0)