Skip to content

Commit 4e08dc7

Browse files
d-csclaude
andcommitted
fix(run-ops): align single-source leader-lock id, guard shutdown stop, fix test status type
- Register the implicit single source with id "legacy" so its leader-lock key matches the id the admin status route probes; otherwise leadership always reads false in the non-split config. - Guard the shutdown-path client.stop() fan-out against re-firing per incoming transaction and add a catch so rejections don't surface as unhandled. - Use the TaskRunStatus type alias (not the const value) for status annotations in the dual-source dedup tests. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 226d3cf commit 4e08dc7

3 files changed

Lines changed: 28 additions & 9 deletions

File tree

apps/webapp/app/services/runsReplicationInstance.server.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,17 @@ function initializeRunsReplicationInstance() {
138138
pgConnectionUrl: DATABASE_URL,
139139
slotName: env.RUN_REPLICATION_SLOT_NAME,
140140
publicationName: env.RUN_REPLICATION_PUBLICATION_NAME,
141+
// Explicit legacy source so the leader-lock key matches the id the status
142+
// route probes from the registry below.
143+
sources: [
144+
{
145+
id: "legacy",
146+
pgConnectionUrl: DATABASE_URL,
147+
slotName: env.RUN_REPLICATION_SLOT_NAME,
148+
publicationName: env.RUN_REPLICATION_PUBLICATION_NAME,
149+
originGeneration: env.RUN_REPLICATION_LEGACY_ORIGIN_GENERATION,
150+
},
151+
],
141152
});
142153

143154
// Register the live handle so the status route + lifecycle routes can find it.

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ export class RunsReplicationService {
162162
private logger: Logger;
163163
private _isShuttingDown = false;
164164
private _isShutDownComplete = false;
165+
private _shutdownStopInFlight = false;
165166
private _tracer: Tracer;
166167
private _meter: Meter;
167168
private _acknowledgeTimeoutMs: number;
@@ -631,11 +632,18 @@ export class RunsReplicationService {
631632

632633
if (this._isShuttingDown) {
633634
// A global shutdown stops every source's client; mark complete once all
634-
// have stopped. For a single source this is identical to the prior
635-
// "stop the one client, then mark complete" behavior.
636-
Promise.all(Array.from(this._sources.values()).map((r) => r.client.stop())).finally(() => {
637-
this._isShutDownComplete = true;
638-
});
635+
// have stopped. Guard against re-firing per incoming transaction, and
636+
// swallow client.stop() rejections so they don't surface as unhandled.
637+
if (!this._shutdownStopInFlight) {
638+
this._shutdownStopInFlight = true;
639+
Promise.all(Array.from(this._sources.values()).map((r) => r.client.stop()))
640+
.catch((error) => {
641+
this.logger.error("Error stopping replication clients during shutdown", { error });
642+
})
643+
.finally(() => {
644+
this._isShutDownComplete = true;
645+
});
646+
}
639647
}
640648

641649
// If there are no events, do nothing

apps/webapp/test/runsReplicationService.part8.test.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { ClickHouse } from "@internal/clickhouse";
22
import { createPostgresContainer, replicationContainerTest } from "@internal/testcontainers";
3-
import { PrismaClient } from "@trigger.dev/database";
3+
import { PrismaClient, type TaskRunStatus as TaskRunStatusType } from "@trigger.dev/database";
44
import { setTimeout } from "node:timers/promises";
55
import { z } from "zod";
66
import { TaskRunStatus } from "~/database-types";
@@ -96,7 +96,7 @@ describe("RunsReplicationService (part 8/8) - dual-source dedup", () => {
9696
const seedFkRows = async (
9797
client: PrismaClient,
9898
tag: string,
99-
status: TaskRunStatus,
99+
status: TaskRunStatusType,
100100
friendlyId: string
101101
) => {
102102
await client.organization.create({
@@ -275,7 +275,7 @@ describe("RunsReplicationService (part 8/8) - dual-source dedup", () => {
275275
const seedFkRows = async (
276276
client: PrismaClient,
277277
tag: string,
278-
status: TaskRunStatus,
278+
status: TaskRunStatusType,
279279
friendlyId: string
280280
) => {
281281
await client.organization.create({
@@ -444,7 +444,7 @@ describe("RunsReplicationService (part 8/8) - dual-source dedup", () => {
444444
client: PrismaClient,
445445
tag: string,
446446
runId: string,
447-
status: TaskRunStatus
447+
status: TaskRunStatusType
448448
) => {
449449
const orgId = `org_${tag}_${suffix}`;
450450
const projectId = `proj_${tag}_${suffix}`;

0 commit comments

Comments
 (0)