From 5db94a56fabd89a421a41d595e0003ca3c180801 Mon Sep 17 00:00:00 2001 From: Steffen Deusch Date: Fri, 5 Jun 2026 18:32:04 +0200 Subject: [PATCH 1/2] Fix stale file read after patch causing empty diff content Also ensure events are always handled in order. --- src/CodexAcpClient.ts | 11 +- src/CodexToolCallMapper.ts | 13 +- .../CodexACPAgent/file-change-events.test.ts | 167 +++++++++++++++++- 3 files changed, 183 insertions(+), 8 deletions(-) diff --git a/src/CodexAcpClient.ts b/src/CodexAcpClient.ts index b06a0c9..b3ebe7c 100644 --- a/src/CodexAcpClient.ts +++ b/src/CodexAcpClient.ts @@ -384,11 +384,18 @@ export class CodexAcpClient { async subscribeToSessionEvents( sessionId: string, - eventHandler: (result: ServerNotification) => void, + eventHandler: (result: ServerNotification) => void | Promise, approvalHandler: ApprovalHandler, elicitationHandler: ElicitationHandler ) { - this.codexClient.onServerNotification(sessionId, eventHandler); + let notificationQueue = Promise.resolve(); + this.codexClient.onServerNotification(sessionId, (event) => { + notificationQueue = notificationQueue + .then(() => eventHandler(event)) + .catch((error) => { + logger.error("Error handling Codex session notification", error); + }); + }); this.codexClient.onApprovalRequest(sessionId, approvalHandler); this.codexClient.onElicitationRequest(sessionId, elicitationHandler); } diff --git a/src/CodexToolCallMapper.ts b/src/CodexToolCallMapper.ts index ad2542e..e1be3fd 100644 --- a/src/CodexToolCallMapper.ts +++ b/src/CodexToolCallMapper.ts @@ -294,7 +294,18 @@ async function createUpdateFileContent(change: FileUpdateChange): Promise { +const { mockFiles, mockReadDelays, mockFileContent, delayMockFileRead, removeMockFile, clearMockFiles } = vi.hoisted(() => { const files = new Map(); + const readDelays = new Map>(); return { mockFiles: files, + mockReadDelays: readDelays, mockFileContent: (path: string, content: string) => files.set(path, content), + delayMockFileRead: (path: string, delay: Promise) => readDelays.set(path, delay), removeMockFile: (path: string) => files.delete(path), - clearMockFiles: () => files.clear(), + clearMockFiles: () => { + files.clear(); + readDelays.clear(); + }, }; }); vi.mock('node:fs/promises', () => ({ - readFile: (path: string) => { + readFile: async (path: string) => { + const delay = mockReadDelays.get(path); + if (delay) { + await delay; + } const content = mockFiles.get(path); if (content !== undefined) { - return Promise.resolve(content); + return content; } - return Promise.reject(new Error(`ENOENT: no such file or directory, open '${path}'`)); + throw new Error(`ENOENT: no such file or directory, open '${path}'`); }, })); @@ -276,6 +286,153 @@ describe('CodexEventHandler - file change events', () => { }); }); + it('should handle update diffs when the file was already patched', async () => { + mockFileContent('/test/project/OldFile.kt', 'package test.project\n\nclass UpdatedFile {}\n'); + + const updateFileNotification: ServerNotification = { + method: 'item/started', + params: { + threadId: sessionId, + turnId: 'turn-1', + item: { + type: 'fileChange', + id: 'file-change-already-patched', + changes: [ + { + path: '/test/project/OldFile.kt', + kind: { type: 'update', move_path: null }, + diff: +`@@ -1,3 +1,3 @@ + package test.project + +-class OldFile {} ++class UpdatedFile {} +`, + }, + ], + status: 'completed', + }, + }, + }; + + await setupPromptAndSendNotifications(mockFixture, sessionId, sessionState, [updateFileNotification]); + + const updates = mockFixture.getAcpConnectionEvents(['id']).map((event) => event.args[0].update); + expect(updates).toMatchObject([ + { + sessionUpdate: 'tool_call', + toolCallId: 'file-change-already-patched', + status: 'completed', + content: [ + { + oldText: 'package test.project\n\nclass OldFile {}\n', + newText: 'package test.project\n\nclass UpdatedFile {}\n', + path: '/test/project/OldFile.kt', + }, + ], + }, + ]); + }); + + it('should not emit completion before a slow file-change start event', async () => { + mockFileContent('/test/project/OldFile.kt', 'package test.project\n\nclass OldFile {}\n'); + + let releaseRead = () => {}; + const blockedRead = new Promise((resolve) => { + releaseRead = resolve; + }); + delayMockFileRead('/test/project/OldFile.kt', blockedRead); + + const fileChange = { + type: 'fileChange', + id: 'file-change-slow-start', + changes: [ + { + path: '/test/project/OldFile.kt', + kind: { type: 'update', move_path: null }, + diff: +`@@ -1,3 +1,3 @@ + package test.project + +-class OldFile {} ++class UpdatedFile {} +`, + }, + ], + } satisfies Omit; + + const fileChangeStarted: ServerNotification = { + method: 'item/started', + params: { + threadId: sessionId, + turnId: 'turn-1', + item: { + ...fileChange, + status: 'inProgress', + }, + }, + }; + const fileChangeCompleted: ServerNotification = { + method: 'item/completed', + params: { + threadId: sessionId, + turnId: 'turn-1', + item: { + ...fileChange, + status: 'completed', + }, + }, + }; + + const codexAcpAgent = mockFixture.getCodexAcpAgent(); + const codexAppServerClient = mockFixture.getCodexAppServerClient(); + const turn = { id: 'turn-id', items: [], status: 'inProgress' as const, error: null }; + codexAppServerClient.turnStart = vi.fn().mockResolvedValue({ turn }); + codexAppServerClient.awaitTurnCompleted = vi.fn().mockResolvedValue({ + threadId: sessionId, + turn: { ...turn, status: 'completed' }, + }); + vi.spyOn(codexAcpAgent, 'getSessionState').mockReturnValue(sessionState); + + await codexAcpAgent.prompt({ + sessionId, + prompt: [{ type: 'text', text: 'test prompt' }], + }); + + mockFixture.clearAcpConnectionDump(); + mockFixture.sendServerNotification(fileChangeStarted); + mockFixture.sendServerNotification(fileChangeCompleted); + + await new Promise((resolve) => setTimeout(resolve, 0)); + expect(mockFixture.getAcpConnectionEvents([])).toEqual([]); + + releaseRead(); + await vi.waitFor(() => { + expect(mockFixture.getAcpConnectionEvents([])).toHaveLength(2); + }); + + const updates = mockFixture.getAcpConnectionEvents([]).map((event) => event.args[0].update); + expect(updates).toMatchObject([ + { + sessionUpdate: 'tool_call', + toolCallId: 'file-change-slow-start', + status: 'in_progress', + content: [ + { + oldText: 'package test.project\n\nclass OldFile {}\n', + newText: 'package test.project\n\nclass UpdatedFile {}\n', + path: '/test/project/OldFile.kt', + }, + ], + }, + { + sessionUpdate: 'tool_call_update', + toolCallId: 'file-change-slow-start', + status: 'completed', + }, + ]); + }); + it('should parse update diffs with move metadata appended', async () => { mockFileContent('/test/project/OriginalFile.kt', 'old code line\n'); From 66658704e983c6e535e5707d4bb79c7d022af9a7 Mon Sep 17 00:00:00 2001 From: Steffen Deusch Date: Fri, 5 Jun 2026 19:36:00 +0200 Subject: [PATCH 2/2] queue per session, drain --- src/CodexAcpClient.ts | 53 ++++++++++++++++--- src/CodexAcpServer.ts | 2 + .../CodexACPAgent/CodexAcpClient.test.ts | 1 + .../data/mcp-tool-completed-with-logs.json | 50 ++++++++--------- .../data/mcp-tool-repeated-progress.json | 44 +++++++-------- .../data/terminal-full-flow.json | 52 +++++++++--------- .../CodexACPAgent/elicitation-events.test.ts | 2 + src/__tests__/acp-test-utils.ts | 1 + 8 files changed, 124 insertions(+), 81 deletions(-) diff --git a/src/CodexAcpClient.ts b/src/CodexAcpClient.ts index b3ebe7c..45fc59b 100644 --- a/src/CodexAcpClient.ts +++ b/src/CodexAcpClient.ts @@ -49,6 +49,7 @@ export class CodexAcpClient { private gatewayConfig: GatewayConfig | null; private pendingLoginCompleted: Promise | null = null; private pendingAccountUpdated: Promise | null = null; + private readonly sessionNotificationQueues = new Map>(); constructor(codexClient: CodexAppServerClient, codexConfig?: JsonObject, modelProvider?: string) { @@ -388,16 +389,52 @@ export class CodexAcpClient { approvalHandler: ApprovalHandler, elicitationHandler: ElicitationHandler ) { - let notificationQueue = Promise.resolve(); this.codexClient.onServerNotification(sessionId, (event) => { - notificationQueue = notificationQueue - .then(() => eventHandler(event)) - .catch((error) => { - logger.error("Error handling Codex session notification", error); - }); + this.enqueueSessionNotification(sessionId, () => eventHandler(event)); + }); + this.codexClient.onApprovalRequest(sessionId, { + handleCommandExecution: async (params) => { + await this.waitForSessionNotifications(sessionId); + return await approvalHandler.handleCommandExecution(params); + }, + handleFileChange: async (params) => { + await this.waitForSessionNotifications(sessionId); + return await approvalHandler.handleFileChange(params); + }, + }); + this.codexClient.onElicitationRequest(sessionId, { + handleElicitation: async (params) => { + await this.waitForSessionNotifications(sessionId); + return await elicitationHandler.handleElicitation(params); + }, + }); + } + + async waitForSessionNotifications(sessionId: string): Promise { + while (true) { + const queue = this.sessionNotificationQueues.get(sessionId); + if (!queue) return; + await queue; + } + } + + private enqueueSessionNotification(sessionId: string, operation: () => void | Promise): void { + const run = async () => { + try { + await operation(); + } catch (error) { + logger.error("Error handling Codex session notification", error); + } + }; + + const previous = this.sessionNotificationQueues.get(sessionId); + const next = previous ? previous.then(run, run) : run(); + this.sessionNotificationQueues.set(sessionId, next); + void next.finally(() => { + if (this.sessionNotificationQueues.get(sessionId) === next) { + this.sessionNotificationQueues.delete(sessionId); + } }); - this.codexClient.onApprovalRequest(sessionId, approvalHandler); - this.codexClient.onElicitationRequest(sessionId, elicitationHandler); } async sendPrompt( diff --git a/src/CodexAcpServer.ts b/src/CodexAcpServer.ts index d27fc15..360fda6 100644 --- a/src/CodexAcpServer.ts +++ b/src/CodexAcpServer.ts @@ -1222,6 +1222,8 @@ export class CodexAcpServer implements acp.Agent { }; } + await this.codexAcpClient.waitForSessionNotifications(params.sessionId); + // Check if turn was interrupted (cancelled) if (turnCompleted.turn.status === "interrupted") { if (!this.sessionIsClosing(params.sessionId) && this.sessions.has(params.sessionId)) { diff --git a/src/__tests__/CodexACPAgent/CodexAcpClient.test.ts b/src/__tests__/CodexACPAgent/CodexAcpClient.test.ts index 804d2a0..9f09d95 100644 --- a/src/__tests__/CodexACPAgent/CodexAcpClient.test.ts +++ b/src/__tests__/CodexACPAgent/CodexAcpClient.test.ts @@ -1154,6 +1154,7 @@ describe('ACP server test', { timeout: 40_000 }, () => { } } }); + await mockFixture.getCodexAcpClient().waitForSessionNotifications(sessionId); expect(sessionState.rateLimits).not.toBeNull(); expect(sessionState.rateLimits!.size).toBe(2); diff --git a/src/__tests__/CodexACPAgent/data/mcp-tool-completed-with-logs.json b/src/__tests__/CodexACPAgent/data/mcp-tool-completed-with-logs.json index 0395df1..07c4eb6 100644 --- a/src/__tests__/CodexACPAgent/data/mcp-tool-completed-with-logs.json +++ b/src/__tests__/CodexACPAgent/data/mcp-tool-completed-with-logs.json @@ -4,12 +4,23 @@ { "sessionId": "test-session-id", "update": { - "sessionUpdate": "tool_call_update", + "sessionUpdate": "tool_call", "toolCallId": "call-id", - "_meta": { - "mcp_output_delta": { - "data": "File /Users/aleksandr.slapoguzov/Projects/ultimate/.ai/local.md doesn't exist or can't be opened" + "kind": "execute", + "title": "mcp.ijproxy.read_file", + "status": "in_progress", + "rawInput": { + "server": "ijproxy", + "tool": "read_file", + "arguments": { + "file_path": ".ai/local.md", + "mode": "slice", + "start_line": 1, + "max_lines": 200 } + }, + "_meta": { + "is_mcp_tool_call": true } } } @@ -23,21 +34,9 @@ "update": { "sessionUpdate": "tool_call_update", "toolCallId": "call-id", - "status": "failed", - "rawInput": { - "server": "ijproxy", - "tool": "read_file", - "arguments": { - "file_path": ".ai/local.md", - "mode": "slice", - "start_line": 1, - "max_lines": 200 - } - }, - "rawOutput": { - "result": null, - "error": { - "message": "File /Users/aleksandr.slapoguzov/Projects/ultimate/.ai/local.md doesn't exist or can't be opened" + "_meta": { + "mcp_output_delta": { + "data": "File /Users/aleksandr.slapoguzov/Projects/ultimate/.ai/local.md doesn't exist or can't be opened" } } } @@ -50,11 +49,9 @@ { "sessionId": "test-session-id", "update": { - "sessionUpdate": "tool_call", + "sessionUpdate": "tool_call_update", "toolCallId": "call-id", - "kind": "execute", - "title": "mcp.ijproxy.read_file", - "status": "in_progress", + "status": "failed", "rawInput": { "server": "ijproxy", "tool": "read_file", @@ -65,8 +62,11 @@ "max_lines": 200 } }, - "_meta": { - "is_mcp_tool_call": true + "rawOutput": { + "result": null, + "error": { + "message": "File /Users/aleksandr.slapoguzov/Projects/ultimate/.ai/local.md doesn't exist or can't be opened" + } } } } diff --git a/src/__tests__/CodexACPAgent/data/mcp-tool-repeated-progress.json b/src/__tests__/CodexACPAgent/data/mcp-tool-repeated-progress.json index 395404f..01fe22d 100644 --- a/src/__tests__/CodexACPAgent/data/mcp-tool-repeated-progress.json +++ b/src/__tests__/CodexACPAgent/data/mcp-tool-repeated-progress.json @@ -4,12 +4,20 @@ { "sessionId": "test-session-id", "update": { - "sessionUpdate": "tool_call_update", + "sessionUpdate": "tool_call", "toolCallId": "call-id", - "_meta": { - "mcp_output_delta": { - "data": "Polling for status" + "kind": "execute", + "title": "mcp.server-name.tool-name", + "status": "in_progress", + "rawInput": { + "server": "server-name", + "tool": "tool-name", + "arguments": { + "argument": "example" } + }, + "_meta": { + "is_mcp_tool_call": true } } } @@ -40,18 +48,9 @@ "update": { "sessionUpdate": "tool_call_update", "toolCallId": "call-id", - "status": "failed", - "rawInput": { - "server": "server-name", - "tool": "tool-name", - "arguments": { - "argument": "example" - } - }, - "rawOutput": { - "result": null, - "error": { - "message": "Polling for status" + "_meta": { + "mcp_output_delta": { + "data": "Polling for status" } } } @@ -64,11 +63,9 @@ { "sessionId": "test-session-id", "update": { - "sessionUpdate": "tool_call", + "sessionUpdate": "tool_call_update", "toolCallId": "call-id", - "kind": "execute", - "title": "mcp.server-name.tool-name", - "status": "in_progress", + "status": "failed", "rawInput": { "server": "server-name", "tool": "tool-name", @@ -76,8 +73,11 @@ "argument": "example" } }, - "_meta": { - "is_mcp_tool_call": true + "rawOutput": { + "result": null, + "error": { + "message": "Polling for status" + } } } } diff --git a/src/__tests__/CodexACPAgent/data/terminal-full-flow.json b/src/__tests__/CodexACPAgent/data/terminal-full-flow.json index ff6ef29..d69f63e 100644 --- a/src/__tests__/CodexACPAgent/data/terminal-full-flow.json +++ b/src/__tests__/CodexACPAgent/data/terminal-full-flow.json @@ -4,11 +4,24 @@ { "sessionId": "test-session-id", "update": { - "sessionUpdate": "tool_call_update", + "sessionUpdate": "tool_call", "toolCallId": "command-flow", + "kind": "execute", + "title": "echo hello", + "status": "in_progress", + "content": [ + { + "type": "terminal", + "terminalId": "command-flow" + } + ], + "rawInput": { + "command": "echo hello", + "cwd": "/test/project" + }, "_meta": { - "terminal_output_delta": { - "data": "hello\n", + "terminal_info": { + "cwd": "/test/project", "terminal_id": "command-flow" } } @@ -24,15 +37,9 @@ "update": { "sessionUpdate": "tool_call_update", "toolCallId": "command-flow", - "status": "completed", - "rawOutput": { - "formatted_output": "hello\n", - "exit_code": 0 - }, "_meta": { - "terminal_exit": { - "exit_code": 0, - "signal": null, + "terminal_output_delta": { + "data": "hello\n", "terminal_id": "command-flow" } } @@ -46,24 +53,17 @@ { "sessionId": "test-session-id", "update": { - "sessionUpdate": "tool_call", + "sessionUpdate": "tool_call_update", "toolCallId": "command-flow", - "kind": "execute", - "title": "echo hello", - "status": "in_progress", - "content": [ - { - "type": "terminal", - "terminalId": "command-flow" - } - ], - "rawInput": { - "command": "echo hello", - "cwd": "/test/project" + "status": "completed", + "rawOutput": { + "formatted_output": "hello\n", + "exit_code": 0 }, "_meta": { - "terminal_info": { - "cwd": "/test/project", + "terminal_exit": { + "exit_code": 0, + "signal": null, "terminal_id": "command-flow" } } diff --git a/src/__tests__/CodexACPAgent/elicitation-events.test.ts b/src/__tests__/CodexACPAgent/elicitation-events.test.ts index 82d14a0..1421cc1 100644 --- a/src/__tests__/CodexACPAgent/elicitation-events.test.ts +++ b/src/__tests__/CodexACPAgent/elicitation-events.test.ts @@ -288,6 +288,7 @@ describe('Elicitation Events', () => { fixture.sendServerNotification(startedNotification); fixture.sendServerNotification(completedNotification); + await fixture.getCodexAcpClient().waitForSessionNotifications(sessionId); fixture.clearAcpConnectionDump(); const params: McpServerElicitationRequestParams = { @@ -340,6 +341,7 @@ describe('Elicitation Events', () => { fixture.sendServerNotification(startedNotification); fixture.sendServerNotification(resolvedNotification); + await fixture.getCodexAcpClient().waitForSessionNotifications(sessionId); fixture.clearAcpConnectionDump(); const params: McpServerElicitationRequestParams = { diff --git a/src/__tests__/acp-test-utils.ts b/src/__tests__/acp-test-utils.ts index 9f82f16..47389c1 100644 --- a/src/__tests__/acp-test-utils.ts +++ b/src/__tests__/acp-test-utils.ts @@ -440,6 +440,7 @@ export async function setupPromptAndSendNotifications( for (const notification of notifications) { fixture.sendServerNotification(notification); } + await fixture.getCodexAcpClient().waitForSessionNotifications(sessionId); await vi.waitFor(() => { const dump = fixture.getAcpConnectionDump([]);