Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 20 additions & 103 deletions packages/agent/src/server/event-stream-sender.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,132 +208,49 @@ describe("TaskRunEventStreamSender", () => {
expect(lastCall[0]).not.toContain("/api/projects/");
});

it("uses a short stream window for agent-proxy ingest so buffered uploads commit live events", async () => {
vi.useFakeTimers();
try {
const requestBodies: string[] = [];
let activeStreamClosed = false;
const fetchMock = vi.fn(
async (_url: string | URL | Request, init?: RequestInit) => {
if (!init?.body || typeof init.body === "string") {
return responseForBody(await readRequestBody(init));
}

const body = await readRequestBody(init);
activeStreamClosed = true;
requestBodies.push(body);
return responseForBody(body);
},
);
vi.stubGlobal("fetch", fetchMock);

const sender = createSender({
eventIngestBaseUrl: "http://agent-proxy:8003/",
});

sender.enqueue({
type: "notification",
notification: { method: "first" },
});
await vi.advanceTimersByTimeAsync(0);

expect(fetchMock).toHaveBeenCalledTimes(2);
expect(activeStreamClosed).toBe(false);

await vi.advanceTimersByTimeAsync(1_000);

expect(activeStreamClosed).toBe(true);
expect(eventSequences(requestBodies[0])).toEqual([1]);

await sender.stop();
} finally {
vi.useRealTimers();
}
});

it("keeps the active ingest request open across scheduled flushes", async () => {
it("closes the active ingest upload after each drained batch on the proxy path", async () => {
const requestBodies: string[] = [];
let activeStreamClosed = false;
let contentUploads = 0;
const fetchMock = vi.fn(
async (_url: string | URL | Request, init?: RequestInit) => {
if (!init?.body || typeof init.body === "string") {
const body = await readRequestBody(init);
requestBodies.push(body);
return responseForBody(body);
return responseForBody(await readRequestBody(init));
}

// Resolves only once the sender closes the upload body.
const body = await readRequestBody(init);
activeStreamClosed = true;
contentUploads += 1;
requestBodies.push(body);
return responseForBody(body);
},
);
vi.stubGlobal("fetch", fetchMock);

const sender = createSender({ flushDelayMs: 0 });
const sender = createSender({
flushDelayMs: 0,
eventIngestBaseUrl: "http://agent-proxy:8003/",
});

// A buffering ingress only forwards the request body when the upload
// closes, so each drained batch must ride its own promptly-closed upload
// rather than one long-lived request held open until stop.
sender.enqueue({ type: "notification", notification: { method: "first" } });
await waitFor(() => fetchMock.mock.calls.length === 2);
expect(activeStreamClosed).toBe(false);
await waitFor(() => contentUploads === 1);
expect(eventSequences(requestBodies[0])).toEqual([1]);
expect(completionSequences(requestBodies[0])).toEqual([]);

sender.enqueue({
type: "notification",
notification: { method: "second" },
});
await new Promise((resolve) => setTimeout(resolve, 0));
await new Promise((resolve) => setTimeout(resolve, 0));

expect(fetchMock).toHaveBeenCalledTimes(2);
expect(activeStreamClosed).toBe(false);

await sender.stop();

expect(fetchMock).toHaveBeenCalledTimes(2);
expect(activeStreamClosed).toBe(true);
expect(parseLines(requestBodies[1])).toEqual([
{
seq: 1,
event: { type: "notification", notification: { method: "first" } },
},
{
seq: 2,
event: { type: "notification", notification: { method: "second" } },
},
{ type: STREAM_COMPLETE_CONTROL_TYPE, final_seq: 2 },
]);
});

it("closes an idle active ingest request after the stream window elapses", async () => {
const requestBodies: string[] = [];
let activeStreamClosed = false;
const fetchMock = vi.fn(
async (_url: string | URL | Request, init?: RequestInit) => {
if (!init?.body || typeof init.body === "string") {
return responseForBody(await readRequestBody(init));
}

const body = await readRequestBody(init);
activeStreamClosed = true;
requestBodies.push(body);
return responseForBody(body);
},
);
vi.stubGlobal("fetch", fetchMock);

const sender = createSender({ flushDelayMs: 0, streamWindowMs: 5 });

sender.enqueue({ type: "notification", notification: { method: "first" } });
await waitFor(() => fetchMock.mock.calls.length === 2);
expect(activeStreamClosed).toBe(false);

await waitFor(() => activeStreamClosed, 200);
expect(eventSequences(requestBodies[0])).toEqual([1]);
expect(completionSequences(requestBodies[0])).toEqual([]);
await waitFor(() => contentUploads === 2);
expect(eventSequences(requestBodies[1])).toEqual([2]);
expect(completionSequences(requestBodies[1])).toEqual([]);

await sender.stop();

expect(eventSequences(requestBodies[1])).toEqual([]);
expect(completionSequences(requestBodies[1])).toEqual([1]);
const finalBody = requestBodies.at(-1) ?? "";
expect(completionSequences(finalBody)).toEqual([2]);
});

it("aborts a stuck ingest response after closing the request body", async () => {
Expand Down
17 changes: 13 additions & 4 deletions packages/agent/src/server/event-stream-sender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ const DEFAULT_RETRY_DELAY_MS = 1_000;
const DEFAULT_REQUEST_TIMEOUT_MS = 10_000;
const DEFAULT_STOP_TIMEOUT_MS = 30_000;
const DEFAULT_STREAM_WINDOW_MS = 5 * 60 * 1_000;
const DEFAULT_PROXY_STREAM_WINDOW_MS = 1_000;
const STREAM_COMPLETE_CONTROL_TYPE = "_posthog/stream_complete";

export class TaskRunEventStreamSender {
Expand All @@ -70,6 +69,7 @@ export class TaskRunEventStreamSender {
private readonly requestTimeoutMs: number;
private readonly stopTimeoutMs: number;
private readonly streamWindowMs: number;
private readonly usingProxy: boolean;
private readonly createStreamingUpload: StreamingUploadFactory;
private readonly encoder = new TextEncoder();
private sequence = 0;
Expand Down Expand Up @@ -112,9 +112,8 @@ export class TaskRunEventStreamSender {
this.requestTimeoutMs =
config.requestTimeoutMs ?? DEFAULT_REQUEST_TIMEOUT_MS;
this.stopTimeoutMs = config.stopTimeoutMs ?? DEFAULT_STOP_TIMEOUT_MS;
this.streamWindowMs =
config.streamWindowMs ??
(usingProxy ? DEFAULT_PROXY_STREAM_WINDOW_MS : DEFAULT_STREAM_WINDOW_MS);
this.streamWindowMs = config.streamWindowMs ?? DEFAULT_STREAM_WINDOW_MS;
this.usingProxy = usingProxy;
this.createStreamingUpload =
config.createStreamingUpload ?? createNodeStreamingUpload;
}
Expand Down Expand Up @@ -211,6 +210,16 @@ export class TaskRunEventStreamSender {

try {
await flushPromise;
// On the proxy path, deliver the drained batch now instead of holding one
// long-lived upload. The ingress in front of the proxy buffers the ingest
// request body and only forwards it once the request closes, so a
// long-lived upload strands events; closing per batch forwards each within
// a round-trip. Gated to the proxy write leg so the Django path keeps its
// existing long-lived upload. The stop path leaves closing to drainForStop
// so the completion line rides the final upload.
if (!this.stopped && this.usingProxy) {
await this.closeActiveStream();
}
return this.bufferedEvents.length < previousBufferLength;
} catch (error) {
this.config.logger.warn(
Expand Down
Loading