From 38d68510e8a56b8b39e23ab82b5f17394aecfbef Mon Sep 17 00:00:00 2001 From: Ben Brandt Date: Fri, 5 Jun 2026 11:13:13 +0200 Subject: [PATCH 1/8] feat: Add session close support --- src/CodexAcpClient.ts | 21 +- src/CodexAcpServer.ts | 203 +++++++++++++++--- src/CodexAppServerClient.ts | 30 ++- .../data/session-close-active-turn.json | 21 ++ .../data/session-close-idle.json | 10 + .../CodexACPAgent/initialize.test.ts | 1 + .../CodexACPAgent/session-close.test.ts | 144 +++++++++++++ 7 files changed, 394 insertions(+), 36 deletions(-) create mode 100644 src/__tests__/CodexACPAgent/data/session-close-active-turn.json create mode 100644 src/__tests__/CodexACPAgent/data/session-close-idle.json create mode 100644 src/__tests__/CodexACPAgent/session-close.test.ts diff --git a/src/CodexAcpClient.ts b/src/CodexAcpClient.ts index 22cde0da..5757af53 100644 --- a/src/CodexAcpClient.ts +++ b/src/CodexAcpClient.ts @@ -258,6 +258,14 @@ export class CodexAcpClient { }; } + async closeSession(sessionId: string): Promise { + try { + await this.codexClient.threadUnsubscribe({threadId: sessionId}); + } finally { + this.codexClient.clearThreadHandlers(sessionId); + } + } + async awaitMcpServerStartup(serverNames: Array, afterVersion: number): Promise { return await this.codexClient.awaitMcpServerStartup(serverNames, afterVersion); } @@ -385,11 +393,16 @@ export class CodexAcpClient { serviceTier: ServiceTier | null, disableSummary: boolean, cwd: string, - ): Promise { + onTurnStarted?: (turnId: string) => void, + shouldCancel?: () => boolean, + ): Promise { const input = buildPromptItems(request.prompt); const effort = modelId.effort as ReasoningEffort | null; //TODO remove unsafe conversion await this.refreshSkills(cwd, request._meta); + if (shouldCancel?.()) { + return null; + } return await this.codexClient.runTurn({ threadId: request.sessionId, input: input, @@ -399,7 +412,11 @@ export class CodexAcpClient { effort: effort, model: modelId.model, serviceTier: serviceTier, - }); + }, onTurnStarted); + } + + resolveTurnInterrupted(params: { threadId: string, turnId: string }): void { + this.codexClient.resolveTurnInterrupted(params.threadId, params.turnId); } async listSkills(params?: SkillsListParams): Promise { diff --git a/src/CodexAcpServer.ts b/src/CodexAcpServer.ts index 058ef099..5443e3ba 100644 --- a/src/CodexAcpServer.ts +++ b/src/CodexAcpServer.ts @@ -80,6 +80,9 @@ export class CodexAcpServer implements acp.Agent { private readonly sessions: Map; private readonly pendingMcpStartupSessions: Map; + private readonly pendingTurnStarts: Map>; + private readonly activePromptCompletions: Map>; + private readonly closingSessions: Set; constructor( connection: acp.AgentSideConnection, @@ -89,6 +92,9 @@ export class CodexAcpServer implements acp.Agent { ) { this.sessions = new Map(); this.pendingMcpStartupSessions = new Map(); + this.pendingTurnStarts = new Map(); + this.activePromptCompletions = new Map(); + this.closingSessions = new Set(); this.connection = connection; this.codexAcpClient = codexAcpClient; this.defaultAuthRequest = defaultAuthRequest ?? null; @@ -123,7 +129,8 @@ export class CodexAcpServer implements acp.Agent { }, sessionCapabilities: { resume: { }, - list: { } + list: { }, + close: { }, }, mcpCapabilities: { acp: false, @@ -292,6 +299,36 @@ export class CodexAcpServer implements acp.Agent { return await this.runWithProcessCheck(() => this.codexAcpClient.listSessions(params)); } + async closeSession(params: acp.CloseSessionRequest): Promise { + logger.log("Closing session...", {sessionId: params.sessionId}); + const sessionState = this.sessions.get(params.sessionId); + this.closingSessions.add(params.sessionId); + + try { + if (sessionState) { + await this.interruptSessionTurn(sessionState, "Close", true); + } else { + logger.log("Close request received for unknown local session", {sessionId: params.sessionId}); + } + + const activePromptCompletion = this.activePromptCompletions.get(params.sessionId); + if (activePromptCompletion) { + await activePromptCompletion; + } + + await this.runWithProcessCheck(() => this.codexAcpClient.closeSession(params.sessionId)); + logger.log("Session closed", {sessionId: params.sessionId}); + } finally { + this.sessions.delete(params.sessionId); + this.pendingMcpStartupSessions.delete(params.sessionId); + this.pendingTurnStarts.delete(params.sessionId); + this.activePromptCompletions.delete(params.sessionId); + this.closingSessions.delete(params.sessionId); + } + + return {}; + } + async newSession( params: acp.NewSessionRequest, ): Promise { @@ -778,6 +815,9 @@ export class CodexAcpServer implements acp.Agent { pendingStartup.afterVersion, ) ); + if (!this.sessions.has(sessionId)) { + return; + } await this.publishMcpStartupStatus(sessionId, mcpStartup, pendingStartup.requestedServers); } catch (err) { logger.error(`Failed to publish MCP startup status for session ${sessionId}`, err); @@ -807,6 +847,81 @@ export class CodexAcpServer implements acp.Agent { } } + private trackActivePrompt(sessionId: string): () => void { + let resolveCompletion: () => void = () => {}; + const completion = new Promise((resolve) => { + resolveCompletion = resolve; + }); + this.activePromptCompletions.set(sessionId, completion); + + let completed = false; + return () => { + if (completed) { + return; + } + completed = true; + if (this.activePromptCompletions.get(sessionId) === completion) { + this.activePromptCompletions.delete(sessionId); + } + resolveCompletion(); + }; + } + + private async interruptSessionTurn( + sessionState: SessionState, + requestName: "Cancel" | "Close", + resolveInterruptedTurn: boolean, + ): Promise { + const turnId = await this.getInterruptibleTurnId(sessionState, requestName); + if (!turnId) { + return; + } + + logger.log(`${requestName} session requested`, { + sessionId: sessionState.sessionId, + currentTurnId: turnId, + }); + try { + await this.runWithProcessCheck(() => this.codexAcpClient.turnInterrupt({ + threadId: sessionState.sessionId, + turnId, + })); + if (resolveInterruptedTurn) { + this.codexAcpClient.resolveTurnInterrupted({ + threadId: sessionState.sessionId, + turnId, + }); + } + logger.log(`${requestName} - turnInterrupt succeeded`, { + sessionId: sessionState.sessionId, + currentTurnId: turnId, + }); + } catch (err) { + logger.error(`${requestName} - turnInterrupt failed`, err); + } + } + + private async getInterruptibleTurnId( + sessionState: SessionState, + requestName: "Cancel" | "Close", + ): Promise { + if (sessionState.currentTurnId) { + return sessionState.currentTurnId; + } + + const pendingTurnStart = this.pendingTurnStarts.get(sessionState.sessionId); + if (!pendingTurnStart) { + logger.log(`${requestName} request rejected: no current turn`, {sessionId: sessionState.sessionId}); + return null; + } + + const turnId = await pendingTurnStart; + if (!turnId) { + logger.log(`${requestName} request rejected: no current turn`, {sessionId: sessionState.sessionId}); + } + return turnId; + } + async prompt(params: acp.PromptRequest): Promise { logger.log("Prompt received", { sessionId: params.sessionId, @@ -815,6 +930,9 @@ export class CodexAcpServer implements acp.Agent { const sessionState = this.getSessionState(params.sessionId); sessionState.currentTurnId = null; sessionState.lastTokenUsage = null; + const completeActivePrompt = this.trackActivePrompt(params.sessionId); + let pendingTurnStart: Promise | null = null; + let resolvePendingTurnStart: (turnId: string | null) => void = () => {}; try { const eventHandler = new CodexEventHandler(this.connection, sessionState); @@ -837,6 +955,14 @@ export class CodexAcpServer implements acp.Agent { }; } + if (this.closingSessions.has(params.sessionId)) { + return { + stopReason: "cancelled", + usage: this.buildPromptUsage(sessionState.lastTokenUsage), + _meta: this.buildQuotaMeta(sessionState), + }; + } + const modelId = ModelId.fromString(sessionState.currentModelId); const modelLacksReasoning = sessionState.supportedReasoningEfforts.length > 0 && sessionState.supportedReasoningEfforts.every(e => e.reasoningEffort === "none"); @@ -857,21 +983,47 @@ export class CodexAcpServer implements acp.Agent { sessionState.fastModeEnabled, sessionState.currentModelSupportsFast, ); + pendingTurnStart = new Promise((resolve) => { + resolvePendingTurnStart = resolve; + }); + this.pendingTurnStarts.set(params.sessionId, pendingTurnStart); const turnCompleted = await this.runWithProcessCheck( - () => this.codexAcpClient.sendPrompt(params, agentMode, modelId, serviceTier, disableSummary, sessionState.cwd)); + () => this.codexAcpClient.sendPrompt( + params, + agentMode, + modelId, + serviceTier, + disableSummary, + sessionState.cwd, + (turnId) => { + sessionState.currentTurnId = turnId; + resolvePendingTurnStart(turnId); + }, + () => this.closingSessions.has(params.sessionId), + )); + + if (turnCompleted === null) { + return { + stopReason: "cancelled", + usage: this.buildPromptUsage(sessionState.lastTokenUsage), + _meta: this.buildQuotaMeta(sessionState), + }; + } // Check if turn was interrupted (cancelled) if (turnCompleted.turn.status === "interrupted") { - await this.connection.sessionUpdate({ - sessionId: params.sessionId, - update: { - sessionUpdate: "agent_message_chunk", - content: { - type: "text", - text: "*Conversation interrupted*" + if (!this.closingSessions.has(params.sessionId) && this.sessions.has(params.sessionId)) { + await this.connection.sessionUpdate({ + sessionId: params.sessionId, + update: { + sessionUpdate: "agent_message_chunk", + content: { + type: "text", + text: "*Conversation interrupted*" + } } - } - }); + }); + } return { stopReason: "cancelled", usage: this.buildPromptUsage(sessionState.lastTokenUsage), @@ -896,6 +1048,11 @@ export class CodexAcpServer implements acp.Agent { } finally { logger.log("Prompt completed", {sessionId: params.sessionId}); sessionState.currentTurnId = null; + if (pendingTurnStart !== null && this.pendingTurnStarts.get(params.sessionId) === pendingTurnStart) { + this.pendingTurnStarts.delete(params.sessionId); + } + resolvePendingTurnStart(null); + completeActivePrompt(); } } @@ -948,28 +1105,8 @@ export class CodexAcpServer implements acp.Agent { return; } - if (!sessionState.currentTurnId) { - logger.log("Cancel request rejected: no current turn", {sessionId: params.sessionId}); - return; - } - - logger.log("Cancel session requested", { - sessionId: params.sessionId, - currentTurnId: sessionState.currentTurnId - }); - try { - // After turnInterrupt(), Codex will send turn/completed event, which will naturally complete awaitTurnCompleted() - await this.codexAcpClient.turnInterrupt({ - threadId: params.sessionId, - turnId: sessionState.currentTurnId - }); - logger.log("Cancel - turnInterrupt succeeded", { - sessionId: params.sessionId, - currentTurnId: sessionState.currentTurnId - }); - } catch (err) { - logger.error(`Cancel - turnInterrupt failed`, err); - } + // After turnInterrupt(), Codex will send turn/completed, which naturally completes awaitTurnCompleted(). + await this.interruptSessionTurn(sessionState, "Cancel", false); } } diff --git a/src/CodexAppServerClient.ts b/src/CodexAppServerClient.ts index 0d2d0832..f7e2da49 100644 --- a/src/CodexAppServerClient.ts +++ b/src/CodexAppServerClient.ts @@ -33,6 +33,8 @@ import type { ThreadResumeResponse, ThreadStartParams, ThreadStartResponse, + ThreadUnsubscribeParams, + ThreadUnsubscribeResponse, TurnCompletedNotification, TurnInterruptParams, TurnInterruptResponse, @@ -151,6 +153,12 @@ export class CodexAppServerClient { this.elicitationHandlers.set(threadId, handler); } + clearThreadHandlers(threadId: string): void { + this.notificationHandlers.delete(threadId); + this.approvalHandlers.delete(threadId); + this.elicitationHandlers.delete(threadId); + } + async initialize(params: InitializeParams): Promise { return await this.sendRequest({ method: "initialize", params: params }); } @@ -159,7 +167,7 @@ export class CodexAppServerClient { return await this.sendRequest({ method: "turn/start", params: params }); } - async runTurn(params: TurnStartParams): Promise { + async runTurn(params: TurnStartParams, onTurnStarted?: (turnId: string) => void): Promise { const capturedCompletions: Array = []; const releaseCapture = this.captureTurnCompletions(params.threadId, (event) => { capturedCompletions.push(event); @@ -167,6 +175,7 @@ export class CodexAppServerClient { try { const turnStarted = await this.turnStart(params); + onTurnStarted?.(turnStarted.turn.id); const earlyCompletion = capturedCompletions.find(event => event.turn.id === turnStarted.turn.id); releaseCapture(); if (earlyCompletion) { @@ -204,6 +213,10 @@ export class CodexAppServerClient { return await this.sendRequest({ method: "thread/read", params: params }); } + async threadUnsubscribe(params: ThreadUnsubscribeParams): Promise { + return await this.sendRequest({ method: "thread/unsubscribe", params: params }); + } + async listMcpServerStatus(params: ListMcpServerStatusParams): Promise { return await this.sendRequest({ method: "mcpServerStatus/list", params }); } @@ -256,6 +269,21 @@ export class CodexAppServerClient { }); } + resolveTurnInterrupted(threadId: string, turnId: string): void { + this.recordTurnCompleted({ + threadId, + turn: { + id: turnId, + items: [], + status: "interrupted", + error: null, + startedAt: null, + completedAt: null, + durationMs: null, + }, + }); + } + async listModels(params: ModelListParams): Promise { return await this.sendRequest({ method: "model/list", params }); } diff --git a/src/__tests__/CodexACPAgent/data/session-close-active-turn.json b/src/__tests__/CodexACPAgent/data/session-close-active-turn.json new file mode 100644 index 00000000..c427fc06 --- /dev/null +++ b/src/__tests__/CodexACPAgent/data/session-close-active-turn.json @@ -0,0 +1,21 @@ +{ + "eventType": "request", + "method": "turn/interrupt", + "params": { + "threadId": "session-id", + "turnId": "turn-id" + } +} +{ + "eventType": "response" +} +{ + "eventType": "request", + "method": "thread/unsubscribe", + "params": { + "threadId": "session-id" + } +} +{ + "eventType": "response" +} \ No newline at end of file diff --git a/src/__tests__/CodexACPAgent/data/session-close-idle.json b/src/__tests__/CodexACPAgent/data/session-close-idle.json new file mode 100644 index 00000000..1cdd0eb9 --- /dev/null +++ b/src/__tests__/CodexACPAgent/data/session-close-idle.json @@ -0,0 +1,10 @@ +{ + "eventType": "request", + "method": "thread/unsubscribe", + "params": { + "threadId": "session-id" + } +} +{ + "eventType": "response" +} \ No newline at end of file diff --git a/src/__tests__/CodexACPAgent/initialize.test.ts b/src/__tests__/CodexACPAgent/initialize.test.ts index bf6e0604..0d6e3af2 100644 --- a/src/__tests__/CodexACPAgent/initialize.test.ts +++ b/src/__tests__/CodexACPAgent/initialize.test.ts @@ -49,6 +49,7 @@ describe('CodexACPAgent - initialize', () => { sessionCapabilities: { resume: {}, list: {}, + close: {}, }, mcpCapabilities: { acp: false, diff --git a/src/__tests__/CodexACPAgent/session-close.test.ts b/src/__tests__/CodexACPAgent/session-close.test.ts new file mode 100644 index 00000000..a808ad1a --- /dev/null +++ b/src/__tests__/CodexACPAgent/session-close.test.ts @@ -0,0 +1,144 @@ +import {describe, expect, it, vi} from "vitest"; +import { + createCodexMockTestFixture, + createTestModel, + mockPromptTurn, + type CodexMockTestFixture, +} from "../acp-test-utils"; +import type {CodexAcpServer} from "../../CodexAcpServer"; +import type {CodexAcpClient} from "../../CodexAcpClient"; +import type {TurnStartResponse} from "../../app-server/v2"; + +const sessionId = "session-id"; + +describe("ACP session close", () => { + it("advertises session close support", async () => { + const fixture = createCodexMockTestFixture(); + + const response = await fixture.getCodexAcpAgent().initialize({protocolVersion: 1}); + + expect(response.agentCapabilities?.sessionCapabilities?.close).toEqual({}); + }); + + it("unsubscribes idle sessions and clears local session handlers", async () => { + const {fixture, codexAcpAgent} = await createSession(); + + mockPromptTurn(fixture, sessionId); + await codexAcpAgent.prompt({ + sessionId, + prompt: [{type: "text", text: "register session handlers"}], + }); + + fixture.clearCodexConnectionDump(); + fixture.clearAcpConnectionDump(); + + await codexAcpAgent.closeSession({sessionId}); + + await expect(fixture.getCodexConnectionDump([])).toMatchFileSnapshot("data/session-close-idle.json"); + expect(() => codexAcpAgent.getSessionState(sessionId)).toThrow(`Session ${sessionId} not found`); + + fixture.sendServerNotification({ + method: "thread/name/updated", + params: { + threadId: sessionId, + threadName: "Ignored after close", + }, + }); + await waitForMicrotasks(); + + expect(fixture.getAcpConnectionEvents([])).toEqual([]); + }); + + it("interrupts active turns before unsubscribing", async () => { + const {fixture, codexAcpAgent} = await createSession(); + codexAcpAgent.getSessionState(sessionId).currentTurnId = "turn-id"; + + await codexAcpAgent.closeSession({sessionId}); + + await expect(fixture.getCodexConnectionDump([])).toMatchFileSnapshot("data/session-close-active-turn.json"); + expect(() => codexAcpAgent.getSessionState(sessionId)).toThrow(`Session ${sessionId} not found`); + }); + + it("cancels a prompt when close races with turn start", async () => { + const {fixture, codexAcpAgent} = await createSession(); + const turnStart = deferred(); + const turnStartCalled = deferred(); + + vi.spyOn(fixture.getCodexAppServerClient(), "turnStart").mockImplementation(async () => { + turnStartCalled.resolve(); + return await turnStart.promise; + }); + + const promptPromise = codexAcpAgent.prompt({ + sessionId, + prompt: [{type: "text", text: "long running prompt"}], + }); + await turnStartCalled.promise; + fixture.clearCodexConnectionDump(); + + const closePromise = codexAcpAgent.closeSession({sessionId}); + turnStart.resolve(createTurnStartResponse("turn-id")); + + await expect(promptPromise).resolves.toMatchObject({stopReason: "cancelled"}); + await closePromise; + + const requestMethods = fixture.getCodexConnectionEvents([]) + .flatMap(event => event.eventType === "request" ? [event.method] : []); + expect(requestMethods).toEqual(["turn/interrupt", "thread/unsubscribe"]); + expect(fixture.getAcpConnectionDump([])).not.toContain("Conversation interrupted"); + expect(() => codexAcpAgent.getSessionState(sessionId)).toThrow(`Session ${sessionId} not found`); + }); +}); + +async function createSession(): Promise<{ + fixture: CodexMockTestFixture, + codexAcpAgent: CodexAcpServer, + codexAcpClient: CodexAcpClient, +}> { + const fixture = createCodexMockTestFixture(); + const codexAcpAgent = fixture.getCodexAcpAgent(); + const codexAcpClient = fixture.getCodexAcpClient(); + const model = createTestModel(); + + vi.spyOn(codexAcpClient, "authRequired").mockResolvedValue(false); + vi.spyOn(codexAcpClient, "getAccount").mockResolvedValue({account: null, requiresOpenaiAuth: false}); + vi.spyOn(codexAcpClient, "listSkills").mockResolvedValue({data: []}); + vi.spyOn(codexAcpClient, "newSession").mockResolvedValue({ + sessionId, + currentModelId: "model-id[medium]", + models: [model], + currentServiceTier: null, + }); + + await codexAcpAgent.newSession({cwd: "/test/cwd", mcpServers: []}); + fixture.clearCodexConnectionDump(); + fixture.clearAcpConnectionDump(); + + return {fixture, codexAcpAgent, codexAcpClient}; +} + +function createTurnStartResponse(turnId: string): TurnStartResponse { + return { + turn: { + id: turnId, + items: [], + status: "inProgress", + error: null, + startedAt: null, + completedAt: null, + durationMs: null, + }, + }; +} + +function deferred(): {promise: Promise, resolve: (value: T) => void} { + let resolve: (value: T) => void = () => {}; + const promise = new Promise((innerResolve) => { + resolve = innerResolve; + }); + return {promise, resolve}; +} + +async function waitForMicrotasks(): Promise { + await new Promise(resolve => setTimeout(resolve, 10)); +} From ce9ed546c0530c8fc4d9c356daaef937092b4fa5 Mon Sep 17 00:00:00 2001 From: Ben Brandt Date: Fri, 5 Jun 2026 11:25:14 +0200 Subject: [PATCH 2/8] Resolve interrupted turns after failed interrupts --- src/CodexAcpServer.ts | 13 ++++---- .../data/session-close-interrupt-failed.json | 10 +++++++ .../CodexACPAgent/session-close.test.ts | 30 +++++++++++++++++++ 3 files changed, 47 insertions(+), 6 deletions(-) create mode 100644 src/__tests__/CodexACPAgent/data/session-close-interrupt-failed.json diff --git a/src/CodexAcpServer.ts b/src/CodexAcpServer.ts index 5443e3ba..da688e84 100644 --- a/src/CodexAcpServer.ts +++ b/src/CodexAcpServer.ts @@ -886,18 +886,19 @@ export class CodexAcpServer implements acp.Agent { threadId: sessionState.sessionId, turnId, })); - if (resolveInterruptedTurn) { - this.codexAcpClient.resolveTurnInterrupted({ - threadId: sessionState.sessionId, - turnId, - }); - } logger.log(`${requestName} - turnInterrupt succeeded`, { sessionId: sessionState.sessionId, currentTurnId: turnId, }); } catch (err) { logger.error(`${requestName} - turnInterrupt failed`, err); + } finally { + if (resolveInterruptedTurn) { + this.codexAcpClient.resolveTurnInterrupted({ + threadId: sessionState.sessionId, + turnId, + }); + } } } diff --git a/src/__tests__/CodexACPAgent/data/session-close-interrupt-failed.json b/src/__tests__/CodexACPAgent/data/session-close-interrupt-failed.json new file mode 100644 index 00000000..1cdd0eb9 --- /dev/null +++ b/src/__tests__/CodexACPAgent/data/session-close-interrupt-failed.json @@ -0,0 +1,10 @@ +{ + "eventType": "request", + "method": "thread/unsubscribe", + "params": { + "threadId": "session-id" + } +} +{ + "eventType": "response" +} \ No newline at end of file diff --git a/src/__tests__/CodexACPAgent/session-close.test.ts b/src/__tests__/CodexACPAgent/session-close.test.ts index a808ad1a..dd8e5ab4 100644 --- a/src/__tests__/CodexACPAgent/session-close.test.ts +++ b/src/__tests__/CodexACPAgent/session-close.test.ts @@ -88,6 +88,36 @@ describe("ACP session close", () => { expect(fixture.getAcpConnectionDump([])).not.toContain("Conversation interrupted"); expect(() => codexAcpAgent.getSessionState(sessionId)).toThrow(`Session ${sessionId} not found`); }); + + it("does not hang when close interrupt fails during an active prompt", async () => { + const {fixture, codexAcpAgent} = await createSession(); + const turnInterruptSpy = vi.spyOn(fixture.getCodexAcpClient(), "turnInterrupt") + .mockRejectedValue(new Error("interrupt failed")); + + vi.spyOn(fixture.getCodexAppServerClient(), "turnStart").mockResolvedValue(createTurnStartResponse("turn-id")); + + const promptPromise = codexAcpAgent.prompt({ + sessionId, + prompt: [{type: "text", text: "long running prompt"}], + }); + + await vi.waitFor(() => { + expect(codexAcpAgent.getSessionState(sessionId).currentTurnId).toBe("turn-id"); + }); + fixture.clearCodexConnectionDump(); + + await expect(codexAcpAgent.closeSession({sessionId})).resolves.toEqual({}); + await expect(promptPromise).resolves.toMatchObject({stopReason: "cancelled"}); + + expect(turnInterruptSpy).toHaveBeenCalledWith({ + threadId: sessionId, + turnId: "turn-id", + }); + await expect(fixture.getCodexConnectionDump([])).toMatchFileSnapshot( + "data/session-close-interrupt-failed.json" + ); + expect(() => codexAcpAgent.getSessionState(sessionId)).toThrow(`Session ${sessionId} not found`); + }); }); async function createSession(): Promise<{ From a0d8580e4310f3f7d39bd76de6febc6cc98b43a9 Mon Sep 17 00:00:00 2001 From: Ben Brandt Date: Fri, 5 Jun 2026 11:35:56 +0200 Subject: [PATCH 3/8] Suppress MCP startup updates during session close --- src/CodexAcpServer.ts | 8 ++- .../CodexACPAgent/session-close.test.ts | 55 ++++++++++++++++++- 2 files changed, 59 insertions(+), 4 deletions(-) diff --git a/src/CodexAcpServer.ts b/src/CodexAcpServer.ts index da688e84..5ca0f651 100644 --- a/src/CodexAcpServer.ts +++ b/src/CodexAcpServer.ts @@ -815,14 +815,18 @@ export class CodexAcpServer implements acp.Agent { pendingStartup.afterVersion, ) ); - if (!this.sessions.has(sessionId)) { + if (!this.sessions.has(sessionId) + || this.closingSessions.has(sessionId) + || this.pendingMcpStartupSessions.get(sessionId) !== pendingStartup) { return; } await this.publishMcpStartupStatus(sessionId, mcpStartup, pendingStartup.requestedServers); } catch (err) { logger.error(`Failed to publish MCP startup status for session ${sessionId}`, err); } finally { - this.pendingMcpStartupSessions.delete(sessionId); + if (this.pendingMcpStartupSessions.get(sessionId) === pendingStartup) { + this.pendingMcpStartupSessions.delete(sessionId); + } } } diff --git a/src/__tests__/CodexACPAgent/session-close.test.ts b/src/__tests__/CodexACPAgent/session-close.test.ts index dd8e5ab4..f6e067e7 100644 --- a/src/__tests__/CodexACPAgent/session-close.test.ts +++ b/src/__tests__/CodexACPAgent/session-close.test.ts @@ -7,7 +7,9 @@ import { } from "../acp-test-utils"; import type {CodexAcpServer} from "../../CodexAcpServer"; import type {CodexAcpClient} from "../../CodexAcpClient"; +import type {McpStartupResult} from "../../CodexAppServerClient"; import type {TurnStartResponse} from "../../app-server/v2"; +import type {McpServer} from "@agentclientprotocol/sdk"; const sessionId = "session-id"; @@ -118,9 +120,56 @@ describe("ACP session close", () => { ); expect(() => codexAcpAgent.getSessionState(sessionId)).toThrow(`Session ${sessionId} not found`); }); + + it("suppresses MCP startup updates while close is in progress", async () => { + const mcpStartup = deferred(); + const mcpServer: McpServer = { + name: "broken-mcp", + command: "npx", + args: ["broken"], + env: [], + }; + const {fixture, codexAcpAgent, codexAcpClient} = await createSession({ + mcpServers: [mcpServer], + configure: ({codexAcpClient}) => { + vi.spyOn(codexAcpClient, "awaitMcpServerStartup").mockReturnValue(mcpStartup.promise); + }, + }); + const unsubscribe = deferred(); + vi.spyOn(codexAcpClient, "closeSession").mockReturnValue(unsubscribe.promise); + + await vi.waitFor(() => { + expect(codexAcpClient.awaitMcpServerStartup).toHaveBeenCalledWith(["broken-mcp"], expect.any(Number)); + }); + fixture.clearAcpConnectionDump(); + + const closePromise = codexAcpAgent.closeSession({sessionId}); + await vi.waitFor(() => { + expect(codexAcpClient.closeSession).toHaveBeenCalledWith(sessionId); + }); + + mcpStartup.resolve({ + ready: [], + failed: [{server: "broken-mcp", error: "boom"}], + cancelled: [], + }); + await waitForMicrotasks(); + + expect(fixture.getAcpConnectionEvents([])).toEqual([]); + + unsubscribe.resolve(undefined); + await closePromise; + }); }); -async function createSession(): Promise<{ +async function createSession(options: { + mcpServers?: McpServer[], + configure?: (params: { + fixture: CodexMockTestFixture, + codexAcpAgent: CodexAcpServer, + codexAcpClient: CodexAcpClient, + }) => void, +} = {}): Promise<{ fixture: CodexMockTestFixture, codexAcpAgent: CodexAcpServer, codexAcpClient: CodexAcpClient, @@ -140,7 +189,9 @@ async function createSession(): Promise<{ currentServiceTier: null, }); - await codexAcpAgent.newSession({cwd: "/test/cwd", mcpServers: []}); + options.configure?.({fixture, codexAcpAgent, codexAcpClient}); + + await codexAcpAgent.newSession({cwd: "/test/cwd", mcpServers: options.mcpServers ?? []}); fixture.clearCodexConnectionDump(); fixture.clearAcpConnectionDump(); From 197a75218bf3e46eb55cc24ae8a4424831f2bf5f Mon Sep 17 00:00:00 2001 From: Ben Brandt Date: Fri, 5 Jun 2026 11:53:10 +0200 Subject: [PATCH 4/8] Handle session close races cleanly --- src/CodexAcpServer.ts | 171 ++++++++++++++---- .../CodexACPAgent/session-close.test.ts | 45 ++++- 2 files changed, 178 insertions(+), 38 deletions(-) diff --git a/src/CodexAcpServer.ts b/src/CodexAcpServer.ts index 5ca0f651..4190dcfc 100644 --- a/src/CodexAcpServer.ts +++ b/src/CodexAcpServer.ts @@ -65,6 +65,18 @@ interface PendingMcpStartupSession { afterVersion: number; } +interface PendingTurnStart { + promise: Promise; + resolve: (turnId: string | null) => void; +} + +interface ActivePrompt { + completion: Promise; + closeSignal: Promise; + requestClose: () => void; + complete: () => void; +} + export class CodexAcpServer implements acp.Agent { private static readonly MODEL_NAME_TOKEN_OVERRIDES: Record = { gpt: "GPT", @@ -80,9 +92,10 @@ export class CodexAcpServer implements acp.Agent { private readonly sessions: Map; private readonly pendingMcpStartupSessions: Map; - private readonly pendingTurnStarts: Map>; - private readonly activePromptCompletions: Map>; + private readonly pendingTurnStarts: Map; + private readonly activePrompts: Map; private readonly closingSessions: Set; + private readonly sessionGenerations: Map; constructor( connection: acp.AgentSideConnection, @@ -93,8 +106,9 @@ export class CodexAcpServer implements acp.Agent { this.sessions = new Map(); this.pendingMcpStartupSessions = new Map(); this.pendingTurnStarts = new Map(); - this.activePromptCompletions = new Map(); + this.activePrompts = new Map(); this.closingSessions = new Set(); + this.sessionGenerations = new Map(); this.connection = connection; this.codexAcpClient = codexAcpClient; this.defaultAuthRequest = defaultAuthRequest ?? null; @@ -191,7 +205,34 @@ export class CodexAcpServer implements acp.Agent { } } + private beginSessionOpen(sessionId: string): number { + const generation = this.getSessionGeneration(sessionId); + if (this.closingSessions.has(sessionId)) { + throw RequestError.invalidRequest(`Session ${sessionId} is closing`); + } + return generation; + } + + private assertSessionOpenCanInstall(sessionId: string, generation: number): void { + if (this.closingSessions.has(sessionId) || this.getSessionGeneration(sessionId) !== generation) { + throw RequestError.invalidRequest(`Session ${sessionId} is closing`); + } + } + + private getSessionGeneration(sessionId: string): number { + return this.sessionGenerations.get(sessionId) ?? 0; + } + + private bumpSessionGeneration(sessionId: string): number { + const generation = this.getSessionGeneration(sessionId) + 1; + this.sessionGenerations.set(sessionId, generation); + return generation; + } + async tryCreateSession(request: acp.NewSessionRequest | acp.ResumeSessionRequest): Promise<[SessionId, SessionModelState, SessionModeState]> { + const requestedSessionGeneration = "sessionId" in request + ? this.beginSessionOpen(request.sessionId) + : null; await this.checkAuthorization(); const requestedMcpServers = request.mcpServers ?? []; const mcpServerStartupVersion = requestedMcpServers.length > 0 @@ -209,6 +250,8 @@ export class CodexAcpServer implements acp.Agent { const account = await this.getActiveAccount(); const {sessionId, currentModelId, models} = sessionMetadata; + const sessionGeneration = requestedSessionGeneration ?? this.beginSessionOpen(sessionId); + this.assertSessionOpenCanInstall(sessionId, sessionGeneration); const sessionMcpServers = this.resolveSessionMcpServers(requestedMcpServers, "sessionId" in request); const currentModel = this.findCurrentModel(models, currentModelId); const currentModelSupportsFast = modelSupportsFast(currentModel); @@ -301,6 +344,7 @@ export class CodexAcpServer implements acp.Agent { async closeSession(params: acp.CloseSessionRequest): Promise { logger.log("Closing session...", {sessionId: params.sessionId}); + const closeGeneration = this.bumpSessionGeneration(params.sessionId); const sessionState = this.sessions.get(params.sessionId); this.closingSessions.add(params.sessionId); @@ -311,19 +355,22 @@ export class CodexAcpServer implements acp.Agent { logger.log("Close request received for unknown local session", {sessionId: params.sessionId}); } - const activePromptCompletion = this.activePromptCompletions.get(params.sessionId); - if (activePromptCompletion) { - await activePromptCompletion; + const activePrompt = this.activePrompts.get(params.sessionId); + if (activePrompt) { + activePrompt.requestClose(); + await activePrompt.completion; } await this.runWithProcessCheck(() => this.codexAcpClient.closeSession(params.sessionId)); logger.log("Session closed", {sessionId: params.sessionId}); } finally { - this.sessions.delete(params.sessionId); - this.pendingMcpStartupSessions.delete(params.sessionId); - this.pendingTurnStarts.delete(params.sessionId); - this.activePromptCompletions.delete(params.sessionId); - this.closingSessions.delete(params.sessionId); + if (this.getSessionGeneration(params.sessionId) === closeGeneration) { + this.sessions.delete(params.sessionId); + this.pendingMcpStartupSessions.delete(params.sessionId); + this.pendingTurnStarts.delete(params.sessionId); + this.activePrompts.delete(params.sessionId); + this.closingSessions.delete(params.sessionId); + } } return {}; @@ -493,6 +540,7 @@ export class CodexAcpServer implements acp.Agent { modeState: SessionModeState; thread: Thread; }> { + const requestedSessionGeneration = this.beginSessionOpen(request.sessionId); await this.checkAuthorization(); const requestedMcpServers = request.mcpServers ?? []; const mcpServerStartupVersion = requestedMcpServers.length > 0 @@ -506,6 +554,7 @@ export class CodexAcpServer implements acp.Agent { const account = await this.getActiveAccount(); const {sessionId, currentModelId, models, thread} = sessionMetadata; + this.assertSessionOpenCanInstall(sessionId, requestedSessionGeneration); const sessionMcpServers = this.resolveSessionMcpServers(requestedMcpServers, true); const currentModel = this.findCurrentModel(models, currentModelId); const currentModelSupportsFast = modelSupportsFast(currentModel); @@ -851,24 +900,64 @@ export class CodexAcpServer implements acp.Agent { } } - private trackActivePrompt(sessionId: string): () => void { + private trackActivePrompt(sessionId: string): ActivePrompt { let resolveCompletion: () => void = () => {}; const completion = new Promise((resolve) => { resolveCompletion = resolve; }); - this.activePromptCompletions.set(sessionId, completion); + let resolveCloseSignal: (value: null) => void = () => {}; + const closeSignal = new Promise((resolve) => { + resolveCloseSignal = resolve; + }); let completed = false; - return () => { - if (completed) { - return; - } - completed = true; - if (this.activePromptCompletions.get(sessionId) === completion) { - this.activePromptCompletions.delete(sessionId); - } - resolveCompletion(); + let closeRequested = false; + const activePrompt: ActivePrompt = { + completion, + closeSignal, + requestClose: () => { + if (closeRequested) { + return; + } + closeRequested = true; + resolveCloseSignal(null); + }, + complete: () => { + if (completed) { + return; + } + completed = true; + if (this.activePrompts.get(sessionId) === activePrompt) { + this.activePrompts.delete(sessionId); + } + resolveCompletion(); + }, }; + + this.activePrompts.set(sessionId, activePrompt); + return activePrompt; + } + + private createPendingTurnStart(): PendingTurnStart { + let resolve: (turnId: string | null) => void = () => {}; + const promise = new Promise((innerResolve) => { + resolve = innerResolve; + }); + return {promise, resolve}; + } + + private interruptLateStartedTurn(sessionId: string, turnId: string): void { + void this.runWithProcessCheck(() => this.codexAcpClient.turnInterrupt({ + threadId: sessionId, + turnId, + })).catch((err) => { + logger.error(`Close - late turnInterrupt failed`, err); + }).finally(() => { + this.codexAcpClient.resolveTurnInterrupted({ + threadId: sessionId, + turnId, + }); + }); } private async interruptSessionTurn( @@ -920,7 +1009,12 @@ export class CodexAcpServer implements acp.Agent { return null; } - const turnId = await pendingTurnStart; + if (requestName === "Close") { + pendingTurnStart.resolve(null); + return null; + } + + const turnId = await pendingTurnStart.promise; if (!turnId) { logger.log(`${requestName} request rejected: no current turn`, {sessionId: sessionState.sessionId}); } @@ -935,9 +1029,8 @@ export class CodexAcpServer implements acp.Agent { const sessionState = this.getSessionState(params.sessionId); sessionState.currentTurnId = null; sessionState.lastTokenUsage = null; - const completeActivePrompt = this.trackActivePrompt(params.sessionId); - let pendingTurnStart: Promise | null = null; - let resolvePendingTurnStart: (turnId: string | null) => void = () => {}; + const activePrompt = this.trackActivePrompt(params.sessionId); + let pendingTurnStart: PendingTurnStart | null = null; try { const eventHandler = new CodexEventHandler(this.connection, sessionState); @@ -988,11 +1081,9 @@ export class CodexAcpServer implements acp.Agent { sessionState.fastModeEnabled, sessionState.currentModelSupportsFast, ); - pendingTurnStart = new Promise((resolve) => { - resolvePendingTurnStart = resolve; - }); + pendingTurnStart = this.createPendingTurnStart(); this.pendingTurnStarts.set(params.sessionId, pendingTurnStart); - const turnCompleted = await this.runWithProcessCheck( + const sendPromptPromise = this.runWithProcessCheck( () => this.codexAcpClient.sendPrompt( params, agentMode, @@ -1001,11 +1092,25 @@ export class CodexAcpServer implements acp.Agent { disableSummary, sessionState.cwd, (turnId) => { + if (this.activePrompts.get(params.sessionId) !== activePrompt + || this.closingSessions.has(params.sessionId)) { + this.interruptLateStartedTurn(params.sessionId, turnId); + return; + } sessionState.currentTurnId = turnId; - resolvePendingTurnStart(turnId); + pendingTurnStart?.resolve(turnId); }, () => this.closingSessions.has(params.sessionId), )); + void sendPromptPromise.catch((err) => { + if (this.activePrompts.get(params.sessionId) !== activePrompt) { + logger.error(`Prompt for closed session ${params.sessionId} failed after close`, err); + } + }); + const turnCompleted = await Promise.race([ + sendPromptPromise, + activePrompt.closeSignal, + ]); if (turnCompleted === null) { return { @@ -1056,8 +1161,8 @@ export class CodexAcpServer implements acp.Agent { if (pendingTurnStart !== null && this.pendingTurnStarts.get(params.sessionId) === pendingTurnStart) { this.pendingTurnStarts.delete(params.sessionId); } - resolvePendingTurnStart(null); - completeActivePrompt(); + pendingTurnStart?.resolve(null); + activePrompt.complete(); } } diff --git a/src/__tests__/CodexACPAgent/session-close.test.ts b/src/__tests__/CodexACPAgent/session-close.test.ts index f6e067e7..cd139b7a 100644 --- a/src/__tests__/CodexACPAgent/session-close.test.ts +++ b/src/__tests__/CodexACPAgent/session-close.test.ts @@ -6,7 +6,7 @@ import { type CodexMockTestFixture, } from "../acp-test-utils"; import type {CodexAcpServer} from "../../CodexAcpServer"; -import type {CodexAcpClient} from "../../CodexAcpClient"; +import type {CodexAcpClient, SessionMetadata} from "../../CodexAcpClient"; import type {McpStartupResult} from "../../CodexAppServerClient"; import type {TurnStartResponse} from "../../app-server/v2"; import type {McpServer} from "@agentclientprotocol/sdk"; @@ -61,7 +61,7 @@ describe("ACP session close", () => { expect(() => codexAcpAgent.getSessionState(sessionId)).toThrow(`Session ${sessionId} not found`); }); - it("cancels a prompt when close races with turn start", async () => { + it("does not wait for delayed turn start before closing", async () => { const {fixture, codexAcpAgent} = await createSession(); const turnStart = deferred(); const turnStartCalled = deferred(); @@ -79,16 +79,24 @@ describe("ACP session close", () => { fixture.clearCodexConnectionDump(); const closePromise = codexAcpAgent.closeSession({sessionId}); - turnStart.resolve(createTurnStartResponse("turn-id")); + await expect(closePromise).resolves.toEqual({}); await expect(promptPromise).resolves.toMatchObject({stopReason: "cancelled"}); - await closePromise; const requestMethods = fixture.getCodexConnectionEvents([]) .flatMap(event => event.eventType === "request" ? [event.method] : []); - expect(requestMethods).toEqual(["turn/interrupt", "thread/unsubscribe"]); + expect(requestMethods).toEqual(["thread/unsubscribe"]); expect(fixture.getAcpConnectionDump([])).not.toContain("Conversation interrupted"); expect(() => codexAcpAgent.getSessionState(sessionId)).toThrow(`Session ${sessionId} not found`); + + fixture.clearCodexConnectionDump(); + turnStart.resolve(createTurnStartResponse("turn-id")); + + await vi.waitFor(() => { + const lateRequestMethods = fixture.getCodexConnectionEvents([]) + .flatMap(event => event.eventType === "request" ? [event.method] : []); + expect(lateRequestMethods).toContain("turn/interrupt"); + }); }); it("does not hang when close interrupt fails during an active prompt", async () => { @@ -160,6 +168,24 @@ describe("ACP session close", () => { unsubscribe.resolve(undefined); await closePromise; }); + + it("rejects an in-flight resume that completes after close", async () => { + const {codexAcpAgent, codexAcpClient} = await createSession(); + const resume = deferred(); + vi.spyOn(codexAcpClient, "resumeSession").mockReturnValue(resume.promise); + + const resumePromise = codexAcpAgent.resumeSession({ + sessionId, + cwd: "/test/cwd", + mcpServers: [], + }); + + await codexAcpAgent.closeSession({sessionId}); + resume.resolve(createSessionMetadata()); + + await expect(resumePromise).rejects.toThrow("Invalid request"); + expect(() => codexAcpAgent.getSessionState(sessionId)).toThrow(`Session ${sessionId} not found`); + }); }); async function createSession(options: { @@ -212,6 +238,15 @@ function createTurnStartResponse(turnId: string): TurnStartResponse { }; } +function createSessionMetadata(): SessionMetadata { + return { + sessionId, + currentModelId: "model-id[medium]", + models: [createTestModel()], + currentServiceTier: null, + }; +} + function deferred(): {promise: Promise, resolve: (value: T) => void} { let resolve: (value: T) => void = () => {}; const promise = new Promise((innerResolve) => { From 954ad80f51f4be2fa40749bee9bd5a32e970a8f5 Mon Sep 17 00:00:00 2001 From: Ben Brandt Date: Fri, 5 Jun 2026 12:11:25 +0200 Subject: [PATCH 5/8] Close stale session opens on generation mismatch --- src/CodexAcpServer.ts | 21 ++++++++++++++----- .../CodexACPAgent/session-close.test.ts | 3 +++ 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/src/CodexAcpServer.ts b/src/CodexAcpServer.ts index 4190dcfc..e8aac922 100644 --- a/src/CodexAcpServer.ts +++ b/src/CodexAcpServer.ts @@ -213,10 +213,17 @@ export class CodexAcpServer implements acp.Agent { return generation; } - private assertSessionOpenCanInstall(sessionId: string, generation: number): void { - if (this.closingSessions.has(sessionId) || this.getSessionGeneration(sessionId) !== generation) { - throw RequestError.invalidRequest(`Session ${sessionId} is closing`); + private sessionOpenCanInstall(sessionId: string, generation: number): boolean { + return !this.closingSessions.has(sessionId) && this.getSessionGeneration(sessionId) === generation; + } + + private async closeStaleSessionOpen(sessionId: string): Promise { + try { + await this.runWithProcessCheck(() => this.codexAcpClient.closeSession(sessionId)); + } catch (err) { + logger.error(`Failed to close stale session open for ${sessionId}`, err); } + throw RequestError.invalidRequest(`Session ${sessionId} is closing`); } private getSessionGeneration(sessionId: string): number { @@ -251,7 +258,9 @@ export class CodexAcpServer implements acp.Agent { const account = await this.getActiveAccount(); const {sessionId, currentModelId, models} = sessionMetadata; const sessionGeneration = requestedSessionGeneration ?? this.beginSessionOpen(sessionId); - this.assertSessionOpenCanInstall(sessionId, sessionGeneration); + if (!this.sessionOpenCanInstall(sessionId, sessionGeneration)) { + await this.closeStaleSessionOpen(sessionId); + } const sessionMcpServers = this.resolveSessionMcpServers(requestedMcpServers, "sessionId" in request); const currentModel = this.findCurrentModel(models, currentModelId); const currentModelSupportsFast = modelSupportsFast(currentModel); @@ -554,7 +563,9 @@ export class CodexAcpServer implements acp.Agent { const account = await this.getActiveAccount(); const {sessionId, currentModelId, models, thread} = sessionMetadata; - this.assertSessionOpenCanInstall(sessionId, requestedSessionGeneration); + if (!this.sessionOpenCanInstall(sessionId, requestedSessionGeneration)) { + await this.closeStaleSessionOpen(sessionId); + } const sessionMcpServers = this.resolveSessionMcpServers(requestedMcpServers, true); const currentModel = this.findCurrentModel(models, currentModelId); const currentModelSupportsFast = modelSupportsFast(currentModel); diff --git a/src/__tests__/CodexACPAgent/session-close.test.ts b/src/__tests__/CodexACPAgent/session-close.test.ts index cd139b7a..c87b2c10 100644 --- a/src/__tests__/CodexACPAgent/session-close.test.ts +++ b/src/__tests__/CodexACPAgent/session-close.test.ts @@ -173,6 +173,7 @@ describe("ACP session close", () => { const {codexAcpAgent, codexAcpClient} = await createSession(); const resume = deferred(); vi.spyOn(codexAcpClient, "resumeSession").mockReturnValue(resume.promise); + const closeSessionSpy = vi.spyOn(codexAcpClient, "closeSession"); const resumePromise = codexAcpAgent.resumeSession({ sessionId, @@ -184,6 +185,8 @@ describe("ACP session close", () => { resume.resolve(createSessionMetadata()); await expect(resumePromise).rejects.toThrow("Invalid request"); + expect(closeSessionSpy).toHaveBeenCalledTimes(2); + expect(closeSessionSpy).toHaveBeenLastCalledWith(sessionId); expect(() => codexAcpAgent.getSessionState(sessionId)).toThrow(`Session ${sessionId} not found`); }); }); From 9c7bdc6c16261a83e879a6b687e722fa18ed0bbc Mon Sep 17 00:00:00 2001 From: Ben Brandt Date: Fri, 5 Jun 2026 12:33:57 +0200 Subject: [PATCH 6/8] Prevent stale session cleanup races --- src/CodexAcpServer.ts | 28 ++++++---- .../CodexACPAgent/session-close.test.ts | 55 +++++++++++++++++++ 2 files changed, 73 insertions(+), 10 deletions(-) diff --git a/src/CodexAcpServer.ts b/src/CodexAcpServer.ts index e8aac922..4640a68b 100644 --- a/src/CodexAcpServer.ts +++ b/src/CodexAcpServer.ts @@ -96,6 +96,7 @@ export class CodexAcpServer implements acp.Agent { private readonly activePrompts: Map; private readonly closingSessions: Set; private readonly sessionGenerations: Map; + private readonly sessionOpenGenerations: Map; constructor( connection: acp.AgentSideConnection, @@ -109,6 +110,7 @@ export class CodexAcpServer implements acp.Agent { this.activePrompts = new Map(); this.closingSessions = new Set(); this.sessionGenerations = new Map(); + this.sessionOpenGenerations = new Map(); this.connection = connection; this.codexAcpClient = codexAcpClient; this.defaultAuthRequest = defaultAuthRequest ?? null; @@ -210,6 +212,7 @@ export class CodexAcpServer implements acp.Agent { if (this.closingSessions.has(sessionId)) { throw RequestError.invalidRequest(`Session ${sessionId} is closing`); } + this.sessionOpenGenerations.set(sessionId, generation); return generation; } @@ -217,11 +220,13 @@ export class CodexAcpServer implements acp.Agent { return !this.closingSessions.has(sessionId) && this.getSessionGeneration(sessionId) === generation; } - private async closeStaleSessionOpen(sessionId: string): Promise { - try { - await this.runWithProcessCheck(() => this.codexAcpClient.closeSession(sessionId)); - } catch (err) { - logger.error(`Failed to close stale session open for ${sessionId}`, err); + private async closeStaleSessionOpen(sessionId: string, generation: number): Promise { + if (this.sessionOpenGenerations.get(sessionId) === generation) { + try { + await this.runWithProcessCheck(() => this.codexAcpClient.closeSession(sessionId)); + } catch (err) { + logger.error(`Failed to close stale session open for ${sessionId}`, err); + } } throw RequestError.invalidRequest(`Session ${sessionId} is closing`); } @@ -259,7 +264,7 @@ export class CodexAcpServer implements acp.Agent { const {sessionId, currentModelId, models} = sessionMetadata; const sessionGeneration = requestedSessionGeneration ?? this.beginSessionOpen(sessionId); if (!this.sessionOpenCanInstall(sessionId, sessionGeneration)) { - await this.closeStaleSessionOpen(sessionId); + await this.closeStaleSessionOpen(sessionId, sessionGeneration); } const sessionMcpServers = this.resolveSessionMcpServers(requestedMcpServers, "sessionId" in request); const currentModel = this.findCurrentModel(models, currentModelId); @@ -564,7 +569,7 @@ export class CodexAcpServer implements acp.Agent { const account = await this.getActiveAccount(); const {sessionId, currentModelId, models, thread} = sessionMetadata; if (!this.sessionOpenCanInstall(sessionId, requestedSessionGeneration)) { - await this.closeStaleSessionOpen(sessionId); + await this.closeStaleSessionOpen(sessionId, requestedSessionGeneration); } const sessionMcpServers = this.resolveSessionMcpServers(requestedMcpServers, true); const currentModel = this.findCurrentModel(models, currentModelId); @@ -971,6 +976,10 @@ export class CodexAcpServer implements acp.Agent { }); } + private promptIsClosedOrStale(sessionId: string, activePrompt: ActivePrompt): boolean { + return this.activePrompts.get(sessionId) !== activePrompt || this.closingSessions.has(sessionId); + } + private async interruptSessionTurn( sessionState: SessionState, requestName: "Cancel" | "Close", @@ -1103,15 +1112,14 @@ export class CodexAcpServer implements acp.Agent { disableSummary, sessionState.cwd, (turnId) => { - if (this.activePrompts.get(params.sessionId) !== activePrompt - || this.closingSessions.has(params.sessionId)) { + if (this.promptIsClosedOrStale(params.sessionId, activePrompt)) { this.interruptLateStartedTurn(params.sessionId, turnId); return; } sessionState.currentTurnId = turnId; pendingTurnStart?.resolve(turnId); }, - () => this.closingSessions.has(params.sessionId), + () => this.promptIsClosedOrStale(params.sessionId, activePrompt), )); void sendPromptPromise.catch((err) => { if (this.activePrompts.get(params.sessionId) !== activePrompt) { diff --git a/src/__tests__/CodexACPAgent/session-close.test.ts b/src/__tests__/CodexACPAgent/session-close.test.ts index c87b2c10..43cd2a5c 100644 --- a/src/__tests__/CodexACPAgent/session-close.test.ts +++ b/src/__tests__/CodexACPAgent/session-close.test.ts @@ -99,6 +99,33 @@ describe("ACP session close", () => { }); }); + it("does not start a turn after close while prompt startup is still refreshing skills", async () => { + const {fixture, codexAcpAgent} = await createSession(); + const skillRefresh = deferred<{data: []}>(); + const listSkillsSpy = vi.spyOn(fixture.getCodexAppServerClient(), "listSkills") + .mockReturnValue(skillRefresh.promise); + const turnStartSpy = vi.spyOn(fixture.getCodexAppServerClient(), "turnStart") + .mockResolvedValue(createTurnStartResponse("turn-id")); + + const promptPromise = codexAcpAgent.prompt({ + sessionId, + prompt: [{type: "text", text: "prompt before turn start"}], + }); + + await vi.waitFor(() => { + expect(listSkillsSpy).toHaveBeenCalled(); + }); + + await expect(codexAcpAgent.closeSession({sessionId})).resolves.toEqual({}); + await expect(promptPromise).resolves.toMatchObject({stopReason: "cancelled"}); + + skillRefresh.resolve({data: []}); + await waitForMicrotasks(); + + expect(turnStartSpy).not.toHaveBeenCalled(); + expect(() => codexAcpAgent.getSessionState(sessionId)).toThrow(`Session ${sessionId} not found`); + }); + it("does not hang when close interrupt fails during an active prompt", async () => { const {fixture, codexAcpAgent} = await createSession(); const turnInterruptSpy = vi.spyOn(fixture.getCodexAcpClient(), "turnInterrupt") @@ -189,6 +216,34 @@ describe("ACP session close", () => { expect(closeSessionSpy).toHaveBeenLastCalledWith(sessionId); expect(() => codexAcpAgent.getSessionState(sessionId)).toThrow(`Session ${sessionId} not found`); }); + + it("does not let a stale resume unsubscribe a newer reopened session", async () => { + const {codexAcpAgent, codexAcpClient} = await createSession(); + const staleResume = deferred(); + vi.spyOn(codexAcpClient, "resumeSession") + .mockReturnValueOnce(staleResume.promise) + .mockResolvedValueOnce(createSessionMetadata()); + const closeSessionSpy = vi.spyOn(codexAcpClient, "closeSession"); + + const staleResumePromise = codexAcpAgent.resumeSession({ + sessionId, + cwd: "/test/cwd", + mcpServers: [], + }); + + await codexAcpAgent.closeSession({sessionId}); + await codexAcpAgent.resumeSession({ + sessionId, + cwd: "/test/cwd", + mcpServers: [], + }); + + staleResume.resolve(createSessionMetadata()); + + await expect(staleResumePromise).rejects.toThrow("Invalid request"); + expect(closeSessionSpy).toHaveBeenCalledTimes(1); + expect(codexAcpAgent.getSessionState(sessionId).sessionId).toBe(sessionId); + }); }); async function createSession(options: { From b5e6afb72bde2f2da38145ea12f5759835971683 Mon Sep 17 00:00:00 2001 From: Ben Brandt Date: Fri, 5 Jun 2026 12:53:28 +0200 Subject: [PATCH 7/8] Block session reopen during stale cleanup --- src/CodexAcpServer.ts | 6 +++ .../CodexACPAgent/session-close.test.ts | 45 +++++++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/src/CodexAcpServer.ts b/src/CodexAcpServer.ts index 4640a68b..d673b174 100644 --- a/src/CodexAcpServer.ts +++ b/src/CodexAcpServer.ts @@ -222,10 +222,16 @@ export class CodexAcpServer implements acp.Agent { private async closeStaleSessionOpen(sessionId: string, generation: number): Promise { if (this.sessionOpenGenerations.get(sessionId) === generation) { + const staleCloseGeneration = this.bumpSessionGeneration(sessionId); + this.closingSessions.add(sessionId); try { await this.runWithProcessCheck(() => this.codexAcpClient.closeSession(sessionId)); } catch (err) { logger.error(`Failed to close stale session open for ${sessionId}`, err); + } finally { + if (this.getSessionGeneration(sessionId) === staleCloseGeneration) { + this.closingSessions.delete(sessionId); + } } } throw RequestError.invalidRequest(`Session ${sessionId} is closing`); diff --git a/src/__tests__/CodexACPAgent/session-close.test.ts b/src/__tests__/CodexACPAgent/session-close.test.ts index 43cd2a5c..b55e4590 100644 --- a/src/__tests__/CodexACPAgent/session-close.test.ts +++ b/src/__tests__/CodexACPAgent/session-close.test.ts @@ -244,6 +244,51 @@ describe("ACP session close", () => { expect(closeSessionSpy).toHaveBeenCalledTimes(1); expect(codexAcpAgent.getSessionState(sessionId).sessionId).toBe(sessionId); }); + + it("blocks reopen while stale resume cleanup is unsubscribing", async () => { + const {codexAcpAgent, codexAcpClient} = await createSession(); + const staleResume = deferred(); + const staleUnsubscribe = deferred(); + const resumeSpy = vi.spyOn(codexAcpClient, "resumeSession") + .mockReturnValueOnce(staleResume.promise) + .mockResolvedValueOnce(createSessionMetadata()); + const closeSessionSpy = vi.spyOn(codexAcpClient, "closeSession") + .mockResolvedValueOnce() + .mockReturnValueOnce(staleUnsubscribe.promise); + + const staleResumePromise = codexAcpAgent.resumeSession({ + sessionId, + cwd: "/test/cwd", + mcpServers: [], + }); + + await codexAcpAgent.closeSession({sessionId}); + staleResume.resolve(createSessionMetadata()); + + await vi.waitFor(() => { + expect(closeSessionSpy).toHaveBeenCalledTimes(2); + }); + + await expect(codexAcpAgent.resumeSession({ + sessionId, + cwd: "/test/cwd", + mcpServers: [], + })).rejects.toThrow("Invalid request"); + expect(resumeSpy).toHaveBeenCalledTimes(1); + + staleUnsubscribe.resolve(undefined); + await expect(staleResumePromise).rejects.toThrow("Invalid request"); + + await expect(codexAcpAgent.resumeSession({ + sessionId, + cwd: "/test/cwd", + mcpServers: [], + })).resolves.toEqual(expect.objectContaining({ + models: expect.objectContaining({ + currentModelId: "model-id[medium]", + }), + })); + }); }); async function createSession(options: { From a4bef4e084afe4b14ee7b20f6f4f39557f748b06 Mon Sep 17 00:00:00 2001 From: Ben Brandt Date: Fri, 5 Jun 2026 13:15:51 +0200 Subject: [PATCH 8/8] Fence stale session and turn cleanup on close --- src/CodexAcpClient.ts | 10 +- src/CodexAcpServer.ts | 120 +++++++++++--- src/CodexAppServerClient.ts | 61 +++++++ .../CodexACPAgent/session-close.test.ts | 154 ++++++++++++++++++ 4 files changed, 321 insertions(+), 24 deletions(-) diff --git a/src/CodexAcpClient.ts b/src/CodexAcpClient.ts index 5757af53..0fc55a78 100644 --- a/src/CodexAcpClient.ts +++ b/src/CodexAcpClient.ts @@ -199,7 +199,7 @@ export class CodexAcpClient { return this.codexClient.accountRead({refreshToken: false}); } - async resumeSession(request: acp.ResumeSessionRequest): Promise { + async resumeSession(request: acp.ResumeSessionRequest, onSubscribed?: () => void): Promise { await this.refreshSkills(request.cwd, request._meta); const response = await this.codexClient.threadResume({ @@ -208,6 +208,7 @@ export class CodexAcpClient { modelProvider: this.getResumeModelProvider(), threadId: request.sessionId, }); + onSubscribed?.(); const codexModels = await this.fetchAvailableModels(); const currentModelId = this.createModelId(codexModels, response.model, response.reasoningEffort).toString(); return { @@ -218,13 +219,14 @@ export class CodexAcpClient { } } - async loadSession(request: acp.LoadSessionRequest): Promise { + async loadSession(request: acp.LoadSessionRequest, onSubscribed?: () => void): Promise { const response = await this.codexClient.threadResume({ config: await this.createSessionConfig(request.cwd, request.mcpServers ?? []), cwd: request.cwd, modelProvider: this.getResumeModelProvider(), threadId: request.sessionId, }); + onSubscribed?.(); const codexModels = await this.fetchAvailableModels(); const currentModelId = this.createModelId(codexModels, response.model, response.reasoningEffort).toString(); return { @@ -419,6 +421,10 @@ export class CodexAcpClient { this.codexClient.resolveTurnInterrupted(params.threadId, params.turnId); } + markTurnStale(params: { threadId: string, turnId: string }): void { + this.codexClient.markTurnStale(params.threadId, params.turnId); + } + async listSkills(params?: SkillsListParams): Promise { return this.codexClient.listSkills(params ?? {}); } diff --git a/src/CodexAcpServer.ts b/src/CodexAcpServer.ts index d673b174..a3d324c0 100644 --- a/src/CodexAcpServer.ts +++ b/src/CodexAcpServer.ts @@ -94,7 +94,7 @@ export class CodexAcpServer implements acp.Agent { private readonly pendingMcpStartupSessions: Map; private readonly pendingTurnStarts: Map; private readonly activePrompts: Map; - private readonly closingSessions: Set; + private readonly closingSessions: Map; private readonly sessionGenerations: Map; private readonly sessionOpenGenerations: Map; @@ -108,7 +108,7 @@ export class CodexAcpServer implements acp.Agent { this.pendingMcpStartupSessions = new Map(); this.pendingTurnStarts = new Map(); this.activePrompts = new Map(); - this.closingSessions = new Set(); + this.closingSessions = new Map(); this.sessionGenerations = new Map(); this.sessionOpenGenerations = new Map(); this.connection = connection; @@ -209,7 +209,7 @@ export class CodexAcpServer implements acp.Agent { private beginSessionOpen(sessionId: string): number { const generation = this.getSessionGeneration(sessionId); - if (this.closingSessions.has(sessionId)) { + if (this.sessionIsClosing(sessionId)) { throw RequestError.invalidRequest(`Session ${sessionId} is closing`); } this.sessionOpenGenerations.set(sessionId, generation); @@ -217,26 +217,49 @@ export class CodexAcpServer implements acp.Agent { } private sessionOpenCanInstall(sessionId: string, generation: number): boolean { - return !this.closingSessions.has(sessionId) && this.getSessionGeneration(sessionId) === generation; + return !this.sessionIsClosing(sessionId) && this.getSessionGeneration(sessionId) === generation; } - private async closeStaleSessionOpen(sessionId: string, generation: number): Promise { + private async cleanupStaleSessionOpen(sessionId: string, generation: number): Promise { if (this.sessionOpenGenerations.get(sessionId) === generation) { - const staleCloseGeneration = this.bumpSessionGeneration(sessionId); - this.closingSessions.add(sessionId); + if (!this.sessionIsClosing(sessionId)) { + this.bumpSessionGeneration(sessionId); + } + this.beginSessionCloseFence(sessionId); try { await this.runWithProcessCheck(() => this.codexAcpClient.closeSession(sessionId)); } catch (err) { logger.error(`Failed to close stale session open for ${sessionId}`, err); } finally { - if (this.getSessionGeneration(sessionId) === staleCloseGeneration) { - this.closingSessions.delete(sessionId); - } + this.endSessionCloseFence(sessionId); } + return true; } + return false; + } + + private async closeStaleSessionOpen(sessionId: string, generation: number): Promise { + await this.cleanupStaleSessionOpen(sessionId, generation); throw RequestError.invalidRequest(`Session ${sessionId} is closing`); } + private sessionIsClosing(sessionId: string): boolean { + return (this.closingSessions.get(sessionId) ?? 0) > 0; + } + + private beginSessionCloseFence(sessionId: string): void { + this.closingSessions.set(sessionId, (this.closingSessions.get(sessionId) ?? 0) + 1); + } + + private endSessionCloseFence(sessionId: string): void { + const count = this.closingSessions.get(sessionId) ?? 0; + if (count <= 1) { + this.closingSessions.delete(sessionId); + return; + } + this.closingSessions.set(sessionId, count - 1); + } + private getSessionGeneration(sessionId: string): number { return this.sessionGenerations.get(sessionId) ?? 0; } @@ -258,18 +281,39 @@ export class CodexAcpServer implements acp.Agent { : null; let sessionMetadata: SessionMetadata; + let resumeSubscribed = false; if ("sessionId" in request) { logger.log(`Resume existing session: ${request.sessionId}...`) - sessionMetadata = await this.runWithProcessCheck(() => this.codexAcpClient.resumeSession(request)); + try { + sessionMetadata = await this.runWithProcessCheck(() => + this.codexAcpClient.resumeSession(request, () => { + resumeSubscribed = true; + }) + ); + } catch (err) { + if (resumeSubscribed && requestedSessionGeneration !== null) { + await this.cleanupStaleSessionOpen(request.sessionId, requestedSessionGeneration); + } + throw err; + } } else { logger.log(`Create new session...`) sessionMetadata = await this.runWithProcessCheck(() => this.codexAcpClient.newSession(request)); } - const account = await this.getActiveAccount(); const {sessionId, currentModelId, models} = sessionMetadata; + let account: Account | null; + try { + account = await this.getActiveAccount(); + } catch (err) { + if (resumeSubscribed && requestedSessionGeneration !== null) { + await this.cleanupStaleSessionOpen(sessionId, requestedSessionGeneration); + } + throw err; + } const sessionGeneration = requestedSessionGeneration ?? this.beginSessionOpen(sessionId); if (!this.sessionOpenCanInstall(sessionId, sessionGeneration)) { + resumeSubscribed = false; await this.closeStaleSessionOpen(sessionId, sessionGeneration); } const sessionMcpServers = this.resolveSessionMcpServers(requestedMcpServers, "sessionId" in request); @@ -293,6 +337,7 @@ export class CodexAcpServer implements acp.Agent { sessionMcpServers: sessionMcpServers, } this.sessions.set(sessionId, sessionState); + resumeSubscribed = false; if (requestedMcpServers.length > 0 && mcpServerStartupVersion !== null) { this.pendingMcpStartupSessions.set(sessionId, { @@ -366,7 +411,7 @@ export class CodexAcpServer implements acp.Agent { logger.log("Closing session...", {sessionId: params.sessionId}); const closeGeneration = this.bumpSessionGeneration(params.sessionId); const sessionState = this.sessions.get(params.sessionId); - this.closingSessions.add(params.sessionId); + this.beginSessionCloseFence(params.sessionId); try { if (sessionState) { @@ -389,8 +434,8 @@ export class CodexAcpServer implements acp.Agent { this.pendingMcpStartupSessions.delete(params.sessionId); this.pendingTurnStarts.delete(params.sessionId); this.activePrompts.delete(params.sessionId); - this.closingSessions.delete(params.sessionId); } + this.endSessionCloseFence(params.sessionId); } return {}; @@ -568,13 +613,33 @@ export class CodexAcpServer implements acp.Agent { : null; logger.log(`Load existing session: ${request.sessionId}...`); - const sessionMetadata: SessionMetadataWithThread = await this.runWithProcessCheck(() => - this.codexAcpClient.loadSession(request) - ); + let subscribed = false; + let sessionMetadata: SessionMetadataWithThread; + try { + sessionMetadata = await this.runWithProcessCheck(() => + this.codexAcpClient.loadSession(request, () => { + subscribed = true; + }) + ); + } catch (err) { + if (subscribed) { + await this.cleanupStaleSessionOpen(request.sessionId, requestedSessionGeneration); + } + throw err; + } - const account = await this.getActiveAccount(); const {sessionId, currentModelId, models, thread} = sessionMetadata; + let account: Account | null; + try { + account = await this.getActiveAccount(); + } catch (err) { + if (subscribed) { + await this.cleanupStaleSessionOpen(request.sessionId, requestedSessionGeneration); + } + throw err; + } if (!this.sessionOpenCanInstall(sessionId, requestedSessionGeneration)) { + subscribed = false; await this.closeStaleSessionOpen(sessionId, requestedSessionGeneration); } const sessionMcpServers = this.resolveSessionMcpServers(requestedMcpServers, true); @@ -598,6 +663,7 @@ export class CodexAcpServer implements acp.Agent { sessionMcpServers: sessionMcpServers, }; this.sessions.set(sessionId, sessionState); + subscribed = false; if (requestedMcpServers.length > 0 && mcpServerStartupVersion !== null) { this.pendingMcpStartupSessions.set(sessionId, { @@ -887,7 +953,7 @@ export class CodexAcpServer implements acp.Agent { ) ); if (!this.sessions.has(sessionId) - || this.closingSessions.has(sessionId) + || this.sessionIsClosing(sessionId) || this.pendingMcpStartupSessions.get(sessionId) !== pendingStartup) { return; } @@ -969,6 +1035,10 @@ export class CodexAcpServer implements acp.Agent { } private interruptLateStartedTurn(sessionId: string, turnId: string): void { + this.codexAcpClient.markTurnStale({ + threadId: sessionId, + turnId, + }); void this.runWithProcessCheck(() => this.codexAcpClient.turnInterrupt({ threadId: sessionId, turnId, @@ -983,7 +1053,7 @@ export class CodexAcpServer implements acp.Agent { } private promptIsClosedOrStale(sessionId: string, activePrompt: ActivePrompt): boolean { - return this.activePrompts.get(sessionId) !== activePrompt || this.closingSessions.has(sessionId); + return this.activePrompts.get(sessionId) !== activePrompt || this.sessionIsClosing(sessionId); } private async interruptSessionTurn( @@ -1000,6 +1070,12 @@ export class CodexAcpServer implements acp.Agent { sessionId: sessionState.sessionId, currentTurnId: turnId, }); + if (resolveInterruptedTurn) { + this.codexAcpClient.markTurnStale({ + threadId: sessionState.sessionId, + turnId, + }); + } try { await this.runWithProcessCheck(() => this.codexAcpClient.turnInterrupt({ threadId: sessionState.sessionId, @@ -1079,7 +1155,7 @@ export class CodexAcpServer implements acp.Agent { }; } - if (this.closingSessions.has(params.sessionId)) { + if (this.sessionIsClosing(params.sessionId)) { return { stopReason: "cancelled", usage: this.buildPromptUsage(sessionState.lastTokenUsage), @@ -1147,7 +1223,7 @@ export class CodexAcpServer implements acp.Agent { // Check if turn was interrupted (cancelled) if (turnCompleted.turn.status === "interrupted") { - if (!this.closingSessions.has(params.sessionId) && this.sessions.has(params.sessionId)) { + if (!this.sessionIsClosing(params.sessionId) && this.sessions.has(params.sessionId)) { await this.connection.sessionUpdate({ sessionId: params.sessionId, update: { diff --git a/src/CodexAppServerClient.ts b/src/CodexAppServerClient.ts index f7e2da49..df9309eb 100644 --- a/src/CodexAppServerClient.ts +++ b/src/CodexAppServerClient.ts @@ -97,6 +97,7 @@ export class CodexAppServerClient { private readonly mcpServerStartupResolvers: Array = []; private readonly pendingTurnCompletionResolvers = new Map void>>(); private readonly turnCompletionCaptures = new Map void>>(); + private readonly staleTurnIds = new Map>(); constructor(connection: MessageConnection) { this.connection = connection; @@ -114,6 +115,17 @@ export class CodexAppServerClient { if (isTurnCompletedNotification(serverNotification)) { this.recordTurnCompleted(serverNotification.params); } + const routing = extractTurnRouting(serverNotification); + const staleTurnNotification = this.isStaleTurn(routing.threadId, routing.turnId); + if (staleTurnNotification) { + if (isTurnCompletedNotification(serverNotification) && routing.threadId !== null && routing.turnId !== null) { + this.clearStaleTurn(routing.threadId, routing.turnId); + } + for (const callback of this.codexEventHandlers) { + callback({ eventType: "notification", ...serverNotification }); + } + return; + } this.notify(serverNotification); for (const callback of this.codexEventHandlers) { callback({ eventType: "notification", ...serverNotification }); @@ -121,6 +133,9 @@ export class CodexAppServerClient { }); this.connection.onRequest(CommandExecutionApprovalRequest, async (params) => { + if (this.isStaleTurn(params.threadId, params.turnId)) { + return { decision: "cancel" }; + } const handler = this.approvalHandlers.get(params.threadId); if (!handler) { return { decision: "cancel" }; @@ -129,6 +144,9 @@ export class CodexAppServerClient { }); this.connection.onRequest(FileChangeApprovalRequest, async (params) => { + if (this.isStaleTurn(params.threadId, params.turnId)) { + return { decision: "cancel" }; + } const handler = this.approvalHandlers.get(params.threadId); if (!handler) { return { decision: "cancel" }; @@ -137,6 +155,9 @@ export class CodexAppServerClient { }); this.connection.onRequest(McpServerElicitationRequest, async (params) => { + if (this.isStaleTurn(params.threadId, params.turnId)) { + return { action: "cancel", content: null, _meta: null }; + } const handler = this.elicitationHandlers.get(params.threadId); if (!handler) { return { action: "cancel", content: null, _meta: null }; @@ -193,6 +214,12 @@ export class CodexAppServerClient { return await this.sendRequest({ method: "turn/interrupt", params: params }); } + markTurnStale(threadId: string, turnId: string): void { + const threadStaleTurns = this.staleTurnIds.get(threadId) ?? new Set(); + threadStaleTurns.add(turnId); + this.staleTurnIds.set(threadId, threadStaleTurns); + } + async threadStart(params: ThreadStartParams): Promise { return await this.sendRequest({ method: "thread/start", params: params }); } @@ -341,6 +368,24 @@ export class CodexAppServerClient { } } + private isStaleTurn(threadId: string | null, turnId: string | null): boolean { + if (threadId === null || turnId === null) { + return false; + } + return this.staleTurnIds.get(threadId)?.has(turnId) ?? false; + } + + private clearStaleTurn(threadId: string, turnId: string): void { + const threadStaleTurns = this.staleTurnIds.get(threadId); + if (!threadStaleTurns) { + return; + } + threadStaleTurns.delete(turnId); + if (threadStaleTurns.size === 0) { + this.staleTurnIds.delete(threadId); + } + } + private getOrCreatePendingTurnCompletionResolvers(threadId: string): Map void> { const existing = this.pendingTurnCompletionResolvers.get(threadId); if (existing) { @@ -475,3 +520,19 @@ function extractThreadId(notification: ServerNotification): string | null { } return null; } + +function extractTurnRouting(notification: ServerNotification): { threadId: string | null, turnId: string | null } { + const params = notification.params as { + threadId?: unknown, + turnId?: unknown, + turn?: { id?: unknown }, + } | undefined; + const threadId = extractThreadId(notification); + if (params && typeof params.turnId === "string") { + return {threadId, turnId: params.turnId}; + } + if (params && typeof params.turn?.id === "string") { + return {threadId, turnId: params.turn.id}; + } + return {threadId, turnId: null}; +} diff --git a/src/__tests__/CodexACPAgent/session-close.test.ts b/src/__tests__/CodexACPAgent/session-close.test.ts index b55e4590..a4d09199 100644 --- a/src/__tests__/CodexACPAgent/session-close.test.ts +++ b/src/__tests__/CodexACPAgent/session-close.test.ts @@ -289,6 +289,148 @@ describe("ACP session close", () => { }), })); }); + + it("preserves local close cleanup while stale resume cleanup overlaps close", async () => { + const {codexAcpAgent, codexAcpClient} = await createSession(); + const staleResume = deferred(); + const activeUnsubscribe = deferred(); + const staleUnsubscribe = deferred(); + vi.spyOn(codexAcpClient, "resumeSession").mockReturnValue(staleResume.promise); + const closeSessionSpy = vi.spyOn(codexAcpClient, "closeSession") + .mockReturnValueOnce(activeUnsubscribe.promise) + .mockReturnValueOnce(staleUnsubscribe.promise); + + const staleResumePromise = codexAcpAgent.resumeSession({ + sessionId, + cwd: "/test/cwd", + mcpServers: [], + }); + const closePromise = codexAcpAgent.closeSession({sessionId}); + + await vi.waitFor(() => { + expect(closeSessionSpy).toHaveBeenCalledTimes(1); + }); + + staleResume.resolve(createSessionMetadata()); + await vi.waitFor(() => { + expect(closeSessionSpy).toHaveBeenCalledTimes(2); + }); + + activeUnsubscribe.resolve(undefined); + await expect(closePromise).resolves.toEqual({}); + expect(() => codexAcpAgent.getSessionState(sessionId)).toThrow(`Session ${sessionId} not found`); + + staleUnsubscribe.resolve(undefined); + await expect(staleResumePromise).rejects.toThrow("Invalid request"); + }); + + it("unsubscribes a resume that fails after app-server subscription", async () => { + const fixture = createCodexMockTestFixture(); + const codexAcpAgent = fixture.getCodexAcpAgent(); + const codexAcpClient = fixture.getCodexAcpClient(); + vi.spyOn(codexAcpClient, "authRequired").mockResolvedValue(false); + vi.spyOn(codexAcpClient, "resumeSession").mockImplementation(async (_request, onSubscribed) => { + onSubscribed?.(); + throw new Error("model list failed"); + }); + const closeSessionSpy = vi.spyOn(codexAcpClient, "closeSession").mockResolvedValue(); + + await expect(codexAcpAgent.resumeSession({ + sessionId, + cwd: "/test/cwd", + mcpServers: [], + })).rejects.toThrow("model list failed"); + + expect(closeSessionSpy).toHaveBeenCalledWith(sessionId); + }); + + it("unsubscribes a resume that fails during account read after app-server subscription", async () => { + const fixture = createCodexMockTestFixture(); + const codexAcpAgent = fixture.getCodexAcpAgent(); + const codexAcpClient = fixture.getCodexAcpClient(); + vi.spyOn(codexAcpClient, "authRequired").mockResolvedValue(false); + vi.spyOn(codexAcpClient, "resumeSession").mockImplementation(async (_request, onSubscribed) => { + onSubscribed?.(); + return createSessionMetadata(); + }); + vi.spyOn(codexAcpClient, "getAccount").mockRejectedValue(new Error("account read failed")); + const closeSessionSpy = vi.spyOn(codexAcpClient, "closeSession").mockResolvedValue(); + + await expect(codexAcpAgent.resumeSession({ + sessionId, + cwd: "/test/cwd", + mcpServers: [], + })).rejects.toThrow("account read failed"); + + expect(closeSessionSpy).toHaveBeenCalledWith(sessionId); + }); + + it("does not route stale turn notifications into a reopened session", async () => { + const {fixture, codexAcpAgent, codexAcpClient} = await createSession(); + const oldTurnStart = deferred(); + const turnStartSpy = vi.spyOn(fixture.getCodexAppServerClient(), "turnStart") + .mockReturnValueOnce(oldTurnStart.promise) + .mockResolvedValueOnce(createTurnStartResponse("new-turn")); + vi.spyOn(fixture.getCodexAppServerClient(), "awaitTurnCompleted").mockResolvedValue({ + threadId: sessionId, + turn: createCompletedTurn("new-turn"), + }); + + const promptPromise = codexAcpAgent.prompt({ + sessionId, + prompt: [{type: "text", text: "old prompt"}], + }); + await vi.waitFor(() => { + expect(turnStartSpy).toHaveBeenCalledTimes(1); + }); + + await expect(codexAcpAgent.closeSession({sessionId})).resolves.toEqual({}); + await expect(promptPromise).resolves.toMatchObject({stopReason: "cancelled"}); + + vi.spyOn(codexAcpClient, "resumeSession").mockResolvedValue(createSessionMetadata()); + await codexAcpAgent.resumeSession({ + sessionId, + cwd: "/test/cwd", + mcpServers: [], + }); + await codexAcpAgent.prompt({ + sessionId, + prompt: [{type: "text", text: "new prompt"}], + }); + + fixture.clearAcpConnectionDump(); + oldTurnStart.resolve(createTurnStartResponse("old-turn")); + + await vi.waitFor(() => { + const requestMethods = fixture.getCodexConnectionEvents([]) + .flatMap(event => event.eventType === "request" ? [event.method] : []); + expect(requestMethods).toContain("turn/interrupt"); + }); + + fixture.sendServerNotification({ + method: "item/agentMessage/delta", + params: {threadId: sessionId, turnId: "old-turn", itemId: "old-item", delta: "stale"}, + }); + await waitForMicrotasks(); + expect(fixture.getAcpConnectionEvents([])).toEqual([]); + + fixture.sendServerNotification({ + method: "turn/completed", + params: { + threadId: sessionId, + turn: createCompletedTurn("old-turn"), + }, + }); + fixture.sendServerNotification({ + method: "item/agentMessage/delta", + params: {threadId: sessionId, turnId: "new-turn", itemId: "new-item", delta: "fresh"}, + }); + + await vi.waitFor(() => { + expect(fixture.getAcpConnectionDump([])).toContain("fresh"); + }); + expect(fixture.getAcpConnectionDump([])).not.toContain("stale"); + }); }); async function createSession(options: { @@ -341,6 +483,18 @@ function createTurnStartResponse(turnId: string): TurnStartResponse { }; } +function createCompletedTurn(turnId: string): TurnStartResponse["turn"] { + return { + id: turnId, + items: [], + status: "completed", + error: null, + startedAt: null, + completedAt: null, + durationMs: null, + }; +} + function createSessionMetadata(): SessionMetadata { return { sessionId,