From 083f5f7be1ba61932033e6d0a818e02e753629b2 Mon Sep 17 00:00:00 2001 From: Charles Vien Date: Tue, 30 Jun 2026 18:52:41 -0700 Subject: [PATCH] flush proxy ingest uploads sooner --- .../src/server/event-stream-sender.test.ts | 43 +++++++++++++++++++ .../agent/src/server/event-stream-sender.ts | 5 ++- 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/packages/agent/src/server/event-stream-sender.test.ts b/packages/agent/src/server/event-stream-sender.test.ts index c57825d31..37dfd1575 100644 --- a/packages/agent/src/server/event-stream-sender.test.ts +++ b/packages/agent/src/server/event-stream-sender.test.ts @@ -208,6 +208,49 @@ describe("TaskRunEventStreamSender", () => { expect(lastCall[0]).not.toContain("/api/projects/"); }); + it("uses a short stream window for agent-proxy ingest so buffered uploads commit live events", async () => { + vi.useFakeTimers(); + try { + const requestBodies: string[] = []; + let activeStreamClosed = false; + const fetchMock = vi.fn( + async (_url: string | URL | Request, init?: RequestInit) => { + if (!init?.body || typeof init.body === "string") { + return responseForBody(await readRequestBody(init)); + } + + const body = await readRequestBody(init); + activeStreamClosed = true; + requestBodies.push(body); + return responseForBody(body); + }, + ); + vi.stubGlobal("fetch", fetchMock); + + const sender = createSender({ + eventIngestBaseUrl: "http://agent-proxy:8003/", + }); + + sender.enqueue({ + type: "notification", + notification: { method: "first" }, + }); + await vi.advanceTimersByTimeAsync(0); + + expect(fetchMock).toHaveBeenCalledTimes(2); + expect(activeStreamClosed).toBe(false); + + await vi.advanceTimersByTimeAsync(1_000); + + expect(activeStreamClosed).toBe(true); + expect(eventSequences(requestBodies[0])).toEqual([1]); + + await sender.stop(); + } finally { + vi.useRealTimers(); + } + }); + it("keeps the active ingest request open across scheduled flushes", async () => { const requestBodies: string[] = []; let activeStreamClosed = false; diff --git a/packages/agent/src/server/event-stream-sender.ts b/packages/agent/src/server/event-stream-sender.ts index 5e254ded4..83985de02 100644 --- a/packages/agent/src/server/event-stream-sender.ts +++ b/packages/agent/src/server/event-stream-sender.ts @@ -56,6 +56,7 @@ const DEFAULT_RETRY_DELAY_MS = 1_000; const DEFAULT_REQUEST_TIMEOUT_MS = 10_000; const DEFAULT_STOP_TIMEOUT_MS = 30_000; const DEFAULT_STREAM_WINDOW_MS = 5 * 60 * 1_000; +const DEFAULT_PROXY_STREAM_WINDOW_MS = 1_000; const STREAM_COMPLETE_CONTROL_TYPE = "_posthog/stream_complete"; export class TaskRunEventStreamSender { @@ -111,7 +112,9 @@ export class TaskRunEventStreamSender { this.requestTimeoutMs = config.requestTimeoutMs ?? DEFAULT_REQUEST_TIMEOUT_MS; this.stopTimeoutMs = config.stopTimeoutMs ?? DEFAULT_STOP_TIMEOUT_MS; - this.streamWindowMs = config.streamWindowMs ?? DEFAULT_STREAM_WINDOW_MS; + this.streamWindowMs = + config.streamWindowMs ?? + (usingProxy ? DEFAULT_PROXY_STREAM_WINDOW_MS : DEFAULT_STREAM_WINDOW_MS); this.createStreamingUpload = config.createStreamingUpload ?? createNodeStreamingUpload; }