diff --git a/.changeset/silly-planes-march.md b/.changeset/silly-planes-march.md new file mode 100644 index 00000000000..d728ac866b9 --- /dev/null +++ b/.changeset/silly-planes-march.md @@ -0,0 +1,5 @@ +--- +"trigger.dev": major +--- + +Kill orphaned trigger-dev-run-worker processes when CLI is killed with SIGKILL diff --git a/packages/cli-v3/src/dev/devSupervisor.ts b/packages/cli-v3/src/dev/devSupervisor.ts index 59b2d2a473b..8dd7dfff408 100644 --- a/packages/cli-v3/src/dev/devSupervisor.ts +++ b/packages/cli-v3/src/dev/devSupervisor.ts @@ -80,6 +80,9 @@ class DevSupervisor implements WorkerRuntime { private activeRunsPath?: string; private watchdogPidPath?: string; + private activeRunsUpdateInterval?: NodeJS.Timeout; + + constructor(public readonly options: WorkerRuntimeOptions) { } async init(): Promise { @@ -153,6 +156,10 @@ class DevSupervisor implements WorkerRuntime { // Spawn detached watchdog to cancel runs if CLI is killed (e.g. pnpm SIGKILL) this.#spawnWatchdog(); + // Keep active-runs.json current with latest worker PIDs + this.activeRunsUpdateInterval = setInterval(() => { + this.#updateActiveRunsFile(); + }, 2_000); //start dequeuing await this.#dequeueRuns(); @@ -198,6 +205,10 @@ class DevSupervisor implements WorkerRuntime { }); } } + + if (this.activeRunsUpdateInterval) { + clearInterval(this.activeRunsUpdateInterval); + } } #spawnWatchdog() { @@ -292,6 +303,7 @@ class DevSupervisor implements WorkerRuntime { const data = { parentPid: process.pid, runFriendlyIds: Array.from(this.runControllers.keys()), + workerPids: this.taskRunProcessPool?.getAllPids() ?? [], }; // Atomic write: write to temp file then rename to avoid corrupt reads const tmpPath = this.activeRunsPath + ".tmp"; diff --git a/packages/cli-v3/src/dev/devWatchdog.ts b/packages/cli-v3/src/dev/devWatchdog.ts index 15dfae929f3..922cdbd10dd 100644 --- a/packages/cli-v3/src/dev/devWatchdog.ts +++ b/packages/cli-v3/src/dev/devWatchdog.ts @@ -71,10 +71,10 @@ writeFileSync(pidFilePath, `${PID_FILE_PREFIX}${process.pid}`); function cleanup() { try { unlinkSync(pidFilePath); - } catch {} + } catch { } try { unlinkSync(activeRunsPath); - } catch {} + } catch { } } function cleanupTmpDir() { @@ -95,12 +95,39 @@ function isParentAlive(): boolean { } } -function readActiveRuns(): string[] { +function readActiveRuns(): { runFriendlyIds: string[]; workerPids: number[] } { try { const data = JSON.parse(readFileSync(activeRunsPath, "utf8")); - return data.runFriendlyIds ?? []; + return { + runFriendlyIds: data.runFriendlyIds ?? [], + workerPids: data.workerPids ?? [], + }; } catch { - return []; + return { runFriendlyIds: [], workerPids: [] }; + } +} + +async function killWorkerProcesses(pids: number[]): Promise { + for (const pid of pids) { + try { + process.kill(pid, "SIGTERM"); + } catch { + // Already dead + } + } + + if (pids.length === 0) return; + + // Give processes a moment to exit cleanly before SIGKILL + await new Promise((resolve) => setTimeout(resolve, 3_000)); + + for (const pid of pids) { + try { + process.kill(pid, 0); // Check if still alive + process.kill(pid, "SIGKILL"); + } catch { + // Already dead — good + } } } @@ -124,7 +151,10 @@ const MAX_DISCONNECT_ATTEMPTS = 5; const INITIAL_BACKOFF_MS = 500; async function onParentDied(): Promise { - const runFriendlyIds = readActiveRuns(); + const { runFriendlyIds, workerPids } = readActiveRuns(); + + //kill orphaned worker processes first + await killWorkerProcesses(workerPids); if (runFriendlyIds.length > 0) { for (let attempt = 0; attempt < MAX_DISCONNECT_ATTEMPTS; attempt++) { diff --git a/packages/cli-v3/src/dev/taskRunProcessPool.test.ts b/packages/cli-v3/src/dev/taskRunProcessPool.test.ts new file mode 100644 index 00000000000..23cdac5cb55 --- /dev/null +++ b/packages/cli-v3/src/dev/taskRunProcessPool.test.ts @@ -0,0 +1,25 @@ +import { describe, it, expect } from "vitest"; +import { TaskRunProcessPool } from "./taskRunProcessPool.js"; + +describe("TaskRunProcessPool", () => { + it("getAllPids returns empty array when pool is empty", () => { + const pool = new TaskRunProcessPool({ + env: {}, + cwd: "/tmp", + enableProcessReuse: false, + }); + + expect(pool.getAllPids()).toEqual([]); + }); + + it("getAllPids returns no undefined values", () => { + const pool = new TaskRunProcessPool({ + env: {}, + cwd: "/tmp", + enableProcessReuse: false, + }); + + const pids = pool.getAllPids(); + expect(pids.every((pid) => typeof pid === "number")).toBe(true); + }); +}); \ No newline at end of file diff --git a/packages/cli-v3/src/dev/taskRunProcessPool.ts b/packages/cli-v3/src/dev/taskRunProcessPool.ts index 810be7acb43..0156a4d4835 100644 --- a/packages/cli-v3/src/dev/taskRunProcessPool.ts +++ b/packages/cli-v3/src/dev/taskRunProcessPool.ts @@ -271,6 +271,24 @@ export class TaskRunProcessPool { } } + getAllPids(): number[] { + const pids: number[] = []; + + for (const processes of this.availableProcessesByVersion.values()) { + for (const process of processes) { + if (process.pid !== undefined) pids.push(process.pid); + } + } + + for (const processSet of this.busyProcessesByVersion.values()) { + for (const process of processSet) { + if (process.pid !== undefined) pids.push(process.pid); + } + } + + return pids; + } + async shutdown(): Promise { const totalAvailable = Array.from(this.availableProcessesByVersion.values()).reduce( (sum, processes) => sum + processes.length,