Skip to content

Commit 5be6a4f

Browse files
d-csclaude
andauthored
feat(run-ops): ClickHouse multi-source replication fan-in + admin ops (#4119)
## What Extends the ClickHouse runs-replication service to fan in from multiple Postgres sources (the control-plane DB and the run-ops DB) instead of a single source, plus the admin operations to run and observe it. - **Multi-source fan-in** (`services/runsReplicationService.server.ts`, new `runsReplicationInstance.server.ts`, `runsReplicationGlobal.server.ts`): factors the replication service into per-source instances and a coordinator so a single ClickHouse target is fed from more than one Postgres source. - **Admin ops** (`routes/admin.api.v1.runs-replication.status.ts`, `admin.api.v1.runs-replication.backfill.ts`, `v3/services/adminWorker.server.ts`): adds a status endpoint reporting per-source replication state and updates the backfill entrypoint for the multi-source shape. ## Why PR7 of the run-ops split stack, and the final piece: once run state can live in a separate run-ops DB (earlier PRs), the analytics replication into ClickHouse has to consume both sources so runs remain queryable regardless of residency. Behavior-changing for the replication service internals; the ClickHouse-facing output is unchanged (still one runs stream), and single-source operation is preserved when the split is not enabled. ## Tests New vitest coverage: `runsReplicationInstance.test.ts` (per-source instance behavior) and `runsReplicationService.part8`/`part9` suites exercising the multi-source coordinator. Testcontainers-backed (ClickHouse + Postgres); no mocks. ## Notes Draft, **stacked on #4118** (`runops/pr06-write-path`). Review that first; this diff is against it. Server-change / changeset note to be added at stack-assembly time. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 84f3e1b commit 5be6a4f

10 files changed

Lines changed: 1766 additions & 138 deletions
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: feature
4+
---
5+
6+
Replicate task runs into ClickHouse from multiple source databases so the run-ops DB split can fan both databases into analytics, with an admin status endpoint reporting per-source replication leadership.

apps/webapp/app/routes/admin.api.v1.runs-replication.backfill.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { prisma } from "~/db.server";
55
import { runStore } from "~/v3/runStore.server";
66
import { logger } from "~/services/logger.server";
77
import { requireAdminApiRequest } from "~/services/personalAccessToken.server";
8+
import { getRunsReplicationGlobal } from "~/services/runsReplicationGlobal.server";
89
import { runsReplicationInstance } from "~/services/runsReplicationInstance.server";
910
import { FINAL_RUN_STATUSES } from "~/v3/taskStatus";
1011

@@ -40,11 +41,12 @@ export async function action({ request }: ActionFunctionArgs) {
4041
runs.push(...batchRuns);
4142
}
4243

43-
if (!runsReplicationInstance) {
44+
const service = getRunsReplicationGlobal() ?? runsReplicationInstance;
45+
if (!service) {
4446
throw new Error("Runs replication instance not found");
4547
}
4648

47-
await runsReplicationInstance.backfill(
49+
await service.backfill(
4850
runs.map((run) => ({
4951
...run,
5052
masterQueue: run.workerQueue,
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import { type LoaderFunctionArgs, json } from "@remix-run/server-runtime";
2+
import Redis from "ioredis";
3+
import { env } from "~/env.server";
4+
import { requireAdminApiRequest } from "~/services/personalAccessToken.server";
5+
import { getRunsReplicationConfiguredSources } from "~/services/runsReplicationGlobal.server";
6+
7+
/**
8+
* Probes per-source replication leadership via the redlock leader-lock key, which
9+
* is DOUBLE-PREFIXED with `logical-replication-client:` — once from the connection's
10+
* keyPrefix and once from redlock's resource string. So we prefix this connection
11+
* with `runs-replication:logical-replication-client:` and EXISTS on the resource
12+
* `logical-replication-client:runs-replication:<id>`, resolving to:
13+
* runs-replication:logical-replication-client:logical-replication-client:runs-replication:<id>
14+
*/
15+
async function probeLeadership(sourceIds: string[]): Promise<Map<string, boolean>> {
16+
const leaders = new Map<string, boolean>();
17+
18+
const redis = new Redis({
19+
keyPrefix: "runs-replication:logical-replication-client:",
20+
port: env.RUN_REPLICATION_REDIS_PORT ?? undefined,
21+
host: env.RUN_REPLICATION_REDIS_HOST ?? undefined,
22+
username: env.RUN_REPLICATION_REDIS_USERNAME ?? undefined,
23+
password: env.RUN_REPLICATION_REDIS_PASSWORD ?? undefined,
24+
enableAutoPipelining: true,
25+
...(env.RUN_REPLICATION_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
26+
});
27+
28+
try {
29+
for (const id of sourceIds) {
30+
const exists = await redis.exists(`logical-replication-client:runs-replication:${id}`);
31+
leaders.set(id, exists === 1);
32+
}
33+
} finally {
34+
await redis.quit();
35+
}
36+
37+
return leaders;
38+
}
39+
40+
export async function loader({ request }: LoaderFunctionArgs) {
41+
await requireAdminApiRequest(request);
42+
43+
const sources = getRunsReplicationConfiguredSources();
44+
45+
if (!sources || sources.length === 0) {
46+
return json({ enabled: false, sources: [] });
47+
}
48+
49+
const leaders = await probeLeadership(sources.map((s) => s.id));
50+
51+
return json({
52+
enabled: env.RUN_REPLICATION_ENABLED === "1" && sources.length > 0,
53+
sources: sources.map((s) => ({
54+
id: s.id,
55+
slotName: s.slotName,
56+
originGeneration: s.originGeneration,
57+
leader: leaders.get(s.id) ?? false,
58+
})),
59+
});
60+
}

apps/webapp/app/services/runsReplicationGlobal.server.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,14 @@ import type { RunsReplicationService } from "./runsReplicationService.server";
22

33
const GLOBAL_RUNS_REPLICATION_KEY = Symbol.for("dev.trigger.ts.runs-replication");
44
const GLOBAL_TCP_MONITOR_KEY = Symbol.for("dev.trigger.ts.tcp-monitor");
5+
const GLOBAL_RUNS_REPLICATION_SOURCES_KEY = Symbol.for("dev.trigger.ts.runs-replication-sources");
6+
7+
export type ConfiguredSource = { id: string; slotName: string; originGeneration: number };
58

69
type RunsReplicationGlobal = {
710
[GLOBAL_RUNS_REPLICATION_KEY]?: RunsReplicationService;
811
[GLOBAL_TCP_MONITOR_KEY]?: NodeJS.Timeout;
12+
[GLOBAL_RUNS_REPLICATION_SOURCES_KEY]?: ConfiguredSource[];
913
};
1014

1115
const _globalThis = typeof globalThis === "object" ? globalThis : global;
@@ -23,6 +27,14 @@ export function unregisterRunsReplicationGlobal() {
2327
delete _global[GLOBAL_RUNS_REPLICATION_KEY];
2428
}
2529

30+
export function getRunsReplicationConfiguredSources(): ConfiguredSource[] | undefined {
31+
return _global[GLOBAL_RUNS_REPLICATION_SOURCES_KEY];
32+
}
33+
34+
export function setRunsReplicationConfiguredSources(sources: ConfiguredSource[]) {
35+
_global[GLOBAL_RUNS_REPLICATION_SOURCES_KEY] = sources;
36+
}
37+
2638
export function getTcpMonitorGlobal(): NodeJS.Timeout | undefined {
2739
return _global[GLOBAL_TCP_MONITOR_KEY];
2840
}

apps/webapp/app/services/runsReplicationInstance.server.ts

Lines changed: 172 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,90 @@ import invariant from "tiny-invariant";
22
import { env } from "~/env.server";
33
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactoryInstance.server";
44
import { singleton } from "~/utils/singleton";
5+
import { isSplitEnabled } from "~/v3/runOpsMigration/splitMode.server";
56
import { meter, provider } from "~/v3/tracer.server";
6-
import { RunsReplicationService } from "./runsReplicationService.server";
7+
import {
8+
setRunsReplicationConfiguredSources,
9+
setRunsReplicationGlobal,
10+
} from "./runsReplicationGlobal.server";
11+
import {
12+
RunsReplicationService,
13+
type RunsReplicationSource,
14+
} from "./runsReplicationService.server";
715
import { signalsEmitter } from "./signals.server";
816

917
export const runsReplicationInstance = singleton(
1018
"runsReplicationInstance",
1119
initializeRunsReplicationInstance
1220
);
1321

22+
export function buildReplicationSources(args: {
23+
splitEnabled: boolean;
24+
legacyUrl: string;
25+
newUrl?: string;
26+
/** `false` forces the new source off under split; undefined follows split. */
27+
newSourceOverride?: boolean;
28+
legacySlotName: string;
29+
legacyPublicationName: string;
30+
legacyOriginGeneration: number;
31+
newSlotName: string;
32+
newPublicationName: string;
33+
newOriginGeneration: number;
34+
}): RunsReplicationSource[] {
35+
const legacy: RunsReplicationSource = {
36+
id: "legacy",
37+
pgConnectionUrl: args.legacyUrl,
38+
slotName: args.legacySlotName,
39+
publicationName: args.legacyPublicationName,
40+
originGeneration: args.legacyOriginGeneration,
41+
};
42+
43+
const newSourceOn = args.splitEnabled && !!args.newUrl && args.newSourceOverride !== false;
44+
45+
if (!newSourceOn || !args.newUrl) {
46+
return [legacy];
47+
}
48+
49+
const next: RunsReplicationSource = {
50+
id: "new",
51+
pgConnectionUrl: args.newUrl,
52+
slotName: args.newSlotName,
53+
publicationName: args.newPublicationName,
54+
originGeneration: args.newOriginGeneration,
55+
};
56+
57+
return [legacy, next];
58+
}
59+
60+
/**
61+
* The residency-split gate and the `#new`->ClickHouse replication gate are
62+
* independent env vars. If split is on (ksuid runs are minted on the new DB) but the
63+
* constructed sources[] has no `"new"` source, every ksuid run is silently missing from
64+
* ClickHouse — under-counting all CH-fronted usage/cost/metrics aggregates with no
65+
* Postgres fallback. Couple the gates at boot: this misconfiguration must fail loudly
66+
* rather than ship a fleet-wide under-count.
67+
*/
68+
export class SplitReplicationMisconfiguredError extends Error {
69+
constructor() {
70+
super(
71+
'RUN_OPS_SPLIT_ENABLED is on but the runs-replication sources[] has no "new" source: ' +
72+
"ksuid runs on the new DB would not replicate to ClickHouse, under-counting every " +
73+
"ClickHouse-fronted aggregate. Enable the new replication source " +
74+
"(RUN_REPLICATION_NEW_ENABLED / RUN_OPS_DATABASE_URL) or turn the split off."
75+
);
76+
this.name = "SplitReplicationMisconfiguredError";
77+
}
78+
}
79+
80+
export function assertReplicationCoversSplit(args: {
81+
splitEnabled: boolean;
82+
sources: RunsReplicationSource[];
83+
}): void {
84+
if (args.splitEnabled && !args.sources.some((s) => s.id === "new")) {
85+
throw new SplitReplicationMisconfiguredError();
86+
}
87+
}
88+
1489
function initializeRunsReplicationInstance() {
1590
const { DATABASE_URL } = process.env;
1691
invariant(typeof DATABASE_URL === "string", "DATABASE_URL env var not set");
@@ -22,12 +97,11 @@ function initializeRunsReplicationInstance() {
2297

2398
console.log("🗃️ Runs replication service enabled");
2499

25-
const service = new RunsReplicationService({
100+
// Shared options for both the legacy-only and the multi-source constructions.
101+
// Excludes per-source identity (pgConnectionUrl/slotName/publicationName/sources).
102+
const baseReplicationOptions = {
26103
clickhouseFactory,
27-
pgConnectionUrl: DATABASE_URL,
28104
serviceName: "runs-replication",
29-
slotName: env.RUN_REPLICATION_SLOT_NAME,
30-
publicationName: env.RUN_REPLICATION_PUBLICATION_NAME,
31105
redisOptions: {
32106
keyPrefix: "runs-replication:",
33107
port: env.RUN_REPLICATION_REDIS_PORT ?? undefined,
@@ -55,24 +129,107 @@ function initializeRunsReplicationInstance() {
55129
insertStrategy: env.RUN_REPLICATION_INSERT_STRATEGY,
56130
disablePayloadInsert: env.RUN_REPLICATION_DISABLE_PAYLOAD_INSERT === "1",
57131
disableErrorFingerprinting: env.RUN_REPLICATION_DISABLE_ERROR_FINGERPRINTING === "1",
132+
};
133+
134+
// Construct the SINGLE legacy source synchronously (the split gate has not resolved
135+
// yet at module-init time, and singleton(...) memoizes this synchronous return value).
136+
let service = new RunsReplicationService({
137+
...baseReplicationOptions,
138+
pgConnectionUrl: DATABASE_URL,
139+
slotName: env.RUN_REPLICATION_SLOT_NAME,
140+
publicationName: env.RUN_REPLICATION_PUBLICATION_NAME,
141+
// Explicit legacy source so the leader-lock key matches the id the status
142+
// route probes from the registry below.
143+
sources: [
144+
{
145+
id: "legacy",
146+
pgConnectionUrl: DATABASE_URL,
147+
slotName: env.RUN_REPLICATION_SLOT_NAME,
148+
publicationName: env.RUN_REPLICATION_PUBLICATION_NAME,
149+
originGeneration: env.RUN_REPLICATION_LEGACY_ORIGIN_GENERATION,
150+
},
151+
],
58152
});
59153

154+
// Register the live handle so the status route + lifecycle routes can find it.
155+
setRunsReplicationGlobal(service);
156+
setRunsReplicationConfiguredSources([
157+
{
158+
id: "legacy",
159+
slotName: env.RUN_REPLICATION_SLOT_NAME,
160+
originGeneration: env.RUN_REPLICATION_LEGACY_ORIGIN_GENERATION,
161+
},
162+
]);
163+
60164
if (env.RUN_REPLICATION_ENABLED === "1") {
61-
clickhouseFactory
62-
.isReady()
63-
.then(() => service.start())
64-
.then(() => {
65-
console.log("🗃️ Runs replication service started");
165+
// Construct-after-gate: resolve the async split gate ONCE at boot, and
166+
// when both sources are enabled rebuild `service` with sources[] before starting.
167+
// The legacy-only instance above is never started in the dual path (no slot/lock
168+
// taken). runsReplicationService.server.ts is untouched. The create route also calls
169+
// setRunsReplicationGlobal — last-writer-wins is the existing contract.
170+
isSplitEnabled()
171+
.then(async (splitEnabled) => {
172+
const sources = buildReplicationSources({
173+
splitEnabled,
174+
legacyUrl: DATABASE_URL,
175+
newUrl: env.RUN_OPS_DATABASE_URL ?? env.TASK_RUN_DATABASE_URL,
176+
newSourceOverride: env.RUN_REPLICATION_NEW_ENABLED === "disabled" ? false : undefined,
177+
legacySlotName: env.RUN_REPLICATION_SLOT_NAME,
178+
legacyPublicationName: env.RUN_REPLICATION_PUBLICATION_NAME,
179+
legacyOriginGeneration: env.RUN_REPLICATION_LEGACY_ORIGIN_GENERATION,
180+
newSlotName: env.RUN_REPLICATION_NEW_SLOT_NAME,
181+
newPublicationName: env.RUN_REPLICATION_NEW_PUBLICATION_NAME,
182+
newOriginGeneration: env.RUN_REPLICATION_NEW_ORIGIN_GENERATION,
183+
});
184+
185+
// Refuse to start replication if split is on but `#new` is not a source.
186+
assertReplicationCoversSplit({ splitEnabled, sources });
187+
188+
if (sources.length > 1) {
189+
// Release the bootstrap instance's eager replication client (Redis + Redlock)
190+
// before replacing it, or it leaks for the process lifetime. shutdown() is idempotent.
191+
await service.shutdown();
192+
// The scalar pgConnectionUrl/slotName/publicationName remain required on the
193+
// options type, but are ignored when sources[] is non-empty — the
194+
// service normalizes off sources. Pass the legacy scalars to satisfy the type.
195+
service = new RunsReplicationService({
196+
...baseReplicationOptions,
197+
pgConnectionUrl: DATABASE_URL,
198+
slotName: env.RUN_REPLICATION_SLOT_NAME,
199+
publicationName: env.RUN_REPLICATION_PUBLICATION_NAME,
200+
sources,
201+
});
202+
setRunsReplicationGlobal(service);
203+
setRunsReplicationConfiguredSources(
204+
sources.map((s) => ({
205+
id: s.id,
206+
slotName: s.slotName,
207+
originGeneration: s.originGeneration,
208+
}))
209+
);
210+
}
211+
212+
return clickhouseFactory.isReady().then(() => service.start());
66213
})
214+
.then(() => console.log("🗃️ Runs replication service started"))
67215
.catch((error) => {
68-
console.error("🗃️ Runs replication service failed to start", {
69-
error,
70-
});
216+
if (error instanceof SplitReplicationMisconfiguredError) {
217+
// A silent ClickHouse under-count is worse than a crash — make it fatal.
218+
console.error("🚨 FATAL: run-ops split / ClickHouse replication misconfiguration", {
219+
error,
220+
});
221+
process.exit(1);
222+
}
223+
console.error("🗃️ Runs replication service failed to start", { error });
71224
});
72225

73-
signalsEmitter.on("SIGTERM", service.shutdown.bind(service));
74-
signalsEmitter.on("SIGINT", service.shutdown.bind(service));
226+
// Closures over the `let` so SIGTERM/SIGINT hit whichever instance is live (NOT a
227+
// stale .bind() to the discarded legacy-only instance).
228+
signalsEmitter.on("SIGTERM", () => service.shutdown());
229+
signalsEmitter.on("SIGINT", () => service.shutdown());
75230
}
76231

232+
// Returns the legacy-only instance synchronously (singleton memoizes this). Lifecycle
233+
// routes read getRunsReplicationGlobal() first, so they get the live multi-source one.
77234
return service;
78235
}

0 commit comments

Comments
 (0)