From 70c509209194deda1590aca93bf34347694fa4f8 Mon Sep 17 00:00:00 2001 From: Alessandro Pogliaghi Date: Fri, 8 May 2026 15:07:07 +0100 Subject: [PATCH] feat(tasks): add task run event stream sender --- .../src/server/event-stream-sender.test.ts | 675 ++++++++++++++++++ .../agent/src/server/event-stream-sender.ts | 453 ++++++++++++ 2 files changed, 1128 insertions(+) create mode 100644 packages/agent/src/server/event-stream-sender.test.ts create mode 100644 packages/agent/src/server/event-stream-sender.ts diff --git a/packages/agent/src/server/event-stream-sender.test.ts b/packages/agent/src/server/event-stream-sender.test.ts new file mode 100644 index 000000000..3dcfc73e4 --- /dev/null +++ b/packages/agent/src/server/event-stream-sender.test.ts @@ -0,0 +1,675 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { Logger } from "../utils/logger"; +import { TaskRunEventStreamSender } from "./event-stream-sender"; + +describe("TaskRunEventStreamSender", () => { + afterEach(() => { + vi.unstubAllGlobals(); + }); + + it("posts ordered NDJSON batches with the run-scoped token", async () => { + let requestBody = ""; + const fetchMock = vi.fn( + async (_url: string | URL | Request, init?: RequestInit) => { + const body = String(init?.body ?? ""); + if (!body) { + return new Response(JSON.stringify({ last_accepted_seq: 0 }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + } + requestBody = body; + return new Response(JSON.stringify({ last_accepted_seq: 2 }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + }, + ); + vi.stubGlobal("fetch", fetchMock); + + const sender = new TaskRunEventStreamSender({ + apiUrl: "http://localhost:8000/", + projectId: 1, + taskId: "task-1", + runId: "run-1", + token: "ingest-token", + logger: new Logger({ debug: false }), + }); + + sender.enqueue({ type: "notification", notification: { method: "first" } }); + sender.enqueue({ + type: "notification", + notification: { method: "second" }, + }); + await sender.stop(); + + expect(fetchMock).toHaveBeenCalledTimes(3); + expect(fetchMock.mock.calls[1][0]).toBe( + "http://localhost:8000/api/projects/1/tasks/task-1/runs/run-1/event_stream/", + ); + expect(fetchMock.mock.calls[1][1]?.headers).toEqual({ + Authorization: "Bearer ingest-token", + "Content-Type": "application/x-ndjson", + }); + expect(fetchMock.mock.calls[2][1]?.headers).toEqual({ + Authorization: "Bearer ingest-token", + "Content-Type": "application/x-ndjson", + "X-PostHog-Event-Stream-Complete": "true", + "X-PostHog-Event-Stream-Final-Seq": "2", + }); + expect(fetchMock.mock.calls[2][1]?.body).toBe(""); + + const lines = requestBody + .trim() + .split("\n") + .map((line) => JSON.parse(line)); + expect(lines).toEqual([ + { + seq: 1, + event: { type: "notification", notification: { method: "first" } }, + }, + { + seq: 2, + event: { type: "notification", notification: { method: "second" } }, + }, + ]); + }); + + it("aborts a stuck ingest request", async () => { + let aborted = false; + const fetchMock = vi.fn( + async (_url: string | URL | Request, init?: RequestInit) => { + if (!String(init?.body ?? "")) { + return new Response(JSON.stringify({ last_accepted_seq: 0 }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + } + return new Promise((_resolve, reject) => { + init?.signal?.addEventListener("abort", () => { + aborted = true; + reject(new DOMException("aborted", "AbortError")); + }); + }); + }, + ); + vi.stubGlobal("fetch", fetchMock); + + const sender = new TaskRunEventStreamSender({ + apiUrl: "http://localhost:8000/", + projectId: 1, + taskId: "task-1", + runId: "run-1", + token: "ingest-token", + logger: new Logger({ debug: false }), + flushDelayMs: 0, + requestTimeoutMs: 1, + retryDelayMs: 1, + stopTimeoutMs: 1, + }); + + sender.enqueue({ type: "notification", notification: { method: "first" } }); + await sender.stop(); + + expect(fetchMock.mock.calls.length).toBeGreaterThanOrEqual(2); + expect(aborted).toBe(true); + }); + + it("waits for the final ingest request before stop resolves", async () => { + const ingestRequest: { resolve?: (response: Response) => void } = {}; + let stopped = false; + const fetchMock = vi.fn( + async (_url: string | URL | Request, init?: RequestInit) => { + const body = String(init?.body ?? ""); + if (!body) { + return new Response(JSON.stringify({ last_accepted_seq: 0 }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + } + return new Promise((resolve) => { + ingestRequest.resolve = resolve; + }); + }, + ); + vi.stubGlobal("fetch", fetchMock); + + const sender = new TaskRunEventStreamSender({ + apiUrl: "http://localhost:8000/", + projectId: 1, + taskId: "task-1", + runId: "run-1", + token: "ingest-token", + logger: new Logger({ debug: false }), + }); + + sender.enqueue({ type: "notification", notification: { method: "first" } }); + const stopPromise = sender.stop().then(() => { + stopped = true; + }); + + await new Promise((resolve) => setTimeout(resolve, 0)); + expect(stopped).toBe(false); + + const resolveIngest = ingestRequest.resolve; + if (!resolveIngest) { + throw new Error("expected ingest request to be in flight"); + } + resolveIngest( + new Response(JSON.stringify({ last_accepted_seq: 1 }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }), + ); + await stopPromise; + + expect(stopped).toBe(true); + }); + + it("posts an empty completion request on shutdown without buffered events", async () => { + const fetchMock = vi.fn( + async (_url: string | URL | Request, _init?: RequestInit) => { + return new Response(JSON.stringify({ last_accepted_seq: 0 }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + }, + ); + vi.stubGlobal("fetch", fetchMock); + + const sender = new TaskRunEventStreamSender({ + apiUrl: "http://localhost:8000/", + projectId: 1, + taskId: "task-1", + runId: "run-1", + token: "ingest-token", + logger: new Logger({ debug: false }), + }); + + await sender.stop(); + + expect(fetchMock).toHaveBeenCalledTimes(2); + expect(fetchMock.mock.calls[0][1]?.body).toBe(""); + expect(fetchMock.mock.calls[1][1]?.body).toBe(""); + expect(fetchMock.mock.calls[0][1]?.headers).toEqual({ + Authorization: "Bearer ingest-token", + "Content-Type": "application/x-ndjson", + }); + expect(fetchMock.mock.calls[1][1]?.headers).toEqual({ + Authorization: "Bearer ingest-token", + "Content-Type": "application/x-ndjson", + "X-PostHog-Event-Stream-Complete": "true", + "X-PostHog-Event-Stream-Final-Seq": "0", + }); + }); + + it.each([ + { + name: "when the buffer is full", + senderOptions: { maxBufferedEvents: 1 }, + events: [ + { type: "notification", notification: { method: "first" } }, + { type: "notification", notification: { method: "second" } }, + ], + acceptedMethod: "first", + }, + { + name: "when an event is oversized", + senderOptions: { maxEventBytes: 120 }, + events: [ + { + type: "notification", + notification: { + method: "oversized", + params: { message: "x".repeat(200) }, + }, + }, + { type: "notification", notification: { method: "small" } }, + ], + acceptedMethod: "small", + }, + ])( + "drops events before assigning sequence $name", + async ({ senderOptions, events, acceptedMethod }) => { + let requestBody = ""; + const fetchMock = vi.fn( + async (_url: string | URL | Request, init?: RequestInit) => { + const body = String(init?.body ?? ""); + if (!body) { + return new Response(JSON.stringify({ last_accepted_seq: 0 }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + } + requestBody = body; + return new Response(JSON.stringify({ last_accepted_seq: 1 }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + }, + ); + vi.stubGlobal("fetch", fetchMock); + + const sender = new TaskRunEventStreamSender({ + apiUrl: "http://localhost:8000/", + projectId: 1, + taskId: "task-1", + runId: "run-1", + token: "ingest-token", + logger: new Logger({ debug: false }), + ...senderOptions, + }); + + for (const event of events) { + sender.enqueue(event); + } + await sender.stop(); + + const lines = requestBody + .trim() + .split("\n") + .map((line) => JSON.parse(line)); + expect(lines).toEqual([ + { + seq: 1, + event: { + type: "notification", + notification: { method: acceptedMethod }, + }, + }, + ]); + }, + ); + + it("accepts an event at the next sequence size boundary", async () => { + let requestBody = ""; + const fetchMock = vi.fn( + async (_url: string | URL | Request, init?: RequestInit) => { + const body = String(init?.body ?? ""); + if (!body) { + return new Response(JSON.stringify({ last_accepted_seq: 0 }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + } + requestBody = body; + return new Response(JSON.stringify({ last_accepted_seq: 1 }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + }, + ); + vi.stubGlobal("fetch", fetchMock); + + const event = { + type: "notification", + notification: { method: "boundary" }, + }; + const maxEventBytes = new TextEncoder().encode( + JSON.stringify({ seq: 1, event }), + ).length; + + const sender = new TaskRunEventStreamSender({ + apiUrl: "http://localhost:8000/", + projectId: 1, + taskId: "task-1", + runId: "run-1", + token: "ingest-token", + logger: new Logger({ debug: false }), + maxEventBytes, + }); + + sender.enqueue(event); + await sender.stop(); + + const lines = requestBody + .trim() + .split("\n") + .map((line) => JSON.parse(line)); + expect(lines).toEqual([ + { + seq: 1, + event: { + type: "notification", + notification: { method: "boundary" }, + }, + }, + ]); + }); + + it("drains multiple capped batches on stop", async () => { + const requestBodies: string[] = []; + const completionBodies: string[] = []; + const completionFinalSequences: string[] = []; + const fetchMock = vi.fn( + async (_url: string | URL | Request, init?: RequestInit) => { + const body = String(init?.body ?? ""); + const headers = init?.headers as Record | undefined; + if (headers?.["X-PostHog-Event-Stream-Complete"] === "true") { + completionBodies.push(body); + completionFinalSequences.push( + headers["X-PostHog-Event-Stream-Final-Seq"], + ); + return new Response(JSON.stringify({ last_accepted_seq: 2 }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + } + if (!body) { + return new Response(JSON.stringify({ last_accepted_seq: 0 }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + } + requestBodies.push(body); + const lastSeq = JSON.parse(body.trim()).seq; + return new Response(JSON.stringify({ last_accepted_seq: lastSeq }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + }, + ); + vi.stubGlobal("fetch", fetchMock); + + const sender = new TaskRunEventStreamSender({ + apiUrl: "http://localhost:8000/", + projectId: 1, + taskId: "task-1", + runId: "run-1", + token: "ingest-token", + logger: new Logger({ debug: false }), + maxBatchEvents: 1, + }); + + sender.enqueue({ type: "notification", notification: { method: "first" } }); + sender.enqueue({ + type: "notification", + notification: { method: "second" }, + }); + await sender.stop(); + + expect(requestBodies).toHaveLength(2); + expect(requestBodies.map((body) => JSON.parse(body.trim()).seq)).toEqual([ + 1, 2, + ]); + const ingestHeaders = fetchMock.mock.calls + .filter(([, init]) => String(init?.body ?? "") !== "") + .map(([, init]) => init?.headers as Record | undefined); + expect( + ingestHeaders.map( + (headers) => headers?.["X-PostHog-Event-Stream-Complete"], + ), + ).toEqual([undefined, undefined]); + expect(completionBodies).toEqual([""]); + expect(completionFinalSequences).toEqual(["2"]); + }); + + it("retries stop drain after a transient ingest failure", async () => { + const requestBodies: string[] = []; + const fetchMock = vi.fn( + async (_url: string | URL | Request, init?: RequestInit) => { + const body = String(init?.body ?? ""); + if (!body) { + return new Response(JSON.stringify({ last_accepted_seq: 0 }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + } + + requestBodies.push(body); + if (requestBodies.length === 1) { + return new Response("temporary failure", { status: 503 }); + } + + return new Response(JSON.stringify({ last_accepted_seq: 1 }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + }, + ); + vi.stubGlobal("fetch", fetchMock); + + const sender = new TaskRunEventStreamSender({ + apiUrl: "http://localhost:8000/", + projectId: 1, + taskId: "task-1", + runId: "run-1", + token: "ingest-token", + logger: new Logger({ debug: false }), + retryDelayMs: 1, + stopTimeoutMs: 100, + }); + + sender.enqueue({ type: "notification", notification: { method: "first" } }); + await sender.stop(); + + expect(requestBodies.map((body) => JSON.parse(body.trim()).seq)).toEqual([ + 1, 1, + ]); + }); + + it("stops draining buffered events after the stop deadline", async () => { + const requestBodies: string[] = []; + const warnings: string[] = []; + const fetchMock = vi.fn( + async (_url: string | URL | Request, init?: RequestInit) => { + const body = String(init?.body ?? ""); + if (!body) { + return new Response(JSON.stringify({ last_accepted_seq: 0 }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + } + + await new Promise((resolve) => setTimeout(resolve, 5)); + requestBodies.push(body); + const lastSeq = JSON.parse(body.trim()).seq; + return new Response(JSON.stringify({ last_accepted_seq: lastSeq }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + }, + ); + vi.stubGlobal("fetch", fetchMock); + + const sender = new TaskRunEventStreamSender({ + apiUrl: "http://localhost:8000/", + projectId: 1, + taskId: "task-1", + runId: "run-1", + token: "ingest-token", + logger: new Logger({ + debug: false, + onLog: (level, _scope, message) => { + if (level === "warn") { + warnings.push(message); + } + }, + }), + maxBatchEvents: 1, + stopTimeoutMs: 1, + }); + + sender.enqueue({ type: "notification", notification: { method: "first" } }); + sender.enqueue({ + type: "notification", + notification: { method: "second" }, + }); + await sender.stop(); + + expect(requestBodies).toHaveLength(1); + expect(JSON.parse(requestBodies[0].trim()).seq).toBe(1); + expect(warnings).toContain( + "Task run event ingest stop deadline reached before fully draining buffered events", + ); + }); + + it("continues after a payload error acknowledges a valid prefix", async () => { + const requestBodies: string[] = []; + const fetchMock = vi.fn( + async (_url: string | URL | Request, init?: RequestInit) => { + const body = String(init?.body ?? ""); + if (!body) { + return new Response(JSON.stringify({ last_accepted_seq: 0 }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + } + requestBodies.push(body); + + if (requestBodies.length === 1) { + return new Response( + JSON.stringify({ + error: "Too many events in request", + last_accepted_seq: 1, + }), + { + status: 413, + headers: { "Content-Type": "application/json" }, + }, + ); + } + + return new Response(JSON.stringify({ last_accepted_seq: 2 }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + }, + ); + vi.stubGlobal("fetch", fetchMock); + + const sender = new TaskRunEventStreamSender({ + apiUrl: "http://localhost:8000/", + projectId: 1, + taskId: "task-1", + runId: "run-1", + token: "ingest-token", + logger: new Logger({ debug: false }), + maxBatchEvents: 2, + }); + + sender.enqueue({ type: "notification", notification: { method: "first" } }); + sender.enqueue({ + type: "notification", + notification: { method: "second" }, + }); + await sender.stop(); + + expect(requestBodies).toHaveLength(2); + expect( + requestBodies.map((body) => + body + .trim() + .split("\n") + .map((line) => JSON.parse(line).seq), + ), + ).toEqual([[1, 2], [2]]); + }); + + it("starts after the server's last accepted sequence on restart", async () => { + const requestBodies: string[] = []; + const fetchMock = vi.fn( + async (_url: string | URL | Request, init?: RequestInit) => { + const body = String(init?.body ?? ""); + if (!body) { + return new Response(JSON.stringify({ last_accepted_seq: 42 }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + } + requestBodies.push(body); + return new Response(JSON.stringify({ last_accepted_seq: 44 }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + }, + ); + vi.stubGlobal("fetch", fetchMock); + + const sender = new TaskRunEventStreamSender({ + apiUrl: "http://localhost:8000/", + projectId: 1, + taskId: "task-1", + runId: "run-1", + token: "ingest-token", + logger: new Logger({ debug: false }), + }); + + sender.enqueue({ + type: "notification", + notification: { method: "after-restart" }, + }); + sender.enqueue({ type: "notification", notification: { method: "next" } }); + await sender.stop(); + + expect( + requestBodies[0] + .trim() + .split("\n") + .map((line) => JSON.parse(line).seq), + ).toEqual([43, 44]); + }); + + it("rebases buffered events after a sequence gap response", async () => { + const requestBodies: string[] = []; + const fetchMock = vi.fn( + async (_url: string | URL | Request, init?: RequestInit) => { + const body = String(init?.body ?? ""); + if (!body) { + return new Response(JSON.stringify({ last_accepted_seq: 42 }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + } + + requestBodies.push(body); + if (requestBodies.length === 1) { + return new Response( + JSON.stringify({ + error: "Expected sequence 1, got 43", + last_accepted_seq: 0, + }), + { + status: 409, + headers: { "Content-Type": "application/json" }, + }, + ); + } + + return new Response(JSON.stringify({ last_accepted_seq: 2 }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + }, + ); + vi.stubGlobal("fetch", fetchMock); + + const sender = new TaskRunEventStreamSender({ + apiUrl: "http://localhost:8000/", + projectId: 1, + taskId: "task-1", + runId: "run-1", + token: "ingest-token", + logger: new Logger({ debug: false }), + }); + + sender.enqueue({ + type: "notification", + notification: { method: "after-expiry" }, + }); + sender.enqueue({ type: "notification", notification: { method: "next" } }); + await sender.stop(); + + expect( + requestBodies.map((body) => + body + .trim() + .split("\n") + .map((line) => JSON.parse(line).seq), + ), + ).toEqual([ + [43, 44], + [1, 2], + ]); + }); +}); diff --git a/packages/agent/src/server/event-stream-sender.ts b/packages/agent/src/server/event-stream-sender.ts new file mode 100644 index 000000000..dd2df0163 --- /dev/null +++ b/packages/agent/src/server/event-stream-sender.ts @@ -0,0 +1,453 @@ +import { Buffer } from "node:buffer"; +import type { Logger } from "../utils/logger"; + +interface TaskRunEventStreamSenderConfig { + apiUrl: string; + projectId: number; + taskId: string; + runId: string; + token: string; + logger: Logger; + maxBufferedEvents?: number; + flushDelayMs?: number; + retryDelayMs?: number; + requestTimeoutMs?: number; + stopTimeoutMs?: number; + maxBatchEvents?: number; + maxBatchBytes?: number; + maxEventBytes?: number; +} + +interface EventEnvelope { + seq: number; + event: Record; +} + +interface IngestResponse { + last_accepted_seq?: unknown; +} + +const DEFAULT_MAX_BUFFERED_EVENTS = 20_000; +const DEFAULT_MAX_BATCH_EVENTS = 500; +const DEFAULT_MAX_BATCH_BYTES = 4_000_000; +const DEFAULT_MAX_EVENT_BYTES = 900_000; +const DEFAULT_FLUSH_DELAY_MS = 50; +const DEFAULT_RETRY_DELAY_MS = 1_000; +const DEFAULT_REQUEST_TIMEOUT_MS = 10_000; +const DEFAULT_STOP_TIMEOUT_MS = 30_000; + +export class TaskRunEventStreamSender { + private readonly ingestUrl: string; + private readonly maxBufferedEvents: number; + private readonly maxBatchEvents: number; + private readonly maxBatchBytes: number; + private readonly maxEventBytes: number; + private readonly flushDelayMs: number; + private readonly retryDelayMs: number; + private readonly requestTimeoutMs: number; + private readonly stopTimeoutMs: number; + private sequence = 0; + private bufferedEvents: EventEnvelope[] = []; + private flushTimer: ReturnType | null = null; + private flushPromise: Promise | null = null; + private stopPromise: Promise | null = null; + private stopped = false; + private sequenceSynced = false; + private transportCompleted = false; + private droppedBeforeSequenceCount = 0; + private bufferRevision = 0; + + constructor(private readonly config: TaskRunEventStreamSenderConfig) { + const apiUrl = config.apiUrl.replace(/\/$/, ""); + this.ingestUrl = `${apiUrl}/api/projects/${config.projectId}/tasks/${encodeURIComponent( + config.taskId, + )}/runs/${encodeURIComponent(config.runId)}/event_stream/`; + this.maxBufferedEvents = + config.maxBufferedEvents ?? DEFAULT_MAX_BUFFERED_EVENTS; + this.maxBatchEvents = config.maxBatchEvents ?? DEFAULT_MAX_BATCH_EVENTS; + this.maxBatchBytes = config.maxBatchBytes ?? DEFAULT_MAX_BATCH_BYTES; + this.maxEventBytes = config.maxEventBytes ?? DEFAULT_MAX_EVENT_BYTES; + this.flushDelayMs = config.flushDelayMs ?? DEFAULT_FLUSH_DELAY_MS; + this.retryDelayMs = config.retryDelayMs ?? DEFAULT_RETRY_DELAY_MS; + this.requestTimeoutMs = + config.requestTimeoutMs ?? DEFAULT_REQUEST_TIMEOUT_MS; + this.stopTimeoutMs = config.stopTimeoutMs ?? DEFAULT_STOP_TIMEOUT_MS; + } + + enqueue(event: Record): void { + if (this.stopped) return; + + if (!this.canAcceptEvent(event)) { + return; + } + + const envelope: EventEnvelope = { + seq: ++this.sequence, + event, + }; + this.bufferedEvents.push(envelope); + this.scheduleFlush(); + } + + async stop(): Promise { + if (this.stopPromise) { + await this.stopPromise; + return; + } + + this.stopped = true; + + if (this.flushTimer) { + clearTimeout(this.flushTimer); + this.flushTimer = null; + } + + this.stopPromise = this.drainForStop(); + await this.stopPromise; + } + + private scheduleFlush(delayMs = this.flushDelayMs): void { + if (this.flushTimer || this.flushPromise || this.stopped) return; + + this.flushTimer = setTimeout(() => { + this.flushTimer = null; + void this.flush(); + }, delayMs); + } + + private async drainForStop(): Promise { + const startedAtMs = Date.now(); + const deadlineAtMs = startedAtMs + this.stopTimeoutMs; + while (this.bufferedEvents.length > 0) { + const previousLength = this.bufferedEvents.length; + const previousRevision = this.bufferRevision; + + await this.flush(); + const madeProgress = + this.bufferedEvents.length < previousLength || + this.bufferRevision !== previousRevision; + + if (!madeProgress && !(await this.waitBeforeStopRetry(deadlineAtMs))) { + this.warnStopDeadlineReached(startedAtMs); + return; + } + + if (Date.now() >= deadlineAtMs && this.bufferedEvents.length > 0) { + this.warnStopDeadlineReached(startedAtMs); + return; + } + } + + while (!this.transportCompleted) { + try { + await this.completeTransport(); + return; + } catch (error) { + this.config.logger.warn( + "Task run event ingest completion request failed", + this.describeError(error), + ); + } + + if (!(await this.waitBeforeStopRetry(deadlineAtMs))) { + this.config.logger.warn( + "Task run event ingest stop deadline reached before completing transport", + { + stopTimeoutMs: this.stopTimeoutMs, + elapsedMs: Date.now() - startedAtMs, + }, + ); + return; + } + } + } + + private async flush(): Promise { + if (this.flushPromise) { + await this.flushPromise.catch(() => undefined); + } + + if (this.bufferedEvents.length === 0) { + return true; + } + + const previousBufferLength = this.bufferedEvents.length; + const flushPromise = this.sendBufferedEvents(); + this.flushPromise = flushPromise; + + try { + await flushPromise; + return this.bufferedEvents.length < previousBufferLength; + } catch (error) { + this.config.logger.warn( + "Task run event ingest request failed", + this.describeError(error), + ); + if (!this.stopped) { + this.scheduleFlush(this.retryDelayMs); + } + return false; + } finally { + if (this.flushPromise === flushPromise) { + this.flushPromise = null; + } + if (!this.stopped && this.bufferedEvents.length > 0) { + this.scheduleFlush(0); + } + } + } + + private async sendBufferedEvents(): Promise { + await this.syncSequenceWithServer(); + + const body = this.buildBatchBody(); + const abortController = new AbortController(); + const timeout = setTimeout(() => { + abortController.abort(); + }, this.requestTimeoutMs); + + let response: Response; + try { + response = await fetch(this.ingestUrl, { + method: "POST", + headers: this.buildHeaders(), + body, + signal: abortController.signal, + }); + } finally { + clearTimeout(timeout); + } + + const responseBody = await this.parseResponse(response); + const lastAcceptedSeq = responseBody.parsed?.last_accepted_seq; + if (typeof lastAcceptedSeq === "number") { + const previousLength = this.bufferedEvents.length; + this.bufferedEvents = this.bufferedEvents.filter( + (event) => event.seq > lastAcceptedSeq, + ); + if (this.bufferedEvents.length !== previousLength) { + this.bufferRevision += 1; + } + if (response.status === 409) { + this.rebaseBufferedEvents(lastAcceptedSeq); + } + } + + if (!response.ok) { + throw new Error( + `Event ingest returned HTTP ${response.status}: ${responseBody.text.slice(0, 300)}`, + ); + } + } + + private async completeTransport(): Promise { + await this.syncSequenceWithServer(); + + const abortController = new AbortController(); + const timeout = setTimeout(() => { + abortController.abort(); + }, this.requestTimeoutMs); + + let response: Response; + try { + response = await fetch(this.ingestUrl, { + method: "POST", + headers: { + ...this.buildHeaders(), + "X-PostHog-Event-Stream-Complete": "true", + "X-PostHog-Event-Stream-Final-Seq": String(this.sequence), + }, + body: "", + signal: abortController.signal, + }); + } finally { + clearTimeout(timeout); + } + + const responseBody = await this.parseResponse(response); + const lastAcceptedSeq = responseBody.parsed?.last_accepted_seq; + if ( + typeof lastAcceptedSeq === "number" && + lastAcceptedSeq > this.sequence + ) { + this.sequence = lastAcceptedSeq; + } + + if (!response.ok) { + throw new Error( + `Event ingest completion returned HTTP ${response.status}: ${responseBody.text.slice(0, 300)}`, + ); + } + this.transportCompleted = true; + } + + private async waitBeforeStopRetry(deadlineAtMs: number): Promise { + const remainingMs = deadlineAtMs - Date.now(); + if (remainingMs <= 0) { + return false; + } + + await new Promise((resolve) => + setTimeout(resolve, Math.min(this.retryDelayMs, remainingMs)), + ); + return Date.now() < deadlineAtMs; + } + + private warnStopDeadlineReached(startedAtMs: number): void { + this.config.logger.warn( + "Task run event ingest stop deadline reached before fully draining buffered events", + { + remaining: this.bufferedEvents.length, + stopTimeoutMs: this.stopTimeoutMs, + elapsedMs: Date.now() - startedAtMs, + }, + ); + } + + private async syncSequenceWithServer(): Promise { + if (this.sequenceSynced) return; + + const abortController = new AbortController(); + const timeout = setTimeout(() => { + abortController.abort(); + }, this.requestTimeoutMs); + + let response: Response; + try { + response = await fetch(this.ingestUrl, { + method: "POST", + headers: this.buildHeaders(), + body: "", + signal: abortController.signal, + }); + } finally { + clearTimeout(timeout); + } + + const responseBody = await this.parseResponse(response); + + if (!response.ok) { + throw new Error( + `Event ingest sequence sync returned HTTP ${response.status}: ${responseBody.text.slice(0, 300)}`, + ); + } + + const lastAcceptedSeq = responseBody.parsed?.last_accepted_seq; + if (typeof lastAcceptedSeq === "number" && lastAcceptedSeq > 0) { + const offset = lastAcceptedSeq; + this.bufferedEvents = this.bufferedEvents.map((event) => ({ + ...event, + seq: event.seq + offset, + })); + this.sequence += offset; + } + + this.sequenceSynced = true; + } + + private buildHeaders(): Record { + return { + Authorization: `Bearer ${this.config.token}`, + "Content-Type": "application/x-ndjson", + }; + } + + private buildBatchBody(): string { + const lines: string[] = []; + let batchBytes = 0; + + for (const envelope of this.bufferedEvents) { + if (lines.length >= this.maxBatchEvents) { + break; + } + + const line = `${this.serializeEnvelope(envelope)}\n`; + const lineBytes = Buffer.byteLength(line, "utf8"); + if (lines.length > 0 && batchBytes + lineBytes > this.maxBatchBytes) { + break; + } + + lines.push(line); + batchBytes += lineBytes; + } + + return lines.join(""); + } + + private rebaseBufferedEvents(lastAcceptedSeq: number): void { + let nextSeq = lastAcceptedSeq + 1; + this.bufferedEvents = this.bufferedEvents.map((event) => ({ + ...event, + seq: nextSeq++, + })); + this.sequence = nextSeq - 1; + this.sequenceSynced = true; + this.bufferRevision += 1; + } + + private async parseResponse( + response: Response, + ): Promise<{ parsed: IngestResponse | null; text: string }> { + const text = await response.text(); + if (!text) { + return { parsed: null, text }; + } + + try { + return { parsed: JSON.parse(text) as IngestResponse, text }; + } catch { + return { parsed: null, text }; + } + } + + private canAcceptEvent(event: Record): boolean { + const eventBytes = Buffer.byteLength( + this.serializeEnvelope({ seq: this.sequence + 1, event }), + "utf8", + ); + if (eventBytes > this.maxEventBytes) { + this.config.logger.warn("Dropped oversized task run event", { + eventBytes, + maxEventBytes: this.maxEventBytes, + }); + return false; + } + + if (this.bufferedEvents.length >= this.maxBufferedEvents) { + this.droppedBeforeSequenceCount += 1; + if ( + this.droppedBeforeSequenceCount === 1 || + this.droppedBeforeSequenceCount % 100 === 0 + ) { + this.config.logger.warn( + "Dropped task run event before assigning sequence due to backpressure", + { + dropped: this.droppedBeforeSequenceCount, + maxBufferedEvents: this.maxBufferedEvents, + }, + ); + } + return false; + } + + if (this.droppedBeforeSequenceCount > 0) { + this.config.logger.warn("Task run event ingest recovered after drops", { + dropped: this.droppedBeforeSequenceCount, + }); + this.droppedBeforeSequenceCount = 0; + } + + return true; + } + + private serializeEnvelope(envelope: EventEnvelope): string { + return JSON.stringify({ seq: envelope.seq, event: envelope.event }); + } + + private describeError(error: unknown): unknown { + if (error instanceof Error) { + return { message: error.message, stack: error.stack }; + } + return error; + } +}