From 3623e13781b2ba8d882b9672a8e6b2b2c1c195fa Mon Sep 17 00:00:00 2001 From: Charles Vien Date: Tue, 30 Jun 2026 19:08:53 -0700 Subject: [PATCH] close ingest upload after each drained batch --- .../src/server/event-stream-sender.test.ts | 123 +++--------------- .../agent/src/server/event-stream-sender.ts | 17 ++- 2 files changed, 33 insertions(+), 107 deletions(-) diff --git a/packages/agent/src/server/event-stream-sender.test.ts b/packages/agent/src/server/event-stream-sender.test.ts index 37dfd1575..f351fa0cc 100644 --- a/packages/agent/src/server/event-stream-sender.test.ts +++ b/packages/agent/src/server/event-stream-sender.test.ts @@ -208,132 +208,49 @@ describe("TaskRunEventStreamSender", () => { expect(lastCall[0]).not.toContain("/api/projects/"); }); - it("uses a short stream window for agent-proxy ingest so buffered uploads commit live events", async () => { - vi.useFakeTimers(); - try { - const requestBodies: string[] = []; - let activeStreamClosed = false; - const fetchMock = vi.fn( - async (_url: string | URL | Request, init?: RequestInit) => { - if (!init?.body || typeof init.body === "string") { - return responseForBody(await readRequestBody(init)); - } - - const body = await readRequestBody(init); - activeStreamClosed = true; - requestBodies.push(body); - return responseForBody(body); - }, - ); - vi.stubGlobal("fetch", fetchMock); - - const sender = createSender({ - eventIngestBaseUrl: "http://agent-proxy:8003/", - }); - - sender.enqueue({ - type: "notification", - notification: { method: "first" }, - }); - await vi.advanceTimersByTimeAsync(0); - - expect(fetchMock).toHaveBeenCalledTimes(2); - expect(activeStreamClosed).toBe(false); - - await vi.advanceTimersByTimeAsync(1_000); - - expect(activeStreamClosed).toBe(true); - expect(eventSequences(requestBodies[0])).toEqual([1]); - - await sender.stop(); - } finally { - vi.useRealTimers(); - } - }); - - it("keeps the active ingest request open across scheduled flushes", async () => { + it("closes the active ingest upload after each drained batch on the proxy path", async () => { const requestBodies: string[] = []; - let activeStreamClosed = false; + let contentUploads = 0; const fetchMock = vi.fn( async (_url: string | URL | Request, init?: RequestInit) => { if (!init?.body || typeof init.body === "string") { - const body = await readRequestBody(init); - requestBodies.push(body); - return responseForBody(body); + return responseForBody(await readRequestBody(init)); } + // Resolves only once the sender closes the upload body. const body = await readRequestBody(init); - activeStreamClosed = true; + contentUploads += 1; requestBodies.push(body); return responseForBody(body); }, ); vi.stubGlobal("fetch", fetchMock); - const sender = createSender({ flushDelayMs: 0 }); + const sender = createSender({ + flushDelayMs: 0, + eventIngestBaseUrl: "http://agent-proxy:8003/", + }); + // A buffering ingress only forwards the request body when the upload + // closes, so each drained batch must ride its own promptly-closed upload + // rather than one long-lived request held open until stop. sender.enqueue({ type: "notification", notification: { method: "first" } }); - await waitFor(() => fetchMock.mock.calls.length === 2); - expect(activeStreamClosed).toBe(false); + await waitFor(() => contentUploads === 1); + expect(eventSequences(requestBodies[0])).toEqual([1]); + expect(completionSequences(requestBodies[0])).toEqual([]); sender.enqueue({ type: "notification", notification: { method: "second" }, }); - await new Promise((resolve) => setTimeout(resolve, 0)); - await new Promise((resolve) => setTimeout(resolve, 0)); - - expect(fetchMock).toHaveBeenCalledTimes(2); - expect(activeStreamClosed).toBe(false); - - await sender.stop(); - - expect(fetchMock).toHaveBeenCalledTimes(2); - expect(activeStreamClosed).toBe(true); - expect(parseLines(requestBodies[1])).toEqual([ - { - seq: 1, - event: { type: "notification", notification: { method: "first" } }, - }, - { - seq: 2, - event: { type: "notification", notification: { method: "second" } }, - }, - { type: STREAM_COMPLETE_CONTROL_TYPE, final_seq: 2 }, - ]); - }); - - it("closes an idle active ingest request after the stream window elapses", async () => { - const requestBodies: string[] = []; - let activeStreamClosed = false; - const fetchMock = vi.fn( - async (_url: string | URL | Request, init?: RequestInit) => { - if (!init?.body || typeof init.body === "string") { - return responseForBody(await readRequestBody(init)); - } - - const body = await readRequestBody(init); - activeStreamClosed = true; - requestBodies.push(body); - return responseForBody(body); - }, - ); - vi.stubGlobal("fetch", fetchMock); - - const sender = createSender({ flushDelayMs: 0, streamWindowMs: 5 }); - - sender.enqueue({ type: "notification", notification: { method: "first" } }); - await waitFor(() => fetchMock.mock.calls.length === 2); - expect(activeStreamClosed).toBe(false); - - await waitFor(() => activeStreamClosed, 200); - expect(eventSequences(requestBodies[0])).toEqual([1]); - expect(completionSequences(requestBodies[0])).toEqual([]); + await waitFor(() => contentUploads === 2); + expect(eventSequences(requestBodies[1])).toEqual([2]); + expect(completionSequences(requestBodies[1])).toEqual([]); await sender.stop(); - expect(eventSequences(requestBodies[1])).toEqual([]); - expect(completionSequences(requestBodies[1])).toEqual([1]); + const finalBody = requestBodies.at(-1) ?? ""; + expect(completionSequences(finalBody)).toEqual([2]); }); it("aborts a stuck ingest response after closing the request body", async () => { diff --git a/packages/agent/src/server/event-stream-sender.ts b/packages/agent/src/server/event-stream-sender.ts index 83985de02..437cb5e98 100644 --- a/packages/agent/src/server/event-stream-sender.ts +++ b/packages/agent/src/server/event-stream-sender.ts @@ -56,7 +56,6 @@ const DEFAULT_RETRY_DELAY_MS = 1_000; const DEFAULT_REQUEST_TIMEOUT_MS = 10_000; const DEFAULT_STOP_TIMEOUT_MS = 30_000; const DEFAULT_STREAM_WINDOW_MS = 5 * 60 * 1_000; -const DEFAULT_PROXY_STREAM_WINDOW_MS = 1_000; const STREAM_COMPLETE_CONTROL_TYPE = "_posthog/stream_complete"; export class TaskRunEventStreamSender { @@ -70,6 +69,7 @@ export class TaskRunEventStreamSender { private readonly requestTimeoutMs: number; private readonly stopTimeoutMs: number; private readonly streamWindowMs: number; + private readonly usingProxy: boolean; private readonly createStreamingUpload: StreamingUploadFactory; private readonly encoder = new TextEncoder(); private sequence = 0; @@ -112,9 +112,8 @@ export class TaskRunEventStreamSender { this.requestTimeoutMs = config.requestTimeoutMs ?? DEFAULT_REQUEST_TIMEOUT_MS; this.stopTimeoutMs = config.stopTimeoutMs ?? DEFAULT_STOP_TIMEOUT_MS; - this.streamWindowMs = - config.streamWindowMs ?? - (usingProxy ? DEFAULT_PROXY_STREAM_WINDOW_MS : DEFAULT_STREAM_WINDOW_MS); + this.streamWindowMs = config.streamWindowMs ?? DEFAULT_STREAM_WINDOW_MS; + this.usingProxy = usingProxy; this.createStreamingUpload = config.createStreamingUpload ?? createNodeStreamingUpload; } @@ -211,6 +210,16 @@ export class TaskRunEventStreamSender { try { await flushPromise; + // On the proxy path, deliver the drained batch now instead of holding one + // long-lived upload. The ingress in front of the proxy buffers the ingest + // request body and only forwards it once the request closes, so a + // long-lived upload strands events; closing per batch forwards each within + // a round-trip. Gated to the proxy write leg so the Django path keeps its + // existing long-lived upload. The stop path leaves closing to drainForStop + // so the completion line rides the final upload. + if (!this.stopped && this.usingProxy) { + await this.closeActiveStream(); + } return this.bufferedEvents.length < previousBufferLength; } catch (error) { this.config.logger.warn(