diff --git a/packages/agent/src/server/agent-server.test.ts b/packages/agent/src/server/agent-server.test.ts index 88bae5505..ca2ed2364 100644 --- a/packages/agent/src/server/agent-server.test.ts +++ b/packages/agent/src/server/agent-server.test.ts @@ -203,6 +203,247 @@ describe("AgentServer HTTP Mode", () => { }); describe("turn completion", () => { + function stubSessionCleanup(testServer: unknown): { + cleanupSession: (options?: { + completeEventStream?: boolean; + }) => Promise; + eventStreamSender: { + enqueue: ReturnType; + stop: ReturnType; + }; + } { + const cleanupServer = testServer as { + session: unknown; + eventStreamSender: { + enqueue: ReturnType; + stop: ReturnType; + }; + captureCheckpointState: ReturnType; + cleanupSession: (options?: { + completeEventStream?: boolean; + }) => Promise; + }; + cleanupServer.captureCheckpointState = vi.fn(async () => {}); + cleanupServer.eventStreamSender = { + enqueue: vi.fn(), + stop: vi.fn(async () => {}), + }; + cleanupServer.session = { + payload: { run_id: "run-1" }, + pendingHandoffGitState: undefined, + logWriter: { flush: vi.fn(async () => {}) }, + acpConnection: { cleanup: vi.fn(async () => {}) }, + sseController: { close: vi.fn() }, + }; + return cleanupServer; + } + + it("keeps event ingest open for non-terminal session cleanup", async () => { + const testServer = stubSessionCleanup(createServer()); + + await testServer.cleanupSession(); + + expect(testServer.eventStreamSender.enqueue).not.toHaveBeenCalled(); + expect(testServer.eventStreamSender.stop).not.toHaveBeenCalled(); + }); + + it("stops event ingest for terminal session cleanup without fake task completion", async () => { + const testServer = stubSessionCleanup(createServer()); + + await testServer.cleanupSession({ completeEventStream: true }); + + expect(testServer.eventStreamSender.enqueue).not.toHaveBeenCalled(); + expect(testServer.eventStreamSender.stop).toHaveBeenCalledOnce(); + }); + + it("writes terminal failure status before completing event ingest", async () => { + const order: string[] = []; + const testServer = new AgentServer({ + port, + jwtPublicKey: TEST_PUBLIC_KEY, + repositoryPath: repo.path, + apiUrl: "http://localhost:8000", + apiKey: "test-api-key", + projectId: 1, + mode: "interactive", + taskId: "test-task-id", + runId: "test-run-id", + }) as unknown as { + eventStreamSender: { + enqueue: (event: Record) => void; + stop: () => Promise; + }; + posthogAPI: { + updateTaskRun: ( + taskId: string, + runId: string, + payload: Record, + ) => Promise; + }; + signalTaskComplete( + payload: JwtPayload, + stopReason: string, + errorMessage?: string, + ): Promise; + }; + testServer.eventStreamSender = { + enqueue: vi.fn(() => { + order.push("enqueue"); + }), + stop: vi.fn(async () => { + order.push("stop"); + }), + }; + testServer.posthogAPI = { + updateTaskRun: vi.fn(async () => { + order.push("update"); + return {}; + }), + }; + + await testServer.signalTaskComplete( + { + run_id: "run-1", + task_id: "task-1", + team_id: 1, + user_id: 1, + distinct_id: "distinct-id", + mode: "interactive", + }, + "error", + "boom", + ); + + expect(order).toEqual(["enqueue", "update", "stop"]); + expect(testServer.eventStreamSender.enqueue).toHaveBeenCalledWith( + expect.objectContaining({ + type: "notification", + notification: expect.objectContaining({ + method: "_posthog/error", + params: expect.objectContaining({ error: "boom" }), + }), + }), + ); + expect(testServer.posthogAPI.updateTaskRun).toHaveBeenCalledWith( + "task-1", + "run-1", + { + status: "failed", + error_message: "boom", + }, + ); + }); + + it("still stops event ingest when terminal failure status update fails", async () => { + const testServer = new AgentServer({ + port, + jwtPublicKey: TEST_PUBLIC_KEY, + repositoryPath: repo.path, + apiUrl: "http://localhost:8000", + apiKey: "test-api-key", + projectId: 1, + mode: "interactive", + taskId: "test-task-id", + runId: "test-run-id", + }) as unknown as { + eventStreamSender: { + enqueue: (event: Record) => void; + stop: () => Promise; + }; + posthogAPI: { + updateTaskRun: ( + taskId: string, + runId: string, + payload: Record, + ) => Promise; + }; + signalTaskComplete( + payload: JwtPayload, + stopReason: string, + errorMessage?: string, + ): Promise; + }; + testServer.eventStreamSender = { + enqueue: vi.fn(), + stop: vi.fn(async () => {}), + }; + testServer.posthogAPI = { + updateTaskRun: vi.fn(async () => { + throw new Error("update failed"); + }), + }; + + await testServer.signalTaskComplete( + { + run_id: "run-1", + task_id: "task-1", + team_id: 1, + user_id: 1, + distinct_id: "distinct-id", + mode: "interactive", + }, + "error", + "boom", + ); + + expect(testServer.eventStreamSender.enqueue).toHaveBeenCalledOnce(); + expect(testServer.posthogAPI.updateTaskRun).toHaveBeenCalledOnce(); + expect(testServer.eventStreamSender.stop).toHaveBeenCalledOnce(); + }); + + it("leaves event ingest open for non-error stop reasons", async () => { + const testServer = new AgentServer({ + port, + jwtPublicKey: TEST_PUBLIC_KEY, + repositoryPath: repo.path, + apiUrl: "http://localhost:8000", + apiKey: "test-api-key", + projectId: 1, + mode: "interactive", + taskId: "test-task-id", + runId: "test-run-id", + }) as unknown as { + eventStreamSender: { + enqueue: (event: Record) => void; + stop: () => Promise; + }; + posthogAPI: { + updateTaskRun: ( + taskId: string, + runId: string, + payload: Record, + ) => Promise; + }; + signalTaskComplete( + payload: JwtPayload, + stopReason: string, + ): Promise; + }; + testServer.eventStreamSender = { + enqueue: vi.fn(), + stop: vi.fn(async () => {}), + }; + testServer.posthogAPI = { + updateTaskRun: vi.fn(async () => ({})), + }; + + await testServer.signalTaskComplete( + { + run_id: "run-1", + task_id: "task-1", + team_id: 1, + user_id: 1, + distinct_id: "distinct-id", + mode: "interactive", + }, + "end_turn", + ); + + expect(testServer.eventStreamSender.enqueue).not.toHaveBeenCalled(); + expect(testServer.eventStreamSender.stop).not.toHaveBeenCalled(); + expect(testServer.posthogAPI.updateTaskRun).not.toHaveBeenCalled(); + }); + it("persists structured turn completion notifications", () => { const appendRawLine = vi.fn(); const testServer = new AgentServer({ diff --git a/packages/agent/src/server/agent-server.ts b/packages/agent/src/server/agent-server.ts index 12ea84c47..dbe7ab794 100644 --- a/packages/agent/src/server/agent-server.ts +++ b/packages/agent/src/server/agent-server.ts @@ -54,6 +54,7 @@ import { normalizeCloudPromptContent, promptBlocksToText, } from "./cloud-prompt"; +import { TaskRunEventStreamSender } from "./event-stream-sender"; import { type JwtPayload, JwtValidationError, validateJwt } from "./jwt"; import { handoffLocalGitStateSchema, @@ -217,6 +218,7 @@ export class AgentServer { private session: ActiveSession | null = null; private app: Hono; private posthogAPI: PostHogAPIClient; + private eventStreamSender: TaskRunEventStreamSender | null = null; private questionRelayedToSlack = false; private detectedPrUrl: string | null = null; private lastReportedBranch: string | null = null; @@ -281,6 +283,16 @@ export class AgentServer { getApiKey: () => config.apiKey, userAgent: `posthog/cloud.hog.dev; version: ${config.version ?? packageJson.version}`, }); + if (config.eventIngestToken) { + this.eventStreamSender = new TaskRunEventStreamSender({ + apiUrl: config.apiUrl, + projectId: config.projectId, + taskId: config.taskId, + runId: config.runId, + token: config.eventIngestToken, + logger: this.logger.child("EventIngest"), + }); + } this.app = this.createApp(); } @@ -544,7 +556,9 @@ export class AgentServer { this.logger.debug("Stopping agent server..."); if (this.session) { - await this.cleanupSession(); + await this.cleanupSession({ completeEventStream: true }); + } else { + await this.eventStreamSender?.stop(); } if (this.server) { @@ -1772,6 +1786,12 @@ ${attributionInstructions} const status = "failed"; + this.enqueueTaskTerminalEvent(POSTHOG_NOTIFICATIONS.ERROR, { + source: "agent_server", + stopReason, + error: errorMessage ?? "Agent error", + }); + try { await this.posthogAPI.updateTaskRun(payload.task_id, payload.run_id, { status, @@ -1780,9 +1800,28 @@ ${attributionInstructions} this.logger.debug("Task completion signaled", { status, stopReason }); } catch (error) { this.logger.error("Failed to signal task completion", error); + } finally { + await this.eventStreamSender?.stop(); } } + private enqueueTaskTerminalEvent( + method: + | typeof POSTHOG_NOTIFICATIONS.TASK_COMPLETE + | typeof POSTHOG_NOTIFICATIONS.ERROR, + params: Record, + ): void { + this.eventStreamSender?.enqueue({ + type: "notification", + timestamp: new Date().toISOString(), + notification: { + jsonrpc: "2.0", + method, + params, + }, + }); + } + private configureEnvironment({ isInternal = false, }: { @@ -2180,7 +2219,11 @@ ${attributionInstructions} } } - private async cleanupSession(): Promise { + private async cleanupSession({ + completeEventStream = false, + }: { + completeEventStream?: boolean; + } = {}): Promise { if (!this.session) return; this.logger.debug("Cleaning up session"); @@ -2219,6 +2262,10 @@ ${attributionInstructions} this.session.sseController.close(); } + if (completeEventStream) { + await this.eventStreamSender?.stop(); + } + this.pendingEvents = []; this.lastReportedBranch = null; this.session = null; @@ -2302,9 +2349,13 @@ ${attributionInstructions} } private broadcastEvent(event: Record): void { + if (!this.session) return; + + this.eventStreamSender?.enqueue(event); + if (this.session?.sseController) { this.sendSseEvent(this.session.sseController, event); - } else if (this.session) { + } else { // Buffer events during initialization (sseController not yet attached) this.pendingEvents.push(event); } diff --git a/packages/agent/src/server/bin.ts b/packages/agent/src/server/bin.ts index d72a91ba3..d22608828 100644 --- a/packages/agent/src/server/bin.ts +++ b/packages/agent/src/server/bin.ts @@ -32,6 +32,7 @@ const envSchema = z.object({ POSTHOG_CODE_REASONING_EFFORT: z .enum(["low", "medium", "high", "xhigh", "max"]) .optional(), + POSTHOG_TASK_RUN_EVENT_INGEST_TOKEN: z.string().min(1).optional(), }); const program = new Command(); @@ -148,6 +149,7 @@ program const server = new AgentServer({ port: parseInt(options.port, 10), jwtPublicKey: env.JWT_PUBLIC_KEY, + eventIngestToken: env.POSTHOG_TASK_RUN_EVENT_INGEST_TOKEN, repositoryPath: options.repositoryPath, apiUrl: env.POSTHOG_API_URL, apiKey: env.POSTHOG_PERSONAL_API_KEY, diff --git a/packages/agent/src/server/types.ts b/packages/agent/src/server/types.ts index 10cf96fc7..c982a9854 100644 --- a/packages/agent/src/server/types.ts +++ b/packages/agent/src/server/types.ts @@ -15,6 +15,7 @@ export interface AgentServerConfig { apiKey: string; projectId: number; jwtPublicKey: string; // RS256 public key for JWT verification + eventIngestToken?: string; mode: AgentMode; taskId: string; runId: string;