From e8c610d323aa31c21333338854ca97f0a070fac8 Mon Sep 17 00:00:00 2001 From: Charles Vien Date: Tue, 2 Jun 2026 17:50:37 -0700 Subject: [PATCH 01/20] Implement agent proxy --- packages/agent/src/server/agent-server.ts | 1 + packages/agent/src/server/bin.ts | 4 + .../src/server/event-stream-sender.test.ts | 20 ++ .../agent/src/server/event-stream-sender.ts | 23 +- packages/agent/src/server/types.ts | 3 + .../core/src/cloud-task/cloud-task.test.ts | 257 +++++++++++------- packages/core/src/cloud-task/cloud-task.ts | 243 +++++++++++------ 7 files changed, 358 insertions(+), 193 deletions(-) diff --git a/packages/agent/src/server/agent-server.ts b/packages/agent/src/server/agent-server.ts index 9875db0047..aa5777f0a3 100644 --- a/packages/agent/src/server/agent-server.ts +++ b/packages/agent/src/server/agent-server.ts @@ -357,6 +357,7 @@ export class AgentServer { if (config.eventIngestToken) { this.eventStreamSender = new TaskRunEventStreamSender({ apiUrl: config.apiUrl, + eventIngestBaseUrl: config.eventIngestBaseUrl, projectId: config.projectId, taskId: config.taskId, runId: config.runId, diff --git a/packages/agent/src/server/bin.ts b/packages/agent/src/server/bin.ts index 1987249968..2ac73e8426 100644 --- a/packages/agent/src/server/bin.ts +++ b/packages/agent/src/server/bin.ts @@ -33,6 +33,9 @@ const envSchema = z.object({ .enum(["low", "medium", "high", "xhigh", "max"]) .optional(), POSTHOG_TASK_RUN_EVENT_INGEST_TOKEN: z.string().min(1).optional(), + // Routes the event-ingest POST to the standalone agent-proxy; other API calls keep using + // POSTHOG_API_URL. Falls back to POSTHOG_API_URL when unset. + POSTHOG_TASK_RUN_EVENT_INGEST_URL: z.url().optional(), POSTHOG_TASK_RUN_EVENT_INGEST_STREAM_WINDOW_MS: z .string() .regex( @@ -162,6 +165,7 @@ program port: parseInt(options.port, 10), jwtPublicKey: env.JWT_PUBLIC_KEY, eventIngestToken: env.POSTHOG_TASK_RUN_EVENT_INGEST_TOKEN, + eventIngestBaseUrl: env.POSTHOG_TASK_RUN_EVENT_INGEST_URL, eventIngestStreamWindowMs: env.POSTHOG_TASK_RUN_EVENT_INGEST_STREAM_WINDOW_MS, repositoryPath: options.repositoryPath, diff --git a/packages/agent/src/server/event-stream-sender.test.ts b/packages/agent/src/server/event-stream-sender.test.ts index e4e936362d..5eb8d3527a 100644 --- a/packages/agent/src/server/event-stream-sender.test.ts +++ b/packages/agent/src/server/event-stream-sender.test.ts @@ -187,6 +187,26 @@ describe("TaskRunEventStreamSender", () => { ]); }); + it("routes the ingest POST to the agent-proxy run-scoped path when eventIngestBaseUrl is set", async () => { + const fetchMock = vi.fn( + async (_url: string | URL | Request, init?: RequestInit) => { + const body = await readRequestBody(init); + return responseForBody(body); + }, + ); + vi.stubGlobal("fetch", fetchMock); + + const sender = createSender({ + eventIngestBaseUrl: "http://agent-proxy:8003/", + }); + sender.enqueue({ type: "notification", notification: { method: "first" } }); + await sender.stop(); + + expect(fetchMock).toHaveBeenCalled(); + const lastCall = fetchMock.mock.calls[fetchMock.mock.calls.length - 1]; + expect(lastCall[0]).toBe("http://agent-proxy:8003/v1/runs/run-1/ingest"); + }); + it("keeps the active ingest request open across scheduled flushes", async () => { const requestBodies: string[] = []; let activeStreamClosed = false; diff --git a/packages/agent/src/server/event-stream-sender.ts b/packages/agent/src/server/event-stream-sender.ts index 0620bd758e..1ea82b7e77 100644 --- a/packages/agent/src/server/event-stream-sender.ts +++ b/packages/agent/src/server/event-stream-sender.ts @@ -8,6 +8,9 @@ import { interface TaskRunEventStreamSenderConfig { apiUrl: string; + // Base URL for the event-ingest POST only. Lets the deployment route ingest to the + // standalone agent-proxy while the rest of the agent's API calls stay on apiUrl. + eventIngestBaseUrl?: string; projectId: number; taskId: string; runId: string; @@ -85,10 +88,22 @@ export class TaskRunEventStreamSender { private bufferRevision = 0; constructor(private readonly config: TaskRunEventStreamSenderConfig) { - const apiUrl = config.apiUrl.replace(/\/$/, ""); - this.ingestUrl = `${apiUrl}/api/projects/${config.projectId}/tasks/${encodeURIComponent( - config.taskId, - )}/runs/${encodeURIComponent(config.runId)}/event_stream/`; + // When routed to the agent-proxy, use its clean run-scoped path; the run-scoped + // token carries team and task. Falling back to apiUrl keeps the Django path. + const usingProxy = Boolean(config.eventIngestBaseUrl); + const ingestBase = (config.eventIngestBaseUrl ?? config.apiUrl).replace( + /\/$/, + "", + ); + this.ingestUrl = usingProxy + ? `${ingestBase}/v1/runs/${encodeURIComponent(config.runId)}/ingest` + : `${ingestBase}/api/projects/${config.projectId}/tasks/${encodeURIComponent( + config.taskId, + )}/runs/${encodeURIComponent(config.runId)}/event_stream/`; + config.logger.info("[agent-proxy debug] event ingest target resolved", { + ingestUrl: this.ingestUrl, + routedToProxy: usingProxy, + }); this.maxBufferedEvents = config.maxBufferedEvents ?? DEFAULT_MAX_BUFFERED_EVENTS; this.maxStreamEvents = config.maxStreamEvents ?? DEFAULT_MAX_STREAM_EVENTS; diff --git a/packages/agent/src/server/types.ts b/packages/agent/src/server/types.ts index 478a080b0a..7402730b1a 100644 --- a/packages/agent/src/server/types.ts +++ b/packages/agent/src/server/types.ts @@ -17,6 +17,9 @@ export interface AgentServerConfig { projectId: number; jwtPublicKey: string; // RS256 public key for JWT verification eventIngestToken?: string; + // Optional base URL for the event-ingest POST only (routes ingest to the agent-proxy); + // all other agent API calls keep using apiUrl. Falls back to apiUrl when unset. + eventIngestBaseUrl?: string; eventIngestStreamWindowMs?: number; mode: AgentMode; taskId: string; diff --git a/packages/core/src/cloud-task/cloud-task.test.ts b/packages/core/src/cloud-task/cloud-task.test.ts index 97f8595f49..b90e0602df 100644 --- a/packages/core/src/cloud-task/cloud-task.test.ts +++ b/packages/core/src/cloud-task/cloud-task.test.ts @@ -3,15 +3,21 @@ import { CloudTaskEvent } from "./schemas"; const mockNetFetch = vi.hoisted(() => vi.fn()); const mockStreamFetch = vi.hoisted(() => vi.fn()); +const mockStreamTokenFetch = vi.hoisted(() => vi.fn()); // The service now uses global fetch for BOTH authenticated API calls (JSON) // and SSE streaming. The two used to be distinct (net.fetch vs global fetch). -// To preserve the existing test fixtures, route by URL: /stream/ → stream mock, -// everything else → API mock. +// Route by URL: /stream_token/ → token mock (read-leg resolution), the stream leg +// (Django /stream/ or proxy /v1/runs/:run/stream) → stream mock, everything else → API mock. +// The token mock has a Django-path default so existing fixtures (which never set it) are untouched. const fetchRouter = vi.hoisted(() => vi.fn((input: string | Request, init?: RequestInit) => { const url = typeof input === "string" ? input : input.url; - const impl = url.includes("/stream/") ? mockStreamFetch : mockNetFetch; + const impl = url.includes("/stream_token/") + ? mockStreamTokenFetch + : /\/stream(\/|\?|$)/.test(url) + ? mockStreamFetch + : mockNetFetch; return impl(input, init); }), ); @@ -98,6 +104,13 @@ describe("CloudTaskService", () => { ); mockNetFetch.mockReset(); mockStreamFetch.mockReset(); + mockStreamTokenFetch.mockReset(); + // Default read-leg resolution: no proxy URL, so the stream reads from Django directly. + mockStreamTokenFetch.mockImplementation(() => + Promise.resolve( + createJsonResponse({ token: "test-token", stream_base_url: null }), + ), + ); mockAuthService.authenticatedFetch.mockReset(); vi.stubGlobal("fetch", fetchRouter); @@ -601,6 +614,113 @@ describe("CloudTaskService", () => { ).toBe(true); }); + it("stops without reconnecting when the server emits stream-end on a non-terminal run", async () => { + vi.useFakeTimers(); + + // Run status stays non-terminal the whole time. Pre-durable-contract, a clean EOF on a + // non-terminal run reconnects (see the test above); the stream-end sentinel must override that. + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } + return Promise.resolve( + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: "build", + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }), + ); + }); + + mockStreamFetch.mockImplementation(() => + Promise.resolve( + createSseResponse( + 'id: 1\ndata: {"type":"notification","timestamp":"2026-01-01T00:00:01Z","notification":{"jsonrpc":"2.0","method":"_posthog/console","params":{"sessionId":"run-1","level":"info","message":"hi"}}}\n\nevent: stream-end\ndata: {}\n\n', + ), + ), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + const hasWatcher = (): boolean => + (service as unknown as { watchers: Map }).watchers.has( + "task-1:run-1", + ); + + await waitFor(() => mockStreamFetch.mock.calls.length === 1); + // Let the reconnect delay (2s base) elapse; with stream-end honored, none is scheduled. + await vi.advanceTimersByTimeAsync(10_000); + + expect(mockStreamFetch.mock.calls.length).toBe(1); + await waitFor(() => !hasWatcher()); + }); + + it("reads via the agent-proxy with a Bearer token when the server resolves a base url", async () => { + vi.useFakeTimers(); + + mockStreamTokenFetch.mockImplementation(() => + Promise.resolve( + createJsonResponse({ + token: "proxy-token", + stream_base_url: "https://proxy.example", + }), + ), + ); + + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } + return Promise.resolve( + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: "build", + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }), + ); + }); + + mockStreamFetch.mockImplementation(() => + Promise.resolve(createSseResponse("event: stream-end\ndata: {}\n\n")), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await waitFor(() => mockStreamFetch.mock.calls.length === 1); + + const [calledUrl, init] = mockStreamFetch.mock.calls[0]; + expect(String(calledUrl)).toMatch( + /^https:\/\/proxy\.example\/v1\/runs\/run-1\/stream(\?|$)/, + ); + expect((init?.headers as Record)?.Authorization).toBe( + "Bearer proxy-token", + ); + }); + it("fails the watcher after exhausting the cumulative reconnect budget on clean-EOF loops", async () => { vi.useFakeTimers(); @@ -717,10 +837,10 @@ describe("CloudTaskService", () => { 10_000, ); - expect(mockStreamFetch.mock.calls.length).toBe(6); - // 2 bootstrap calls + 1 post-bootstrap status verification + 6 - // handleStreamCompletion calls (one per stream error) - expect(mockNetFetch).toHaveBeenCalledTimes(9); + expect(mockStreamFetch.mock.calls.length).toBe(10); + // Status is no longer polled per reconnect. Only the 2 bootstrap calls plus the single + // post-bootstrap verification touch the status endpoint; reconnects never do. + expect(mockNetFetch.mock.calls.length).toBeLessThanOrEqual(3); expect(updates).toContainEqual({ taskId: "task-1", runId: "run-1", @@ -889,12 +1009,12 @@ describe("CloudTaskService", () => { }); }); - it("stops the watcher without reconnecting once the run is terminal", async () => { + it("reconnects on a clean EOF even after the run status goes terminal (status-unaware)", async () => { vi.useFakeTimers(); - const updates: unknown[] = []; - service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); - + // Bootstrap sees an active run (so it streams); every later status fetch reports terminal. + // Pre-decoupling, a clean EOF on a terminal run stopped the watch. Now run status is never + // consulted to decide reconnects, so the clean EOFs keep reconnecting until stream-end. let statusFetchCount = 0; mockNetFetch.mockImplementation((input: string | Request) => { const url = typeof input === "string" ? input : input.url; @@ -904,7 +1024,6 @@ describe("CloudTaskService", () => { ); } statusFetchCount += 1; - // Bootstrap sees an active run; the post-stream status check sees terminal. return Promise.resolve( createJsonResponse({ id: "run-1", @@ -933,22 +1052,14 @@ describe("CloudTaskService", () => { }); await waitFor(() => mockStreamFetch.mock.calls.length === 1); - await vi.advanceTimersByTimeAsync(10_000); + await waitFor(() => mockStreamFetch.mock.calls.length >= 3, 20_000); - expect(updates).toContainEqual( - expect.objectContaining({ - taskId: "task-1", - runId: "run-1", - kind: "status", - status: "completed", - }), - ); - expect(mockStreamFetch.mock.calls.length).toBe(1); + // Terminal status did not stop the watch; the watcher is still reconnecting. expect( (service as unknown as { watchers: Map }).watchers.has( "task-1:run-1", ), - ).toBe(false); + ).toBe(true); }); it("surfaces a retryable error when the backend errors even on a long-lived stream", async () => { @@ -1010,8 +1121,8 @@ describe("CloudTaskService", () => { }); await waitFor(() => mockStreamFetch.mock.calls.length === 1); - // Drive >= 6 long-lived-then-backend-error cycles (65s open + backoff each). - await vi.advanceTimersByTimeAsync(65_000 * 7 + 70_000); + // Drive >= 10 long-lived-then-backend-error cycles (65s open + backoff each). + await vi.advanceTimersByTimeAsync(65_000 * 11 + 70_000); await waitFor( () => updates.some( @@ -1319,16 +1430,19 @@ describe("CloudTaskService", () => { expect(getWatcher()?.failed).toBe(false); }); - it("surfaces an error instead of retrying forever when run-state fetch keeps failing after a clean stream end", async () => { + it("does not poll run status per reconnect on clean EOFs (status-unaware)", async () => { vi.useFakeTimers(); - const updates: unknown[] = []; - service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); - - // Bootstrap succeeds (run + empty backlog); every subsequent run-state - // fetch returns 500 (a non-fatal status -> fetchTaskRun resolves null). - mockNetFetch - .mockResolvedValueOnce( + let statusFetchCount = 0; + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } + statusFetchCount += 1; + return Promise.resolve( createJsonResponse({ id: "run-1", status: "in_progress", @@ -1338,40 +1452,14 @@ describe("CloudTaskService", () => { branch: "main", updated_at: "2026-01-01T00:00:00Z", }), - ) // bootstrap: fetchTaskRun - .mockResolvedValueOnce( - createJsonResponse([], 200, { "X-Has-More": "false" }), - ) // bootstrap: fetchSessionLogs - .mockImplementation(() => - Promise.resolve(createJsonResponse({ detail: "boom" }, 500)), - ); - - // First connection is held open so bootstrap can finish; the test then - // closes it cleanly. Every later connection ends cleanly on its own, so the - // only thing that can fail is the post-stream run-state fetch (500). - let streamCall = 0; - const firstControllerRef: { - current: ReadableStreamDefaultController | null; - } = { current: null }; - mockStreamFetch.mockImplementation(() => { - streamCall += 1; - const stream = new ReadableStream({ - start(controller) { - if (streamCall === 1) { - firstControllerRef.current = controller; - } else { - controller.close(); - } - }, - }); - return Promise.resolve( - new Response(stream, { - status: 200, - headers: { "Content-Type": "text/event-stream" }, - }), ); }); + // Every connection ends cleanly with no stream-end sentinel, forcing reconnect after reconnect. + mockStreamFetch.mockImplementation(() => + Promise.resolve(createSseResponse("")), + ); + service.watch({ taskId: "task-1", runId: "run-1", @@ -1379,44 +1467,11 @@ describe("CloudTaskService", () => { teamId: 2, }); - // Wait for bootstrap to emit its snapshot and hold the live connection open. - await waitFor( - () => - !!firstControllerRef.current && - updates.some( - (u) => - typeof u === "object" && - u !== null && - (u as { kind?: string }).kind === "snapshot", - ), - ); - - // Close the live stream cleanly: each clean end now fetches run state, which - // 500s. The reconnect must charge the budget so it eventually gives up. - firstControllerRef.current?.close(); + await waitFor(() => mockStreamFetch.mock.calls.length >= 5, 20_000); - // Budget is 5 attempts (2s + 4s + 8s + 16s + 30s + 30s of backoff). - await vi.advanceTimersByTimeAsync(120_000); - await waitFor( - () => - updates.some( - (u) => - typeof u === "object" && - u !== null && - (u as { kind?: string }).kind === "error", - ), - 20_000, - ); - - expect(updates).toContainEqual({ - taskId: "task-1", - runId: "run-1", - kind: "error", - errorTitle: "Cloud run state unavailable", - errorMessage: - "Could not fetch the latest cloud run state after the stream ended. Retry to reconnect.", - retryable: true, - }); + // Bootstrap fetches status once and the post-bootstrap verification once more; reconnects add + // none. Pre-decoupling, every clean EOF polled status, so this count would climb with reconnects. + expect(statusFetchCount).toBeLessThanOrEqual(2); }); const guardedFetchStatusExpectations = [ diff --git a/packages/core/src/cloud-task/cloud-task.ts b/packages/core/src/cloud-task/cloud-task.ts index f35285a3c8..6ed00b8635 100644 --- a/packages/core/src/cloud-task/cloud-task.ts +++ b/packages/core/src/cloud-task/cloud-task.ts @@ -24,15 +24,27 @@ import { } from "./schemas"; import { type SseEvent, SseEventParser } from "./sse-parser"; -const MAX_SSE_RECONNECT_ATTEMPTS = 5; +// Transport reconnect backoff. The first SSE_RECONNECT_FLAT_ATTEMPTS retries fire at the base +// delay (a stopped agent-proxy is usually reachable again within a couple seconds), then the +// delay grows exponentially up to the cap: 0.5, 0.5, 0.5, 1, 2, 4, 8, 16, 30s. MAX_SSE_RECONNECT_ +// ATTEMPTS is sized so this schedule still spans ~60s of wall-clock before the watcher gives up, +// matching the prior give-up window while reconnecting an order of magnitude faster in the common +// case (a quick proxy restart now recovers in ~1.5s instead of ~16s). +const MAX_SSE_RECONNECT_ATTEMPTS = 9; const MAX_CUMULATIVE_RECONNECT_ATTEMPTS = 30; -const SSE_RECONNECT_BASE_DELAY_MS = 2_000; +const SSE_RECONNECT_BASE_DELAY_MS = 500; +const SSE_RECONNECT_FLAT_ATTEMPTS = 3; const SSE_RECONNECT_MAX_DELAY_MS = 30_000; const SSE_HEALTHY_CONNECTION_MS = 60_000; const EVENT_BATCH_FLUSH_MS = 16; const EVENT_BATCH_MAX_SIZE = 50; const SESSION_LOG_PAGE_LIMIT = 5_000; +// Durable end-of-stream sentinel emitted by the server (and the agent-proxy) once a run's +// event stream is complete. It is the authoritative "no more events, ever" signal — the +// client stops on it without consulting run status (status-unaware durable-stream contract). +const STREAM_END_EVENT_NAME = "stream-end"; + interface SessionLogsPage { entries: StoredLogEntry[]; hasMore: boolean; @@ -120,6 +132,12 @@ interface WatcherState { failed: boolean; needsPostBootstrapReconnect: boolean; needsStopAfterBootstrap: boolean; + streamEnded: boolean; + // Read-leg routing, resolved once from the stream_token endpoint and reused across reconnects. + // streamBaseUrl set => read via the agent-proxy with streamReadToken; null => read from Django. + streamTargetResolved: boolean; + streamBaseUrl: string | null; + streamReadToken: string | null; } function watcherKey(taskId: string, runId: string): string { @@ -460,6 +478,10 @@ export class CloudTaskService extends TypedEventEmitter { failed: false, needsPostBootstrapReconnect: false, needsStopAfterBootstrap: false, + streamEnded: false, + streamTargetResolved: false, + streamBaseUrl: null, + streamReadToken: null, }; this.watchers.set(key, watcher); @@ -600,10 +622,7 @@ export class CloudTaskService extends TypedEventEmitter { return; } - if ( - watcher.needsStopAfterBootstrap || - isTerminalStatus(watcher.lastStatus) - ) { + if (watcher.needsStopAfterBootstrap) { watcher.needsStopAfterBootstrap = false; this.stopWatcher(key); return; @@ -630,6 +649,10 @@ export class CloudTaskService extends TypedEventEmitter { if (!this.applyTaskRunState(watcher, run)) return; if (isTerminalStatus(watcher.lastStatus)) return; + this.emitStatusUpdate(watcher); + } + + private emitStatusUpdate(watcher: WatcherState): void { this.emit(CloudTaskEvent.Update, { taskId: watcher.taskId, runId: watcher.runId, @@ -657,8 +680,31 @@ export class CloudTaskService extends TypedEventEmitter { watcher.connDataEventsReceived = 0; const startLatest = Boolean(options?.startLatest && !watcher.lastEventId); + // Resolve the read target once (proxy URL + token, or Django). The server owns the decision; + // reused across reconnects so transport churn never re-mints a token. + if (!watcher.streamTargetResolved) { + await this.resolveStreamTarget(watcher); + const resolvedWatcher = this.watchers.get(key); + if ( + !resolvedWatcher || + resolvedWatcher !== watcher || + controller.signal.aborted + ) { + return; + } + } + + const usingProxy = Boolean( + watcher.streamBaseUrl && watcher.streamReadToken, + ); + const base = usingProxy + ? watcher.streamBaseUrl?.replace(/\/+$/, "") + : watcher.apiHost; + // The proxy exposes a clean run-scoped path; the run-scoped token carries team and task. const url = new URL( - `${watcher.apiHost}/api/projects/${watcher.teamId}/tasks/${watcher.taskId}/runs/${watcher.runId}/stream/`, + usingProxy + ? `${base}/v1/runs/${watcher.runId}/stream` + : `${base}/api/projects/${watcher.teamId}/tasks/${watcher.taskId}/runs/${watcher.runId}/stream/`, ); if (startLatest) { url.searchParams.set("start", "latest"); @@ -669,6 +715,14 @@ export class CloudTaskService extends TypedEventEmitter { if (watcher.lastEventId) { headers["Last-Event-ID"] = watcher.lastEventId; } + if (usingProxy) { + headers.Authorization = `Bearer ${watcher.streamReadToken}`; + } + + this.log.info("[agent-proxy debug] opening SSE stream", { + usingProxy, + streamUrl: url.toString(), + }); const parser = new SseEventParser((message, data) => this.log.warn(message, data), @@ -684,11 +738,19 @@ export class CloudTaskService extends TypedEventEmitter { let eventsReceived = 0; try { - const response = await this.auth.authenticatedFetch(url.toString(), { - method: "GET", - headers, - signal: controller.signal, - }); + // The proxy authenticates with the run-scoped Bearer token, not the user session, so it + // takes a plain fetch. The Django leg still goes through authenticatedFetch. + const response = usingProxy + ? await fetch(url.toString(), { + method: "GET", + headers, + signal: controller.signal, + }) + : await this.auth.authenticatedFetch(url.toString(), { + method: "GET", + headers, + signal: controller.signal, + }); if (!response.ok) { throw createStreamStatusError(response.status); @@ -756,7 +818,7 @@ export class CloudTaskService extends TypedEventEmitter { dataEventsReceived: watcher.connDataEventsReceived, lastEventId: watcher.lastEventId, }); - await this.handleStreamCompletion(key, { reconnectIfNonTerminal: true }); + await this.handleStreamCompletion(key, { reconnectOnDisconnect: true }); } catch (error) { this.flushLogBatch(key); @@ -809,7 +871,7 @@ export class CloudTaskService extends TypedEventEmitter { cumulativeReconnectAttempts: watcher?.cumulativeReconnectAttempts ?? 0, }); await this.handleStreamCompletion(key, { - reconnectIfNonTerminal: true, + reconnectOnDisconnect: true, reconnectError: error, countReconnectAttempt: !isBackendError && !wasHealthyStream, }); @@ -836,6 +898,14 @@ export class CloudTaskService extends TypedEventEmitter { throw new BackendStreamError(message); } + if (event.event === STREAM_END_EVENT_NAME) { + // The run's stream is durably complete. Mark it so completion stops instead + // of reconnecting, independent of run status. The connection will close + // naturally (clean EOF) right after this sentinel. + watcher.streamEnded = true; + return; + } + // A keepalive or real event proves the transport recovered, so clear the // transport reconnect budget. A keepalive stops here: it does NOT clear the // backend-error budget, since it doesn't prove the stream itself produced @@ -1129,7 +1199,9 @@ export class CloudTaskService extends TypedEventEmitter { options: { countAttempt?: boolean } = {}, ): void { const watcher = this.watchers.get(key); - if (!watcher || watcher.failed || isTerminalStatus(watcher.lastStatus)) { + // No isTerminalStatus gate: the reconnect loop is status-unaware and only stops on the + // stream-end sentinel (handled in handleStreamCompletion) or the budget exhaustion below. + if (!watcher || watcher.failed) { return; } @@ -1182,7 +1254,8 @@ export class CloudTaskService extends TypedEventEmitter { ? watcher.streamErrorAttempts : watcher.reconnectAttempts; const delay = Math.min( - SSE_RECONNECT_BASE_DELAY_MS * 2 ** Math.max(backoffAttempts - 1, 0), + SSE_RECONNECT_BASE_DELAY_MS * + 2 ** Math.max(backoffAttempts - SSE_RECONNECT_FLAT_ATTEMPTS, 0), SSE_RECONNECT_MAX_DELAY_MS, ); @@ -1200,79 +1273,40 @@ export class CloudTaskService extends TypedEventEmitter { private async handleStreamCompletion( key: string, options: { - reconnectIfNonTerminal: boolean; + reconnectOnDisconnect: boolean; reconnectError?: unknown; countReconnectAttempt?: boolean; }, ): Promise { const watcher = this.watchers.get(key); if (!watcher) return; - - const { reconnectIfNonTerminal } = options; - const run = await this.fetchTaskRun(watcher); - const currentWatcher = this.watchers.get(key); - if (!currentWatcher || currentWatcher !== watcher) return; if (watcher.failed) return; + // The durable end-of-stream sentinel is the ONLY signal that ends the watch. The client is + // unaware of sandbox/run status and assumes the transport breaks mid-message constantly, so any + // disconnect without stream-end is just transport churn to ride out by reconnecting. We never + // poll run status to decide whether to stop — that would drop the tail if the stream cut right as + // the run went terminal but before stream-end arrived. Status is updated for display only, from + // the task_run_state events the stream itself carries. + if (watcher.streamEnded) { + this.emitStatusUpdate(watcher); + this.stopWatcher(key); + return; + } + + const { reconnectOnDisconnect } = options; + if (watcher.isBootstrapping) { - if (!run) { + // Bootstrap still owns the snapshot lifecycle; record reconnect intent and let it finish. + if (reconnectOnDisconnect) { watcher.needsPostBootstrapReconnect = true; - return; - } - - this.applyTaskRunState(watcher, run); - if (isTerminalStatus(watcher.lastStatus) || !reconnectIfNonTerminal) { - watcher.needsStopAfterBootstrap = true; } else { - watcher.needsPostBootstrapReconnect = true; + watcher.needsStopAfterBootstrap = true; } return; } - if (!run) { - this.scheduleReconnect( - key, - new CloudTaskStreamError("Failed to fetch terminal cloud run state", { - title: "Cloud run state unavailable", - message: - "Could not fetch the latest cloud run state after the stream ended. Retry to reconnect.", - retryable: true, - }), - { countAttempt: options.countReconnectAttempt ?? true }, - ); - return; - } - - const stateChanged = this.applyTaskRunState(watcher, run); - - if (!isTerminalStatus(watcher.lastStatus) && reconnectIfNonTerminal) { - if (stateChanged) { - // Polled progress proves the run is alive — reset both budgets. - watcher.reconnectAttempts = 0; - watcher.cumulativeReconnectAttempts = 0; - this.emit(CloudTaskEvent.Update, { - taskId: watcher.taskId, - runId: watcher.runId, - kind: "status", - status: watcher.lastStatus ?? undefined, - stage: watcher.lastStage, - output: watcher.lastOutput, - errorMessage: watcher.lastErrorMessage, - branch: watcher.lastBranch, - }); - } - this.log.warn("Cloud task stream ended before terminal status", { - key, - status: watcher.lastStatus, - stage: watcher.lastStage, - stateChanged, - lastEventId: watcher.lastEventId, - sentLastEventId: watcher.connSentLastEventId, - connDataEventsReceived: watcher.connDataEventsReceived, - reconnectAttempts: watcher.reconnectAttempts, - streamErrorAttempts: watcher.streamErrorAttempts, - cumulativeReconnectAttempts: watcher.cumulativeReconnectAttempts, - }); + if (reconnectOnDisconnect) { this.scheduleReconnect(key, options.reconnectError, { countAttempt: options.countReconnectAttempt ?? false, }); @@ -1291,20 +1325,7 @@ export class CloudTaskService extends TypedEventEmitter { watcher.connStartedAt !== 0 ? Date.now() - watcher.connStartedAt : 0, }); - // Always emit the latest status before stopping. Terminal states are - // intentionally deferred until stream completion; clean EOFs can also mean - // the backend has no more stream events even when the run status remains active. - this.emit(CloudTaskEvent.Update, { - taskId: watcher.taskId, - runId: watcher.runId, - kind: "status", - status: watcher.lastStatus ?? undefined, - stage: watcher.lastStage, - output: watcher.lastOutput, - errorMessage: watcher.lastErrorMessage, - branch: watcher.lastBranch, - }); - + this.emitStatusUpdate(watcher); this.stopWatcher(key); } @@ -1429,6 +1450,52 @@ export class CloudTaskService extends TypedEventEmitter { } } + private async resolveStreamTarget(watcher: WatcherState): Promise { + const url = `${watcher.apiHost}/api/projects/${watcher.teamId}/tasks/${watcher.taskId}/runs/${watcher.runId}/stream_token/`; + this.log.info("[agent-proxy debug] resolveStreamTarget → GET stream_token", { + url, + taskId: watcher.taskId, + runId: watcher.runId, + }); + try { + const response = await this.auth.authenticatedFetch(url, { + method: "GET", + }); + if (!response.ok) { + // Reachable but refused: read from Django and don't retry resolution. + watcher.streamBaseUrl = null; + watcher.streamReadToken = null; + watcher.streamTargetResolved = true; + this.log.info( + "[agent-proxy debug] stream_token refused → reading from Django", + { status: response.status }, + ); + return; + } + const data = (await response.json()) as { + token?: string; + stream_base_url?: string | null; + }; + watcher.streamReadToken = data.token ?? null; + watcher.streamBaseUrl = data.stream_base_url ?? null; + watcher.streamTargetResolved = true; + this.log.info("[agent-proxy debug] stream_token resolved", { + streamBaseUrl: watcher.streamBaseUrl, + hasToken: Boolean(watcher.streamReadToken), + }); + } catch (error) { + // Transient failure: leave unresolved so the next reconnect retries; connectSse falls back + // to the Django leg meanwhile since streamBaseUrl stays null. + watcher.streamBaseUrl = null; + watcher.streamReadToken = null; + this.log.warn("Cloud task stream target resolution failed", { + taskId: watcher.taskId, + runId: watcher.runId, + error, + }); + } + } + private async fetchTaskRun( watcher: WatcherState, ): Promise { From a137ac43f2b042be8f64513c8a61de37e7e2b013 Mon Sep 17 00:00:00 2001 From: Charles Vien Date: Wed, 10 Jun 2026 23:25:06 -0700 Subject: [PATCH 02/20] format cloud-task log call --- packages/core/src/cloud-task/cloud-task.ts | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/packages/core/src/cloud-task/cloud-task.ts b/packages/core/src/cloud-task/cloud-task.ts index 6ed00b8635..ea8cea1f90 100644 --- a/packages/core/src/cloud-task/cloud-task.ts +++ b/packages/core/src/cloud-task/cloud-task.ts @@ -1452,11 +1452,14 @@ export class CloudTaskService extends TypedEventEmitter { private async resolveStreamTarget(watcher: WatcherState): Promise { const url = `${watcher.apiHost}/api/projects/${watcher.teamId}/tasks/${watcher.taskId}/runs/${watcher.runId}/stream_token/`; - this.log.info("[agent-proxy debug] resolveStreamTarget → GET stream_token", { - url, - taskId: watcher.taskId, - runId: watcher.runId, - }); + this.log.info( + "[agent-proxy debug] resolveStreamTarget → GET stream_token", + { + url, + taskId: watcher.taskId, + runId: watcher.runId, + }, + ); try { const response = await this.auth.authenticatedFetch(url, { method: "GET", From 05d5703400b9b94dd6ae59cfbf57066698009624 Mon Sep 17 00:00:00 2001 From: Charles Vien Date: Thu, 11 Jun 2026 17:03:04 -0700 Subject: [PATCH 03/20] fix stream-end during bootstrap dropping snapshot --- .../core/src/cloud-task/cloud-task.test.ts | 80 +++++++++++++++++++ packages/core/src/cloud-task/cloud-task.ts | 27 ++++--- 2 files changed, 95 insertions(+), 12 deletions(-) diff --git a/packages/core/src/cloud-task/cloud-task.test.ts b/packages/core/src/cloud-task/cloud-task.test.ts index b90e0602df..e4b78b040c 100644 --- a/packages/core/src/cloud-task/cloud-task.test.ts +++ b/packages/core/src/cloud-task/cloud-task.test.ts @@ -667,6 +667,86 @@ describe("CloudTaskService", () => { await waitFor(() => !hasWatcher()); }); + it("emits the bootstrap snapshot when stream-end arrives mid-bootstrap", async () => { + vi.useFakeTimers(); + + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + const historicalEntry = { + type: "notification", + timestamp: "2026-01-01T00:00:01Z", + notification: { + jsonrpc: "2.0", + method: "_posthog/console", + params: { sessionId: "run-1", level: "info", message: "backlog" }, + }, + }; + + // Hold the session_logs fetch open until the stream has already delivered + // stream-end and closed, so completion races the bootstrap snapshot. + let releaseSessionLogs: (() => void) | undefined; + const sessionLogsGate = new Promise((resolve) => { + releaseSessionLogs = resolve; + }); + + mockNetFetch.mockImplementation(async (input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + await sessionLogsGate; + return createJsonResponse([historicalEntry], 200, { + "X-Has-More": "false", + }); + } + return createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: "build", + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }); + }); + + mockStreamFetch.mockImplementation(() => + Promise.resolve(createSseResponse("event: stream-end\ndata: {}\n\n")), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + // Stream connects, delivers stream-end and EOFs while session_logs hangs. + await waitFor(() => mockStreamFetch.mock.calls.length === 1); + await vi.advanceTimersByTimeAsync(1_000); + + releaseSessionLogs?.(); + + const hasWatcher = (): boolean => + (service as unknown as { watchers: Map }).watchers.has( + "task-1:run-1", + ); + await waitFor(() => !hasWatcher()); + + const snapshots = updates.filter( + (u) => + typeof u === "object" && + u !== null && + (u as { kind?: string }).kind === "snapshot", + ); + expect(snapshots).toHaveLength(1); + expect( + (snapshots[0] as { newEntries: unknown[] }).newEntries, + ).toContainEqual(historicalEntry); + // The stream-end stop must not schedule another connection. + await vi.advanceTimersByTimeAsync(10_000); + expect(mockStreamFetch.mock.calls.length).toBe(1); + }); + it("reads via the agent-proxy with a Bearer token when the server resolves a base url", async () => { vi.useFakeTimers(); diff --git a/packages/core/src/cloud-task/cloud-task.ts b/packages/core/src/cloud-task/cloud-task.ts index ea8cea1f90..e439a659ba 100644 --- a/packages/core/src/cloud-task/cloud-task.ts +++ b/packages/core/src/cloud-task/cloud-task.ts @@ -1282,6 +1282,21 @@ export class CloudTaskService extends TypedEventEmitter { if (!watcher) return; if (watcher.failed) return; + const { reconnectOnDisconnect } = options; + + // Bootstrap owns the snapshot lifecycle, so it must be consulted before the + // stream-end stop: stopping mid-bootstrap deletes the watcher before the + // snapshot is emitted and silently discards the backlog plus any live + // entries buffered during bootstrap. Record intent and let bootstrap finish. + if (watcher.isBootstrapping) { + if (watcher.streamEnded || !reconnectOnDisconnect) { + watcher.needsStopAfterBootstrap = true; + } else { + watcher.needsPostBootstrapReconnect = true; + } + return; + } + // The durable end-of-stream sentinel is the ONLY signal that ends the watch. The client is // unaware of sandbox/run status and assumes the transport breaks mid-message constantly, so any // disconnect without stream-end is just transport churn to ride out by reconnecting. We never @@ -1294,18 +1309,6 @@ export class CloudTaskService extends TypedEventEmitter { return; } - const { reconnectOnDisconnect } = options; - - if (watcher.isBootstrapping) { - // Bootstrap still owns the snapshot lifecycle; record reconnect intent and let it finish. - if (reconnectOnDisconnect) { - watcher.needsPostBootstrapReconnect = true; - } else { - watcher.needsStopAfterBootstrap = true; - } - return; - } - if (reconnectOnDisconnect) { this.scheduleReconnect(key, options.reconnectError, { countAttempt: options.countReconnectAttempt ?? false, From 76027a5f55b3992b7d2f9e9b74c7dc18683bb404 Mon Sep 17 00:00:00 2001 From: Charles Vien Date: Thu, 11 Jun 2026 17:04:41 -0700 Subject: [PATCH 04/20] verify run status before stream-end stop --- .../core/src/cloud-task/cloud-task.test.ts | 69 +++++++++++++++++++ packages/core/src/cloud-task/cloud-task.ts | 26 ++++++- 2 files changed, 92 insertions(+), 3 deletions(-) diff --git a/packages/core/src/cloud-task/cloud-task.test.ts b/packages/core/src/cloud-task/cloud-task.test.ts index e4b78b040c..85b50a32e1 100644 --- a/packages/core/src/cloud-task/cloud-task.test.ts +++ b/packages/core/src/cloud-task/cloud-task.test.ts @@ -747,6 +747,75 @@ describe("CloudTaskService", () => { expect(mockStreamFetch.mock.calls.length).toBe(1); }); + it("repairs the final status when stream-end arrives without a terminal state event", async () => { + vi.useFakeTimers(); + + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + let runFetchCount = 0; + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } + runFetchCount += 1; + // First fetch bootstraps an active run; the stream then ends without ever + // carrying a terminal task_run_state event, so the stop path must fetch + // the completed status itself. + return Promise.resolve( + createJsonResponse( + runFetchCount === 1 + ? { + id: "run-1", + status: "in_progress", + stage: "build", + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + } + : { + id: "run-1", + status: "completed", + stage: null, + output: { pr_url: "https://github.com/PostHog/code/pull/9" }, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:05Z", + }, + ), + ); + }); + + mockStreamFetch.mockImplementation(() => + Promise.resolve(createSseResponse("event: stream-end\ndata: {}\n\n")), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + const hasWatcher = (): boolean => + (service as unknown as { watchers: Map }).watchers.has( + "task-1:run-1", + ); + await waitFor(() => !hasWatcher()); + + expect(updates).toContainEqual( + expect.objectContaining({ + kind: "status", + status: "completed", + output: { pr_url: "https://github.com/PostHog/code/pull/9" }, + }), + ); + }); + it("reads via the agent-proxy with a Bearer token when the server resolves a base url", async () => { vi.useFakeTimers(); diff --git a/packages/core/src/cloud-task/cloud-task.ts b/packages/core/src/cloud-task/cloud-task.ts index e439a659ba..e5a9731957 100644 --- a/packages/core/src/cloud-task/cloud-task.ts +++ b/packages/core/src/cloud-task/cloud-task.ts @@ -624,7 +624,7 @@ export class CloudTaskService extends TypedEventEmitter { if (watcher.needsStopAfterBootstrap) { watcher.needsStopAfterBootstrap = false; - this.stopWatcher(key); + await this.finalizeWatcherStop(key); return; } @@ -1304,8 +1304,7 @@ export class CloudTaskService extends TypedEventEmitter { // the run went terminal but before stream-end arrived. Status is updated for display only, from // the task_run_state events the stream itself carries. if (watcher.streamEnded) { - this.emitStatusUpdate(watcher); - this.stopWatcher(key); + await this.finalizeWatcherStop(key); return; } @@ -1328,6 +1327,27 @@ export class CloudTaskService extends TypedEventEmitter { watcher.connStartedAt !== 0 ? Date.now() - watcher.connStartedAt : 0, }); + await this.finalizeWatcherStop(key); + } + + // Stops a watcher whose stream is durably complete. Status normally arrives via the + // task_run_state events the stream carries, but if the stream ended without a terminal + // status (dropped final frame or a server bug), fetch it once so the session is not left + // permanently "in progress" with no watcher behind it. The poll never decides WHETHER to + // stop — stream-end already did — it only repairs the displayed status. + private async finalizeWatcherStop(key: string): Promise { + const watcher = this.watchers.get(key); + if (!watcher) return; + + if (!isTerminalStatus(watcher.lastStatus)) { + const run = await this.fetchTaskRun(watcher); + const currentWatcher = this.watchers.get(key); + if (!currentWatcher || currentWatcher !== watcher) return; + if (run) { + this.applyTaskRunState(watcher, run); + } + } + this.emitStatusUpdate(watcher); this.stopWatcher(key); } From 3c7ba6a657da07b207548a522b550cc850359ff1 Mon Sep 17 00:00:00 2001 From: Charles Vien Date: Thu, 11 Jun 2026 17:06:35 -0700 Subject: [PATCH 05/20] reset resume state on cloud task retry --- .../core/src/cloud-task/cloud-task.test.ts | 94 +++++++++++++++++++ packages/core/src/cloud-task/cloud-task.ts | 39 +++++--- 2 files changed, 118 insertions(+), 15 deletions(-) diff --git a/packages/core/src/cloud-task/cloud-task.test.ts b/packages/core/src/cloud-task/cloud-task.test.ts index 85b50a32e1..a065aa5f7d 100644 --- a/packages/core/src/cloud-task/cloud-task.test.ts +++ b/packages/core/src/cloud-task/cloud-task.test.ts @@ -933,6 +933,100 @@ describe("CloudTaskService", () => { }); }); + it("retry rebuilds the watcher from scratch after a failure", async () => { + vi.useFakeTimers(); + + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } + return Promise.resolve( + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }), + ); + }); + + // Every connection records a resume position, then dies on a backend error + // frame until the error budget fails the watcher. + mockStreamFetch.mockImplementation(() => + Promise.resolve( + createSseResponse( + 'id: 42\nevent: keepalive\ndata: {"type":"keepalive"}\n\nevent: error\ndata: {"error":"boom"}\n\n', + ), + ), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await waitFor(() => mockStreamFetch.mock.calls.length === 1); + await vi.advanceTimersByTimeAsync(70_000); + await waitFor( + () => + updates.some( + (u) => + typeof u === "object" && + u !== null && + (u as { kind?: string }).kind === "error", + ), + 10_000, + ); + + const countSnapshots = (): number => + updates.filter( + (u) => + typeof u === "object" && + u !== null && + (u as { kind?: string }).kind === "snapshot", + ).length; + const tokenCallsBeforeRetry = mockStreamTokenFetch.mock.calls.length; + const snapshotsBeforeRetry = countSnapshots(); + const streamCallsBeforeRetry = mockStreamFetch.mock.calls.length; + + mockStreamFetch.mockImplementation(() => + Promise.resolve( + createOpenSseResponse( + 'event: keepalive\ndata: {"type":"keepalive"}\n\n', + ), + ), + ); + + service.retry("task-1", "run-1"); + + await waitFor( + () => mockStreamFetch.mock.calls.length === streamCallsBeforeRetry + 1, + ); + await waitFor(() => countSnapshots() === snapshotsBeforeRetry + 1); + + // Retry must rebuild from server truth: re-resolve the read leg, re-emit a + // fresh snapshot and drop the poisoned resume position instead of resuming + // from the failed stream's Last-Event-ID. + expect(mockStreamTokenFetch.mock.calls.length).toBe( + tokenCallsBeforeRetry + 1, + ); + const [, init] = mockStreamFetch.mock.calls[streamCallsBeforeRetry]; + expect( + (init?.headers as Record)?.["Last-Event-ID"], + ).toBeUndefined(); + }); + it("emits a retryable cloud error after repeated stream failures", async () => { vi.useFakeTimers(); diff --git a/packages/core/src/cloud-task/cloud-task.ts b/packages/core/src/cloud-task/cloud-task.ts index e5a9731957..85b3b6480e 100644 --- a/packages/core/src/cloud-task/cloud-task.ts +++ b/packages/core/src/cloud-task/cloud-task.ts @@ -330,6 +330,23 @@ export class CloudTaskService extends TypedEventEmitter { watcher.batchFlushTimeoutId = null; } + this.log.info("Retrying cloud task watcher", { + key, + hasSnapshot: watcher.hasEmittedSnapshot, + }); + + // Retry means "start over from scratch". A watcher usually fails because its resume + // state is poisoned (e.g. a Last-Event-ID the server answers with instant empty + // streams); reconnecting with that state preserved loops straight back into the same + // failure. Re-bootstrapping re-resolves the read leg and re-emits a fresh snapshot, + // which also heals any entries missed while the stream was broken. + this.resetWatcherForRebootstrap(watcher); + void this.bootstrapWatcher(key); + } + + // Returns a watcher to its pre-bootstrap state so bootstrapWatcher can rebuild it from + // server truth: budgets, buffers, resume position and read-leg routing all reset. + private resetWatcherForRebootstrap(watcher: WatcherState): void { watcher.reconnectAttempts = 0; watcher.streamErrorAttempts = 0; watcher.cumulativeReconnectAttempts = 0; @@ -338,21 +355,13 @@ export class CloudTaskService extends TypedEventEmitter { watcher.bufferedLogBatches = []; watcher.needsPostBootstrapReconnect = false; watcher.needsStopAfterBootstrap = false; - - this.log.info("Retrying cloud task watcher", { - key, - hasSnapshot: watcher.hasEmittedSnapshot, - }); - - if (!watcher.hasEmittedSnapshot) { - watcher.lastEventId = null; - watcher.totalEntryCount = 0; - watcher.isBootstrapping = false; - void this.bootstrapWatcher(key); - return; - } - - void this.connectSse(key, { startLatest: !watcher.lastEventId }); + watcher.streamEnded = false; + watcher.lastEventId = null; + watcher.totalEntryCount = 0; + watcher.isBootstrapping = false; + watcher.streamTargetResolved = false; + watcher.streamBaseUrl = null; + watcher.streamReadToken = null; } async sendCommand(input: SendCommandInput): Promise { From 063ff929e2bb1358334c5c1a05b74f4f089d0a89 Mon Sep 17 00:00:00 2001 From: Charles Vien Date: Thu, 11 Jun 2026 17:07:55 -0700 Subject: [PATCH 06/20] reset reconnect budget after healthy streams --- .../core/src/cloud-task/cloud-task.test.ts | 74 +++++++++++++++++++ packages/core/src/cloud-task/cloud-task.ts | 18 +++++ 2 files changed, 92 insertions(+) diff --git a/packages/core/src/cloud-task/cloud-task.test.ts b/packages/core/src/cloud-task/cloud-task.test.ts index a065aa5f7d..92d03e9a49 100644 --- a/packages/core/src/cloud-task/cloud-task.test.ts +++ b/packages/core/src/cloud-task/cloud-task.test.ts @@ -1459,6 +1459,80 @@ describe("CloudTaskService", () => { expect(watcher?.reconnectAttempts).toBe(0); }); + it("never fails an idle run riding healthy clean-EOF reconnect cycles", async () => { + vi.useFakeTimers(); + + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } + return Promise.resolve( + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }), + ); + }); + + // An idle run: every connection delivers only a keepalive, lives a healthy + // 65s, then the server closes it cleanly. No data events ever arrive, so + // nothing else resets the cumulative budget across the cycles. + mockStreamFetch.mockImplementation(() => { + const encoder = new TextEncoder(); + const stream = new ReadableStream({ + start(controller) { + controller.enqueue( + encoder.encode('event: keepalive\ndata: {"type":"keepalive"}\n\n'), + ); + setTimeout(() => controller.close(), 65_000); + }, + }); + return Promise.resolve( + new Response(stream, { + status: 200, + headers: { "Content-Type": "text/event-stream" }, + }), + ); + }); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await waitFor(() => mockStreamFetch.mock.calls.length === 1); + // Ride out more cycles than the cumulative budget allows for loops. + await vi.advanceTimersByTimeAsync(67_000 * 35); + await waitFor(() => mockStreamFetch.mock.calls.length >= 32, 20_000); + + expect( + updates.some( + (u) => + typeof u === "object" && + u !== null && + (u as { kind?: string }).kind === "error", + ), + ).toBe(false); + expect( + (service as unknown as { watchers: Map }).watchers.has( + "task-1:run-1", + ), + ).toBe(true); + }); + it("resets the transport reconnect budget once a keepalive proves recovery", async () => { vi.useFakeTimers(); diff --git a/packages/core/src/cloud-task/cloud-task.ts b/packages/core/src/cloud-task/cloud-task.ts index 85b3b6480e..e8b5053fbd 100644 --- a/packages/core/src/cloud-task/cloud-task.ts +++ b/packages/core/src/cloud-task/cloud-task.ts @@ -827,6 +827,21 @@ export class CloudTaskService extends TypedEventEmitter { dataEventsReceived: watcher.connDataEventsReceived, lastEventId: watcher.lastEventId, }); + + // A long-lived connection that closed cleanly is healthy transport churn, not a + // reconnect loop. Clear the cumulative budget so an idle run (keepalives only — + // nothing else resets it) can ride out proxy timeout cycles indefinitely, while + // instant-EOF loops still exhaust it. + const completedWatcher = this.watchers.get(key); + if ( + completedWatcher && + streamWasEstablished && + Date.now() - connectedAt >= SSE_HEALTHY_CONNECTION_MS + ) { + completedWatcher.cumulativeReconnectAttempts = 0; + } + + await this.handleStreamCompletion(key, { reconnectOnDisconnect: true }); } catch (error) { this.flushLogBatch(key); @@ -858,6 +873,9 @@ export class CloudTaskService extends TypedEventEmitter { watcher.streamErrorAttempts += 1; } else if (wasHealthyStream) { watcher.streamErrorAttempts = 0; + // Same as the clean-EOF path: a healthy-length connection proves this is + // timeout cycling, not a loop, so an idle run never exhausts the budget. + watcher.cumulativeReconnectAttempts = 0; } } From 75a2c245eaf70b15426c7e57834affe5ab7e055f Mon Sep 17 00:00:00 2001 From: Charles Vien Date: Thu, 11 Jun 2026 17:11:01 -0700 Subject: [PATCH 07/20] self-heal looping cloud streams via re-bootstrap --- .../core/src/cloud-task/cloud-task.test.ts | 14 +++++++++- packages/core/src/cloud-task/cloud-task.ts | 27 ++++++++++++++++++- 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/packages/core/src/cloud-task/cloud-task.test.ts b/packages/core/src/cloud-task/cloud-task.test.ts index 92d03e9a49..0dd9e41d6d 100644 --- a/packages/core/src/cloud-task/cloud-task.test.ts +++ b/packages/core/src/cloud-task/cloud-task.test.ts @@ -870,7 +870,7 @@ describe("CloudTaskService", () => { ); }); - it("fails the watcher after exhausting the cumulative reconnect budget on clean-EOF loops", async () => { + it("re-bootstraps once on a clean-EOF loop and fails when it persists", async () => { vi.useFakeTimers(); const updates: unknown[] = []; @@ -931,6 +931,18 @@ describe("CloudTaskService", () => { "Could not maintain a connection to the cloud run after many attempts. Click retry once the issue is resolved.", retryable: true, }); + + // The first budget exhaustion self-heals with a full rebuild (fresh read-leg + // resolution and a second snapshot); only the second exhaustion fails. + expect( + updates.filter( + (u) => + typeof u === "object" && + u !== null && + (u as { kind?: string }).kind === "snapshot", + ), + ).toHaveLength(2); + expect(mockStreamTokenFetch.mock.calls.length).toBe(2); }); it("retry rebuilds the watcher from scratch after a failure", async () => { diff --git a/packages/core/src/cloud-task/cloud-task.ts b/packages/core/src/cloud-task/cloud-task.ts index e8b5053fbd..df14323751 100644 --- a/packages/core/src/cloud-task/cloud-task.ts +++ b/packages/core/src/cloud-task/cloud-task.ts @@ -133,6 +133,9 @@ interface WatcherState { needsPostBootstrapReconnect: boolean; needsStopAfterBootstrap: boolean; streamEnded: boolean; + // True once the watcher has consumed its one automatic re-bootstrap recovery for the + // current trouble window; re-armed by a real data event or a healthy-length connection. + selfHealAttempted: boolean; // Read-leg routing, resolved once from the stream_token endpoint and reused across reconnects. // streamBaseUrl set => read via the agent-proxy with streamReadToken; null => read from Django. streamTargetResolved: boolean; @@ -356,6 +359,7 @@ export class CloudTaskService extends TypedEventEmitter { watcher.needsPostBootstrapReconnect = false; watcher.needsStopAfterBootstrap = false; watcher.streamEnded = false; + watcher.selfHealAttempted = false; watcher.lastEventId = null; watcher.totalEntryCount = 0; watcher.isBootstrapping = false; @@ -488,6 +492,7 @@ export class CloudTaskService extends TypedEventEmitter { needsPostBootstrapReconnect: false, needsStopAfterBootstrap: false, streamEnded: false, + selfHealAttempted: false, streamTargetResolved: false, streamBaseUrl: null, streamReadToken: null, @@ -839,6 +844,7 @@ export class CloudTaskService extends TypedEventEmitter { Date.now() - connectedAt >= SSE_HEALTHY_CONNECTION_MS ) { completedWatcher.cumulativeReconnectAttempts = 0; + completedWatcher.selfHealAttempted = false; } @@ -876,6 +882,7 @@ export class CloudTaskService extends TypedEventEmitter { // Same as the clean-EOF path: a healthy-length connection proves this is // timeout cycling, not a loop, so an idle run never exhausts the budget. watcher.cumulativeReconnectAttempts = 0; + watcher.selfHealAttempted = false; } } @@ -944,9 +951,10 @@ export class CloudTaskService extends TypedEventEmitter { } // A real data event proves the stream materialized; clear the backend-error - // and cumulative budgets too. + // and cumulative budgets too, and re-arm the one-shot self-heal recovery. watcher.streamErrorAttempts = 0; watcher.cumulativeReconnectAttempts = 0; + watcher.selfHealAttempted = false; watcher.connDataEventsReceived += 1; if (watcher.connDataEventsReceived === 1 && watcher.connSentLastEventId) { @@ -1247,6 +1255,23 @@ export class CloudTaskService extends TypedEventEmitter { if ( watcher.cumulativeReconnectAttempts > MAX_CUMULATIVE_RECONNECT_ATTEMPTS ) { + // A poisoned resume position (a Last-Event-ID the server answers with instant + // empty streams) burns through the budget without a single error frame. Before + // surfacing a failure, rebuild once from scratch — fresh read leg, fresh snapshot, + // no resume position — the same recovery an app restart performs. A data event or + // a healthy-length connection re-arms the attempt; if the rebuilt watcher loops + // straight back here, fail for real. + if (!watcher.selfHealAttempted) { + watcher.reconnectTimeoutId = null; + this.log.warn( + "Cloud task stream looping without events, re-bootstrapping", + { key }, + ); + this.resetWatcherForRebootstrap(watcher); + watcher.selfHealAttempted = true; + void this.bootstrapWatcher(key); + return; + } this.failWatcher(key, { title: "Cloud run unreachable", message: From 285c002280945cc92b90a8018dd43f9dd002042a Mon Sep 17 00:00:00 2001 From: Charles Vien Date: Thu, 11 Jun 2026 17:13:08 -0700 Subject: [PATCH 08/20] clear last event id when stream leg changes --- .../core/src/cloud-task/cloud-task.test.ts | 75 +++++++++++++++++++ packages/core/src/cloud-task/cloud-task.ts | 26 +++++++ 2 files changed, 101 insertions(+) diff --git a/packages/core/src/cloud-task/cloud-task.test.ts b/packages/core/src/cloud-task/cloud-task.test.ts index 0dd9e41d6d..a37fac6720 100644 --- a/packages/core/src/cloud-task/cloud-task.test.ts +++ b/packages/core/src/cloud-task/cloud-task.test.ts @@ -870,6 +870,81 @@ describe("CloudTaskService", () => { ); }); + it("drops the resume position when the stream leg changes", async () => { + vi.useFakeTimers(); + + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } + return Promise.resolve( + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }), + ); + }); + + // First resolution fails transiently (connection falls back to Django and + // records a Django-id-space resume position); the retried resolution routes + // to the proxy, whose id space is unrelated. + mockStreamTokenFetch + .mockRejectedValueOnce(new Error("network blip")) + .mockImplementation(() => + Promise.resolve( + createJsonResponse({ + token: "proxy-token", + stream_base_url: "https://proxy.example", + }), + ), + ); + + mockStreamFetch + .mockImplementationOnce(() => + Promise.resolve( + createSseResponse( + 'id: 7\nevent: keepalive\ndata: {"type":"keepalive"}\n\n', + ), + ), + ) + .mockImplementation(() => + Promise.resolve( + createOpenSseResponse( + 'event: keepalive\ndata: {"type":"keepalive"}\n\n', + ), + ), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await waitFor(() => mockStreamFetch.mock.calls.length >= 2, 10_000); + + const [firstUrl] = mockStreamFetch.mock.calls[0]; + const [secondUrl, secondInit] = mockStreamFetch.mock.calls[1]; + expect(String(firstUrl)).toContain("https://app.example.com/api/"); + expect(String(secondUrl)).toMatch( + /^https:\/\/proxy\.example\/v1\/runs\/run-1\/stream/, + ); + // The Django resume position must not leak into the proxy leg. + expect( + (secondInit?.headers as Record)?.["Last-Event-ID"], + ).toBeUndefined(); + expect(String(secondUrl)).toContain("start=latest"); + }); + it("re-bootstraps once on a clean-EOF loop and fails when it persists", async () => { vi.useFakeTimers(); diff --git a/packages/core/src/cloud-task/cloud-task.ts b/packages/core/src/cloud-task/cloud-task.ts index df14323751..a8fe345c4a 100644 --- a/packages/core/src/cloud-task/cloud-task.ts +++ b/packages/core/src/cloud-task/cloud-task.ts @@ -97,6 +97,10 @@ interface TaskRunStateEvent { completed_at?: string | null; } +// Which endpoint a stream connection reads from. Event ids are only meaningful within +// the leg that issued them: the proxy and Django id spaces are unrelated. +type StreamLeg = "proxy" | "django"; + interface WatcherState { taskId: string; runId: string; @@ -116,6 +120,9 @@ interface WatcherState { streamErrorAttempts: number; cumulativeReconnectAttempts: number; lastEventId: string | null; + // Leg that issued lastEventId, and the leg of the connection currently being read. + lastEventIdLeg: StreamLeg | null; + streamLeg: StreamLeg | null; lastStatus: TaskRunStatus | null; lastStage: string | null; lastOutput: Record | null; @@ -361,6 +368,8 @@ export class CloudTaskService extends TypedEventEmitter { watcher.streamEnded = false; watcher.selfHealAttempted = false; watcher.lastEventId = null; + watcher.lastEventIdLeg = null; + watcher.streamLeg = null; watcher.totalEntryCount = 0; watcher.isBootstrapping = false; watcher.streamTargetResolved = false; @@ -475,6 +484,8 @@ export class CloudTaskService extends TypedEventEmitter { streamErrorAttempts: 0, cumulativeReconnectAttempts: 0, lastEventId: null, + lastEventIdLeg: null, + streamLeg: null, lastStatus: null, lastStage: null, lastOutput: null, @@ -714,6 +725,20 @@ export class CloudTaskService extends TypedEventEmitter { const base = usingProxy ? watcher.streamBaseUrl?.replace(/\/+$/, "") : watcher.apiHost; + const leg: StreamLeg = usingProxy ? "proxy" : "django"; + // Resuming a proxy stream from a Django id (or vice versa) is undefined: the id + // spaces are unrelated. Drop the resume position on a leg switch and let + // start=latest plus the next snapshot cover the gap. + if (watcher.lastEventId && watcher.lastEventIdLeg !== leg) { + this.log.info("Cloud task stream leg changed, dropping resume position", { + key, + from: watcher.lastEventIdLeg, + to: leg, + }); + watcher.lastEventId = null; + watcher.lastEventIdLeg = null; + } + watcher.streamLeg = leg; // The proxy exposes a clean run-scoped path; the run-scoped token carries team and task. const url = new URL( usingProxy @@ -923,6 +948,7 @@ export class CloudTaskService extends TypedEventEmitter { if (event.id) { watcher.lastEventId = event.id; + watcher.lastEventIdLeg = watcher.streamLeg; } if (event.event === "error") { From 29b030adc283de2b593a055648ff74a6bef4532d Mon Sep 17 00:00:00 2001 From: Charles Vien Date: Thu, 11 Jun 2026 17:14:58 -0700 Subject: [PATCH 09/20] clean up agent proxy debug logging --- .../agent/src/server/event-stream-sender.ts | 2 +- packages/core/src/cloud-task/cloud-task.ts | 25 ++++++++----------- 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/packages/agent/src/server/event-stream-sender.ts b/packages/agent/src/server/event-stream-sender.ts index 1ea82b7e77..a65f7effbb 100644 --- a/packages/agent/src/server/event-stream-sender.ts +++ b/packages/agent/src/server/event-stream-sender.ts @@ -100,7 +100,7 @@ export class TaskRunEventStreamSender { : `${ingestBase}/api/projects/${config.projectId}/tasks/${encodeURIComponent( config.taskId, )}/runs/${encodeURIComponent(config.runId)}/event_stream/`; - config.logger.info("[agent-proxy debug] event ingest target resolved", { + config.logger.info("Event ingest target resolved", { ingestUrl: this.ingestUrl, routedToProxy: usingProxy, }); diff --git a/packages/core/src/cloud-task/cloud-task.ts b/packages/core/src/cloud-task/cloud-task.ts index a8fe345c4a..ace82d3344 100644 --- a/packages/core/src/cloud-task/cloud-task.ts +++ b/packages/core/src/cloud-task/cloud-task.ts @@ -758,7 +758,9 @@ export class CloudTaskService extends TypedEventEmitter { headers.Authorization = `Bearer ${watcher.streamReadToken}`; } - this.log.info("[agent-proxy debug] opening SSE stream", { + // Debug level: this fires on every reconnect, which during transport churn is + // multiple times per second and would otherwise dominate the log file. + this.log.debug("Opening cloud task stream", { usingProxy, streamUrl: url.toString(), }); @@ -1553,14 +1555,6 @@ export class CloudTaskService extends TypedEventEmitter { private async resolveStreamTarget(watcher: WatcherState): Promise { const url = `${watcher.apiHost}/api/projects/${watcher.teamId}/tasks/${watcher.taskId}/runs/${watcher.runId}/stream_token/`; - this.log.info( - "[agent-proxy debug] resolveStreamTarget → GET stream_token", - { - url, - taskId: watcher.taskId, - runId: watcher.runId, - }, - ); try { const response = await this.auth.authenticatedFetch(url, { method: "GET", @@ -1570,10 +1564,11 @@ export class CloudTaskService extends TypedEventEmitter { watcher.streamBaseUrl = null; watcher.streamReadToken = null; watcher.streamTargetResolved = true; - this.log.info( - "[agent-proxy debug] stream_token refused → reading from Django", - { status: response.status }, - ); + this.log.info("Cloud task stream reading from API host", { + taskId: watcher.taskId, + runId: watcher.runId, + status: response.status, + }); return; } const data = (await response.json()) as { @@ -1583,7 +1578,9 @@ export class CloudTaskService extends TypedEventEmitter { watcher.streamReadToken = data.token ?? null; watcher.streamBaseUrl = data.stream_base_url ?? null; watcher.streamTargetResolved = true; - this.log.info("[agent-proxy debug] stream_token resolved", { + this.log.info("Cloud task stream target resolved", { + taskId: watcher.taskId, + runId: watcher.runId, streamBaseUrl: watcher.streamBaseUrl, hasToken: Boolean(watcher.streamReadToken), }); From e0a97ac0ebf34ad7cf00077e149da0a4c7473d55 Mon Sep 17 00:00:00 2001 From: Charles Vien Date: Thu, 11 Jun 2026 18:35:04 -0700 Subject: [PATCH 10/20] fall back to status polling on old servers --- .../core/src/cloud-task/cloud-task.test.ts | 128 ++++++++++++++++++ packages/core/src/cloud-task/cloud-task.ts | 55 +++++++- 2 files changed, 176 insertions(+), 7 deletions(-) diff --git a/packages/core/src/cloud-task/cloud-task.test.ts b/packages/core/src/cloud-task/cloud-task.test.ts index a37fac6720..c307a807fb 100644 --- a/packages/core/src/cloud-task/cloud-task.test.ts +++ b/packages/core/src/cloud-task/cloud-task.test.ts @@ -106,6 +106,8 @@ describe("CloudTaskService", () => { mockStreamFetch.mockReset(); mockStreamTokenFetch.mockReset(); // Default read-leg resolution: no proxy URL, so the stream reads from Django directly. + // A resolving stream_token endpoint implies the durable-stream contract (stream-end); + // legacy-mode tests override this with a 404 to model old servers. mockStreamTokenFetch.mockImplementation(() => Promise.resolve( createJsonResponse({ token: "test-token", stream_base_url: null }), @@ -945,6 +947,132 @@ describe("CloudTaskService", () => { expect(String(secondUrl)).toContain("start=latest"); }); + it("old servers without stream_token use legacy polling to reconnect and stop", async () => { + vi.useFakeTimers(); + + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + // Old server: the endpoint does not exist, so no stream-end ever arrives and the + // client must poll run status on each clean EOF to decide stop vs reconnect. + mockStreamTokenFetch.mockImplementation(() => + Promise.resolve(createJsonResponse({ detail: "Not found" }, 404)), + ); + + let runFetchCount = 0; + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } + runFetchCount += 1; + // Calls 1-3 (bootstrap, post-bootstrap verify, first legacy poll) report an + // active run; the second legacy poll reports terminal. + const terminal = runFetchCount >= 4; + return Promise.resolve( + createJsonResponse({ + id: "run-1", + status: terminal ? "completed" : "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: terminal + ? "2026-01-01T00:00:05Z" + : "2026-01-01T00:00:00Z", + }), + ); + }); + + // The flag-off server never emits stream-end; every connection just EOFs. + mockStreamFetch.mockImplementation(() => + Promise.resolve(createSseResponse("")), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + const hasWatcher = (): boolean => + (service as unknown as { watchers: Map }).watchers.has( + "task-1:run-1", + ); + await waitFor(() => !hasWatcher(), 10_000); + + // At least one legacy reconnect happened while the run was active, then the + // terminal poll stopped the watcher; no further connections after that. + const connectionsAtStop = mockStreamFetch.mock.calls.length; + expect(connectionsAtStop).toBeGreaterThanOrEqual(2); + expect(updates).toContainEqual( + expect.objectContaining({ kind: "status", status: "completed" }), + ); + await vi.advanceTimersByTimeAsync(10_000); + expect(mockStreamFetch.mock.calls.length).toBe(connectionsAtStop); + // The refused resolution is cached: one stream_token call for the whole watch. + expect(mockStreamTokenFetch.mock.calls.length).toBe(1); + }); + + it("stream-end still stops the watcher in legacy mode", async () => { + vi.useFakeTimers(); + + // Old server detected via 404, yet the stream delivers stream-end anyway (e.g. a + // server upgraded mid-watch). The sentinel is honored in both modes. + mockStreamTokenFetch.mockImplementation(() => + Promise.resolve(createJsonResponse({ detail: "Not found" }, 404)), + ); + + let runFetchCount = 0; + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } + runFetchCount += 1; + // Bootstrap sees an active run so the stream actually opens; the + // stream-end stop path then repairs the status to terminal. + const terminal = runFetchCount >= 2; + return Promise.resolve( + createJsonResponse({ + id: "run-1", + status: terminal ? "completed" : "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: terminal + ? "2026-01-01T00:00:05Z" + : "2026-01-01T00:00:00Z", + }), + ); + }); + + mockStreamFetch.mockImplementation(() => + Promise.resolve(createSseResponse("event: stream-end\ndata: {}\n\n")), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + const hasWatcher = (): boolean => + (service as unknown as { watchers: Map }).watchers.has( + "task-1:run-1", + ); + await waitFor(() => !hasWatcher(), 10_000); + + expect(mockStreamFetch.mock.calls.length).toBe(1); + }); + it("re-bootstraps once on a clean-EOF loop and fails when it persists", async () => { vi.useFakeTimers(); diff --git a/packages/core/src/cloud-task/cloud-task.ts b/packages/core/src/cloud-task/cloud-task.ts index ace82d3344..c3cfd39e20 100644 --- a/packages/core/src/cloud-task/cloud-task.ts +++ b/packages/core/src/cloud-task/cloud-task.ts @@ -148,6 +148,10 @@ interface WatcherState { streamTargetResolved: boolean; streamBaseUrl: string | null; streamReadToken: string | null; + // True once stream_token resolved OK: servers with that endpoint always emit the stream-end + // sentinel. False for old servers (404), where the client must fall back to legacy status + // polling to detect the end of a run. + durableStreamEnabled: boolean; } function watcherKey(taskId: string, runId: string): string { @@ -375,6 +379,7 @@ export class CloudTaskService extends TypedEventEmitter { watcher.streamTargetResolved = false; watcher.streamBaseUrl = null; watcher.streamReadToken = null; + watcher.durableStreamEnabled = false; } async sendCommand(input: SendCommandInput): Promise { @@ -507,6 +512,7 @@ export class CloudTaskService extends TypedEventEmitter { streamTargetResolved: false, streamBaseUrl: null, streamReadToken: null, + durableStreamEnabled: false, }; this.watchers.set(key, watcher); @@ -1377,17 +1383,45 @@ export class CloudTaskService extends TypedEventEmitter { return; } - // The durable end-of-stream sentinel is the ONLY signal that ends the watch. The client is - // unaware of sandbox/run status and assumes the transport breaks mid-message constantly, so any - // disconnect without stream-end is just transport churn to ride out by reconnecting. We never - // poll run status to decide whether to stop — that would drop the tail if the stream cut right as - // the run went terminal but before stream-end arrived. Status is updated for display only, from - // the task_run_state events the stream itself carries. + // On a durable stream the end-of-stream sentinel is the ONLY signal that ends the watch. The + // client is unaware of sandbox/run status and assumes the transport breaks mid-message + // constantly, so any disconnect without stream-end is just transport churn to ride out by + // reconnecting. We never poll run status to decide whether to stop — that would drop the tail + // if the stream cut right as the run went terminal but before stream-end arrived. Status is + // updated for display only, from the task_run_state events the stream itself carries. if (watcher.streamEnded) { await this.finalizeWatcherStop(key); return; } + // Legacy mode (old server or rollout flag off): the stream carries no end-of-run sentinel, + // so a disconnect is the old contract — poll run status to decide between stopping and + // reconnecting. The reconnect budgets deliberately keep the new semantics (no reset on + // polled progress; that defeated the runaway-loop cap and let a poisoned stream churn for + // hours), so the self-heal recovery stays active here too. + if (!watcher.durableStreamEnabled && reconnectOnDisconnect) { + const run = await this.fetchTaskRun(watcher); + const legacyWatcher = this.watchers.get(key); + if (!legacyWatcher || legacyWatcher !== watcher) return; + if (watcher.failed) return; + + if (run) { + this.applyTaskRunState(watcher, run); + } + if (isTerminalStatus(watcher.lastStatus)) { + this.emitStatusUpdate(watcher); + this.stopWatcher(key); + return; + } + if (run) { + this.emitStatusUpdate(watcher); + } + this.scheduleReconnect(key, options.reconnectError, { + countAttempt: options.countReconnectAttempt ?? false, + }); + return; + } + if (reconnectOnDisconnect) { this.scheduleReconnect(key, options.reconnectError, { countAttempt: options.countReconnectAttempt ?? false, @@ -1560,9 +1594,11 @@ export class CloudTaskService extends TypedEventEmitter { method: "GET", }); if (!response.ok) { - // Reachable but refused: read from Django and don't retry resolution. + // Reachable but refused (or an old server without the endpoint): read from Django + // with legacy status-polling semantics and don't retry resolution. watcher.streamBaseUrl = null; watcher.streamReadToken = null; + watcher.durableStreamEnabled = false; watcher.streamTargetResolved = true; this.log.info("Cloud task stream reading from API host", { taskId: watcher.taskId, @@ -1577,12 +1613,17 @@ export class CloudTaskService extends TypedEventEmitter { }; watcher.streamReadToken = data.token ?? null; watcher.streamBaseUrl = data.stream_base_url ?? null; + // Every server with the stream_token endpoint emits the stream-end sentinel, so the + // endpoint resolving at all is the capability signal that opts this watcher into the + // status-unaware contract. Old servers 404 above and stay on legacy status polling. + watcher.durableStreamEnabled = true; watcher.streamTargetResolved = true; this.log.info("Cloud task stream target resolved", { taskId: watcher.taskId, runId: watcher.runId, streamBaseUrl: watcher.streamBaseUrl, hasToken: Boolean(watcher.streamReadToken), + durableStream: watcher.durableStreamEnabled, }); } catch (error) { // Transient failure: leave unresolved so the next reconnect retries; connectSse falls back From 3affe09f2fc8b7ffeb8a3b8ecc034e7801d3e341 Mon Sep 17 00:00:00 2001 From: Charles Vien Date: Thu, 11 Jun 2026 18:35:53 -0700 Subject: [PATCH 11/20] re-resolve stream target on proxy 401 --- .../core/src/cloud-task/cloud-task.test.ts | 149 ++++++++++++++++++ packages/core/src/cloud-task/cloud-task.ts | 26 +++ 2 files changed, 175 insertions(+) diff --git a/packages/core/src/cloud-task/cloud-task.test.ts b/packages/core/src/cloud-task/cloud-task.test.ts index c307a807fb..b60059bd4c 100644 --- a/packages/core/src/cloud-task/cloud-task.test.ts +++ b/packages/core/src/cloud-task/cloud-task.test.ts @@ -1017,6 +1017,155 @@ describe("CloudTaskService", () => { expect(mockStreamTokenFetch.mock.calls.length).toBe(1); }); + it("proxy 401 re-resolves the read target and resumes with a fresh token", async () => { + vi.useFakeTimers(); + + mockStreamTokenFetch + .mockImplementationOnce(() => + Promise.resolve( + createJsonResponse({ + token: "expired-token", + stream_base_url: "https://proxy.example", + }), + ), + ) + .mockImplementation(() => + Promise.resolve( + createJsonResponse({ + token: "fresh-token", + stream_base_url: "https://proxy.example", + }), + ), + ); + + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } + return Promise.resolve( + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }), + ); + }); + + mockStreamFetch + .mockImplementationOnce(() => + Promise.resolve(createJsonResponse({ detail: "expired" }, 401)), + ) + .mockImplementation(() => + Promise.resolve( + createOpenSseResponse( + 'event: keepalive\ndata: {"type":"keepalive"}\n\n', + ), + ), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await waitFor(() => mockStreamFetch.mock.calls.length >= 2, 10_000); + + expect(mockStreamTokenFetch.mock.calls.length).toBe(2); + const [secondUrl, secondInit] = mockStreamFetch.mock.calls[1]; + expect(String(secondUrl)).toMatch( + /^https:\/\/proxy\.example\/v1\/runs\/run-1\/stream/, + ); + expect((secondInit?.headers as Record)?.Authorization).toBe( + "Bearer fresh-token", + ); + expect( + (service as unknown as { watchers: Map }).watchers.has( + "task-1:run-1", + ), + ).toBe(true); + }); + + it("proxy 401 falls back to Django when the proxy is withdrawn", async () => { + vi.useFakeTimers(); + + // The re-resolution after the 401 no longer offers a proxy (rollout flag turned + // off mid-run); the watcher continues on the Django leg, which still emits the + // stream-end sentinel. + mockStreamTokenFetch + .mockImplementationOnce(() => + Promise.resolve( + createJsonResponse({ + token: "expired-token", + stream_base_url: "https://proxy.example", + }), + ), + ) + .mockImplementation(() => + Promise.resolve( + createJsonResponse({ token: "django-token", stream_base_url: null }), + ), + ); + + let runFetchCount = 0; + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } + runFetchCount += 1; + const terminal = runFetchCount >= 2; + return Promise.resolve( + createJsonResponse({ + id: "run-1", + status: terminal ? "completed" : "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: terminal + ? "2026-01-01T00:00:05Z" + : "2026-01-01T00:00:00Z", + }), + ); + }); + + mockStreamFetch + .mockImplementationOnce(() => + Promise.resolve(createJsonResponse({ detail: "expired" }, 401)), + ) + .mockImplementation(() => + Promise.resolve(createSseResponse("event: stream-end\ndata: {}\n\n")), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + const hasWatcher = (): boolean => + (service as unknown as { watchers: Map }).watchers.has( + "task-1:run-1", + ); + await waitFor(() => !hasWatcher(), 10_000); + + expect(mockStreamTokenFetch.mock.calls.length).toBe(2); + const [secondUrl] = mockStreamFetch.mock.calls[1]; + expect(String(secondUrl)).toContain("https://app.example.com/api/"); + }); + it("stream-end still stops the watcher in legacy mode", async () => { vi.useFakeTimers(); diff --git a/packages/core/src/cloud-task/cloud-task.ts b/packages/core/src/cloud-task/cloud-task.ts index c3cfd39e20..657f14ac99 100644 --- a/packages/core/src/cloud-task/cloud-task.ts +++ b/packages/core/src/cloud-task/cloud-task.ts @@ -889,6 +889,32 @@ export class CloudTaskService extends TypedEventEmitter { return; } + // Proxy-leg 401: the run-scoped read token expired mid-watch (short TTL) or its signing + // key rotated. Re-resolve the read target — which mints a fresh token, or routes back to + // Django if the rollout flag was turned off meanwhile — instead of failing. Django-leg + // 401 stays fatal below (the user session expired). Counting the attempt bounds + // persistent proxy 401s by the transport reconnect budget. + const unauthorizedWatcher = this.watchers.get(key); + if ( + error instanceof CloudTaskStreamError && + error.status === 401 && + unauthorizedWatcher?.streamBaseUrl + ) { + unauthorizedWatcher.streamTargetResolved = false; + unauthorizedWatcher.streamBaseUrl = null; + unauthorizedWatcher.streamReadToken = null; + unauthorizedWatcher.durableStreamEnabled = false; + this.log.info("Cloud task stream proxy token rejected, re-resolving", { + key, + }); + await this.handleStreamCompletion(key, { + reconnectOnDisconnect: true, + reconnectError: error, + countReconnectAttempt: true, + }); + return; + } + if ( error instanceof CloudTaskStreamError && error.details.autoRetry === false From ea8ed995f4902c961780401ffbe7461cc94b9531 Mon Sep 17 00:00:00 2001 From: Charles Vien Date: Mon, 22 Jun 2026 09:24:23 -0700 Subject: [PATCH 12/20] compute start-latest after stream leg drop --- packages/core/src/cloud-task/cloud-task.ts | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/packages/core/src/cloud-task/cloud-task.ts b/packages/core/src/cloud-task/cloud-task.ts index 657f14ac99..b07ffd5537 100644 --- a/packages/core/src/cloud-task/cloud-task.ts +++ b/packages/core/src/cloud-task/cloud-task.ts @@ -707,9 +707,7 @@ export class CloudTaskService extends TypedEventEmitter { watcher.sseAbortController = controller; watcher.connStartedAt = 0; - watcher.connSentLastEventId = watcher.lastEventId; watcher.connDataEventsReceived = 0; - const startLatest = Boolean(options?.startLatest && !watcher.lastEventId); // Resolve the read target once (proxy URL + token, or Django). The server owns the decision; // reused across reconnects so transport churn never re-mints a token. @@ -745,6 +743,11 @@ export class CloudTaskService extends TypedEventEmitter { watcher.lastEventIdLeg = null; } watcher.streamLeg = leg; + + // Captured after the leg-switch drop so they reflect what this connection actually sends: + // a dropped resume position means no Last-Event-ID and a start=latest reconnect. + watcher.connSentLastEventId = watcher.lastEventId; + const startLatest = Boolean(options?.startLatest && !watcher.lastEventId); // The proxy exposes a clean run-scoped path; the run-scoped token carries team and task. const url = new URL( usingProxy @@ -880,7 +883,6 @@ export class CloudTaskService extends TypedEventEmitter { completedWatcher.selfHealAttempted = false; } - await this.handleStreamCompletion(key, { reconnectOnDisconnect: true }); } catch (error) { this.flushLogBatch(key); From 80aca6e1cf92dfbc50a81e99685462797d2b3c7b Mon Sep 17 00:00:00 2001 From: Charles Vien Date: Tue, 23 Jun 2026 15:34:20 -0700 Subject: [PATCH 13/20] comments --- packages/agent/src/server/bin.ts | 10 +- .../agent/src/server/event-stream-sender.ts | 8 +- packages/agent/src/server/types.ts | 3 +- .../core/src/cloud-task/cloud-task.test.ts | 21 +-- packages/core/src/cloud-task/cloud-task.ts | 140 ++++++------------ 5 files changed, 56 insertions(+), 126 deletions(-) diff --git a/packages/agent/src/server/bin.ts b/packages/agent/src/server/bin.ts index 2ac73e8426..d840ddadc0 100644 --- a/packages/agent/src/server/bin.ts +++ b/packages/agent/src/server/bin.ts @@ -33,8 +33,7 @@ const envSchema = z.object({ .enum(["low", "medium", "high", "xhigh", "max"]) .optional(), POSTHOG_TASK_RUN_EVENT_INGEST_TOKEN: z.string().min(1).optional(), - // Routes the event-ingest POST to the standalone agent-proxy; other API calls keep using - // POSTHOG_API_URL. Falls back to POSTHOG_API_URL when unset. + // Base URL for the event-ingest POST only; falls back to POSTHOG_API_URL when unset. POSTHOG_TASK_RUN_EVENT_INGEST_URL: z.url().optional(), POSTHOG_TASK_RUN_EVENT_INGEST_STREAM_WINDOW_MS: z .string() @@ -196,11 +195,8 @@ program process.exit(0); }); - // A hard crash would otherwise leave the run non-terminal and the user staring - // at a generic "Cloud stream disconnected". Mark the run failed before exiting - // so the desktop surfaces a real error instead of a silent stall. The deadline - // guarantees we exit even if reportFatalError's network calls hang at crash time - // (e.g. API unreachable during a restart), so we never block pod shutdown. + // Mark the run failed before exiting so a hard crash surfaces a real error instead of a + // silent stall. The deadline guarantees we exit even if the report hangs at crash time. const FATAL_ERROR_REPORT_DEADLINE_MS = 5_000; const handleFatalError = async (error: unknown) => { try { diff --git a/packages/agent/src/server/event-stream-sender.ts b/packages/agent/src/server/event-stream-sender.ts index a65f7effbb..ce2b37609e 100644 --- a/packages/agent/src/server/event-stream-sender.ts +++ b/packages/agent/src/server/event-stream-sender.ts @@ -8,8 +8,7 @@ import { interface TaskRunEventStreamSenderConfig { apiUrl: string; - // Base URL for the event-ingest POST only. Lets the deployment route ingest to the - // standalone agent-proxy while the rest of the agent's API calls stay on apiUrl. + // Base URL for the event-ingest POST only; falls back to apiUrl (Django path) when unset. eventIngestBaseUrl?: string; projectId: number; taskId: string; @@ -88,8 +87,6 @@ export class TaskRunEventStreamSender { private bufferRevision = 0; constructor(private readonly config: TaskRunEventStreamSenderConfig) { - // When routed to the agent-proxy, use its clean run-scoped path; the run-scoped - // token carries team and task. Falling back to apiUrl keeps the Django path. const usingProxy = Boolean(config.eventIngestBaseUrl); const ingestBase = (config.eventIngestBaseUrl ?? config.apiUrl).replace( /\/$/, @@ -356,8 +353,7 @@ export class TaskRunEventStreamSender { delayOverrideMs?: number, ): void { this.clearStreamWindowClose(stream); - // Rotate long-lived uploads even when the agent goes idle; this is a - // transport boundary, not a batching window. + // Rotate long-lived uploads even when idle: a transport boundary, not a batching window. const delayMs = delayOverrideMs ?? Math.max(0, stream.startedAtMs + this.streamWindowMs - Date.now()); diff --git a/packages/agent/src/server/types.ts b/packages/agent/src/server/types.ts index 7402730b1a..f0ece6174f 100644 --- a/packages/agent/src/server/types.ts +++ b/packages/agent/src/server/types.ts @@ -17,8 +17,7 @@ export interface AgentServerConfig { projectId: number; jwtPublicKey: string; // RS256 public key for JWT verification eventIngestToken?: string; - // Optional base URL for the event-ingest POST only (routes ingest to the agent-proxy); - // all other agent API calls keep using apiUrl. Falls back to apiUrl when unset. + // Base URL for the event-ingest POST only; falls back to apiUrl when unset. eventIngestBaseUrl?: string; eventIngestStreamWindowMs?: number; mode: AgentMode; diff --git a/packages/core/src/cloud-task/cloud-task.test.ts b/packages/core/src/cloud-task/cloud-task.test.ts index b60059bd4c..52e988ee9b 100644 --- a/packages/core/src/cloud-task/cloud-task.test.ts +++ b/packages/core/src/cloud-task/cloud-task.test.ts @@ -1776,10 +1776,8 @@ describe("CloudTaskService", () => { ) .mockImplementation(() => Promise.resolve(makeInProgressRun())); - // Each connection opens but delivers NOTHING, then is transport-cut at 65s. - // Healthiness is duration-only on purpose — it must NOT depend on keepalive - // frames surviving the proxy — so even a frame-less long-lived cut is healthy - // and never exhausts the budget. + // Each connection opens, delivers nothing, then is transport-cut at 65s. Healthiness is + // duration-only (not keepalive frames), so even a frame-less long-lived cut never exhausts the budget. mockStreamFetch.mockImplementation(() => { const stream = new ReadableStream({ start(controller) { @@ -1921,10 +1919,8 @@ describe("CloudTaskService", () => { ) .mockImplementation(() => Promise.resolve(makeInProgressRun())); - // First 3 connections fail fast at the transport level (established, then - // errored immediately, no frame) and accrue reconnect attempts. The 4th - // delivers a keepalive and stays open — proving the transport recovered, so - // the accrued attempts must reset rather than carry forward into the budget. + // First 3 connections fail fast at the transport level and accrue reconnect attempts. The 4th + // delivers a keepalive and stays open, proving recovery, so the accrued attempts must reset. let streamCall = 0; const keepaliveControllerRef: { current: ReadableStreamDefaultController | null; @@ -2028,12 +2024,9 @@ describe("CloudTaskService", () => { ) // bootstrap: fetchSessionLogs .mockImplementation(() => Promise.resolve(makeInProgressRun())); - // Connections 1-4 each emit a backend `event: error` frame, building the - // backend-error budget to 4 — those reconnects correctly pace on - // streamErrorAttempts. Connection 5 is held open until the test injects a - // quick TRANSPORT cut, which must pace its reconnect on the just-incremented - // transport budget (1 -> ~2s), NOT on the stale backend-error budget - // (4 -> ~16s). Math.max(both) for the delay would wrongly use the latter. + // Connections 1-4 each emit a backend error frame, pacing reconnects on streamErrorAttempts. + // Connection 5 is held open until a quick transport cut, which must pace on the transport budget + // (1 -> ~2s), not the stale backend-error budget (4 -> ~16s). Math.max(both) would use the latter. let streamCall = 0; const transportControllerRef: { current: ReadableStreamDefaultController | null; diff --git a/packages/core/src/cloud-task/cloud-task.ts b/packages/core/src/cloud-task/cloud-task.ts index b07ffd5537..c1ac091dfb 100644 --- a/packages/core/src/cloud-task/cloud-task.ts +++ b/packages/core/src/cloud-task/cloud-task.ts @@ -24,12 +24,8 @@ import { } from "./schemas"; import { type SseEvent, SseEventParser } from "./sse-parser"; -// Transport reconnect backoff. The first SSE_RECONNECT_FLAT_ATTEMPTS retries fire at the base -// delay (a stopped agent-proxy is usually reachable again within a couple seconds), then the -// delay grows exponentially up to the cap: 0.5, 0.5, 0.5, 1, 2, 4, 8, 16, 30s. MAX_SSE_RECONNECT_ -// ATTEMPTS is sized so this schedule still spans ~60s of wall-clock before the watcher gives up, -// matching the prior give-up window while reconnecting an order of magnitude faster in the common -// case (a quick proxy restart now recovers in ~1.5s instead of ~16s). +// Reconnect backoff: flat base delay for the first SSE_RECONNECT_FLAT_ATTEMPTS attempts, then +// exponential up to the cap (0.5, 0.5, 0.5, 1, 2, 4, 8, 16, 30s), spanning ~60s before giving up. const MAX_SSE_RECONNECT_ATTEMPTS = 9; const MAX_CUMULATIVE_RECONNECT_ATTEMPTS = 30; const SSE_RECONNECT_BASE_DELAY_MS = 500; @@ -40,9 +36,7 @@ const EVENT_BATCH_FLUSH_MS = 16; const EVENT_BATCH_MAX_SIZE = 50; const SESSION_LOG_PAGE_LIMIT = 5_000; -// Durable end-of-stream sentinel emitted by the server (and the agent-proxy) once a run's -// event stream is complete. It is the authoritative "no more events, ever" signal — the -// client stops on it without consulting run status (status-unaware durable-stream contract). +// Authoritative end-of-stream sentinel. The client stops on it without consulting run status. const STREAM_END_EVENT_NAME = "stream-end"; interface SessionLogsPage { @@ -97,8 +91,7 @@ interface TaskRunStateEvent { completed_at?: string | null; } -// Which endpoint a stream connection reads from. Event ids are only meaningful within -// the leg that issued them: the proxy and Django id spaces are unrelated. +// Which endpoint a connection reads from. Event ids are only meaningful within their issuing leg. type StreamLeg = "proxy" | "django"; interface WatcherState { @@ -140,17 +133,13 @@ interface WatcherState { needsPostBootstrapReconnect: boolean; needsStopAfterBootstrap: boolean; streamEnded: boolean; - // True once the watcher has consumed its one automatic re-bootstrap recovery for the - // current trouble window; re-armed by a real data event or a healthy-length connection. + // Consumes one automatic re-bootstrap recovery; re-armed by a data event or healthy connection. selfHealAttempted: boolean; - // Read-leg routing, resolved once from the stream_token endpoint and reused across reconnects. // streamBaseUrl set => read via the agent-proxy with streamReadToken; null => read from Django. streamTargetResolved: boolean; streamBaseUrl: string | null; streamReadToken: string | null; - // True once stream_token resolved OK: servers with that endpoint always emit the stream-end - // sentinel. False for old servers (404), where the client must fall back to legacy status - // polling to detect the end of a run. + // True once stream_token resolved. False for old servers (404), which fall back to status polling. durableStreamEnabled: boolean; } @@ -349,17 +338,13 @@ export class CloudTaskService extends TypedEventEmitter { hasSnapshot: watcher.hasEmittedSnapshot, }); - // Retry means "start over from scratch". A watcher usually fails because its resume - // state is poisoned (e.g. a Last-Event-ID the server answers with instant empty - // streams); reconnecting with that state preserved loops straight back into the same - // failure. Re-bootstrapping re-resolves the read leg and re-emits a fresh snapshot, - // which also heals any entries missed while the stream was broken. + // Start over from scratch: a poisoned resume position loops straight back into the same + // failure, so re-bootstrap to re-resolve the read leg and emit a fresh snapshot. this.resetWatcherForRebootstrap(watcher); void this.bootstrapWatcher(key); } - // Returns a watcher to its pre-bootstrap state so bootstrapWatcher can rebuild it from - // server truth: budgets, buffers, resume position and read-leg routing all reset. + // Resets a watcher to its pre-bootstrap state so bootstrapWatcher can rebuild it from server truth. private resetWatcherForRebootstrap(watcher: WatcherState): void { watcher.reconnectAttempts = 0; watcher.streamErrorAttempts = 0; @@ -709,8 +694,7 @@ export class CloudTaskService extends TypedEventEmitter { watcher.connStartedAt = 0; watcher.connDataEventsReceived = 0; - // Resolve the read target once (proxy URL + token, or Django). The server owns the decision; - // reused across reconnects so transport churn never re-mints a token. + // Resolve the read target once (proxy URL + token, or Django), reused across reconnects. if (!watcher.streamTargetResolved) { await this.resolveStreamTarget(watcher); const resolvedWatcher = this.watchers.get(key); @@ -730,9 +714,8 @@ export class CloudTaskService extends TypedEventEmitter { ? watcher.streamBaseUrl?.replace(/\/+$/, "") : watcher.apiHost; const leg: StreamLeg = usingProxy ? "proxy" : "django"; - // Resuming a proxy stream from a Django id (or vice versa) is undefined: the id - // spaces are unrelated. Drop the resume position on a leg switch and let - // start=latest plus the next snapshot cover the gap. + // Proxy and Django id spaces are unrelated, so drop the resume position on a leg switch and + // let start=latest plus the next snapshot cover the gap. if (watcher.lastEventId && watcher.lastEventIdLeg !== leg) { this.log.info("Cloud task stream leg changed, dropping resume position", { key, @@ -744,11 +727,9 @@ export class CloudTaskService extends TypedEventEmitter { } watcher.streamLeg = leg; - // Captured after the leg-switch drop so they reflect what this connection actually sends: - // a dropped resume position means no Last-Event-ID and a start=latest reconnect. + // Captured after the leg-switch drop so they reflect what this connection actually sends. watcher.connSentLastEventId = watcher.lastEventId; const startLatest = Boolean(options?.startLatest && !watcher.lastEventId); - // The proxy exposes a clean run-scoped path; the run-scoped token carries team and task. const url = new URL( usingProxy ? `${base}/v1/runs/${watcher.runId}/stream` @@ -767,8 +748,7 @@ export class CloudTaskService extends TypedEventEmitter { headers.Authorization = `Bearer ${watcher.streamReadToken}`; } - // Debug level: this fires on every reconnect, which during transport churn is - // multiple times per second and would otherwise dominate the log file. + // Debug level: fires on every reconnect and would otherwise dominate the log file. this.log.debug("Opening cloud task stream", { usingProxy, streamUrl: url.toString(), @@ -779,17 +759,15 @@ export class CloudTaskService extends TypedEventEmitter { ); const decoder = new TextDecoder(); - // Tracks whether the response body was opened and how long it stayed open, - // so a long-lived connection cut by transport churn isn't penalized as a - // failed reconnect attempt (see SSE_HEALTHY_CONNECTION_MS). + // Track how long the body stayed open so healthy long-lived connections cut by churn + // aren't penalized as failed reconnects (see SSE_HEALTHY_CONNECTION_MS). let connectedAt = 0; let streamWasEstablished = false; let bytesReceived = 0; let eventsReceived = 0; try { - // The proxy authenticates with the run-scoped Bearer token, not the user session, so it - // takes a plain fetch. The Django leg still goes through authenticatedFetch. + // The proxy authenticates with the run-scoped Bearer token; the Django leg uses the session. const response = usingProxy ? await fetch(url.toString(), { method: "GET", @@ -869,10 +847,8 @@ export class CloudTaskService extends TypedEventEmitter { lastEventId: watcher.lastEventId, }); - // A long-lived connection that closed cleanly is healthy transport churn, not a - // reconnect loop. Clear the cumulative budget so an idle run (keepalives only — - // nothing else resets it) can ride out proxy timeout cycles indefinitely, while - // instant-EOF loops still exhaust it. + // A long-lived clean close is healthy churn, not a loop: clear the cumulative budget so an + // idle run can ride out proxy timeout cycles, while instant-EOF loops still exhaust it. const completedWatcher = this.watchers.get(key); if ( completedWatcher && @@ -891,11 +867,8 @@ export class CloudTaskService extends TypedEventEmitter { return; } - // Proxy-leg 401: the run-scoped read token expired mid-watch (short TTL) or its signing - // key rotated. Re-resolve the read target — which mints a fresh token, or routes back to - // Django if the rollout flag was turned off meanwhile — instead of failing. Django-leg - // 401 stays fatal below (the user session expired). Counting the attempt bounds - // persistent proxy 401s by the transport reconnect budget. + // Proxy-leg 401: the read token expired or its signing key rotated. Re-resolve to mint a + // fresh token (or route back to Django) instead of failing. Django-leg 401 stays fatal below. const unauthorizedWatcher = this.watchers.get(key); if ( error instanceof CloudTaskStreamError && @@ -940,8 +913,7 @@ export class CloudTaskService extends TypedEventEmitter { watcher.streamErrorAttempts += 1; } else if (wasHealthyStream) { watcher.streamErrorAttempts = 0; - // Same as the clean-EOF path: a healthy-length connection proves this is - // timeout cycling, not a loop, so an idle run never exhausts the budget. + // A healthy-length connection proves timeout cycling, not a loop. watcher.cumulativeReconnectAttempts = 0; watcher.selfHealAttempted = false; } @@ -1002,18 +974,15 @@ export class CloudTaskService extends TypedEventEmitter { return; } - // A keepalive or real event proves the transport recovered, so clear the - // transport reconnect budget. A keepalive stops here: it does NOT clear the - // backend-error budget, since it doesn't prove the stream itself produced - // data. + // A keepalive or real event proves the transport recovered. A keepalive does not clear the + // backend-error budget, which only a real data event below resets. watcher.reconnectAttempts = 0; if (isKeepaliveEvent(event)) { return; } - // A real data event proves the stream materialized; clear the backend-error - // and cumulative budgets too, and re-arm the one-shot self-heal recovery. + // A real data event proves the stream materialized; clear the remaining budgets and re-arm self-heal. watcher.streamErrorAttempts = 0; watcher.cumulativeReconnectAttempts = 0; watcher.selfHealAttempted = false; @@ -1107,8 +1076,7 @@ export class CloudTaskService extends TypedEventEmitter { const watcher = this.watchers.get(key); if (!watcher || watcher.bufferedLogBatches.length === 0) return; - // Content-based dedup because SSE IDs (Redis stream IDs) don't exist in - // the S3-backed historical entries — the JSON payload is the only shared key + // Content-based dedup: SSE ids don't exist in the historical entries, so the payload is the key. const historicalCounts = new Map(); for (const entry of historicalEntries) { const serialized = JSON.stringify(entry); @@ -1296,8 +1264,7 @@ export class CloudTaskService extends TypedEventEmitter { options: { countAttempt?: boolean } = {}, ): void { const watcher = this.watchers.get(key); - // No isTerminalStatus gate: the reconnect loop is status-unaware and only stops on the - // stream-end sentinel (handled in handleStreamCompletion) or the budget exhaustion below. + // Status-unaware: the loop only stops on the stream-end sentinel or budget exhaustion below. if (!watcher || watcher.failed) { return; } @@ -1306,8 +1273,7 @@ export class CloudTaskService extends TypedEventEmitter { clearTimeout(watcher.reconnectTimeoutId); } - // Cumulative counter bounds runaway loops that clean-EOF (countAttempt=false) - // and would otherwise dodge `reconnectAttempts`. + // Bounds runaway loops that clean-EOF (countAttempt=false) and dodge reconnectAttempts. watcher.cumulativeReconnectAttempts += 1; const countAttempt = options.countAttempt ?? true; if (countAttempt) { @@ -1317,12 +1283,8 @@ export class CloudTaskService extends TypedEventEmitter { if ( watcher.cumulativeReconnectAttempts > MAX_CUMULATIVE_RECONNECT_ATTEMPTS ) { - // A poisoned resume position (a Last-Event-ID the server answers with instant - // empty streams) burns through the budget without a single error frame. Before - // surfacing a failure, rebuild once from scratch — fresh read leg, fresh snapshot, - // no resume position — the same recovery an app restart performs. A data event or - // a healthy-length connection re-arms the attempt; if the rebuilt watcher loops - // straight back here, fail for real. + // A poisoned resume position burns the budget without an error frame. Rebuild once from + // scratch (the app-restart recovery) before failing; if it loops straight back, fail for real. if (!watcher.selfHealAttempted) { watcher.reconnectTimeoutId = null; this.log.warn( @@ -1343,8 +1305,7 @@ export class CloudTaskService extends TypedEventEmitter { return; } - // The watcher fails once either budget is exhausted: transport reconnect - // failures or backend stream-error frames. + // Fail once either budget (transport reconnect or backend stream-error) is exhausted. const attemptCount = Math.max( watcher.reconnectAttempts, watcher.streamErrorAttempts, @@ -1398,10 +1359,8 @@ export class CloudTaskService extends TypedEventEmitter { const { reconnectOnDisconnect } = options; - // Bootstrap owns the snapshot lifecycle, so it must be consulted before the - // stream-end stop: stopping mid-bootstrap deletes the watcher before the - // snapshot is emitted and silently discards the backlog plus any live - // entries buffered during bootstrap. Record intent and let bootstrap finish. + // Bootstrap owns the snapshot lifecycle: stopping mid-bootstrap would discard the backlog and + // buffered live entries. Record intent and let bootstrap finish. if (watcher.isBootstrapping) { if (watcher.streamEnded || !reconnectOnDisconnect) { watcher.needsStopAfterBootstrap = true; @@ -1411,22 +1370,15 @@ export class CloudTaskService extends TypedEventEmitter { return; } - // On a durable stream the end-of-stream sentinel is the ONLY signal that ends the watch. The - // client is unaware of sandbox/run status and assumes the transport breaks mid-message - // constantly, so any disconnect without stream-end is just transport churn to ride out by - // reconnecting. We never poll run status to decide whether to stop — that would drop the tail - // if the stream cut right as the run went terminal but before stream-end arrived. Status is - // updated for display only, from the task_run_state events the stream itself carries. + // The stream-end sentinel is the only signal that ends a durable watch. Any disconnect without + // it is transport churn to reconnect through; status is tracked for display only, never to stop. if (watcher.streamEnded) { await this.finalizeWatcherStop(key); return; } - // Legacy mode (old server or rollout flag off): the stream carries no end-of-run sentinel, - // so a disconnect is the old contract — poll run status to decide between stopping and - // reconnecting. The reconnect budgets deliberately keep the new semantics (no reset on - // polled progress; that defeated the runaway-loop cap and let a poisoned stream churn for - // hours), so the self-heal recovery stays active here too. + // Legacy mode (old server): no sentinel, so poll run status on disconnect to decide stop vs + // reconnect. The reconnect budgets keep the new semantics, so self-heal stays active here too. if (!watcher.durableStreamEnabled && reconnectOnDisconnect) { const run = await this.fetchTaskRun(watcher); const legacyWatcher = this.watchers.get(key); @@ -1472,11 +1424,8 @@ export class CloudTaskService extends TypedEventEmitter { await this.finalizeWatcherStop(key); } - // Stops a watcher whose stream is durably complete. Status normally arrives via the - // task_run_state events the stream carries, but if the stream ended without a terminal - // status (dropped final frame or a server bug), fetch it once so the session is not left - // permanently "in progress" with no watcher behind it. The poll never decides WHETHER to - // stop — stream-end already did — it only repairs the displayed status. + // Stops a watcher whose stream is durably complete. Repairs the displayed status if the stream + // ended non-terminal (dropped final frame); the poll never decides whether to stop. private async finalizeWatcherStop(key: string): Promise { const watcher = this.watchers.get(key); if (!watcher) return; @@ -1622,8 +1571,7 @@ export class CloudTaskService extends TypedEventEmitter { method: "GET", }); if (!response.ok) { - // Reachable but refused (or an old server without the endpoint): read from Django - // with legacy status-polling semantics and don't retry resolution. + // Refused, or an old server without the endpoint: read from Django with status polling. watcher.streamBaseUrl = null; watcher.streamReadToken = null; watcher.durableStreamEnabled = false; @@ -1641,9 +1589,8 @@ export class CloudTaskService extends TypedEventEmitter { }; watcher.streamReadToken = data.token ?? null; watcher.streamBaseUrl = data.stream_base_url ?? null; - // Every server with the stream_token endpoint emits the stream-end sentinel, so the - // endpoint resolving at all is the capability signal that opts this watcher into the - // status-unaware contract. Old servers 404 above and stay on legacy status polling. + // The endpoint resolving at all opts this watcher into the status-unaware contract; + // old servers 404 above and stay on legacy status polling. watcher.durableStreamEnabled = true; watcher.streamTargetResolved = true; this.log.info("Cloud task stream target resolved", { @@ -1654,8 +1601,7 @@ export class CloudTaskService extends TypedEventEmitter { durableStream: watcher.durableStreamEnabled, }); } catch (error) { - // Transient failure: leave unresolved so the next reconnect retries; connectSse falls back - // to the Django leg meanwhile since streamBaseUrl stays null. + // Transient failure: leave unresolved so the next reconnect retries and falls back to Django. watcher.streamBaseUrl = null; watcher.streamReadToken = null; this.log.warn("Cloud task stream target resolution failed", { From 7ac68e6c0a1af26c0a1c6d5bfa8df1128390e8ff Mon Sep 17 00:00:00 2001 From: Charles Vien Date: Tue, 23 Jun 2026 15:37:07 -0700 Subject: [PATCH 14/20] Update cloud-task.ts --- packages/core/src/cloud-task/cloud-task.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/packages/core/src/cloud-task/cloud-task.ts b/packages/core/src/cloud-task/cloud-task.ts index c1ac091dfb..2be75835d3 100644 --- a/packages/core/src/cloud-task/cloud-task.ts +++ b/packages/core/src/cloud-task/cloud-task.ts @@ -732,8 +732,10 @@ export class CloudTaskService extends TypedEventEmitter { const startLatest = Boolean(options?.startLatest && !watcher.lastEventId); const url = new URL( usingProxy - ? `${base}/v1/runs/${watcher.runId}/stream` - : `${base}/api/projects/${watcher.teamId}/tasks/${watcher.taskId}/runs/${watcher.runId}/stream/`, + ? `${base}/v1/runs/${encodeURIComponent(watcher.runId)}/stream` + : `${base}/api/projects/${watcher.teamId}/tasks/${encodeURIComponent( + watcher.taskId, + )}/runs/${encodeURIComponent(watcher.runId)}/stream/`, ); if (startLatest) { url.searchParams.set("start", "latest"); From e8ff7cd2d291ea60c71f162bdbe3aab7a23adf2c Mon Sep 17 00:00:00 2001 From: Charles Vien Date: Mon, 29 Jun 2026 03:40:14 -0700 Subject: [PATCH 15/20] retry stream target resolution on transient errors --- .../core/src/cloud-task/cloud-task.test.ts | 72 +++++++++++++++++++ packages/core/src/cloud-task/cloud-task.ts | 18 ++++- 2 files changed, 89 insertions(+), 1 deletion(-) diff --git a/packages/core/src/cloud-task/cloud-task.test.ts b/packages/core/src/cloud-task/cloud-task.test.ts index 52e988ee9b..3d0946f136 100644 --- a/packages/core/src/cloud-task/cloud-task.test.ts +++ b/packages/core/src/cloud-task/cloud-task.test.ts @@ -1017,6 +1017,78 @@ describe("CloudTaskService", () => { expect(mockStreamTokenFetch.mock.calls.length).toBe(1); }); + it("a transient stream_token failure retries resolution instead of pinning to Django", async () => { + vi.useFakeTimers(); + + // The endpoint is momentarily down (503): unlike a 404, this must not cache a Django fallback. + // The next reconnect re-resolves and the watch upgrades to the durable proxy leg. + mockStreamTokenFetch + .mockImplementationOnce(() => + Promise.resolve(createJsonResponse({ detail: "unavailable" }, 503)), + ) + .mockImplementation(() => + Promise.resolve( + createJsonResponse({ + token: "fresh-token", + stream_base_url: "https://proxy.example", + }), + ), + ); + + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } + return Promise.resolve( + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }), + ); + }); + + // Django leg (transient round) just EOFs to force a reconnect; once the proxy resolves it + // emits stream-end so the watch completes, proving durable streaming engaged after the retry. + const usedProxyLeg = (input: string | Request): boolean => { + const url = typeof input === "string" ? input : input.url; + return url.includes("proxy.example"); + }; + mockStreamFetch.mockImplementation((input: string | Request) => + Promise.resolve( + usedProxyLeg(input) + ? createSseResponse("event: stream-end\ndata: {}\n\n") + : createSseResponse(""), + ), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + const hasWatcher = (): boolean => + (service as unknown as { watchers: Map }).watchers.has( + "task-1:run-1", + ); + await waitFor(() => !hasWatcher(), 10_000); + + // The 503 was not cached: resolution retried and the stream switched to the durable proxy leg. + expect(mockStreamTokenFetch.mock.calls.length).toBeGreaterThanOrEqual(2); + expect( + mockStreamFetch.mock.calls.some(([input]) => usedProxyLeg(input)), + ).toBe(true); + }); + it("proxy 401 re-resolves the read target and resumes with a fresh token", async () => { vi.useFakeTimers(); diff --git a/packages/core/src/cloud-task/cloud-task.ts b/packages/core/src/cloud-task/cloud-task.ts index 2be75835d3..6e0e0e578e 100644 --- a/packages/core/src/cloud-task/cloud-task.ts +++ b/packages/core/src/cloud-task/cloud-task.ts @@ -263,6 +263,12 @@ function shouldFailWatcherForFetchStatus(status: number): boolean { return status === 401 || status === 403 || status === 404; } +// 5xx and 429 are momentary: the stream-token endpoint exists but is briefly unavailable, so the +// target stays unresolved and the next reconnect retries instead of caching a Django fallback. +function isTransientStreamTargetStatus(status: number): boolean { + return status >= 500 || status === 429; +} + @injectable() export class CloudTaskService extends TypedEventEmitter { private watchers = new Map(); @@ -1573,9 +1579,19 @@ export class CloudTaskService extends TypedEventEmitter { method: "GET", }); if (!response.ok) { - // Refused, or an old server without the endpoint: read from Django with status polling. watcher.streamBaseUrl = null; watcher.streamReadToken = null; + if (isTransientStreamTargetStatus(response.status)) { + // Transient: read from Django this round but leave the target unresolved so the next + // reconnect retries durable resolution instead of pinning the run to status polling. + this.log.warn("Cloud task stream target temporarily unavailable", { + taskId: watcher.taskId, + runId: watcher.runId, + status: response.status, + }); + return; + } + // Refused, or an old server without the endpoint: read from Django with status polling. watcher.durableStreamEnabled = false; watcher.streamTargetResolved = true; this.log.info("Cloud task stream reading from API host", { From c0e82213a138b7b56d5633520f16717a161dadba Mon Sep 17 00:00:00 2001 From: Charles Vien Date: Tue, 30 Jun 2026 11:19:50 -0700 Subject: [PATCH 16/20] fix premature cloud stream stop on proxy 401 --- .../core/src/cloud-task/cloud-task.test.ts | 348 ++++++++++++++++++ packages/core/src/cloud-task/cloud-task.ts | 170 +++++---- 2 files changed, 438 insertions(+), 80 deletions(-) diff --git a/packages/core/src/cloud-task/cloud-task.test.ts b/packages/core/src/cloud-task/cloud-task.test.ts index 3d0946f136..4b285db2f9 100644 --- a/packages/core/src/cloud-task/cloud-task.test.ts +++ b/packages/core/src/cloud-task/cloud-task.test.ts @@ -2419,4 +2419,352 @@ describe("CloudTaskService", () => { ]); expect(mockNetFetch).toHaveBeenCalledTimes(3); }); + + it("fails a Django-leg watcher on a stream 401 without re-resolving the read target", async () => { + vi.useFakeTimers(); + + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + // Default stream_token resolves with stream_base_url: null, so the read leg is Django. + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } + return Promise.resolve( + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }), + ); + }); + + // A Django-leg 401 is fatal (autoRetry: false). The proxy re-resolve path is guarded on a + // non-null streamBaseUrl, so a Django leg must fail rather than re-mint a stream_token. + mockStreamFetch.mockImplementation(() => + Promise.resolve(createJsonResponse({ detail: "expired" }, 401)), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await waitFor( + () => + updates.some( + (u) => + typeof u === "object" && + u !== null && + (u as { kind?: string }).kind === "error", + ), + 10_000, + ); + + expect(updates).toContainEqual({ + taskId: "task-1", + runId: "run-1", + kind: "error", + errorTitle: "Cloud authentication expired", + errorMessage: "Please reauthenticate and retry the cloud run stream.", + retryable: true, + }); + + // The Django leg did not re-resolve, and the fatal failure schedules no reconnect. + expect(mockStreamTokenFetch.mock.calls.length).toBe(1); + const streamCallsAtFailure = mockStreamFetch.mock.calls.length; + await vi.advanceTimersByTimeAsync(10_000); + expect(mockStreamFetch.mock.calls.length).toBe(streamCallsAtFailure); + expect(mockStreamTokenFetch.mock.calls.length).toBe(1); + }); + + it("treats a 429 from stream_token as transient and retries the read-target resolution", async () => { + vi.useFakeTimers(); + + // 429 is momentary like a 503: it must not cache a Django fallback. The next reconnect + // re-resolves and the watch upgrades to the durable proxy leg. + mockStreamTokenFetch + .mockImplementationOnce(() => + Promise.resolve(createJsonResponse({ detail: "slow down" }, 429)), + ) + .mockImplementation(() => + Promise.resolve( + createJsonResponse({ + token: "fresh-token", + stream_base_url: "https://proxy.example", + }), + ), + ); + + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } + return Promise.resolve( + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }), + ); + }); + + const usedProxyLeg = (input: string | Request): boolean => { + const url = typeof input === "string" ? input : input.url; + return url.includes("proxy.example"); + }; + mockStreamFetch.mockImplementation((input: string | Request) => + Promise.resolve( + usedProxyLeg(input) + ? createSseResponse("event: stream-end\ndata: {}\n\n") + : createSseResponse(""), + ), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + const hasWatcher = (): boolean => + (service as unknown as { watchers: Map }).watchers.has( + "task-1:run-1", + ); + await waitFor(() => !hasWatcher(), 10_000); + + // The 429 was not cached: resolution retried and the stream switched to the durable proxy leg. + expect(mockStreamTokenFetch.mock.calls.length).toBeGreaterThanOrEqual(2); + expect( + mockStreamFetch.mock.calls.some(([input]) => usedProxyLeg(input)), + ).toBe(true); + }); + + it("caches a 403 from stream_token and falls back to Django legacy polling", async () => { + vi.useFakeTimers(); + + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + // 403 is not transient: like a 404 it pins the watch to the Django leg with status polling. + mockStreamTokenFetch.mockImplementation(() => + Promise.resolve(createJsonResponse({ detail: "forbidden" }, 403)), + ); + + let runFetchCount = 0; + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } + runFetchCount += 1; + // Calls 1-3 (bootstrap, post-bootstrap verify, first legacy poll) report an active run; the + // second legacy poll reports terminal. + const terminal = runFetchCount >= 4; + return Promise.resolve( + createJsonResponse({ + id: "run-1", + status: terminal ? "completed" : "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: terminal + ? "2026-01-01T00:00:05Z" + : "2026-01-01T00:00:00Z", + }), + ); + }); + + // Legacy mode: no stream-end ever arrives, so each clean EOF triggers a status poll. + mockStreamFetch.mockImplementation(() => + Promise.resolve(createSseResponse("")), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + const hasWatcher = (): boolean => + (service as unknown as { watchers: Map }).watchers.has( + "task-1:run-1", + ); + await waitFor(() => !hasWatcher(), 10_000); + + // Every stream read used the Django leg (no proxy URL), the watcher stopped on the terminal + // poll, and the refused resolution was cached: one stream_token call for the whole watch. + expect( + mockStreamFetch.mock.calls.every(([input]) => { + const url = typeof input === "string" ? input : (input as Request).url; + return url.includes("https://app.example.com/api/"); + }), + ).toBe(true); + expect(updates).toContainEqual( + expect.objectContaining({ kind: "status", status: "completed" }), + ); + expect(mockStreamTokenFetch.mock.calls.length).toBe(1); + }); + + it("stops on the last-known status when the post-stream-end repair fetch fails", async () => { + vi.useFakeTimers(); + + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + let runFetchCount = 0; + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } + runFetchCount += 1; + // Bootstrap (call 1) reports an active run so the stream opens; the stream-end stop path's + // status-repair fetch (call 2) fails the network instead of returning a terminal run. + if (runFetchCount >= 2) { + return Promise.reject(new Error("network down")); + } + return Promise.resolve( + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: "build", + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }), + ); + }); + + mockStreamFetch.mockImplementation(() => + Promise.resolve(createSseResponse("event: stream-end\ndata: {}\n\n")), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + const hasWatcher = (): boolean => + (service as unknown as { watchers: Map }).watchers.has( + "task-1:run-1", + ); + await waitFor(() => !hasWatcher(), 10_000); + + // The failed repair fetch must not strand the watcher; it stops on the last-known status. + expect(updates).toContainEqual( + expect.objectContaining({ kind: "status", status: "in_progress" }), + ); + // Exactly one stream connection (the bootstrap stream-end); no reconnect after the clean stop. + expect(mockStreamFetch.mock.calls.length).toBe(1); + }); + + it("re-arms self-heal after a data event and re-bootstraps a second time before failing", async () => { + vi.useFakeTimers(); + + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } + return Promise.resolve( + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }), + ); + }); + + // Clean-EOF loop by default. Each re-bootstrap re-resolves the read target, so a second + // stream_token call marks the first self-heal. Deliver exactly one real data event on that + // re-bootstrap's connection to re-arm self-heal, then resume clean-EOF looping. + let dataEventDelivered = false; + mockStreamFetch.mockImplementation(() => { + if (mockStreamTokenFetch.mock.calls.length >= 2 && !dataEventDelivered) { + dataEventDelivered = true; + return Promise.resolve( + createSseResponse( + 'id: 1\ndata: {"type":"notification","timestamp":"2026-01-01T00:00:02Z","notification":{"jsonrpc":"2.0","method":"_posthog/console","params":{"sessionId":"run-1","level":"info","message":"alive"}}}\n\n', + ), + ); + } + return Promise.resolve(createSseResponse("")); + }); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await waitFor(() => mockStreamFetch.mock.calls.length === 1); + await vi.advanceTimersByTimeAsync(2 * 60 * 60_000); + + await waitFor( + () => + updates.some( + (u) => + typeof u === "object" && + u !== null && + (u as { kind?: string }).kind === "error", + ), + 10_000, + ); + + expect(updates).toContainEqual( + expect.objectContaining({ + kind: "error", + errorTitle: "Cloud run unreachable", + }), + ); + + // Initial bootstrap snapshot plus two self-heal re-bootstraps: the data event between the first + // and second budget exhaustion re-armed self-heal, so a third snapshot precedes the failure. + expect( + updates.filter( + (u) => + typeof u === "object" && + u !== null && + (u as { kind?: string }).kind === "snapshot", + ), + ).toHaveLength(3); + }); }); diff --git a/packages/core/src/cloud-task/cloud-task.ts b/packages/core/src/cloud-task/cloud-task.ts index 6e0e0e578e..f7a5cbcece 100644 --- a/packages/core/src/cloud-task/cloud-task.ts +++ b/packages/core/src/cloud-task/cloud-task.ts @@ -36,7 +36,8 @@ const EVENT_BATCH_FLUSH_MS = 16; const EVENT_BATCH_MAX_SIZE = 50; const SESSION_LOG_PAGE_LIMIT = 5_000; -// Authoritative end-of-stream sentinel. The client stops on it without consulting run status. +// Authoritative end-of-stream sentinel, matched on the SSE event name (event.event, not data.type). +// The client stops on it without consulting run status. const STREAM_END_EVENT_NAME = "stream-end"; interface SessionLogsPage { @@ -128,6 +129,9 @@ interface WatcherState { isBootstrapping: boolean; hasEmittedSnapshot: boolean; bufferedLogBatches: StoredLogEntry[][]; + // Live entries emitted since the last snapshot, retained so a re-subscribe snapshot can reconcile + // entries the server has not persisted yet. emitCurrentSnapshot trims this to the still-missing + // set; with no re-subscribe it holds the run's emitted entries until the watch ends. emittedLogEntries: StoredLogEntry[]; failed: boolean; needsPostBootstrapReconnect: boolean; @@ -135,7 +139,7 @@ interface WatcherState { streamEnded: boolean; // Consumes one automatic re-bootstrap recovery; re-armed by a data event or healthy connection. selfHealAttempted: boolean; - // streamBaseUrl set => read via the agent-proxy with streamReadToken; null => read from Django. + // Both streamBaseUrl and streamReadToken non-null => read via the agent-proxy; either null => Django. streamTargetResolved: boolean; streamBaseUrl: string | null; streamReadToken: string | null; @@ -269,6 +273,36 @@ function isTransientStreamTargetStatus(status: number): boolean { return status >= 500 || status === 429; } +// Content-based frequency map keyed by the serialized entry. SSE ids are absent from persisted +// (historical) entries, so the payload itself is the identity used to dedup live against historical. +function buildEntryFrequencyMap( + entries: StoredLogEntry[], +): Map { + const counts = new Map(); + for (const entry of entries) { + const serialized = JSON.stringify(entry); + counts.set(serialized, (counts.get(serialized) ?? 0) + 1); + } + return counts; +} + +// Keeps only entries absent from counts, consuming one occurrence per match so a payload present N +// times in the reference set is suppressed at most N times. Mutates counts. +function filterEntriesNotInFrequencyMap( + entries: StoredLogEntry[], + counts: Map, +): StoredLogEntry[] { + return entries.filter((entry) => { + const serialized = JSON.stringify(entry); + const remaining = counts.get(serialized) ?? 0; + if (remaining <= 0) { + return true; + } + counts.set(serialized, remaining - 1); + return false; + }); +} + @injectable() export class CloudTaskService extends TypedEventEmitter { private watchers = new Map(); @@ -831,13 +865,19 @@ export class CloudTaskService extends TypedEventEmitter { const events = parser.parse(chunk); for (const event of events) { eventsReceived += 1; - this.handleSseEvent(key, event); + const backendError = this.handleSseEvent(key, event); + if (backendError) { + throw backendError; + } } } const trailingEvents = parser.parse(decoder.decode()); for (const event of trailingEvents) { - this.handleSseEvent(key, event); + const backendError = this.handleSseEvent(key, event); + if (backendError) { + throw backendError; + } } this.flushLogBatch(key); @@ -883,10 +923,12 @@ export class CloudTaskService extends TypedEventEmitter { error.status === 401 && unauthorizedWatcher?.streamBaseUrl ) { + // Keep durableStreamEnabled set: clearing it would route this disconnect through legacy + // status polling, which can stop the watch on a terminal status before stream-end arrives. + // The next connectSse re-resolves the target and resolveStreamTarget re-derives durability. unauthorizedWatcher.streamTargetResolved = false; unauthorizedWatcher.streamBaseUrl = null; unauthorizedWatcher.streamReadToken = null; - unauthorizedWatcher.durableStreamEnabled = false; this.log.info("Cloud task stream proxy token rejected, re-resolving", { key, }); @@ -915,15 +957,15 @@ export class CloudTaskService extends TypedEventEmitter { streamWasEstablished && Date.now() - connectedAt >= SSE_HEALTHY_CONNECTION_MS; - const watcher = this.watchers.get(key); - if (watcher) { + const errorWatcher = this.watchers.get(key); + if (errorWatcher) { if (isBackendError) { - watcher.streamErrorAttempts += 1; + errorWatcher.streamErrorAttempts += 1; } else if (wasHealthyStream) { - watcher.streamErrorAttempts = 0; + errorWatcher.streamErrorAttempts = 0; // A healthy-length connection proves timeout cycling, not a loop. - watcher.cumulativeReconnectAttempts = 0; - watcher.selfHealAttempted = false; + errorWatcher.cumulativeReconnectAttempts = 0; + errorWatcher.selfHealAttempted = false; } } @@ -939,11 +981,12 @@ export class CloudTaskService extends TypedEventEmitter { : 0, bytesReceived, eventsReceived, - dataEventsReceived: watcher?.connDataEventsReceived ?? 0, - lastEventId: watcher?.lastEventId ?? null, - reconnectAttempts: watcher?.reconnectAttempts ?? 0, - streamErrorAttempts: watcher?.streamErrorAttempts ?? 0, - cumulativeReconnectAttempts: watcher?.cumulativeReconnectAttempts ?? 0, + dataEventsReceived: errorWatcher?.connDataEventsReceived ?? 0, + lastEventId: errorWatcher?.lastEventId ?? null, + reconnectAttempts: errorWatcher?.reconnectAttempts ?? 0, + streamErrorAttempts: errorWatcher?.streamErrorAttempts ?? 0, + cumulativeReconnectAttempts: + errorWatcher?.cumulativeReconnectAttempts ?? 0, }); await this.handleStreamCompletion(key, { reconnectOnDisconnect: true, @@ -958,9 +1001,15 @@ export class CloudTaskService extends TypedEventEmitter { } } - private handleSseEvent(key: string, event: SseEvent): void { + // Returns a BackendStreamError when the stream carries an error event so the caller can throw at + // the read site; returns null otherwise. It does not throw, so a single event cannot unwind the + // reader loop unexpectedly. + private handleSseEvent( + key: string, + event: SseEvent, + ): BackendStreamError | null { const watcher = this.watchers.get(key); - if (!watcher || watcher.failed) return; + if (!watcher || watcher.failed) return null; if (event.id) { watcher.lastEventId = event.id; @@ -971,7 +1020,7 @@ export class CloudTaskService extends TypedEventEmitter { const message = isSseErrorEvent(event.data) ? event.data.error : "Unknown stream error"; - throw new BackendStreamError(message); + return new BackendStreamError(message); } if (event.event === STREAM_END_EVENT_NAME) { @@ -979,7 +1028,7 @@ export class CloudTaskService extends TypedEventEmitter { // of reconnecting, independent of run status. The connection will close // naturally (clean EOF) right after this sentinel. watcher.streamEnded = true; - return; + return null; } // A keepalive or real event proves the transport recovered. A keepalive does not clear the @@ -987,7 +1036,7 @@ export class CloudTaskService extends TypedEventEmitter { watcher.reconnectAttempts = 0; if (isKeepaliveEvent(event)) { - return; + return null; } // A real data event proves the stream materialized; clear the remaining budgets and re-arm self-heal. @@ -1019,7 +1068,7 @@ export class CloudTaskService extends TypedEventEmitter { }); } } - return; + return null; } if (isPermissionRequestEvent(event.data)) { @@ -1031,13 +1080,13 @@ export class CloudTaskService extends TypedEventEmitter { toolCall: event.data.toolCall, options: event.data.options, }); - return; + return null; } watcher.pendingLogEntries.push(event.data as StoredLogEntry); if (watcher.pendingLogEntries.length >= EVENT_BATCH_MAX_SIZE) { this.flushLogBatch(key); - return; + return null; } if (!watcher.batchFlushTimeoutId) { @@ -1046,6 +1095,8 @@ export class CloudTaskService extends TypedEventEmitter { this.flushLogBatch(key); }, EVENT_BATCH_FLUSH_MS); } + + return null; } private flushLogBatch(key: string): void { @@ -1084,27 +1135,13 @@ export class CloudTaskService extends TypedEventEmitter { const watcher = this.watchers.get(key); if (!watcher || watcher.bufferedLogBatches.length === 0) return; - // Content-based dedup: SSE ids don't exist in the historical entries, so the payload is the key. - const historicalCounts = new Map(); - for (const entry of historicalEntries) { - const serialized = JSON.stringify(entry); - historicalCounts.set( - serialized, - (historicalCounts.get(serialized) ?? 0) + 1, - ); - } + const historicalCounts = buildEntryFrequencyMap(historicalEntries); for (const entries of watcher.bufferedLogBatches) { - const dedupedEntries = entries.filter((entry) => { - const serialized = JSON.stringify(entry); - const remaining = historicalCounts.get(serialized) ?? 0; - if (remaining <= 0) { - return true; - } - - historicalCounts.set(serialized, remaining - 1); - return false; - }); + const dedupedEntries = filterEntriesNotInFrequencyMap( + entries, + historicalCounts, + ); if (dedupedEntries.length === 0) { continue; @@ -1142,25 +1179,11 @@ export class CloudTaskService extends TypedEventEmitter { return { snapshotEntries: historicalEntries, missingEmittedEntries: [] }; } - const historicalCounts = new Map(); - for (const entry of historicalEntries) { - const serialized = JSON.stringify(entry); - historicalCounts.set( - serialized, - (historicalCounts.get(serialized) ?? 0) + 1, - ); - } - - const missingEmittedEntries = emittedEntries.filter((entry) => { - const serialized = JSON.stringify(entry); - const remaining = historicalCounts.get(serialized) ?? 0; - if (remaining <= 0) { - return true; - } - - historicalCounts.set(serialized, remaining - 1); - return false; - }); + const historicalCounts = buildEntryFrequencyMap(historicalEntries); + const missingEmittedEntries = filterEntriesNotInFrequencyMap( + emittedEntries, + historicalCounts, + ); return { snapshotEntries: [...historicalEntries, ...missingEmittedEntries], @@ -1300,6 +1323,8 @@ export class CloudTaskService extends TypedEventEmitter { { key }, ); this.resetWatcherForRebootstrap(watcher); + // Set after the reset (which clears it): consumes the single allowed self-heal so a + // straight-back loop fails next time instead of re-bootstrapping forever. watcher.selfHealAttempted = true; void this.bootstrapWatcher(key); return; @@ -1410,26 +1435,13 @@ export class CloudTaskService extends TypedEventEmitter { return; } + // All callers pass reconnectOnDisconnect, and durable watches only stop via the stream-end + // sentinel or a terminal legacy poll (both handled above); any other disconnect reconnects. if (reconnectOnDisconnect) { this.scheduleReconnect(key, options.reconnectError, { countAttempt: options.countReconnectAttempt ?? false, }); - return; } - - this.log.info("Cloud task terminal stop", { - key, - status: watcher.lastStatus, - reachedViaError: options.reconnectError !== undefined, - lastEventId: watcher.lastEventId, - sentLastEventId: watcher.connSentLastEventId, - totalEntryCount: watcher.totalEntryCount, - connDataEventsReceived: watcher.connDataEventsReceived, - connDurationMs: - watcher.connStartedAt !== 0 ? Date.now() - watcher.connStartedAt : 0, - }); - - await this.finalizeWatcherStop(key); } // Stops a watcher whose stream is durably complete. Repairs the displayed status if the stream @@ -1561,9 +1573,7 @@ export class CloudTaskService extends TypedEventEmitter { return null; } - for (const entry of page.entries) { - entries.push(entry); - } + entries.push(...page.entries); if (!page.hasMore || page.entries.length === 0) { return entries; } From 1cbc34444af79116affc41c97378f9893d38a17c Mon Sep 17 00:00:00 2001 From: Charles Vien Date: Tue, 30 Jun 2026 11:20:20 -0700 Subject: [PATCH 17/20] log ingest recovery at info not warn --- packages/agent/src/server/event-stream-sender.test.ts | 1 + packages/agent/src/server/event-stream-sender.ts | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/agent/src/server/event-stream-sender.test.ts b/packages/agent/src/server/event-stream-sender.test.ts index 5eb8d3527a..c57825d318 100644 --- a/packages/agent/src/server/event-stream-sender.test.ts +++ b/packages/agent/src/server/event-stream-sender.test.ts @@ -205,6 +205,7 @@ describe("TaskRunEventStreamSender", () => { expect(fetchMock).toHaveBeenCalled(); const lastCall = fetchMock.mock.calls[fetchMock.mock.calls.length - 1]; expect(lastCall[0]).toBe("http://agent-proxy:8003/v1/runs/run-1/ingest"); + expect(lastCall[0]).not.toContain("/api/projects/"); }); it("keeps the active ingest request open across scheduled flushes", async () => { diff --git a/packages/agent/src/server/event-stream-sender.ts b/packages/agent/src/server/event-stream-sender.ts index ce2b37609e..5e254ded40 100644 --- a/packages/agent/src/server/event-stream-sender.ts +++ b/packages/agent/src/server/event-stream-sender.ts @@ -682,7 +682,7 @@ export class TaskRunEventStreamSender { } if (this.droppedBeforeSequenceCount > 0) { - this.config.logger.warn("Task run event ingest recovered after drops", { + this.config.logger.info("Task run event ingest recovered after drops", { dropped: this.droppedBeforeSequenceCount, }); this.droppedBeforeSequenceCount = 0; From dd07722dfd7ad626cfb63c0bdf5fa58b4dc4bc19 Mon Sep 17 00:00:00 2001 From: Charles Vien Date: Tue, 30 Jun 2026 13:45:53 -0700 Subject: [PATCH 18/20] fix codeql url substring alert in cloud test --- packages/core/src/cloud-task/cloud-task.test.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/core/src/cloud-task/cloud-task.test.ts b/packages/core/src/cloud-task/cloud-task.test.ts index 4b285db2f9..0acd9ee5fe 100644 --- a/packages/core/src/cloud-task/cloud-task.test.ts +++ b/packages/core/src/cloud-task/cloud-task.test.ts @@ -2619,7 +2619,10 @@ describe("CloudTaskService", () => { expect( mockStreamFetch.mock.calls.every(([input]) => { const url = typeof input === "string" ? input : (input as Request).url; - return url.includes("https://app.example.com/api/"); + const { origin, pathname } = new URL(url); + return ( + origin === "https://app.example.com" && pathname.startsWith("/api/") + ); }), ).toBe(true); expect(updates).toContainEqual( From d0dd64903f9d416cdff124933b2ec47b46c5e7dc Mon Sep 17 00:00:00 2001 From: Charles Vien Date: Tue, 30 Jun 2026 14:08:42 -0700 Subject: [PATCH 19/20] log cloud stream attempts and results at info --- packages/core/src/cloud-task/cloud-task.ts | 23 ++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/packages/core/src/cloud-task/cloud-task.ts b/packages/core/src/cloud-task/cloud-task.ts index f7a5cbcece..2779ea4dcb 100644 --- a/packages/core/src/cloud-task/cloud-task.ts +++ b/packages/core/src/cloud-task/cloud-task.ts @@ -790,10 +790,19 @@ export class CloudTaskService extends TypedEventEmitter { headers.Authorization = `Bearer ${watcher.streamReadToken}`; } - // Debug level: fires on every reconnect and would otherwise dominate the log file. - this.log.debug("Opening cloud task stream", { + // Info so every stream attempt is visible in the logs; Bearer token redacted. + this.log.info("Opening cloud task stream", { + key, + leg, usingProxy, + durableStream: watcher.durableStreamEnabled, + method: "GET", streamUrl: url.toString(), + lastEventId: watcher.lastEventId, + startLatest, + headers: usingProxy + ? { ...headers, Authorization: "Bearer " } + : headers, }); const parser = new SseEventParser((message, data) => @@ -822,6 +831,14 @@ export class CloudTaskService extends TypedEventEmitter { signal: controller.signal, }); + this.log.info("Cloud task stream response", { + key, + leg, + status: response.status, + ok: response.ok, + streamUrl: url.toString(), + }); + if (!response.ok) { throw createStreamStatusError(response.status); } @@ -971,6 +988,8 @@ export class CloudTaskService extends TypedEventEmitter { this.log.warn("Cloud task stream error", { key, + leg, + streamUrl: url.toString(), error: errorMessage, errorDetail: serializeError(error), wasHealthyStream, From 7f791ced9904543341fcdbe248e64727f589ef94 Mon Sep 17 00:00:00 2001 From: Charles Vien Date: Tue, 30 Jun 2026 14:21:38 -0700 Subject: [PATCH 20/20] surface stream leg and url in log headlines --- packages/core/src/cloud-task/cloud-task.ts | 25 ++++++++++++++-------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/packages/core/src/cloud-task/cloud-task.ts b/packages/core/src/cloud-task/cloud-task.ts index 2779ea4dcb..348f5e57c3 100644 --- a/packages/core/src/cloud-task/cloud-task.ts +++ b/packages/core/src/cloud-task/cloud-task.ts @@ -791,7 +791,7 @@ export class CloudTaskService extends TypedEventEmitter { } // Info so every stream attempt is visible in the logs; Bearer token redacted. - this.log.info("Opening cloud task stream", { + this.log.info(`Opening cloud task stream via ${leg}: ${url.toString()}`, { key, leg, usingProxy, @@ -831,13 +831,18 @@ export class CloudTaskService extends TypedEventEmitter { signal: controller.signal, }); - this.log.info("Cloud task stream response", { - key, - leg, - status: response.status, - ok: response.ok, - streamUrl: url.toString(), - }); + this.log.info( + `Cloud task stream response ${response.status} ${ + response.ok ? "ok" : "FAILED" + } via ${leg}`, + { + key, + leg, + status: response.status, + ok: response.ok, + streamUrl: url.toString(), + }, + ); if (!response.ok) { throw createStreamStatusError(response.status); @@ -851,8 +856,10 @@ export class CloudTaskService extends TypedEventEmitter { streamWasEstablished = true; watcher.connStartedAt = connectedAt; - this.log.info("Cloud task SSE connected", { + this.log.info(`Cloud task SSE connected via ${leg}: ${url.toString()}`, { key, + leg, + streamUrl: url.toString(), sentLastEventId: watcher.connSentLastEventId, startLatest, status: response.status,