diff --git a/packages/agent/src/server/agent-server.ts b/packages/agent/src/server/agent-server.ts index 9875db0047..aa5777f0a3 100644 --- a/packages/agent/src/server/agent-server.ts +++ b/packages/agent/src/server/agent-server.ts @@ -357,6 +357,7 @@ export class AgentServer { if (config.eventIngestToken) { this.eventStreamSender = new TaskRunEventStreamSender({ apiUrl: config.apiUrl, + eventIngestBaseUrl: config.eventIngestBaseUrl, projectId: config.projectId, taskId: config.taskId, runId: config.runId, diff --git a/packages/agent/src/server/bin.ts b/packages/agent/src/server/bin.ts index 1987249968..d840ddadc0 100644 --- a/packages/agent/src/server/bin.ts +++ b/packages/agent/src/server/bin.ts @@ -33,6 +33,8 @@ const envSchema = z.object({ .enum(["low", "medium", "high", "xhigh", "max"]) .optional(), POSTHOG_TASK_RUN_EVENT_INGEST_TOKEN: z.string().min(1).optional(), + // Base URL for the event-ingest POST only; falls back to POSTHOG_API_URL when unset. + POSTHOG_TASK_RUN_EVENT_INGEST_URL: z.url().optional(), POSTHOG_TASK_RUN_EVENT_INGEST_STREAM_WINDOW_MS: z .string() .regex( @@ -162,6 +164,7 @@ program port: parseInt(options.port, 10), jwtPublicKey: env.JWT_PUBLIC_KEY, eventIngestToken: env.POSTHOG_TASK_RUN_EVENT_INGEST_TOKEN, + eventIngestBaseUrl: env.POSTHOG_TASK_RUN_EVENT_INGEST_URL, eventIngestStreamWindowMs: env.POSTHOG_TASK_RUN_EVENT_INGEST_STREAM_WINDOW_MS, repositoryPath: options.repositoryPath, @@ -192,11 +195,8 @@ program process.exit(0); }); - // A hard crash would otherwise leave the run non-terminal and the user staring - // at a generic "Cloud stream disconnected". Mark the run failed before exiting - // so the desktop surfaces a real error instead of a silent stall. The deadline - // guarantees we exit even if reportFatalError's network calls hang at crash time - // (e.g. API unreachable during a restart), so we never block pod shutdown. + // Mark the run failed before exiting so a hard crash surfaces a real error instead of a + // silent stall. The deadline guarantees we exit even if the report hangs at crash time. const FATAL_ERROR_REPORT_DEADLINE_MS = 5_000; const handleFatalError = async (error: unknown) => { try { diff --git a/packages/agent/src/server/event-stream-sender.test.ts b/packages/agent/src/server/event-stream-sender.test.ts index e4e936362d..c57825d318 100644 --- a/packages/agent/src/server/event-stream-sender.test.ts +++ b/packages/agent/src/server/event-stream-sender.test.ts @@ -187,6 +187,27 @@ describe("TaskRunEventStreamSender", () => { ]); }); + it("routes the ingest POST to the agent-proxy run-scoped path when eventIngestBaseUrl is set", async () => { + const fetchMock = vi.fn( + async (_url: string | URL | Request, init?: RequestInit) => { + const body = await readRequestBody(init); + return responseForBody(body); + }, + ); + vi.stubGlobal("fetch", fetchMock); + + const sender = createSender({ + eventIngestBaseUrl: "http://agent-proxy:8003/", + }); + sender.enqueue({ type: "notification", notification: { method: "first" } }); + await sender.stop(); + + expect(fetchMock).toHaveBeenCalled(); + const lastCall = fetchMock.mock.calls[fetchMock.mock.calls.length - 1]; + expect(lastCall[0]).toBe("http://agent-proxy:8003/v1/runs/run-1/ingest"); + expect(lastCall[0]).not.toContain("/api/projects/"); + }); + it("keeps the active ingest request open across scheduled flushes", async () => { const requestBodies: string[] = []; let activeStreamClosed = false; diff --git a/packages/agent/src/server/event-stream-sender.ts b/packages/agent/src/server/event-stream-sender.ts index 0620bd758e..5e254ded40 100644 --- a/packages/agent/src/server/event-stream-sender.ts +++ b/packages/agent/src/server/event-stream-sender.ts @@ -8,6 +8,8 @@ import { interface TaskRunEventStreamSenderConfig { apiUrl: string; + // Base URL for the event-ingest POST only; falls back to apiUrl (Django path) when unset. + eventIngestBaseUrl?: string; projectId: number; taskId: string; runId: string; @@ -85,10 +87,20 @@ export class TaskRunEventStreamSender { private bufferRevision = 0; constructor(private readonly config: TaskRunEventStreamSenderConfig) { - const apiUrl = config.apiUrl.replace(/\/$/, ""); - this.ingestUrl = `${apiUrl}/api/projects/${config.projectId}/tasks/${encodeURIComponent( - config.taskId, - )}/runs/${encodeURIComponent(config.runId)}/event_stream/`; + const usingProxy = Boolean(config.eventIngestBaseUrl); + const ingestBase = (config.eventIngestBaseUrl ?? config.apiUrl).replace( + /\/$/, + "", + ); + this.ingestUrl = usingProxy + ? `${ingestBase}/v1/runs/${encodeURIComponent(config.runId)}/ingest` + : `${ingestBase}/api/projects/${config.projectId}/tasks/${encodeURIComponent( + config.taskId, + )}/runs/${encodeURIComponent(config.runId)}/event_stream/`; + config.logger.info("Event ingest target resolved", { + ingestUrl: this.ingestUrl, + routedToProxy: usingProxy, + }); this.maxBufferedEvents = config.maxBufferedEvents ?? DEFAULT_MAX_BUFFERED_EVENTS; this.maxStreamEvents = config.maxStreamEvents ?? DEFAULT_MAX_STREAM_EVENTS; @@ -341,8 +353,7 @@ export class TaskRunEventStreamSender { delayOverrideMs?: number, ): void { this.clearStreamWindowClose(stream); - // Rotate long-lived uploads even when the agent goes idle; this is a - // transport boundary, not a batching window. + // Rotate long-lived uploads even when idle: a transport boundary, not a batching window. const delayMs = delayOverrideMs ?? Math.max(0, stream.startedAtMs + this.streamWindowMs - Date.now()); @@ -671,7 +682,7 @@ export class TaskRunEventStreamSender { } if (this.droppedBeforeSequenceCount > 0) { - this.config.logger.warn("Task run event ingest recovered after drops", { + this.config.logger.info("Task run event ingest recovered after drops", { dropped: this.droppedBeforeSequenceCount, }); this.droppedBeforeSequenceCount = 0; diff --git a/packages/agent/src/server/types.ts b/packages/agent/src/server/types.ts index 478a080b0a..f0ece6174f 100644 --- a/packages/agent/src/server/types.ts +++ b/packages/agent/src/server/types.ts @@ -17,6 +17,8 @@ export interface AgentServerConfig { projectId: number; jwtPublicKey: string; // RS256 public key for JWT verification eventIngestToken?: string; + // Base URL for the event-ingest POST only; falls back to apiUrl when unset. + eventIngestBaseUrl?: string; eventIngestStreamWindowMs?: number; mode: AgentMode; taskId: string; diff --git a/packages/core/src/cloud-task/cloud-task.test.ts b/packages/core/src/cloud-task/cloud-task.test.ts index 97f8595f49..0acd9ee5fe 100644 --- a/packages/core/src/cloud-task/cloud-task.test.ts +++ b/packages/core/src/cloud-task/cloud-task.test.ts @@ -3,15 +3,21 @@ import { CloudTaskEvent } from "./schemas"; const mockNetFetch = vi.hoisted(() => vi.fn()); const mockStreamFetch = vi.hoisted(() => vi.fn()); +const mockStreamTokenFetch = vi.hoisted(() => vi.fn()); // The service now uses global fetch for BOTH authenticated API calls (JSON) // and SSE streaming. The two used to be distinct (net.fetch vs global fetch). -// To preserve the existing test fixtures, route by URL: /stream/ → stream mock, -// everything else → API mock. +// Route by URL: /stream_token/ → token mock (read-leg resolution), the stream leg +// (Django /stream/ or proxy /v1/runs/:run/stream) → stream mock, everything else → API mock. +// The token mock has a Django-path default so existing fixtures (which never set it) are untouched. const fetchRouter = vi.hoisted(() => vi.fn((input: string | Request, init?: RequestInit) => { const url = typeof input === "string" ? input : input.url; - const impl = url.includes("/stream/") ? mockStreamFetch : mockNetFetch; + const impl = url.includes("/stream_token/") + ? mockStreamTokenFetch + : /\/stream(\/|\?|$)/.test(url) + ? mockStreamFetch + : mockNetFetch; return impl(input, init); }), ); @@ -98,6 +104,15 @@ describe("CloudTaskService", () => { ); mockNetFetch.mockReset(); mockStreamFetch.mockReset(); + mockStreamTokenFetch.mockReset(); + // Default read-leg resolution: no proxy URL, so the stream reads from Django directly. + // A resolving stream_token endpoint implies the durable-stream contract (stream-end); + // legacy-mode tests override this with a 404 to model old servers. + mockStreamTokenFetch.mockImplementation(() => + Promise.resolve( + createJsonResponse({ token: "test-token", stream_base_url: null }), + ), + ); mockAuthService.authenticatedFetch.mockReset(); vi.stubGlobal("fetch", fetchRouter); @@ -601,23 +616,11 @@ describe("CloudTaskService", () => { ).toBe(true); }); - it("fails the watcher after exhausting the cumulative reconnect budget on clean-EOF loops", async () => { + it("stops without reconnecting when the server emits stream-end on a non-terminal run", async () => { vi.useFakeTimers(); - const updates: unknown[] = []; - service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); - - const makeInProgressRun = () => - createJsonResponse({ - id: "run-1", - status: "in_progress", - stage: null, - output: null, - error_message: null, - branch: "main", - updated_at: "2026-01-01T00:00:00Z", - }); - + // Run status stays non-terminal the whole time. Pre-durable-contract, a clean EOF on a + // non-terminal run reconnects (see the test above); the stream-end sentinel must override that. mockNetFetch.mockImplementation((input: string | Request) => { const url = typeof input === "string" ? input : input.url; if (url.includes("/session_logs/")) { @@ -625,11 +628,25 @@ describe("CloudTaskService", () => { createJsonResponse([], 200, { "X-Has-More": "false" }), ); } - return Promise.resolve(makeInProgressRun()); + return Promise.resolve( + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: "build", + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }), + ); }); mockStreamFetch.mockImplementation(() => - Promise.resolve(createSseResponse("")), + Promise.resolve( + createSseResponse( + 'id: 1\ndata: {"type":"notification","timestamp":"2026-01-01T00:00:01Z","notification":{"jsonrpc":"2.0","method":"_posthog/console","params":{"sessionId":"run-1","level":"info","message":"hi"}}}\n\nevent: stream-end\ndata: {}\n\n', + ), + ), ); service.watch({ @@ -639,62 +656,63 @@ describe("CloudTaskService", () => { teamId: 2, }); - await waitFor(() => mockStreamFetch.mock.calls.length === 1); - await vi.advanceTimersByTimeAsync(60 * 60_000); + const hasWatcher = (): boolean => + (service as unknown as { watchers: Map }).watchers.has( + "task-1:run-1", + ); - await waitFor( - () => - updates.some( - (u) => - typeof u === "object" && - u !== null && - (u as { kind?: string }).kind === "error", - ), - 10_000, - ); + await waitFor(() => mockStreamFetch.mock.calls.length === 1); + // Let the reconnect delay (2s base) elapse; with stream-end honored, none is scheduled. + await vi.advanceTimersByTimeAsync(10_000); - expect(updates).toContainEqual({ - taskId: "task-1", - runId: "run-1", - kind: "error", - errorTitle: "Cloud run unreachable", - errorMessage: - "Could not maintain a connection to the cloud run after many attempts. Click retry once the issue is resolved.", - retryable: true, - }); + expect(mockStreamFetch.mock.calls.length).toBe(1); + await waitFor(() => !hasWatcher()); }); - it("emits a retryable cloud error after repeated stream failures", async () => { + it("emits the bootstrap snapshot when stream-end arrives mid-bootstrap", async () => { vi.useFakeTimers(); const updates: unknown[] = []; service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); - const makeInProgressRun = () => - createJsonResponse({ + const historicalEntry = { + type: "notification", + timestamp: "2026-01-01T00:00:01Z", + notification: { + jsonrpc: "2.0", + method: "_posthog/console", + params: { sessionId: "run-1", level: "info", message: "backlog" }, + }, + }; + + // Hold the session_logs fetch open until the stream has already delivered + // stream-end and closed, so completion races the bootstrap snapshot. + let releaseSessionLogs: (() => void) | undefined; + const sessionLogsGate = new Promise((resolve) => { + releaseSessionLogs = resolve; + }); + + mockNetFetch.mockImplementation(async (input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + await sessionLogsGate; + return createJsonResponse([historicalEntry], 200, { + "X-Has-More": "false", + }); + } + return createJsonResponse({ id: "run-1", status: "in_progress", - stage: null, + stage: "build", output: null, error_message: null, branch: "main", updated_at: "2026-01-01T00:00:00Z", }); - - mockNetFetch - .mockResolvedValueOnce(makeInProgressRun()) // bootstrap: fetchTaskRun - .mockResolvedValueOnce( - createJsonResponse([], 200, { "X-Has-More": "false" }), - ) // bootstrap: fetchSessionLogs - // Each stream error triggers handleStreamCompletion → fetchTaskRun - .mockImplementation(() => Promise.resolve(makeInProgressRun())); + }); mockStreamFetch.mockImplementation(() => - Promise.resolve( - createSseResponse( - 'event: keepalive\ndata: {"type":"keepalive"}\n\nevent: error\ndata: {"error":"boom"}\n\n', - ), - ), + Promise.resolve(createSseResponse("event: stream-end\ndata: {}\n\n")), ); service.watch({ @@ -704,87 +722,80 @@ describe("CloudTaskService", () => { teamId: 2, }); + // Stream connects, delivers stream-end and EOFs while session_logs hangs. await waitFor(() => mockStreamFetch.mock.calls.length === 1); - await vi.advanceTimersByTimeAsync(70_000); - await waitFor( - () => - updates.some( - (u) => - typeof u === "object" && - u !== null && - (u as { kind?: string }).kind === "error", - ), - 10_000, - ); + await vi.advanceTimersByTimeAsync(1_000); - expect(mockStreamFetch.mock.calls.length).toBe(6); - // 2 bootstrap calls + 1 post-bootstrap status verification + 6 - // handleStreamCompletion calls (one per stream error) - expect(mockNetFetch).toHaveBeenCalledTimes(9); - expect(updates).toContainEqual({ - taskId: "task-1", - runId: "run-1", - kind: "error", - errorTitle: "Cloud stream disconnected", - errorMessage: - "Lost connection to the cloud run stream. Retry to reconnect.", - retryable: true, - }); + releaseSessionLogs?.(); + + const hasWatcher = (): boolean => + (service as unknown as { watchers: Map }).watchers.has( + "task-1:run-1", + ); + await waitFor(() => !hasWatcher()); + + const snapshots = updates.filter( + (u) => + typeof u === "object" && + u !== null && + (u as { kind?: string }).kind === "snapshot", + ); + expect(snapshots).toHaveLength(1); + expect( + (snapshots[0] as { newEntries: unknown[] }).newEntries, + ).toContainEqual(historicalEntry); + // The stream-end stop must not schedule another connection. + await vi.advanceTimersByTimeAsync(10_000); + expect(mockStreamFetch.mock.calls.length).toBe(1); }); - it("clears the backend-error budget after a healthy long-lived cut", async () => { + it("repairs the final status when stream-end arrives without a terminal state event", async () => { vi.useFakeTimers(); const updates: unknown[] = []; service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); - const makeInProgressRun = () => - createJsonResponse({ - id: "run-1", - status: "in_progress", - stage: null, - output: null, - error_message: null, - branch: "main", - updated_at: "2026-01-01T00:00:00Z", - }); - - mockNetFetch - .mockResolvedValueOnce(makeInProgressRun()) // bootstrap: fetchTaskRun - .mockResolvedValueOnce( - createJsonResponse([], 200, { "X-Has-More": "false" }), - ) // bootstrap: fetchSessionLogs - .mockImplementation(() => Promise.resolve(makeInProgressRun())); - - // First connection delivers an explicit backend error frame (accruing the - // backend-error budget). Subsequent connections are healthy long-lived cuts - // (>= SSE_HEALTHY_CONNECTION_MS): each proves the stream recovered and must - // clear the backend-error budget, so it never accumulates for the run's life. - let streamCall = 0; - mockStreamFetch.mockImplementation(() => { - streamCall += 1; - if (streamCall === 1) { + let runFetchCount = 0; + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { return Promise.resolve( - createSseResponse('event: error\ndata: {"error":"boom"}\n\n'), + createJsonResponse([], 200, { "X-Has-More": "false" }), ); } - const encoder = new TextEncoder(); - const stream = new ReadableStream({ - start(controller) { - controller.enqueue( - encoder.encode('event: keepalive\ndata: {"type":"keepalive"}\n\n'), - ); - setTimeout(() => controller.error(new Error("terminated")), 65_000); - }, - }); + runFetchCount += 1; + // First fetch bootstraps an active run; the stream then ends without ever + // carrying a terminal task_run_state event, so the stop path must fetch + // the completed status itself. return Promise.resolve( - new Response(stream, { - status: 200, - headers: { "Content-Type": "text/event-stream" }, - }), + createJsonResponse( + runFetchCount === 1 + ? { + id: "run-1", + status: "in_progress", + stage: "build", + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + } + : { + id: "run-1", + status: "completed", + stage: null, + output: { pr_url: "https://github.com/PostHog/code/pull/9" }, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:05Z", + }, + ), ); }); + mockStreamFetch.mockImplementation(() => + Promise.resolve(createSseResponse("event: stream-end\ndata: {}\n\n")), + ); + service.watch({ taskId: "task-1", runId: "run-1", @@ -792,110 +803,78 @@ describe("CloudTaskService", () => { teamId: 2, }); - const getWatcher = () => - ( - service as unknown as { - watchers: Map< - string, - { - reconnectAttempts: number; - streamErrorAttempts: number; - failed: boolean; - } - >; - } - ).watchers.get("task-1:run-1"); - - // The backend error must have accrued the backend-error budget first... - await waitFor(() => (getWatcher()?.streamErrorAttempts ?? 0) >= 1, 20_000); - // ...then the healthy long-lived cut on the next connection clears it. - await vi.advanceTimersByTimeAsync(67_000 * 2); - await waitFor(() => getWatcher()?.streamErrorAttempts === 0, 20_000); + const hasWatcher = (): boolean => + (service as unknown as { watchers: Map }).watchers.has( + "task-1:run-1", + ); + await waitFor(() => !hasWatcher()); - const watcher = getWatcher(); - expect(watcher?.failed).toBe(false); - expect(watcher?.streamErrorAttempts).toBe(0); - expect(watcher?.reconnectAttempts).toBe(0); - expect( - updates.some( - (u) => - typeof u === "object" && - u !== null && - (u as { kind?: string }).kind === "error", - ), - ).toBe(false); + expect(updates).toContainEqual( + expect.objectContaining({ + kind: "status", + status: "completed", + output: { pr_url: "https://github.com/PostHog/code/pull/9" }, + }), + ); }); - it("counts quick stream failures and surfaces a retryable error", async () => { + it("reads via the agent-proxy with a Bearer token when the server resolves a base url", async () => { vi.useFakeTimers(); - const updates: unknown[] = []; - service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); - - const makeInProgressRun = () => - createJsonResponse({ - id: "run-1", - status: "in_progress", - stage: null, - output: null, - error_message: null, - branch: "main", - updated_at: "2026-01-01T00:00:00Z", - }); - - mockNetFetch - .mockResolvedValueOnce(makeInProgressRun()) - .mockResolvedValueOnce( - createJsonResponse([], 200, { "X-Has-More": "false" }), - ) - .mockImplementation(() => Promise.resolve(makeInProgressRun())); - - // Connections that fail immediately (under SSE_HEALTHY_CONNECTION_MS) are - // genuine churn and must keep counting toward the retry budget. - mockStreamFetch.mockImplementation(() => + mockStreamTokenFetch.mockImplementation(() => Promise.resolve( - createSseResponse('event: error\ndata: {"error":"boom"}\n\n'), + createJsonResponse({ + token: "proxy-token", + stream_base_url: "https://proxy.example", + }), ), ); - service.watch({ - taskId: "task-1", - runId: "run-1", - apiHost: "https://app.example.com", - teamId: 2, - }); - - await waitFor(() => mockStreamFetch.mock.calls.length === 1); - await vi.advanceTimersByTimeAsync(70_000); - await waitFor( - () => - updates.some( - (u) => - typeof u === "object" && - u !== null && - (u as { kind?: string }).kind === "error", - ), - 10_000, + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } + return Promise.resolve( + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: "build", + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }), + ); + }); + + mockStreamFetch.mockImplementation(() => + Promise.resolve(createSseResponse("event: stream-end\ndata: {}\n\n")), ); - expect(updates).toContainEqual({ + service.watch({ taskId: "task-1", runId: "run-1", - kind: "error", - errorTitle: "Cloud stream disconnected", - errorMessage: - "Lost connection to the cloud run stream. Retry to reconnect.", - retryable: true, + apiHost: "https://app.example.com", + teamId: 2, }); + + await waitFor(() => mockStreamFetch.mock.calls.length === 1); + + const [calledUrl, init] = mockStreamFetch.mock.calls[0]; + expect(String(calledUrl)).toMatch( + /^https:\/\/proxy\.example\/v1\/runs\/run-1\/stream(\?|$)/, + ); + expect((init?.headers as Record)?.Authorization).toBe( + "Bearer proxy-token", + ); }); - it("stops the watcher without reconnecting once the run is terminal", async () => { + it("drops the resume position when the stream leg changes", async () => { vi.useFakeTimers(); - const updates: unknown[] = []; - service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); - - let statusFetchCount = 0; mockNetFetch.mockImplementation((input: string | Request) => { const url = typeof input === "string" ? input : input.url; if (url.includes("/session_logs/")) { @@ -903,27 +882,48 @@ describe("CloudTaskService", () => { createJsonResponse([], 200, { "X-Has-More": "false" }), ); } - statusFetchCount += 1; - // Bootstrap sees an active run; the post-stream status check sees terminal. return Promise.resolve( createJsonResponse({ id: "run-1", - status: statusFetchCount === 1 ? "in_progress" : "completed", + status: "in_progress", stage: null, output: null, error_message: null, branch: "main", - updated_at: - statusFetchCount === 1 - ? "2026-01-01T00:00:00Z" - : "2026-01-01T00:00:01Z", + updated_at: "2026-01-01T00:00:00Z", }), ); }); - mockStreamFetch.mockImplementation(() => - Promise.resolve(createSseResponse("")), - ); + // First resolution fails transiently (connection falls back to Django and + // records a Django-id-space resume position); the retried resolution routes + // to the proxy, whose id space is unrelated. + mockStreamTokenFetch + .mockRejectedValueOnce(new Error("network blip")) + .mockImplementation(() => + Promise.resolve( + createJsonResponse({ + token: "proxy-token", + stream_base_url: "https://proxy.example", + }), + ), + ); + + mockStreamFetch + .mockImplementationOnce(() => + Promise.resolve( + createSseResponse( + 'id: 7\nevent: keepalive\ndata: {"type":"keepalive"}\n\n', + ), + ), + ) + .mockImplementation(() => + Promise.resolve( + createOpenSseResponse( + 'event: keepalive\ndata: {"type":"keepalive"}\n\n', + ), + ), + ); service.watch({ taskId: "task-1", @@ -932,76 +932,65 @@ describe("CloudTaskService", () => { teamId: 2, }); - await waitFor(() => mockStreamFetch.mock.calls.length === 1); - await vi.advanceTimersByTimeAsync(10_000); + await waitFor(() => mockStreamFetch.mock.calls.length >= 2, 10_000); - expect(updates).toContainEqual( - expect.objectContaining({ - taskId: "task-1", - runId: "run-1", - kind: "status", - status: "completed", - }), + const [firstUrl] = mockStreamFetch.mock.calls[0]; + const [secondUrl, secondInit] = mockStreamFetch.mock.calls[1]; + expect(String(firstUrl)).toContain("https://app.example.com/api/"); + expect(String(secondUrl)).toMatch( + /^https:\/\/proxy\.example\/v1\/runs\/run-1\/stream/, ); - expect(mockStreamFetch.mock.calls.length).toBe(1); + // The Django resume position must not leak into the proxy leg. expect( - (service as unknown as { watchers: Map }).watchers.has( - "task-1:run-1", - ), - ).toBe(false); + (secondInit?.headers as Record)?.["Last-Event-ID"], + ).toBeUndefined(); + expect(String(secondUrl)).toContain("start=latest"); }); - it("surfaces a retryable error when the backend errors even on a long-lived stream", async () => { + it("old servers without stream_token use legacy polling to reconnect and stop", async () => { vi.useFakeTimers(); const updates: unknown[] = []; service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); - const makeInProgressRun = () => - createJsonResponse({ - id: "run-1", - status: "in_progress", - stage: null, - output: null, - error_message: null, - branch: "main", - updated_at: "2026-01-01T00:00:00Z", - }); - - mockNetFetch - .mockResolvedValueOnce(makeInProgressRun()) - .mockResolvedValueOnce( - createJsonResponse([], 200, { "X-Has-More": "false" }), - ) - .mockImplementation(() => Promise.resolve(makeInProgressRun())); + // Old server: the endpoint does not exist, so no stream-end ever arrives and the + // client must poll run status on each clean EOF to decide stop vs reconnect. + mockStreamTokenFetch.mockImplementation(() => + Promise.resolve(createJsonResponse({ detail: "Not found" }, 404)), + ); - // Each connection stays open with a keepalive for 65s (> the healthy - // threshold) and only THEN emits an explicit backend `event: error` frame. - // An explicit backend error must always count toward the budget, so even a - // long-lived stream eventually surfaces the retryable disconnect error. - mockStreamFetch.mockImplementation(() => { - const encoder = new TextEncoder(); - const stream = new ReadableStream({ - start(controller) { - controller.enqueue( - encoder.encode('event: keepalive\ndata: {"type":"keepalive"}\n\n'), - ); - setTimeout(() => { - controller.enqueue( - encoder.encode('event: error\ndata: {"error":"boom"}\n\n'), - ); - controller.close(); - }, 65_000); - }, - }); + let runFetchCount = 0; + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } + runFetchCount += 1; + // Calls 1-3 (bootstrap, post-bootstrap verify, first legacy poll) report an + // active run; the second legacy poll reports terminal. + const terminal = runFetchCount >= 4; return Promise.resolve( - new Response(stream, { - status: 200, - headers: { "Content-Type": "text/event-stream" }, + createJsonResponse({ + id: "run-1", + status: terminal ? "completed" : "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: terminal + ? "2026-01-01T00:00:05Z" + : "2026-01-01T00:00:00Z", }), ); }); + // The flag-off server never emits stream-end; every connection just EOFs. + mockStreamFetch.mockImplementation(() => + Promise.resolve(createSseResponse("")), + ); + service.watch({ taskId: "task-1", runId: "run-1", @@ -1009,73 +998,77 @@ describe("CloudTaskService", () => { teamId: 2, }); - await waitFor(() => mockStreamFetch.mock.calls.length === 1); - // Drive >= 6 long-lived-then-backend-error cycles (65s open + backoff each). - await vi.advanceTimersByTimeAsync(65_000 * 7 + 70_000); - await waitFor( - () => - updates.some( - (u) => - typeof u === "object" && - u !== null && - (u as { kind?: string }).kind === "error", - ), - 10_000, - ); + const hasWatcher = (): boolean => + (service as unknown as { watchers: Map }).watchers.has( + "task-1:run-1", + ); + await waitFor(() => !hasWatcher(), 10_000); - expect(updates).toContainEqual({ - taskId: "task-1", - runId: "run-1", - kind: "error", - errorTitle: "Cloud stream disconnected", - errorMessage: - "Lost connection to the cloud run stream. Retry to reconnect.", - retryable: true, - }); + // At least one legacy reconnect happened while the run was active, then the + // terminal poll stopped the watcher; no further connections after that. + const connectionsAtStop = mockStreamFetch.mock.calls.length; + expect(connectionsAtStop).toBeGreaterThanOrEqual(2); + expect(updates).toContainEqual( + expect.objectContaining({ kind: "status", status: "completed" }), + ); + await vi.advanceTimersByTimeAsync(10_000); + expect(mockStreamFetch.mock.calls.length).toBe(connectionsAtStop); + // The refused resolution is cached: one stream_token call for the whole watch. + expect(mockStreamTokenFetch.mock.calls.length).toBe(1); }); - it("treats a long-lived transport cut as healthy even with no frames received", async () => { + it("a transient stream_token failure retries resolution instead of pinning to Django", async () => { vi.useFakeTimers(); - const updates: unknown[] = []; - service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); - - const makeInProgressRun = () => - createJsonResponse({ - id: "run-1", - status: "in_progress", - stage: null, - output: null, - error_message: null, - branch: "main", - updated_at: "2026-01-01T00:00:00Z", - }); - - mockNetFetch - .mockResolvedValueOnce(makeInProgressRun()) - .mockResolvedValueOnce( - createJsonResponse([], 200, { "X-Has-More": "false" }), + // The endpoint is momentarily down (503): unlike a 404, this must not cache a Django fallback. + // The next reconnect re-resolves and the watch upgrades to the durable proxy leg. + mockStreamTokenFetch + .mockImplementationOnce(() => + Promise.resolve(createJsonResponse({ detail: "unavailable" }, 503)), ) - .mockImplementation(() => Promise.resolve(makeInProgressRun())); + .mockImplementation(() => + Promise.resolve( + createJsonResponse({ + token: "fresh-token", + stream_base_url: "https://proxy.example", + }), + ), + ); - // Each connection opens but delivers NOTHING, then is transport-cut at 65s. - // Healthiness is duration-only on purpose — it must NOT depend on keepalive - // frames surviving the proxy — so even a frame-less long-lived cut is healthy - // and never exhausts the budget. - mockStreamFetch.mockImplementation(() => { - const stream = new ReadableStream({ - start(controller) { - setTimeout(() => controller.error(new Error("terminated")), 65_000); - }, - }); + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } return Promise.resolve( - new Response(stream, { - status: 200, - headers: { "Content-Type": "text/event-stream" }, + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", }), ); }); + // Django leg (transient round) just EOFs to force a reconnect; once the proxy resolves it + // emits stream-end so the watch completes, proving durable streaming engaged after the retry. + const usedProxyLeg = (input: string | Request): boolean => { + const url = typeof input === "string" ? input : input.url; + return url.includes("proxy.example"); + }; + mockStreamFetch.mockImplementation((input: string | Request) => + Promise.resolve( + usedProxyLeg(input) + ? createSseResponse("event: stream-end\ndata: {}\n\n") + : createSseResponse(""), + ), + ); + service.watch({ taskId: "task-1", runId: "run-1", @@ -1083,91 +1076,72 @@ describe("CloudTaskService", () => { teamId: 2, }); - await waitFor(() => mockStreamFetch.mock.calls.length === 1); - await vi.advanceTimersByTimeAsync(67_000 * 8); - await waitFor(() => mockStreamFetch.mock.calls.length >= 6, 20_000); + const hasWatcher = (): boolean => + (service as unknown as { watchers: Map }).watchers.has( + "task-1:run-1", + ); + await waitFor(() => !hasWatcher(), 10_000); + // The 503 was not cached: resolution retried and the stream switched to the durable proxy leg. + expect(mockStreamTokenFetch.mock.calls.length).toBeGreaterThanOrEqual(2); expect( - updates.some( - (u) => - typeof u === "object" && - u !== null && - (u as { kind?: string }).kind === "error", - ), - ).toBe(false); - - const watcher = ( - service as unknown as { - watchers: Map; - } - ).watchers.get("task-1:run-1"); - expect(watcher?.failed).toBe(false); - expect(watcher?.reconnectAttempts).toBe(0); + mockStreamFetch.mock.calls.some(([input]) => usedProxyLeg(input)), + ).toBe(true); }); - it("resets the transport reconnect budget once a keepalive proves recovery", async () => { + it("proxy 401 re-resolves the read target and resumes with a fresh token", async () => { vi.useFakeTimers(); - const updates: unknown[] = []; - service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + mockStreamTokenFetch + .mockImplementationOnce(() => + Promise.resolve( + createJsonResponse({ + token: "expired-token", + stream_base_url: "https://proxy.example", + }), + ), + ) + .mockImplementation(() => + Promise.resolve( + createJsonResponse({ + token: "fresh-token", + stream_base_url: "https://proxy.example", + }), + ), + ); - const makeInProgressRun = () => - createJsonResponse({ - id: "run-1", - status: "in_progress", - stage: null, - output: null, - error_message: null, - branch: "main", - updated_at: "2026-01-01T00:00:00Z", - }); - - mockNetFetch - .mockResolvedValueOnce(makeInProgressRun()) - .mockResolvedValueOnce( - createJsonResponse([], 200, { "X-Has-More": "false" }), - ) - .mockImplementation(() => Promise.resolve(makeInProgressRun())); - - // First 3 connections fail fast at the transport level (established, then - // errored immediately, no frame) and accrue reconnect attempts. The 4th - // delivers a keepalive and stays open — proving the transport recovered, so - // the accrued attempts must reset rather than carry forward into the budget. - let streamCall = 0; - const keepaliveControllerRef: { - current: ReadableStreamDefaultController | null; - } = { current: null }; - const encoder = new TextEncoder(); - mockStreamFetch.mockImplementation(() => { - streamCall += 1; - if (streamCall <= 3) { - const stream = new ReadableStream({ - start(controller) { - controller.error(new Error("terminated")); - }, - }); + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { return Promise.resolve( - new Response(stream, { - status: 200, - headers: { "Content-Type": "text/event-stream" }, - }), + createJsonResponse([], 200, { "X-Has-More": "false" }), ); } - // 4th connection stays open with no frame; the test injects the keepalive - // below so it can observe the accrued budget BEFORE the reset. - const stream = new ReadableStream({ - start(controller) { - keepaliveControllerRef.current = controller; - }, - }); return Promise.resolve( - new Response(stream, { - status: 200, - headers: { "Content-Type": "text/event-stream" }, + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", }), ); }); + mockStreamFetch + .mockImplementationOnce(() => + Promise.resolve(createJsonResponse({ detail: "expired" }, 401)), + ) + .mockImplementation(() => + Promise.resolve( + createOpenSseResponse( + 'event: keepalive\ndata: {"type":"keepalive"}\n\n', + ), + ), + ); + service.watch({ taskId: "task-1", runId: "run-1", @@ -1175,99 +1149,77 @@ describe("CloudTaskService", () => { teamId: 2, }); - const getWatcher = () => - ( - service as unknown as { - watchers: Map; - } - ).watchers.get("task-1:run-1"); + await waitFor(() => mockStreamFetch.mock.calls.length >= 2, 10_000); - await waitFor(() => mockStreamFetch.mock.calls.length === 1); - // Drive the 3 fast transport failures and open the held 4th connection. - await vi.advanceTimersByTimeAsync(30_000); - await waitFor( - () => streamCall >= 4 && !!keepaliveControllerRef.current, - 20_000, + expect(mockStreamTokenFetch.mock.calls.length).toBe(2); + const [secondUrl, secondInit] = mockStreamFetch.mock.calls[1]; + expect(String(secondUrl)).toMatch( + /^https:\/\/proxy\.example\/v1\/runs\/run-1\/stream/, ); - - // Non-vacuous precondition: the fast failures actually accrued the budget. - expect(getWatcher()?.reconnectAttempts ?? 0).toBeGreaterThan(0); - - // A keepalive on the recovered connection must reset the transport budget. - keepaliveControllerRef.current?.enqueue( - encoder.encode('event: keepalive\ndata: {"type":"keepalive"}\n\n'), + expect((secondInit?.headers as Record)?.Authorization).toBe( + "Bearer fresh-token", ); - await waitFor(() => getWatcher()?.reconnectAttempts === 0, 20_000); - - const watcher = getWatcher(); - expect(watcher?.failed).toBe(false); - expect(watcher?.reconnectAttempts).toBe(0); expect( - updates.some( - (u) => - typeof u === "object" && - u !== null && - (u as { kind?: string }).kind === "error", + (service as unknown as { watchers: Map }).watchers.has( + "task-1:run-1", ), - ).toBe(false); + ).toBe(true); }); - it("does not let a stale backend-error count inflate a transport reconnect delay", async () => { + it("proxy 401 falls back to Django when the proxy is withdrawn", async () => { vi.useFakeTimers(); - const updates: unknown[] = []; - service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); - - const makeInProgressRun = () => - createJsonResponse({ - id: "run-1", - status: "in_progress", - stage: null, - output: null, - error_message: null, - branch: "main", - updated_at: "2026-01-01T00:00:00Z", - }); - - mockNetFetch - .mockResolvedValueOnce(makeInProgressRun()) // bootstrap: fetchTaskRun - .mockResolvedValueOnce( - createJsonResponse([], 200, { "X-Has-More": "false" }), - ) // bootstrap: fetchSessionLogs - .mockImplementation(() => Promise.resolve(makeInProgressRun())); + // The re-resolution after the 401 no longer offers a proxy (rollout flag turned + // off mid-run); the watcher continues on the Django leg, which still emits the + // stream-end sentinel. + mockStreamTokenFetch + .mockImplementationOnce(() => + Promise.resolve( + createJsonResponse({ + token: "expired-token", + stream_base_url: "https://proxy.example", + }), + ), + ) + .mockImplementation(() => + Promise.resolve( + createJsonResponse({ token: "django-token", stream_base_url: null }), + ), + ); - // Connections 1-4 each emit a backend `event: error` frame, building the - // backend-error budget to 4 — those reconnects correctly pace on - // streamErrorAttempts. Connection 5 is held open until the test injects a - // quick TRANSPORT cut, which must pace its reconnect on the just-incremented - // transport budget (1 -> ~2s), NOT on the stale backend-error budget - // (4 -> ~16s). Math.max(both) for the delay would wrongly use the latter. - let streamCall = 0; - const transportControllerRef: { - current: ReadableStreamDefaultController | null; - } = { current: null }; - mockStreamFetch.mockImplementation(() => { - streamCall += 1; - if (streamCall <= 4) { + let runFetchCount = 0; + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { return Promise.resolve( - createSseResponse('event: error\ndata: {"error":"boom"}\n\n'), + createJsonResponse([], 200, { "X-Has-More": "false" }), ); } - const stream = new ReadableStream({ - start(controller) { - if (streamCall === 5) { - transportControllerRef.current = controller; - } - }, - }); + runFetchCount += 1; + const terminal = runFetchCount >= 2; return Promise.resolve( - new Response(stream, { - status: 200, - headers: { "Content-Type": "text/event-stream" }, + createJsonResponse({ + id: "run-1", + status: terminal ? "completed" : "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: terminal + ? "2026-01-01T00:00:05Z" + : "2026-01-01T00:00:00Z", }), ); }); + mockStreamFetch + .mockImplementationOnce(() => + Promise.resolve(createJsonResponse({ detail: "expired" }, 401)), + ) + .mockImplementation(() => + Promise.resolve(createSseResponse("event: stream-end\ndata: {}\n\n")), + ); + service.watch({ taskId: "task-1", runId: "run-1", @@ -1275,103 +1227,57 @@ describe("CloudTaskService", () => { teamId: 2, }); - const getWatcher = () => - ( - service as unknown as { - watchers: Map< - string, - { - reconnectAttempts: number; - streamErrorAttempts: number; - failed: boolean; - } - >; - } - ).watchers.get("task-1:run-1"); - - await waitFor(() => mockStreamFetch.mock.calls.length === 1); - // Drive the four backend-error reconnects (2s + 4s + 8s + 16s of backoff) - // and open the held fifth connection. - await vi.advanceTimersByTimeAsync(35_000); - await waitFor( - () => streamCall >= 5 && !!transportControllerRef.current, - 20_000, - ); - - // Non-vacuous precondition: the backend-error budget is stale-high while the - // transport budget is still zero. - expect(getWatcher()?.streamErrorAttempts).toBe(4); - expect(getWatcher()?.reconnectAttempts).toBe(0); - expect(getWatcher()?.failed).toBe(false); - - // A quick transport cut on the open fifth connection charges ONE transport - // attempt; its reconnect must wait ~2s (transport budget), not ~16s. - transportControllerRef.current?.error(new Error("terminated")); - await waitFor(() => getWatcher()?.reconnectAttempts === 1, 20_000); - expect(getWatcher()?.streamErrorAttempts).toBe(4); + const hasWatcher = (): boolean => + (service as unknown as { watchers: Map }).watchers.has( + "task-1:run-1", + ); + await waitFor(() => !hasWatcher(), 10_000); - const callsBeforeProbe = mockStreamFetch.mock.calls.length; - // 5s is past the fixed ~2s transport backoff but well short of the buggy - // ~16s backend-error backoff, so the sixth connection only opens if the - // delay was paced on the transport budget. - await vi.advanceTimersByTimeAsync(5_000); - expect(mockStreamFetch.mock.calls.length).toBe(callsBeforeProbe + 1); - expect(getWatcher()?.failed).toBe(false); + expect(mockStreamTokenFetch.mock.calls.length).toBe(2); + const [secondUrl] = mockStreamFetch.mock.calls[1]; + expect(String(secondUrl)).toContain("https://app.example.com/api/"); }); - it("surfaces an error instead of retrying forever when run-state fetch keeps failing after a clean stream end", async () => { + it("stream-end still stops the watcher in legacy mode", async () => { vi.useFakeTimers(); - const updates: unknown[] = []; - service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + // Old server detected via 404, yet the stream delivers stream-end anyway (e.g. a + // server upgraded mid-watch). The sentinel is honored in both modes. + mockStreamTokenFetch.mockImplementation(() => + Promise.resolve(createJsonResponse({ detail: "Not found" }, 404)), + ); - // Bootstrap succeeds (run + empty backlog); every subsequent run-state - // fetch returns 500 (a non-fatal status -> fetchTaskRun resolves null). - mockNetFetch - .mockResolvedValueOnce( + let runFetchCount = 0; + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } + runFetchCount += 1; + // Bootstrap sees an active run so the stream actually opens; the + // stream-end stop path then repairs the status to terminal. + const terminal = runFetchCount >= 2; + return Promise.resolve( createJsonResponse({ id: "run-1", - status: "in_progress", + status: terminal ? "completed" : "in_progress", stage: null, output: null, error_message: null, branch: "main", - updated_at: "2026-01-01T00:00:00Z", - }), - ) // bootstrap: fetchTaskRun - .mockResolvedValueOnce( - createJsonResponse([], 200, { "X-Has-More": "false" }), - ) // bootstrap: fetchSessionLogs - .mockImplementation(() => - Promise.resolve(createJsonResponse({ detail: "boom" }, 500)), - ); - - // First connection is held open so bootstrap can finish; the test then - // closes it cleanly. Every later connection ends cleanly on its own, so the - // only thing that can fail is the post-stream run-state fetch (500). - let streamCall = 0; - const firstControllerRef: { - current: ReadableStreamDefaultController | null; - } = { current: null }; - mockStreamFetch.mockImplementation(() => { - streamCall += 1; - const stream = new ReadableStream({ - start(controller) { - if (streamCall === 1) { - firstControllerRef.current = controller; - } else { - controller.close(); - } - }, - }); - return Promise.resolve( - new Response(stream, { - status: 200, - headers: { "Content-Type": "text/event-stream" }, + updated_at: terminal + ? "2026-01-01T00:00:05Z" + : "2026-01-01T00:00:00Z", }), ); }); + mockStreamFetch.mockImplementation(() => + Promise.resolve(createSseResponse("event: stream-end\ndata: {}\n\n")), + ); + service.watch({ taskId: "task-1", runId: "run-1", @@ -1379,192 +1285,125 @@ describe("CloudTaskService", () => { teamId: 2, }); - // Wait for bootstrap to emit its snapshot and hold the live connection open. - await waitFor( - () => - !!firstControllerRef.current && - updates.some( - (u) => - typeof u === "object" && - u !== null && - (u as { kind?: string }).kind === "snapshot", - ), - ); + const hasWatcher = (): boolean => + (service as unknown as { watchers: Map }).watchers.has( + "task-1:run-1", + ); + await waitFor(() => !hasWatcher(), 10_000); - // Close the live stream cleanly: each clean end now fetches run state, which - // 500s. The reconnect must charge the budget so it eventually gives up. - firstControllerRef.current?.close(); + expect(mockStreamFetch.mock.calls.length).toBe(1); + }); - // Budget is 5 attempts (2s + 4s + 8s + 16s + 30s + 30s of backoff). - await vi.advanceTimersByTimeAsync(120_000); - await waitFor( - () => - updates.some( - (u) => - typeof u === "object" && - u !== null && - (u as { kind?: string }).kind === "error", - ), - 20_000, + it("re-bootstraps once on a clean-EOF loop and fails when it persists", async () => { + vi.useFakeTimers(); + + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + const makeInProgressRun = () => + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }); + + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } + return Promise.resolve(makeInProgressRun()); + }); + + mockStreamFetch.mockImplementation(() => + Promise.resolve(createSseResponse("")), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await waitFor(() => mockStreamFetch.mock.calls.length === 1); + await vi.advanceTimersByTimeAsync(60 * 60_000); + + await waitFor( + () => + updates.some( + (u) => + typeof u === "object" && + u !== null && + (u as { kind?: string }).kind === "error", + ), + 10_000, ); expect(updates).toContainEqual({ taskId: "task-1", runId: "run-1", kind: "error", - errorTitle: "Cloud run state unavailable", + errorTitle: "Cloud run unreachable", errorMessage: - "Could not fetch the latest cloud run state after the stream ended. Retry to reconnect.", + "Could not maintain a connection to the cloud run after many attempts. Click retry once the issue is resolved.", retryable: true, }); - }); - - const guardedFetchStatusExpectations = [ - [ - 401, - { - errorTitle: "Cloud authentication expired", - errorMessage: "Please reauthenticate and retry the cloud run stream.", - retryable: true, - }, - ], - [ - 403, - { - errorTitle: "Cloud access denied", - errorMessage: - "You no longer have access to this cloud run. Reauthenticate and retry.", - retryable: true, - }, - ], - [ - 404, - { - errorTitle: "Cloud run not found", - errorMessage: - "This cloud run could not be found. It may have been deleted or moved.", - retryable: false, - }, - ], - ] as const; - - const guardedFetchStatusCases = ( - ["status fetch", "persisted log fetch"] as const - ).flatMap((fetchPhase) => - guardedFetchStatusExpectations.map(([status, expectedError]) => ({ - fetchPhase, - status, - expectedError, - })), - ); - - it.each(guardedFetchStatusCases)( - "fails the watcher when $fetchPhase returns $status", - async ({ fetchPhase, status, expectedError }) => { - const updates: unknown[] = []; - service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); - - if (fetchPhase === "status fetch") { - mockNetFetch.mockResolvedValueOnce( - createJsonResponse({ detail: "Access denied" }, status), - ); - } else { - mockNetFetch - .mockResolvedValueOnce( - createJsonResponse({ - id: "run-1", - status: "completed", - stage: null, - output: null, - error_message: null, - branch: "main", - updated_at: "2026-01-01T00:00:00Z", - completed_at: "2026-01-01T00:00:01Z", - }), - ) - .mockResolvedValueOnce( - createJsonResponse({ detail: "Access denied" }, status), - ); - } - service.watch({ - taskId: "task-1", - runId: "run-1", - apiHost: "https://app.example.com", - teamId: 2, - }); - - await waitFor(() => updates.length === 1); + // The first budget exhaustion self-heals with a full rebuild (fresh read-leg + // resolution and a second snapshot); only the second exhaustion fails. + expect( + updates.filter( + (u) => + typeof u === "object" && + u !== null && + (u as { kind?: string }).kind === "snapshot", + ), + ).toHaveLength(2); + expect(mockStreamTokenFetch.mock.calls.length).toBe(2); + }); - expect(mockStreamFetch).not.toHaveBeenCalled(); - expect(updates).toContainEqual({ - taskId: "task-1", - runId: "run-1", - kind: "error", - ...expectedError, - }); - }, - ); + it("retry rebuilds the watcher from scratch after a failure", async () => { + vi.useFakeTimers(); - it("loads paginated persisted logs once for an already terminal run", async () => { const updates: unknown[] = []; service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); - mockNetFetch - .mockResolvedValueOnce( + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } + return Promise.resolve( createJsonResponse({ id: "run-1", - status: "completed", - stage: "build", + status: "in_progress", + stage: null, output: null, error_message: null, branch: "main", updated_at: "2026-01-01T00:00:00Z", - completed_at: "2026-01-01T00:00:00Z", }), - ) - .mockResolvedValueOnce( - createJsonResponse( - [ - { - type: "notification", - timestamp: "2026-01-01T00:00:01Z", - notification: { - jsonrpc: "2.0", - method: "_posthog/console", - params: { - sessionId: "run-1", - level: "info", - message: "done-1", - }, - }, - }, - ], - 200, - { "X-Has-More": "true" }, - ), - ) - .mockResolvedValueOnce( - createJsonResponse( - [ - { - type: "notification", - timestamp: "2026-01-01T00:00:02Z", - notification: { - jsonrpc: "2.0", - method: "_posthog/console", - params: { - sessionId: "run-1", - level: "info", - message: "done-2", - }, - }, - }, - ], - 200, - { "X-Has-More": "false" }, - ), ); + }); + + // Every connection records a resume position, then dies on a backend error + // frame until the error budget fails the watcher. + mockStreamFetch.mockImplementation(() => + Promise.resolve( + createSseResponse( + 'id: 42\nevent: keepalive\ndata: {"type":"keepalive"}\n\nevent: error\ndata: {"error":"boom"}\n\n', + ), + ), + ); service.watch({ taskId: "task-1", @@ -1573,49 +1412,1362 @@ describe("CloudTaskService", () => { teamId: 2, }); - await waitFor(() => updates.length >= 1); + await waitFor(() => mockStreamFetch.mock.calls.length === 1); + await vi.advanceTimersByTimeAsync(70_000); + await waitFor( + () => + updates.some( + (u) => + typeof u === "object" && + u !== null && + (u as { kind?: string }).kind === "error", + ), + 10_000, + ); - expect(updates).toEqual([ - { - taskId: "task-1", - runId: "run-1", - kind: "snapshot", - newEntries: [ - { - type: "notification", - timestamp: "2026-01-01T00:00:01Z", - notification: { - jsonrpc: "2.0", - method: "_posthog/console", - params: { - sessionId: "run-1", - level: "info", - message: "done-1", - }, - }, - }, - { - type: "notification", - timestamp: "2026-01-01T00:00:02Z", - notification: { - jsonrpc: "2.0", - method: "_posthog/console", - params: { - sessionId: "run-1", - level: "info", - message: "done-2", - }, - }, - }, - ], - totalEntryCount: 2, - status: "completed", - stage: "build", + const countSnapshots = (): number => + updates.filter( + (u) => + typeof u === "object" && + u !== null && + (u as { kind?: string }).kind === "snapshot", + ).length; + const tokenCallsBeforeRetry = mockStreamTokenFetch.mock.calls.length; + const snapshotsBeforeRetry = countSnapshots(); + const streamCallsBeforeRetry = mockStreamFetch.mock.calls.length; + + mockStreamFetch.mockImplementation(() => + Promise.resolve( + createOpenSseResponse( + 'event: keepalive\ndata: {"type":"keepalive"}\n\n', + ), + ), + ); + + service.retry("task-1", "run-1"); + + await waitFor( + () => mockStreamFetch.mock.calls.length === streamCallsBeforeRetry + 1, + ); + await waitFor(() => countSnapshots() === snapshotsBeforeRetry + 1); + + // Retry must rebuild from server truth: re-resolve the read leg, re-emit a + // fresh snapshot and drop the poisoned resume position instead of resuming + // from the failed stream's Last-Event-ID. + expect(mockStreamTokenFetch.mock.calls.length).toBe( + tokenCallsBeforeRetry + 1, + ); + const [, init] = mockStreamFetch.mock.calls[streamCallsBeforeRetry]; + expect( + (init?.headers as Record)?.["Last-Event-ID"], + ).toBeUndefined(); + }); + + it("emits a retryable cloud error after repeated stream failures", async () => { + vi.useFakeTimers(); + + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + const makeInProgressRun = () => + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: null, output: null, - errorMessage: null, + error_message: null, branch: "main", - }, - ]); - expect(mockNetFetch).toHaveBeenCalledTimes(3); + updated_at: "2026-01-01T00:00:00Z", + }); + + mockNetFetch + .mockResolvedValueOnce(makeInProgressRun()) // bootstrap: fetchTaskRun + .mockResolvedValueOnce( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ) // bootstrap: fetchSessionLogs + // Each stream error triggers handleStreamCompletion → fetchTaskRun + .mockImplementation(() => Promise.resolve(makeInProgressRun())); + + mockStreamFetch.mockImplementation(() => + Promise.resolve( + createSseResponse( + 'event: keepalive\ndata: {"type":"keepalive"}\n\nevent: error\ndata: {"error":"boom"}\n\n', + ), + ), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await waitFor(() => mockStreamFetch.mock.calls.length === 1); + await vi.advanceTimersByTimeAsync(70_000); + await waitFor( + () => + updates.some( + (u) => + typeof u === "object" && + u !== null && + (u as { kind?: string }).kind === "error", + ), + 10_000, + ); + + expect(mockStreamFetch.mock.calls.length).toBe(10); + // Status is no longer polled per reconnect. Only the 2 bootstrap calls plus the single + // post-bootstrap verification touch the status endpoint; reconnects never do. + expect(mockNetFetch.mock.calls.length).toBeLessThanOrEqual(3); + expect(updates).toContainEqual({ + taskId: "task-1", + runId: "run-1", + kind: "error", + errorTitle: "Cloud stream disconnected", + errorMessage: + "Lost connection to the cloud run stream. Retry to reconnect.", + retryable: true, + }); + }); + + it("clears the backend-error budget after a healthy long-lived cut", async () => { + vi.useFakeTimers(); + + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + const makeInProgressRun = () => + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }); + + mockNetFetch + .mockResolvedValueOnce(makeInProgressRun()) // bootstrap: fetchTaskRun + .mockResolvedValueOnce( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ) // bootstrap: fetchSessionLogs + .mockImplementation(() => Promise.resolve(makeInProgressRun())); + + // First connection delivers an explicit backend error frame (accruing the + // backend-error budget). Subsequent connections are healthy long-lived cuts + // (>= SSE_HEALTHY_CONNECTION_MS): each proves the stream recovered and must + // clear the backend-error budget, so it never accumulates for the run's life. + let streamCall = 0; + mockStreamFetch.mockImplementation(() => { + streamCall += 1; + if (streamCall === 1) { + return Promise.resolve( + createSseResponse('event: error\ndata: {"error":"boom"}\n\n'), + ); + } + const encoder = new TextEncoder(); + const stream = new ReadableStream({ + start(controller) { + controller.enqueue( + encoder.encode('event: keepalive\ndata: {"type":"keepalive"}\n\n'), + ); + setTimeout(() => controller.error(new Error("terminated")), 65_000); + }, + }); + return Promise.resolve( + new Response(stream, { + status: 200, + headers: { "Content-Type": "text/event-stream" }, + }), + ); + }); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + const getWatcher = () => + ( + service as unknown as { + watchers: Map< + string, + { + reconnectAttempts: number; + streamErrorAttempts: number; + failed: boolean; + } + >; + } + ).watchers.get("task-1:run-1"); + + // The backend error must have accrued the backend-error budget first... + await waitFor(() => (getWatcher()?.streamErrorAttempts ?? 0) >= 1, 20_000); + // ...then the healthy long-lived cut on the next connection clears it. + await vi.advanceTimersByTimeAsync(67_000 * 2); + await waitFor(() => getWatcher()?.streamErrorAttempts === 0, 20_000); + + const watcher = getWatcher(); + expect(watcher?.failed).toBe(false); + expect(watcher?.streamErrorAttempts).toBe(0); + expect(watcher?.reconnectAttempts).toBe(0); + expect( + updates.some( + (u) => + typeof u === "object" && + u !== null && + (u as { kind?: string }).kind === "error", + ), + ).toBe(false); + }); + + it("counts quick stream failures and surfaces a retryable error", async () => { + vi.useFakeTimers(); + + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + const makeInProgressRun = () => + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }); + + mockNetFetch + .mockResolvedValueOnce(makeInProgressRun()) + .mockResolvedValueOnce( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ) + .mockImplementation(() => Promise.resolve(makeInProgressRun())); + + // Connections that fail immediately (under SSE_HEALTHY_CONNECTION_MS) are + // genuine churn and must keep counting toward the retry budget. + mockStreamFetch.mockImplementation(() => + Promise.resolve( + createSseResponse('event: error\ndata: {"error":"boom"}\n\n'), + ), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await waitFor(() => mockStreamFetch.mock.calls.length === 1); + await vi.advanceTimersByTimeAsync(70_000); + await waitFor( + () => + updates.some( + (u) => + typeof u === "object" && + u !== null && + (u as { kind?: string }).kind === "error", + ), + 10_000, + ); + + expect(updates).toContainEqual({ + taskId: "task-1", + runId: "run-1", + kind: "error", + errorTitle: "Cloud stream disconnected", + errorMessage: + "Lost connection to the cloud run stream. Retry to reconnect.", + retryable: true, + }); + }); + + it("reconnects on a clean EOF even after the run status goes terminal (status-unaware)", async () => { + vi.useFakeTimers(); + + // Bootstrap sees an active run (so it streams); every later status fetch reports terminal. + // Pre-decoupling, a clean EOF on a terminal run stopped the watch. Now run status is never + // consulted to decide reconnects, so the clean EOFs keep reconnecting until stream-end. + let statusFetchCount = 0; + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } + statusFetchCount += 1; + return Promise.resolve( + createJsonResponse({ + id: "run-1", + status: statusFetchCount === 1 ? "in_progress" : "completed", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: + statusFetchCount === 1 + ? "2026-01-01T00:00:00Z" + : "2026-01-01T00:00:01Z", + }), + ); + }); + + mockStreamFetch.mockImplementation(() => + Promise.resolve(createSseResponse("")), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await waitFor(() => mockStreamFetch.mock.calls.length === 1); + await waitFor(() => mockStreamFetch.mock.calls.length >= 3, 20_000); + + // Terminal status did not stop the watch; the watcher is still reconnecting. + expect( + (service as unknown as { watchers: Map }).watchers.has( + "task-1:run-1", + ), + ).toBe(true); + }); + + it("surfaces a retryable error when the backend errors even on a long-lived stream", async () => { + vi.useFakeTimers(); + + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + const makeInProgressRun = () => + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }); + + mockNetFetch + .mockResolvedValueOnce(makeInProgressRun()) + .mockResolvedValueOnce( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ) + .mockImplementation(() => Promise.resolve(makeInProgressRun())); + + // Each connection stays open with a keepalive for 65s (> the healthy + // threshold) and only THEN emits an explicit backend `event: error` frame. + // An explicit backend error must always count toward the budget, so even a + // long-lived stream eventually surfaces the retryable disconnect error. + mockStreamFetch.mockImplementation(() => { + const encoder = new TextEncoder(); + const stream = new ReadableStream({ + start(controller) { + controller.enqueue( + encoder.encode('event: keepalive\ndata: {"type":"keepalive"}\n\n'), + ); + setTimeout(() => { + controller.enqueue( + encoder.encode('event: error\ndata: {"error":"boom"}\n\n'), + ); + controller.close(); + }, 65_000); + }, + }); + return Promise.resolve( + new Response(stream, { + status: 200, + headers: { "Content-Type": "text/event-stream" }, + }), + ); + }); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await waitFor(() => mockStreamFetch.mock.calls.length === 1); + // Drive >= 10 long-lived-then-backend-error cycles (65s open + backoff each). + await vi.advanceTimersByTimeAsync(65_000 * 11 + 70_000); + await waitFor( + () => + updates.some( + (u) => + typeof u === "object" && + u !== null && + (u as { kind?: string }).kind === "error", + ), + 10_000, + ); + + expect(updates).toContainEqual({ + taskId: "task-1", + runId: "run-1", + kind: "error", + errorTitle: "Cloud stream disconnected", + errorMessage: + "Lost connection to the cloud run stream. Retry to reconnect.", + retryable: true, + }); + }); + + it("treats a long-lived transport cut as healthy even with no frames received", async () => { + vi.useFakeTimers(); + + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + const makeInProgressRun = () => + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }); + + mockNetFetch + .mockResolvedValueOnce(makeInProgressRun()) + .mockResolvedValueOnce( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ) + .mockImplementation(() => Promise.resolve(makeInProgressRun())); + + // Each connection opens, delivers nothing, then is transport-cut at 65s. Healthiness is + // duration-only (not keepalive frames), so even a frame-less long-lived cut never exhausts the budget. + mockStreamFetch.mockImplementation(() => { + const stream = new ReadableStream({ + start(controller) { + setTimeout(() => controller.error(new Error("terminated")), 65_000); + }, + }); + return Promise.resolve( + new Response(stream, { + status: 200, + headers: { "Content-Type": "text/event-stream" }, + }), + ); + }); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await waitFor(() => mockStreamFetch.mock.calls.length === 1); + await vi.advanceTimersByTimeAsync(67_000 * 8); + await waitFor(() => mockStreamFetch.mock.calls.length >= 6, 20_000); + + expect( + updates.some( + (u) => + typeof u === "object" && + u !== null && + (u as { kind?: string }).kind === "error", + ), + ).toBe(false); + + const watcher = ( + service as unknown as { + watchers: Map; + } + ).watchers.get("task-1:run-1"); + expect(watcher?.failed).toBe(false); + expect(watcher?.reconnectAttempts).toBe(0); + }); + + it("never fails an idle run riding healthy clean-EOF reconnect cycles", async () => { + vi.useFakeTimers(); + + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } + return Promise.resolve( + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }), + ); + }); + + // An idle run: every connection delivers only a keepalive, lives a healthy + // 65s, then the server closes it cleanly. No data events ever arrive, so + // nothing else resets the cumulative budget across the cycles. + mockStreamFetch.mockImplementation(() => { + const encoder = new TextEncoder(); + const stream = new ReadableStream({ + start(controller) { + controller.enqueue( + encoder.encode('event: keepalive\ndata: {"type":"keepalive"}\n\n'), + ); + setTimeout(() => controller.close(), 65_000); + }, + }); + return Promise.resolve( + new Response(stream, { + status: 200, + headers: { "Content-Type": "text/event-stream" }, + }), + ); + }); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await waitFor(() => mockStreamFetch.mock.calls.length === 1); + // Ride out more cycles than the cumulative budget allows for loops. + await vi.advanceTimersByTimeAsync(67_000 * 35); + await waitFor(() => mockStreamFetch.mock.calls.length >= 32, 20_000); + + expect( + updates.some( + (u) => + typeof u === "object" && + u !== null && + (u as { kind?: string }).kind === "error", + ), + ).toBe(false); + expect( + (service as unknown as { watchers: Map }).watchers.has( + "task-1:run-1", + ), + ).toBe(true); + }); + + it("resets the transport reconnect budget once a keepalive proves recovery", async () => { + vi.useFakeTimers(); + + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + const makeInProgressRun = () => + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }); + + mockNetFetch + .mockResolvedValueOnce(makeInProgressRun()) + .mockResolvedValueOnce( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ) + .mockImplementation(() => Promise.resolve(makeInProgressRun())); + + // First 3 connections fail fast at the transport level and accrue reconnect attempts. The 4th + // delivers a keepalive and stays open, proving recovery, so the accrued attempts must reset. + let streamCall = 0; + const keepaliveControllerRef: { + current: ReadableStreamDefaultController | null; + } = { current: null }; + const encoder = new TextEncoder(); + mockStreamFetch.mockImplementation(() => { + streamCall += 1; + if (streamCall <= 3) { + const stream = new ReadableStream({ + start(controller) { + controller.error(new Error("terminated")); + }, + }); + return Promise.resolve( + new Response(stream, { + status: 200, + headers: { "Content-Type": "text/event-stream" }, + }), + ); + } + // 4th connection stays open with no frame; the test injects the keepalive + // below so it can observe the accrued budget BEFORE the reset. + const stream = new ReadableStream({ + start(controller) { + keepaliveControllerRef.current = controller; + }, + }); + return Promise.resolve( + new Response(stream, { + status: 200, + headers: { "Content-Type": "text/event-stream" }, + }), + ); + }); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + const getWatcher = () => + ( + service as unknown as { + watchers: Map; + } + ).watchers.get("task-1:run-1"); + + await waitFor(() => mockStreamFetch.mock.calls.length === 1); + // Drive the 3 fast transport failures and open the held 4th connection. + await vi.advanceTimersByTimeAsync(30_000); + await waitFor( + () => streamCall >= 4 && !!keepaliveControllerRef.current, + 20_000, + ); + + // Non-vacuous precondition: the fast failures actually accrued the budget. + expect(getWatcher()?.reconnectAttempts ?? 0).toBeGreaterThan(0); + + // A keepalive on the recovered connection must reset the transport budget. + keepaliveControllerRef.current?.enqueue( + encoder.encode('event: keepalive\ndata: {"type":"keepalive"}\n\n'), + ); + await waitFor(() => getWatcher()?.reconnectAttempts === 0, 20_000); + + const watcher = getWatcher(); + expect(watcher?.failed).toBe(false); + expect(watcher?.reconnectAttempts).toBe(0); + expect( + updates.some( + (u) => + typeof u === "object" && + u !== null && + (u as { kind?: string }).kind === "error", + ), + ).toBe(false); + }); + + it("does not let a stale backend-error count inflate a transport reconnect delay", async () => { + vi.useFakeTimers(); + + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + const makeInProgressRun = () => + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }); + + mockNetFetch + .mockResolvedValueOnce(makeInProgressRun()) // bootstrap: fetchTaskRun + .mockResolvedValueOnce( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ) // bootstrap: fetchSessionLogs + .mockImplementation(() => Promise.resolve(makeInProgressRun())); + + // Connections 1-4 each emit a backend error frame, pacing reconnects on streamErrorAttempts. + // Connection 5 is held open until a quick transport cut, which must pace on the transport budget + // (1 -> ~2s), not the stale backend-error budget (4 -> ~16s). Math.max(both) would use the latter. + let streamCall = 0; + const transportControllerRef: { + current: ReadableStreamDefaultController | null; + } = { current: null }; + mockStreamFetch.mockImplementation(() => { + streamCall += 1; + if (streamCall <= 4) { + return Promise.resolve( + createSseResponse('event: error\ndata: {"error":"boom"}\n\n'), + ); + } + const stream = new ReadableStream({ + start(controller) { + if (streamCall === 5) { + transportControllerRef.current = controller; + } + }, + }); + return Promise.resolve( + new Response(stream, { + status: 200, + headers: { "Content-Type": "text/event-stream" }, + }), + ); + }); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + const getWatcher = () => + ( + service as unknown as { + watchers: Map< + string, + { + reconnectAttempts: number; + streamErrorAttempts: number; + failed: boolean; + } + >; + } + ).watchers.get("task-1:run-1"); + + await waitFor(() => mockStreamFetch.mock.calls.length === 1); + // Drive the four backend-error reconnects (2s + 4s + 8s + 16s of backoff) + // and open the held fifth connection. + await vi.advanceTimersByTimeAsync(35_000); + await waitFor( + () => streamCall >= 5 && !!transportControllerRef.current, + 20_000, + ); + + // Non-vacuous precondition: the backend-error budget is stale-high while the + // transport budget is still zero. + expect(getWatcher()?.streamErrorAttempts).toBe(4); + expect(getWatcher()?.reconnectAttempts).toBe(0); + expect(getWatcher()?.failed).toBe(false); + + // A quick transport cut on the open fifth connection charges ONE transport + // attempt; its reconnect must wait ~2s (transport budget), not ~16s. + transportControllerRef.current?.error(new Error("terminated")); + await waitFor(() => getWatcher()?.reconnectAttempts === 1, 20_000); + expect(getWatcher()?.streamErrorAttempts).toBe(4); + + const callsBeforeProbe = mockStreamFetch.mock.calls.length; + // 5s is past the fixed ~2s transport backoff but well short of the buggy + // ~16s backend-error backoff, so the sixth connection only opens if the + // delay was paced on the transport budget. + await vi.advanceTimersByTimeAsync(5_000); + expect(mockStreamFetch.mock.calls.length).toBe(callsBeforeProbe + 1); + expect(getWatcher()?.failed).toBe(false); + }); + + it("does not poll run status per reconnect on clean EOFs (status-unaware)", async () => { + vi.useFakeTimers(); + + let statusFetchCount = 0; + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } + statusFetchCount += 1; + return Promise.resolve( + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }), + ); + }); + + // Every connection ends cleanly with no stream-end sentinel, forcing reconnect after reconnect. + mockStreamFetch.mockImplementation(() => + Promise.resolve(createSseResponse("")), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await waitFor(() => mockStreamFetch.mock.calls.length >= 5, 20_000); + + // Bootstrap fetches status once and the post-bootstrap verification once more; reconnects add + // none. Pre-decoupling, every clean EOF polled status, so this count would climb with reconnects. + expect(statusFetchCount).toBeLessThanOrEqual(2); + }); + + const guardedFetchStatusExpectations = [ + [ + 401, + { + errorTitle: "Cloud authentication expired", + errorMessage: "Please reauthenticate and retry the cloud run stream.", + retryable: true, + }, + ], + [ + 403, + { + errorTitle: "Cloud access denied", + errorMessage: + "You no longer have access to this cloud run. Reauthenticate and retry.", + retryable: true, + }, + ], + [ + 404, + { + errorTitle: "Cloud run not found", + errorMessage: + "This cloud run could not be found. It may have been deleted or moved.", + retryable: false, + }, + ], + ] as const; + + const guardedFetchStatusCases = ( + ["status fetch", "persisted log fetch"] as const + ).flatMap((fetchPhase) => + guardedFetchStatusExpectations.map(([status, expectedError]) => ({ + fetchPhase, + status, + expectedError, + })), + ); + + it.each(guardedFetchStatusCases)( + "fails the watcher when $fetchPhase returns $status", + async ({ fetchPhase, status, expectedError }) => { + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + if (fetchPhase === "status fetch") { + mockNetFetch.mockResolvedValueOnce( + createJsonResponse({ detail: "Access denied" }, status), + ); + } else { + mockNetFetch + .mockResolvedValueOnce( + createJsonResponse({ + id: "run-1", + status: "completed", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + completed_at: "2026-01-01T00:00:01Z", + }), + ) + .mockResolvedValueOnce( + createJsonResponse({ detail: "Access denied" }, status), + ); + } + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await waitFor(() => updates.length === 1); + + expect(mockStreamFetch).not.toHaveBeenCalled(); + expect(updates).toContainEqual({ + taskId: "task-1", + runId: "run-1", + kind: "error", + ...expectedError, + }); + }, + ); + + it("loads paginated persisted logs once for an already terminal run", async () => { + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + mockNetFetch + .mockResolvedValueOnce( + createJsonResponse({ + id: "run-1", + status: "completed", + stage: "build", + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + completed_at: "2026-01-01T00:00:00Z", + }), + ) + .mockResolvedValueOnce( + createJsonResponse( + [ + { + type: "notification", + timestamp: "2026-01-01T00:00:01Z", + notification: { + jsonrpc: "2.0", + method: "_posthog/console", + params: { + sessionId: "run-1", + level: "info", + message: "done-1", + }, + }, + }, + ], + 200, + { "X-Has-More": "true" }, + ), + ) + .mockResolvedValueOnce( + createJsonResponse( + [ + { + type: "notification", + timestamp: "2026-01-01T00:00:02Z", + notification: { + jsonrpc: "2.0", + method: "_posthog/console", + params: { + sessionId: "run-1", + level: "info", + message: "done-2", + }, + }, + }, + ], + 200, + { "X-Has-More": "false" }, + ), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await waitFor(() => updates.length >= 1); + + expect(updates).toEqual([ + { + taskId: "task-1", + runId: "run-1", + kind: "snapshot", + newEntries: [ + { + type: "notification", + timestamp: "2026-01-01T00:00:01Z", + notification: { + jsonrpc: "2.0", + method: "_posthog/console", + params: { + sessionId: "run-1", + level: "info", + message: "done-1", + }, + }, + }, + { + type: "notification", + timestamp: "2026-01-01T00:00:02Z", + notification: { + jsonrpc: "2.0", + method: "_posthog/console", + params: { + sessionId: "run-1", + level: "info", + message: "done-2", + }, + }, + }, + ], + totalEntryCount: 2, + status: "completed", + stage: "build", + output: null, + errorMessage: null, + branch: "main", + }, + ]); + expect(mockNetFetch).toHaveBeenCalledTimes(3); + }); + + it("fails a Django-leg watcher on a stream 401 without re-resolving the read target", async () => { + vi.useFakeTimers(); + + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + // Default stream_token resolves with stream_base_url: null, so the read leg is Django. + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } + return Promise.resolve( + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }), + ); + }); + + // A Django-leg 401 is fatal (autoRetry: false). The proxy re-resolve path is guarded on a + // non-null streamBaseUrl, so a Django leg must fail rather than re-mint a stream_token. + mockStreamFetch.mockImplementation(() => + Promise.resolve(createJsonResponse({ detail: "expired" }, 401)), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await waitFor( + () => + updates.some( + (u) => + typeof u === "object" && + u !== null && + (u as { kind?: string }).kind === "error", + ), + 10_000, + ); + + expect(updates).toContainEqual({ + taskId: "task-1", + runId: "run-1", + kind: "error", + errorTitle: "Cloud authentication expired", + errorMessage: "Please reauthenticate and retry the cloud run stream.", + retryable: true, + }); + + // The Django leg did not re-resolve, and the fatal failure schedules no reconnect. + expect(mockStreamTokenFetch.mock.calls.length).toBe(1); + const streamCallsAtFailure = mockStreamFetch.mock.calls.length; + await vi.advanceTimersByTimeAsync(10_000); + expect(mockStreamFetch.mock.calls.length).toBe(streamCallsAtFailure); + expect(mockStreamTokenFetch.mock.calls.length).toBe(1); + }); + + it("treats a 429 from stream_token as transient and retries the read-target resolution", async () => { + vi.useFakeTimers(); + + // 429 is momentary like a 503: it must not cache a Django fallback. The next reconnect + // re-resolves and the watch upgrades to the durable proxy leg. + mockStreamTokenFetch + .mockImplementationOnce(() => + Promise.resolve(createJsonResponse({ detail: "slow down" }, 429)), + ) + .mockImplementation(() => + Promise.resolve( + createJsonResponse({ + token: "fresh-token", + stream_base_url: "https://proxy.example", + }), + ), + ); + + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } + return Promise.resolve( + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }), + ); + }); + + const usedProxyLeg = (input: string | Request): boolean => { + const url = typeof input === "string" ? input : input.url; + return url.includes("proxy.example"); + }; + mockStreamFetch.mockImplementation((input: string | Request) => + Promise.resolve( + usedProxyLeg(input) + ? createSseResponse("event: stream-end\ndata: {}\n\n") + : createSseResponse(""), + ), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + const hasWatcher = (): boolean => + (service as unknown as { watchers: Map }).watchers.has( + "task-1:run-1", + ); + await waitFor(() => !hasWatcher(), 10_000); + + // The 429 was not cached: resolution retried and the stream switched to the durable proxy leg. + expect(mockStreamTokenFetch.mock.calls.length).toBeGreaterThanOrEqual(2); + expect( + mockStreamFetch.mock.calls.some(([input]) => usedProxyLeg(input)), + ).toBe(true); + }); + + it("caches a 403 from stream_token and falls back to Django legacy polling", async () => { + vi.useFakeTimers(); + + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + // 403 is not transient: like a 404 it pins the watch to the Django leg with status polling. + mockStreamTokenFetch.mockImplementation(() => + Promise.resolve(createJsonResponse({ detail: "forbidden" }, 403)), + ); + + let runFetchCount = 0; + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } + runFetchCount += 1; + // Calls 1-3 (bootstrap, post-bootstrap verify, first legacy poll) report an active run; the + // second legacy poll reports terminal. + const terminal = runFetchCount >= 4; + return Promise.resolve( + createJsonResponse({ + id: "run-1", + status: terminal ? "completed" : "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: terminal + ? "2026-01-01T00:00:05Z" + : "2026-01-01T00:00:00Z", + }), + ); + }); + + // Legacy mode: no stream-end ever arrives, so each clean EOF triggers a status poll. + mockStreamFetch.mockImplementation(() => + Promise.resolve(createSseResponse("")), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + const hasWatcher = (): boolean => + (service as unknown as { watchers: Map }).watchers.has( + "task-1:run-1", + ); + await waitFor(() => !hasWatcher(), 10_000); + + // Every stream read used the Django leg (no proxy URL), the watcher stopped on the terminal + // poll, and the refused resolution was cached: one stream_token call for the whole watch. + expect( + mockStreamFetch.mock.calls.every(([input]) => { + const url = typeof input === "string" ? input : (input as Request).url; + const { origin, pathname } = new URL(url); + return ( + origin === "https://app.example.com" && pathname.startsWith("/api/") + ); + }), + ).toBe(true); + expect(updates).toContainEqual( + expect.objectContaining({ kind: "status", status: "completed" }), + ); + expect(mockStreamTokenFetch.mock.calls.length).toBe(1); + }); + + it("stops on the last-known status when the post-stream-end repair fetch fails", async () => { + vi.useFakeTimers(); + + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + let runFetchCount = 0; + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } + runFetchCount += 1; + // Bootstrap (call 1) reports an active run so the stream opens; the stream-end stop path's + // status-repair fetch (call 2) fails the network instead of returning a terminal run. + if (runFetchCount >= 2) { + return Promise.reject(new Error("network down")); + } + return Promise.resolve( + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: "build", + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }), + ); + }); + + mockStreamFetch.mockImplementation(() => + Promise.resolve(createSseResponse("event: stream-end\ndata: {}\n\n")), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + const hasWatcher = (): boolean => + (service as unknown as { watchers: Map }).watchers.has( + "task-1:run-1", + ); + await waitFor(() => !hasWatcher(), 10_000); + + // The failed repair fetch must not strand the watcher; it stops on the last-known status. + expect(updates).toContainEqual( + expect.objectContaining({ kind: "status", status: "in_progress" }), + ); + // Exactly one stream connection (the bootstrap stream-end); no reconnect after the clean stop. + expect(mockStreamFetch.mock.calls.length).toBe(1); + }); + + it("re-arms self-heal after a data event and re-bootstraps a second time before failing", async () => { + vi.useFakeTimers(); + + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } + return Promise.resolve( + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }), + ); + }); + + // Clean-EOF loop by default. Each re-bootstrap re-resolves the read target, so a second + // stream_token call marks the first self-heal. Deliver exactly one real data event on that + // re-bootstrap's connection to re-arm self-heal, then resume clean-EOF looping. + let dataEventDelivered = false; + mockStreamFetch.mockImplementation(() => { + if (mockStreamTokenFetch.mock.calls.length >= 2 && !dataEventDelivered) { + dataEventDelivered = true; + return Promise.resolve( + createSseResponse( + 'id: 1\ndata: {"type":"notification","timestamp":"2026-01-01T00:00:02Z","notification":{"jsonrpc":"2.0","method":"_posthog/console","params":{"sessionId":"run-1","level":"info","message":"alive"}}}\n\n', + ), + ); + } + return Promise.resolve(createSseResponse("")); + }); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await waitFor(() => mockStreamFetch.mock.calls.length === 1); + await vi.advanceTimersByTimeAsync(2 * 60 * 60_000); + + await waitFor( + () => + updates.some( + (u) => + typeof u === "object" && + u !== null && + (u as { kind?: string }).kind === "error", + ), + 10_000, + ); + + expect(updates).toContainEqual( + expect.objectContaining({ + kind: "error", + errorTitle: "Cloud run unreachable", + }), + ); + + // Initial bootstrap snapshot plus two self-heal re-bootstraps: the data event between the first + // and second budget exhaustion re-armed self-heal, so a third snapshot precedes the failure. + expect( + updates.filter( + (u) => + typeof u === "object" && + u !== null && + (u as { kind?: string }).kind === "snapshot", + ), + ).toHaveLength(3); }); }); diff --git a/packages/core/src/cloud-task/cloud-task.ts b/packages/core/src/cloud-task/cloud-task.ts index f35285a3c8..348f5e57c3 100644 --- a/packages/core/src/cloud-task/cloud-task.ts +++ b/packages/core/src/cloud-task/cloud-task.ts @@ -24,15 +24,22 @@ import { } from "./schemas"; import { type SseEvent, SseEventParser } from "./sse-parser"; -const MAX_SSE_RECONNECT_ATTEMPTS = 5; +// Reconnect backoff: flat base delay for the first SSE_RECONNECT_FLAT_ATTEMPTS attempts, then +// exponential up to the cap (0.5, 0.5, 0.5, 1, 2, 4, 8, 16, 30s), spanning ~60s before giving up. +const MAX_SSE_RECONNECT_ATTEMPTS = 9; const MAX_CUMULATIVE_RECONNECT_ATTEMPTS = 30; -const SSE_RECONNECT_BASE_DELAY_MS = 2_000; +const SSE_RECONNECT_BASE_DELAY_MS = 500; +const SSE_RECONNECT_FLAT_ATTEMPTS = 3; const SSE_RECONNECT_MAX_DELAY_MS = 30_000; const SSE_HEALTHY_CONNECTION_MS = 60_000; const EVENT_BATCH_FLUSH_MS = 16; const EVENT_BATCH_MAX_SIZE = 50; const SESSION_LOG_PAGE_LIMIT = 5_000; +// Authoritative end-of-stream sentinel, matched on the SSE event name (event.event, not data.type). +// The client stops on it without consulting run status. +const STREAM_END_EVENT_NAME = "stream-end"; + interface SessionLogsPage { entries: StoredLogEntry[]; hasMore: boolean; @@ -85,6 +92,9 @@ interface TaskRunStateEvent { completed_at?: string | null; } +// Which endpoint a connection reads from. Event ids are only meaningful within their issuing leg. +type StreamLeg = "proxy" | "django"; + interface WatcherState { taskId: string; runId: string; @@ -104,6 +114,9 @@ interface WatcherState { streamErrorAttempts: number; cumulativeReconnectAttempts: number; lastEventId: string | null; + // Leg that issued lastEventId, and the leg of the connection currently being read. + lastEventIdLeg: StreamLeg | null; + streamLeg: StreamLeg | null; lastStatus: TaskRunStatus | null; lastStage: string | null; lastOutput: Record | null; @@ -116,10 +129,22 @@ interface WatcherState { isBootstrapping: boolean; hasEmittedSnapshot: boolean; bufferedLogBatches: StoredLogEntry[][]; + // Live entries emitted since the last snapshot, retained so a re-subscribe snapshot can reconcile + // entries the server has not persisted yet. emitCurrentSnapshot trims this to the still-missing + // set; with no re-subscribe it holds the run's emitted entries until the watch ends. emittedLogEntries: StoredLogEntry[]; failed: boolean; needsPostBootstrapReconnect: boolean; needsStopAfterBootstrap: boolean; + streamEnded: boolean; + // Consumes one automatic re-bootstrap recovery; re-armed by a data event or healthy connection. + selfHealAttempted: boolean; + // Both streamBaseUrl and streamReadToken non-null => read via the agent-proxy; either null => Django. + streamTargetResolved: boolean; + streamBaseUrl: string | null; + streamReadToken: string | null; + // True once stream_token resolved. False for old servers (404), which fall back to status polling. + durableStreamEnabled: boolean; } function watcherKey(taskId: string, runId: string): string { @@ -242,6 +267,42 @@ function shouldFailWatcherForFetchStatus(status: number): boolean { return status === 401 || status === 403 || status === 404; } +// 5xx and 429 are momentary: the stream-token endpoint exists but is briefly unavailable, so the +// target stays unresolved and the next reconnect retries instead of caching a Django fallback. +function isTransientStreamTargetStatus(status: number): boolean { + return status >= 500 || status === 429; +} + +// Content-based frequency map keyed by the serialized entry. SSE ids are absent from persisted +// (historical) entries, so the payload itself is the identity used to dedup live against historical. +function buildEntryFrequencyMap( + entries: StoredLogEntry[], +): Map { + const counts = new Map(); + for (const entry of entries) { + const serialized = JSON.stringify(entry); + counts.set(serialized, (counts.get(serialized) ?? 0) + 1); + } + return counts; +} + +// Keeps only entries absent from counts, consuming one occurrence per match so a payload present N +// times in the reference set is suppressed at most N times. Mutates counts. +function filterEntriesNotInFrequencyMap( + entries: StoredLogEntry[], + counts: Map, +): StoredLogEntry[] { + return entries.filter((entry) => { + const serialized = JSON.stringify(entry); + const remaining = counts.get(serialized) ?? 0; + if (remaining <= 0) { + return true; + } + counts.set(serialized, remaining - 1); + return false; + }); +} + @injectable() export class CloudTaskService extends TypedEventEmitter { private watchers = new Map(); @@ -312,6 +373,19 @@ export class CloudTaskService extends TypedEventEmitter { watcher.batchFlushTimeoutId = null; } + this.log.info("Retrying cloud task watcher", { + key, + hasSnapshot: watcher.hasEmittedSnapshot, + }); + + // Start over from scratch: a poisoned resume position loops straight back into the same + // failure, so re-bootstrap to re-resolve the read leg and emit a fresh snapshot. + this.resetWatcherForRebootstrap(watcher); + void this.bootstrapWatcher(key); + } + + // Resets a watcher to its pre-bootstrap state so bootstrapWatcher can rebuild it from server truth. + private resetWatcherForRebootstrap(watcher: WatcherState): void { watcher.reconnectAttempts = 0; watcher.streamErrorAttempts = 0; watcher.cumulativeReconnectAttempts = 0; @@ -320,21 +394,17 @@ export class CloudTaskService extends TypedEventEmitter { watcher.bufferedLogBatches = []; watcher.needsPostBootstrapReconnect = false; watcher.needsStopAfterBootstrap = false; - - this.log.info("Retrying cloud task watcher", { - key, - hasSnapshot: watcher.hasEmittedSnapshot, - }); - - if (!watcher.hasEmittedSnapshot) { - watcher.lastEventId = null; - watcher.totalEntryCount = 0; - watcher.isBootstrapping = false; - void this.bootstrapWatcher(key); - return; - } - - void this.connectSse(key, { startLatest: !watcher.lastEventId }); + watcher.streamEnded = false; + watcher.selfHealAttempted = false; + watcher.lastEventId = null; + watcher.lastEventIdLeg = null; + watcher.streamLeg = null; + watcher.totalEntryCount = 0; + watcher.isBootstrapping = false; + watcher.streamTargetResolved = false; + watcher.streamBaseUrl = null; + watcher.streamReadToken = null; + watcher.durableStreamEnabled = false; } async sendCommand(input: SendCommandInput): Promise { @@ -444,6 +514,8 @@ export class CloudTaskService extends TypedEventEmitter { streamErrorAttempts: 0, cumulativeReconnectAttempts: 0, lastEventId: null, + lastEventIdLeg: null, + streamLeg: null, lastStatus: null, lastStage: null, lastOutput: null, @@ -460,6 +532,12 @@ export class CloudTaskService extends TypedEventEmitter { failed: false, needsPostBootstrapReconnect: false, needsStopAfterBootstrap: false, + streamEnded: false, + selfHealAttempted: false, + streamTargetResolved: false, + streamBaseUrl: null, + streamReadToken: null, + durableStreamEnabled: false, }; this.watchers.set(key, watcher); @@ -600,12 +678,9 @@ export class CloudTaskService extends TypedEventEmitter { return; } - if ( - watcher.needsStopAfterBootstrap || - isTerminalStatus(watcher.lastStatus) - ) { + if (watcher.needsStopAfterBootstrap) { watcher.needsStopAfterBootstrap = false; - this.stopWatcher(key); + await this.finalizeWatcherStop(key); return; } @@ -630,6 +705,10 @@ export class CloudTaskService extends TypedEventEmitter { if (!this.applyTaskRunState(watcher, run)) return; if (isTerminalStatus(watcher.lastStatus)) return; + this.emitStatusUpdate(watcher); + } + + private emitStatusUpdate(watcher: WatcherState): void { this.emit(CloudTaskEvent.Update, { taskId: watcher.taskId, runId: watcher.runId, @@ -653,12 +732,50 @@ export class CloudTaskService extends TypedEventEmitter { watcher.sseAbortController = controller; watcher.connStartedAt = 0; - watcher.connSentLastEventId = watcher.lastEventId; watcher.connDataEventsReceived = 0; - const startLatest = Boolean(options?.startLatest && !watcher.lastEventId); + // Resolve the read target once (proxy URL + token, or Django), reused across reconnects. + if (!watcher.streamTargetResolved) { + await this.resolveStreamTarget(watcher); + const resolvedWatcher = this.watchers.get(key); + if ( + !resolvedWatcher || + resolvedWatcher !== watcher || + controller.signal.aborted + ) { + return; + } + } + + const usingProxy = Boolean( + watcher.streamBaseUrl && watcher.streamReadToken, + ); + const base = usingProxy + ? watcher.streamBaseUrl?.replace(/\/+$/, "") + : watcher.apiHost; + const leg: StreamLeg = usingProxy ? "proxy" : "django"; + // Proxy and Django id spaces are unrelated, so drop the resume position on a leg switch and + // let start=latest plus the next snapshot cover the gap. + if (watcher.lastEventId && watcher.lastEventIdLeg !== leg) { + this.log.info("Cloud task stream leg changed, dropping resume position", { + key, + from: watcher.lastEventIdLeg, + to: leg, + }); + watcher.lastEventId = null; + watcher.lastEventIdLeg = null; + } + watcher.streamLeg = leg; + + // Captured after the leg-switch drop so they reflect what this connection actually sends. + watcher.connSentLastEventId = watcher.lastEventId; + const startLatest = Boolean(options?.startLatest && !watcher.lastEventId); const url = new URL( - `${watcher.apiHost}/api/projects/${watcher.teamId}/tasks/${watcher.taskId}/runs/${watcher.runId}/stream/`, + usingProxy + ? `${base}/v1/runs/${encodeURIComponent(watcher.runId)}/stream` + : `${base}/api/projects/${watcher.teamId}/tasks/${encodeURIComponent( + watcher.taskId, + )}/runs/${encodeURIComponent(watcher.runId)}/stream/`, ); if (startLatest) { url.searchParams.set("start", "latest"); @@ -669,26 +786,63 @@ export class CloudTaskService extends TypedEventEmitter { if (watcher.lastEventId) { headers["Last-Event-ID"] = watcher.lastEventId; } + if (usingProxy) { + headers.Authorization = `Bearer ${watcher.streamReadToken}`; + } + + // Info so every stream attempt is visible in the logs; Bearer token redacted. + this.log.info(`Opening cloud task stream via ${leg}: ${url.toString()}`, { + key, + leg, + usingProxy, + durableStream: watcher.durableStreamEnabled, + method: "GET", + streamUrl: url.toString(), + lastEventId: watcher.lastEventId, + startLatest, + headers: usingProxy + ? { ...headers, Authorization: "Bearer " } + : headers, + }); const parser = new SseEventParser((message, data) => this.log.warn(message, data), ); const decoder = new TextDecoder(); - // Tracks whether the response body was opened and how long it stayed open, - // so a long-lived connection cut by transport churn isn't penalized as a - // failed reconnect attempt (see SSE_HEALTHY_CONNECTION_MS). + // Track how long the body stayed open so healthy long-lived connections cut by churn + // aren't penalized as failed reconnects (see SSE_HEALTHY_CONNECTION_MS). let connectedAt = 0; let streamWasEstablished = false; let bytesReceived = 0; let eventsReceived = 0; try { - const response = await this.auth.authenticatedFetch(url.toString(), { - method: "GET", - headers, - signal: controller.signal, - }); + // The proxy authenticates with the run-scoped Bearer token; the Django leg uses the session. + const response = usingProxy + ? await fetch(url.toString(), { + method: "GET", + headers, + signal: controller.signal, + }) + : await this.auth.authenticatedFetch(url.toString(), { + method: "GET", + headers, + signal: controller.signal, + }); + + this.log.info( + `Cloud task stream response ${response.status} ${ + response.ok ? "ok" : "FAILED" + } via ${leg}`, + { + key, + leg, + status: response.status, + ok: response.ok, + streamUrl: url.toString(), + }, + ); if (!response.ok) { throw createStreamStatusError(response.status); @@ -702,8 +856,10 @@ export class CloudTaskService extends TypedEventEmitter { streamWasEstablished = true; watcher.connStartedAt = connectedAt; - this.log.info("Cloud task SSE connected", { + this.log.info(`Cloud task SSE connected via ${leg}: ${url.toString()}`, { key, + leg, + streamUrl: url.toString(), sentLastEventId: watcher.connSentLastEventId, startLatest, status: response.status, @@ -733,13 +889,19 @@ export class CloudTaskService extends TypedEventEmitter { const events = parser.parse(chunk); for (const event of events) { eventsReceived += 1; - this.handleSseEvent(key, event); + const backendError = this.handleSseEvent(key, event); + if (backendError) { + throw backendError; + } } } const trailingEvents = parser.parse(decoder.decode()); for (const event of trailingEvents) { - this.handleSseEvent(key, event); + const backendError = this.handleSseEvent(key, event); + if (backendError) { + throw backendError; + } } this.flushLogBatch(key); @@ -756,7 +918,20 @@ export class CloudTaskService extends TypedEventEmitter { dataEventsReceived: watcher.connDataEventsReceived, lastEventId: watcher.lastEventId, }); - await this.handleStreamCompletion(key, { reconnectIfNonTerminal: true }); + + // A long-lived clean close is healthy churn, not a loop: clear the cumulative budget so an + // idle run can ride out proxy timeout cycles, while instant-EOF loops still exhaust it. + const completedWatcher = this.watchers.get(key); + if ( + completedWatcher && + streamWasEstablished && + Date.now() - connectedAt >= SSE_HEALTHY_CONNECTION_MS + ) { + completedWatcher.cumulativeReconnectAttempts = 0; + completedWatcher.selfHealAttempted = false; + } + + await this.handleStreamCompletion(key, { reconnectOnDisconnect: true }); } catch (error) { this.flushLogBatch(key); @@ -764,6 +939,31 @@ export class CloudTaskService extends TypedEventEmitter { return; } + // Proxy-leg 401: the read token expired or its signing key rotated. Re-resolve to mint a + // fresh token (or route back to Django) instead of failing. Django-leg 401 stays fatal below. + const unauthorizedWatcher = this.watchers.get(key); + if ( + error instanceof CloudTaskStreamError && + error.status === 401 && + unauthorizedWatcher?.streamBaseUrl + ) { + // Keep durableStreamEnabled set: clearing it would route this disconnect through legacy + // status polling, which can stop the watch on a terminal status before stream-end arrives. + // The next connectSse re-resolves the target and resolveStreamTarget re-derives durability. + unauthorizedWatcher.streamTargetResolved = false; + unauthorizedWatcher.streamBaseUrl = null; + unauthorizedWatcher.streamReadToken = null; + this.log.info("Cloud task stream proxy token rejected, re-resolving", { + key, + }); + await this.handleStreamCompletion(key, { + reconnectOnDisconnect: true, + reconnectError: error, + countReconnectAttempt: true, + }); + return; + } + if ( error instanceof CloudTaskStreamError && error.details.autoRetry === false @@ -781,17 +981,22 @@ export class CloudTaskService extends TypedEventEmitter { streamWasEstablished && Date.now() - connectedAt >= SSE_HEALTHY_CONNECTION_MS; - const watcher = this.watchers.get(key); - if (watcher) { + const errorWatcher = this.watchers.get(key); + if (errorWatcher) { if (isBackendError) { - watcher.streamErrorAttempts += 1; + errorWatcher.streamErrorAttempts += 1; } else if (wasHealthyStream) { - watcher.streamErrorAttempts = 0; + errorWatcher.streamErrorAttempts = 0; + // A healthy-length connection proves timeout cycling, not a loop. + errorWatcher.cumulativeReconnectAttempts = 0; + errorWatcher.selfHealAttempted = false; } } this.log.warn("Cloud task stream error", { key, + leg, + streamUrl: url.toString(), error: errorMessage, errorDetail: serializeError(error), wasHealthyStream, @@ -802,14 +1007,15 @@ export class CloudTaskService extends TypedEventEmitter { : 0, bytesReceived, eventsReceived, - dataEventsReceived: watcher?.connDataEventsReceived ?? 0, - lastEventId: watcher?.lastEventId ?? null, - reconnectAttempts: watcher?.reconnectAttempts ?? 0, - streamErrorAttempts: watcher?.streamErrorAttempts ?? 0, - cumulativeReconnectAttempts: watcher?.cumulativeReconnectAttempts ?? 0, + dataEventsReceived: errorWatcher?.connDataEventsReceived ?? 0, + lastEventId: errorWatcher?.lastEventId ?? null, + reconnectAttempts: errorWatcher?.reconnectAttempts ?? 0, + streamErrorAttempts: errorWatcher?.streamErrorAttempts ?? 0, + cumulativeReconnectAttempts: + errorWatcher?.cumulativeReconnectAttempts ?? 0, }); await this.handleStreamCompletion(key, { - reconnectIfNonTerminal: true, + reconnectOnDisconnect: true, reconnectError: error, countReconnectAttempt: !isBackendError && !wasHealthyStream, }); @@ -821,35 +1027,48 @@ export class CloudTaskService extends TypedEventEmitter { } } - private handleSseEvent(key: string, event: SseEvent): void { + // Returns a BackendStreamError when the stream carries an error event so the caller can throw at + // the read site; returns null otherwise. It does not throw, so a single event cannot unwind the + // reader loop unexpectedly. + private handleSseEvent( + key: string, + event: SseEvent, + ): BackendStreamError | null { const watcher = this.watchers.get(key); - if (!watcher || watcher.failed) return; + if (!watcher || watcher.failed) return null; if (event.id) { watcher.lastEventId = event.id; + watcher.lastEventIdLeg = watcher.streamLeg; } if (event.event === "error") { const message = isSseErrorEvent(event.data) ? event.data.error : "Unknown stream error"; - throw new BackendStreamError(message); + return new BackendStreamError(message); + } + + if (event.event === STREAM_END_EVENT_NAME) { + // The run's stream is durably complete. Mark it so completion stops instead + // of reconnecting, independent of run status. The connection will close + // naturally (clean EOF) right after this sentinel. + watcher.streamEnded = true; + return null; } - // A keepalive or real event proves the transport recovered, so clear the - // transport reconnect budget. A keepalive stops here: it does NOT clear the - // backend-error budget, since it doesn't prove the stream itself produced - // data. + // A keepalive or real event proves the transport recovered. A keepalive does not clear the + // backend-error budget, which only a real data event below resets. watcher.reconnectAttempts = 0; if (isKeepaliveEvent(event)) { - return; + return null; } - // A real data event proves the stream materialized; clear the backend-error - // and cumulative budgets too. + // A real data event proves the stream materialized; clear the remaining budgets and re-arm self-heal. watcher.streamErrorAttempts = 0; watcher.cumulativeReconnectAttempts = 0; + watcher.selfHealAttempted = false; watcher.connDataEventsReceived += 1; if (watcher.connDataEventsReceived === 1 && watcher.connSentLastEventId) { @@ -875,7 +1094,7 @@ export class CloudTaskService extends TypedEventEmitter { }); } } - return; + return null; } if (isPermissionRequestEvent(event.data)) { @@ -887,13 +1106,13 @@ export class CloudTaskService extends TypedEventEmitter { toolCall: event.data.toolCall, options: event.data.options, }); - return; + return null; } watcher.pendingLogEntries.push(event.data as StoredLogEntry); if (watcher.pendingLogEntries.length >= EVENT_BATCH_MAX_SIZE) { this.flushLogBatch(key); - return; + return null; } if (!watcher.batchFlushTimeoutId) { @@ -902,6 +1121,8 @@ export class CloudTaskService extends TypedEventEmitter { this.flushLogBatch(key); }, EVENT_BATCH_FLUSH_MS); } + + return null; } private flushLogBatch(key: string): void { @@ -940,28 +1161,13 @@ export class CloudTaskService extends TypedEventEmitter { const watcher = this.watchers.get(key); if (!watcher || watcher.bufferedLogBatches.length === 0) return; - // Content-based dedup because SSE IDs (Redis stream IDs) don't exist in - // the S3-backed historical entries — the JSON payload is the only shared key - const historicalCounts = new Map(); - for (const entry of historicalEntries) { - const serialized = JSON.stringify(entry); - historicalCounts.set( - serialized, - (historicalCounts.get(serialized) ?? 0) + 1, - ); - } + const historicalCounts = buildEntryFrequencyMap(historicalEntries); for (const entries of watcher.bufferedLogBatches) { - const dedupedEntries = entries.filter((entry) => { - const serialized = JSON.stringify(entry); - const remaining = historicalCounts.get(serialized) ?? 0; - if (remaining <= 0) { - return true; - } - - historicalCounts.set(serialized, remaining - 1); - return false; - }); + const dedupedEntries = filterEntriesNotInFrequencyMap( + entries, + historicalCounts, + ); if (dedupedEntries.length === 0) { continue; @@ -999,25 +1205,11 @@ export class CloudTaskService extends TypedEventEmitter { return { snapshotEntries: historicalEntries, missingEmittedEntries: [] }; } - const historicalCounts = new Map(); - for (const entry of historicalEntries) { - const serialized = JSON.stringify(entry); - historicalCounts.set( - serialized, - (historicalCounts.get(serialized) ?? 0) + 1, - ); - } - - const missingEmittedEntries = emittedEntries.filter((entry) => { - const serialized = JSON.stringify(entry); - const remaining = historicalCounts.get(serialized) ?? 0; - if (remaining <= 0) { - return true; - } - - historicalCounts.set(serialized, remaining - 1); - return false; - }); + const historicalCounts = buildEntryFrequencyMap(historicalEntries); + const missingEmittedEntries = filterEntriesNotInFrequencyMap( + emittedEntries, + historicalCounts, + ); return { snapshotEntries: [...historicalEntries, ...missingEmittedEntries], @@ -1129,7 +1321,8 @@ export class CloudTaskService extends TypedEventEmitter { options: { countAttempt?: boolean } = {}, ): void { const watcher = this.watchers.get(key); - if (!watcher || watcher.failed || isTerminalStatus(watcher.lastStatus)) { + // Status-unaware: the loop only stops on the stream-end sentinel or budget exhaustion below. + if (!watcher || watcher.failed) { return; } @@ -1137,8 +1330,7 @@ export class CloudTaskService extends TypedEventEmitter { clearTimeout(watcher.reconnectTimeoutId); } - // Cumulative counter bounds runaway loops that clean-EOF (countAttempt=false) - // and would otherwise dodge `reconnectAttempts`. + // Bounds runaway loops that clean-EOF (countAttempt=false) and dodge reconnectAttempts. watcher.cumulativeReconnectAttempts += 1; const countAttempt = options.countAttempt ?? true; if (countAttempt) { @@ -1148,6 +1340,21 @@ export class CloudTaskService extends TypedEventEmitter { if ( watcher.cumulativeReconnectAttempts > MAX_CUMULATIVE_RECONNECT_ATTEMPTS ) { + // A poisoned resume position burns the budget without an error frame. Rebuild once from + // scratch (the app-restart recovery) before failing; if it loops straight back, fail for real. + if (!watcher.selfHealAttempted) { + watcher.reconnectTimeoutId = null; + this.log.warn( + "Cloud task stream looping without events, re-bootstrapping", + { key }, + ); + this.resetWatcherForRebootstrap(watcher); + // Set after the reset (which clears it): consumes the single allowed self-heal so a + // straight-back loop fails next time instead of re-bootstrapping forever. + watcher.selfHealAttempted = true; + void this.bootstrapWatcher(key); + return; + } this.failWatcher(key, { title: "Cloud run unreachable", message: @@ -1157,8 +1364,7 @@ export class CloudTaskService extends TypedEventEmitter { return; } - // The watcher fails once either budget is exhausted: transport reconnect - // failures or backend stream-error frames. + // Fail once either budget (transport reconnect or backend stream-error) is exhausted. const attemptCount = Math.max( watcher.reconnectAttempts, watcher.streamErrorAttempts, @@ -1182,7 +1388,8 @@ export class CloudTaskService extends TypedEventEmitter { ? watcher.streamErrorAttempts : watcher.reconnectAttempts; const delay = Math.min( - SSE_RECONNECT_BASE_DELAY_MS * 2 ** Math.max(backoffAttempts - 1, 0), + SSE_RECONNECT_BASE_DELAY_MS * + 2 ** Math.max(backoffAttempts - SSE_RECONNECT_FLAT_ATTEMPTS, 0), SSE_RECONNECT_MAX_DELAY_MS, ); @@ -1200,28 +1407,21 @@ export class CloudTaskService extends TypedEventEmitter { private async handleStreamCompletion( key: string, options: { - reconnectIfNonTerminal: boolean; + reconnectOnDisconnect: boolean; reconnectError?: unknown; countReconnectAttempt?: boolean; }, ): Promise { const watcher = this.watchers.get(key); if (!watcher) return; - - const { reconnectIfNonTerminal } = options; - const run = await this.fetchTaskRun(watcher); - const currentWatcher = this.watchers.get(key); - if (!currentWatcher || currentWatcher !== watcher) return; if (watcher.failed) return; - if (watcher.isBootstrapping) { - if (!run) { - watcher.needsPostBootstrapReconnect = true; - return; - } + const { reconnectOnDisconnect } = options; - this.applyTaskRunState(watcher, run); - if (isTerminalStatus(watcher.lastStatus) || !reconnectIfNonTerminal) { + // Bootstrap owns the snapshot lifecycle: stopping mid-bootstrap would discard the backlog and + // buffered live entries. Record intent and let bootstrap finish. + if (watcher.isBootstrapping) { + if (watcher.streamEnded || !reconnectOnDisconnect) { watcher.needsStopAfterBootstrap = true; } else { watcher.needsPostBootstrapReconnect = true; @@ -1229,82 +1429,63 @@ export class CloudTaskService extends TypedEventEmitter { return; } - if (!run) { - this.scheduleReconnect( - key, - new CloudTaskStreamError("Failed to fetch terminal cloud run state", { - title: "Cloud run state unavailable", - message: - "Could not fetch the latest cloud run state after the stream ended. Retry to reconnect.", - retryable: true, - }), - { countAttempt: options.countReconnectAttempt ?? true }, - ); + // The stream-end sentinel is the only signal that ends a durable watch. Any disconnect without + // it is transport churn to reconnect through; status is tracked for display only, never to stop. + if (watcher.streamEnded) { + await this.finalizeWatcherStop(key); return; } - const stateChanged = this.applyTaskRunState(watcher, run); + // Legacy mode (old server): no sentinel, so poll run status on disconnect to decide stop vs + // reconnect. The reconnect budgets keep the new semantics, so self-heal stays active here too. + if (!watcher.durableStreamEnabled && reconnectOnDisconnect) { + const run = await this.fetchTaskRun(watcher); + const legacyWatcher = this.watchers.get(key); + if (!legacyWatcher || legacyWatcher !== watcher) return; + if (watcher.failed) return; - if (!isTerminalStatus(watcher.lastStatus) && reconnectIfNonTerminal) { - if (stateChanged) { - // Polled progress proves the run is alive — reset both budgets. - watcher.reconnectAttempts = 0; - watcher.cumulativeReconnectAttempts = 0; - this.emit(CloudTaskEvent.Update, { - taskId: watcher.taskId, - runId: watcher.runId, - kind: "status", - status: watcher.lastStatus ?? undefined, - stage: watcher.lastStage, - output: watcher.lastOutput, - errorMessage: watcher.lastErrorMessage, - branch: watcher.lastBranch, - }); + if (run) { + this.applyTaskRunState(watcher, run); + } + if (isTerminalStatus(watcher.lastStatus)) { + this.emitStatusUpdate(watcher); + this.stopWatcher(key); + return; + } + if (run) { + this.emitStatusUpdate(watcher); } - this.log.warn("Cloud task stream ended before terminal status", { - key, - status: watcher.lastStatus, - stage: watcher.lastStage, - stateChanged, - lastEventId: watcher.lastEventId, - sentLastEventId: watcher.connSentLastEventId, - connDataEventsReceived: watcher.connDataEventsReceived, - reconnectAttempts: watcher.reconnectAttempts, - streamErrorAttempts: watcher.streamErrorAttempts, - cumulativeReconnectAttempts: watcher.cumulativeReconnectAttempts, - }); this.scheduleReconnect(key, options.reconnectError, { countAttempt: options.countReconnectAttempt ?? false, }); return; } - this.log.info("Cloud task terminal stop", { - key, - status: watcher.lastStatus, - reachedViaError: options.reconnectError !== undefined, - lastEventId: watcher.lastEventId, - sentLastEventId: watcher.connSentLastEventId, - totalEntryCount: watcher.totalEntryCount, - connDataEventsReceived: watcher.connDataEventsReceived, - connDurationMs: - watcher.connStartedAt !== 0 ? Date.now() - watcher.connStartedAt : 0, - }); + // All callers pass reconnectOnDisconnect, and durable watches only stop via the stream-end + // sentinel or a terminal legacy poll (both handled above); any other disconnect reconnects. + if (reconnectOnDisconnect) { + this.scheduleReconnect(key, options.reconnectError, { + countAttempt: options.countReconnectAttempt ?? false, + }); + } + } - // Always emit the latest status before stopping. Terminal states are - // intentionally deferred until stream completion; clean EOFs can also mean - // the backend has no more stream events even when the run status remains active. - this.emit(CloudTaskEvent.Update, { - taskId: watcher.taskId, - runId: watcher.runId, - kind: "status", - status: watcher.lastStatus ?? undefined, - stage: watcher.lastStage, - output: watcher.lastOutput, - errorMessage: watcher.lastErrorMessage, - branch: watcher.lastBranch, - }); + // Stops a watcher whose stream is durably complete. Repairs the displayed status if the stream + // ended non-terminal (dropped final frame); the poll never decides whether to stop. + private async finalizeWatcherStop(key: string): Promise { + const watcher = this.watchers.get(key); + if (!watcher) return; + + if (!isTerminalStatus(watcher.lastStatus)) { + const run = await this.fetchTaskRun(watcher); + const currentWatcher = this.watchers.get(key); + if (!currentWatcher || currentWatcher !== watcher) return; + if (run) { + this.applyTaskRunState(watcher, run); + } + } + this.emitStatusUpdate(watcher); this.stopWatcher(key); } @@ -1418,9 +1599,7 @@ export class CloudTaskService extends TypedEventEmitter { return null; } - for (const entry of page.entries) { - entries.push(entry); - } + entries.push(...page.entries); if (!page.hasMore || page.entries.length === 0) { return entries; } @@ -1429,6 +1608,64 @@ export class CloudTaskService extends TypedEventEmitter { } } + private async resolveStreamTarget(watcher: WatcherState): Promise { + const url = `${watcher.apiHost}/api/projects/${watcher.teamId}/tasks/${watcher.taskId}/runs/${watcher.runId}/stream_token/`; + try { + const response = await this.auth.authenticatedFetch(url, { + method: "GET", + }); + if (!response.ok) { + watcher.streamBaseUrl = null; + watcher.streamReadToken = null; + if (isTransientStreamTargetStatus(response.status)) { + // Transient: read from Django this round but leave the target unresolved so the next + // reconnect retries durable resolution instead of pinning the run to status polling. + this.log.warn("Cloud task stream target temporarily unavailable", { + taskId: watcher.taskId, + runId: watcher.runId, + status: response.status, + }); + return; + } + // Refused, or an old server without the endpoint: read from Django with status polling. + watcher.durableStreamEnabled = false; + watcher.streamTargetResolved = true; + this.log.info("Cloud task stream reading from API host", { + taskId: watcher.taskId, + runId: watcher.runId, + status: response.status, + }); + return; + } + const data = (await response.json()) as { + token?: string; + stream_base_url?: string | null; + }; + watcher.streamReadToken = data.token ?? null; + watcher.streamBaseUrl = data.stream_base_url ?? null; + // The endpoint resolving at all opts this watcher into the status-unaware contract; + // old servers 404 above and stay on legacy status polling. + watcher.durableStreamEnabled = true; + watcher.streamTargetResolved = true; + this.log.info("Cloud task stream target resolved", { + taskId: watcher.taskId, + runId: watcher.runId, + streamBaseUrl: watcher.streamBaseUrl, + hasToken: Boolean(watcher.streamReadToken), + durableStream: watcher.durableStreamEnabled, + }); + } catch (error) { + // Transient failure: leave unresolved so the next reconnect retries and falls back to Django. + watcher.streamBaseUrl = null; + watcher.streamReadToken = null; + this.log.warn("Cloud task stream target resolution failed", { + taskId: watcher.taskId, + runId: watcher.runId, + error, + }); + } + } + private async fetchTaskRun( watcher: WatcherState, ): Promise {