Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/agent/src/server/agent-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ export class AgentServer {
if (config.eventIngestToken) {
this.eventStreamSender = new TaskRunEventStreamSender({
apiUrl: config.apiUrl,
eventIngestBaseUrl: config.eventIngestBaseUrl,
projectId: config.projectId,
taskId: config.taskId,
runId: config.runId,
Expand Down
10 changes: 5 additions & 5 deletions packages/agent/src/server/bin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ const envSchema = z.object({
.enum(["low", "medium", "high", "xhigh", "max"])
.optional(),
POSTHOG_TASK_RUN_EVENT_INGEST_TOKEN: z.string().min(1).optional(),
// Base URL for the event-ingest POST only; falls back to POSTHOG_API_URL when unset.
POSTHOG_TASK_RUN_EVENT_INGEST_URL: z.url().optional(),
POSTHOG_TASK_RUN_EVENT_INGEST_STREAM_WINDOW_MS: z
.string()
.regex(
Expand Down Expand Up @@ -162,6 +164,7 @@ program
port: parseInt(options.port, 10),
jwtPublicKey: env.JWT_PUBLIC_KEY,
eventIngestToken: env.POSTHOG_TASK_RUN_EVENT_INGEST_TOKEN,
eventIngestBaseUrl: env.POSTHOG_TASK_RUN_EVENT_INGEST_URL,
eventIngestStreamWindowMs:
env.POSTHOG_TASK_RUN_EVENT_INGEST_STREAM_WINDOW_MS,
repositoryPath: options.repositoryPath,
Expand Down Expand Up @@ -192,11 +195,8 @@ program
process.exit(0);
});

// A hard crash would otherwise leave the run non-terminal and the user staring
// at a generic "Cloud stream disconnected". Mark the run failed before exiting
// so the desktop surfaces a real error instead of a silent stall. The deadline
// guarantees we exit even if reportFatalError's network calls hang at crash time
// (e.g. API unreachable during a restart), so we never block pod shutdown.
// Mark the run failed before exiting so a hard crash surfaces a real error instead of a
// silent stall. The deadline guarantees we exit even if the report hangs at crash time.
const FATAL_ERROR_REPORT_DEADLINE_MS = 5_000;
const handleFatalError = async (error: unknown) => {
try {
Expand Down
21 changes: 21 additions & 0 deletions packages/agent/src/server/event-stream-sender.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,27 @@ describe("TaskRunEventStreamSender", () => {
]);
});

it("routes the ingest POST to the agent-proxy run-scoped path when eventIngestBaseUrl is set", async () => {
const fetchMock = vi.fn(
async (_url: string | URL | Request, init?: RequestInit) => {
const body = await readRequestBody(init);
return responseForBody(body);
},
);
vi.stubGlobal("fetch", fetchMock);

const sender = createSender({
eventIngestBaseUrl: "http://agent-proxy:8003/",
});
sender.enqueue({ type: "notification", notification: { method: "first" } });
await sender.stop();

expect(fetchMock).toHaveBeenCalled();
const lastCall = fetchMock.mock.calls[fetchMock.mock.calls.length - 1];
expect(lastCall[0]).toBe("http://agent-proxy:8003/v1/runs/run-1/ingest");
expect(lastCall[0]).not.toContain("/api/projects/");
});

it("keeps the active ingest request open across scheduled flushes", async () => {
const requestBodies: string[] = [];
let activeStreamClosed = false;
Expand Down
25 changes: 18 additions & 7 deletions packages/agent/src/server/event-stream-sender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import {

interface TaskRunEventStreamSenderConfig {
apiUrl: string;
// Base URL for the event-ingest POST only; falls back to apiUrl (Django path) when unset.
eventIngestBaseUrl?: string;
projectId: number;
taskId: string;
runId: string;
Expand Down Expand Up @@ -85,10 +87,20 @@ export class TaskRunEventStreamSender {
private bufferRevision = 0;

constructor(private readonly config: TaskRunEventStreamSenderConfig) {
const apiUrl = config.apiUrl.replace(/\/$/, "");
this.ingestUrl = `${apiUrl}/api/projects/${config.projectId}/tasks/${encodeURIComponent(
config.taskId,
)}/runs/${encodeURIComponent(config.runId)}/event_stream/`;
const usingProxy = Boolean(config.eventIngestBaseUrl);
const ingestBase = (config.eventIngestBaseUrl ?? config.apiUrl).replace(
/\/$/,
"",
);
this.ingestUrl = usingProxy
? `${ingestBase}/v1/runs/${encodeURIComponent(config.runId)}/ingest`
: `${ingestBase}/api/projects/${config.projectId}/tasks/${encodeURIComponent(
config.taskId,
)}/runs/${encodeURIComponent(config.runId)}/event_stream/`;
config.logger.info("Event ingest target resolved", {
ingestUrl: this.ingestUrl,
routedToProxy: usingProxy,
});
this.maxBufferedEvents =
config.maxBufferedEvents ?? DEFAULT_MAX_BUFFERED_EVENTS;
this.maxStreamEvents = config.maxStreamEvents ?? DEFAULT_MAX_STREAM_EVENTS;
Expand Down Expand Up @@ -341,8 +353,7 @@ export class TaskRunEventStreamSender {
delayOverrideMs?: number,
): void {
this.clearStreamWindowClose(stream);
// Rotate long-lived uploads even when the agent goes idle; this is a
// transport boundary, not a batching window.
// Rotate long-lived uploads even when idle: a transport boundary, not a batching window.
const delayMs =
delayOverrideMs ??
Math.max(0, stream.startedAtMs + this.streamWindowMs - Date.now());
Expand Down Expand Up @@ -671,7 +682,7 @@ export class TaskRunEventStreamSender {
}

if (this.droppedBeforeSequenceCount > 0) {
this.config.logger.warn("Task run event ingest recovered after drops", {
this.config.logger.info("Task run event ingest recovered after drops", {
dropped: this.droppedBeforeSequenceCount,
});
this.droppedBeforeSequenceCount = 0;
Expand Down
2 changes: 2 additions & 0 deletions packages/agent/src/server/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ export interface AgentServerConfig {
projectId: number;
jwtPublicKey: string; // RS256 public key for JWT verification
eventIngestToken?: string;
// Base URL for the event-ingest POST only; falls back to apiUrl when unset.
eventIngestBaseUrl?: string;
eventIngestStreamWindowMs?: number;
mode: AgentMode;
taskId: string;
Expand Down
Loading
Loading