Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 48 additions & 4 deletions src/CodexAcpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ export class CodexAcpClient {
private gatewayConfig: GatewayConfig | null;
private pendingLoginCompleted: Promise<AccountLoginCompletedNotification> | null = null;
private pendingAccountUpdated: Promise<AccountUpdatedNotification> | null = null;
private readonly sessionNotificationQueues = new Map<string, Promise<void>>();


constructor(codexClient: CodexAppServerClient, codexConfig?: JsonObject, modelProvider?: string) {
Expand Down Expand Up @@ -384,13 +385,56 @@ export class CodexAcpClient {

async subscribeToSessionEvents(
sessionId: string,
eventHandler: (result: ServerNotification) => void,
eventHandler: (result: ServerNotification) => void | Promise<void>,
approvalHandler: ApprovalHandler,
elicitationHandler: ElicitationHandler
) {
this.codexClient.onServerNotification(sessionId, eventHandler);
this.codexClient.onApprovalRequest(sessionId, approvalHandler);
this.codexClient.onElicitationRequest(sessionId, elicitationHandler);
this.codexClient.onServerNotification(sessionId, (event) => {
this.enqueueSessionNotification(sessionId, () => eventHandler(event));
});
this.codexClient.onApprovalRequest(sessionId, {
handleCommandExecution: async (params) => {
await this.waitForSessionNotifications(sessionId);
return await approvalHandler.handleCommandExecution(params);
},
handleFileChange: async (params) => {
await this.waitForSessionNotifications(sessionId);
return await approvalHandler.handleFileChange(params);
},
});
this.codexClient.onElicitationRequest(sessionId, {
handleElicitation: async (params) => {
await this.waitForSessionNotifications(sessionId);
return await elicitationHandler.handleElicitation(params);
},
});
}

async waitForSessionNotifications(sessionId: string): Promise<void> {
while (true) {
const queue = this.sessionNotificationQueues.get(sessionId);
if (!queue) return;
await queue;
}
}

private enqueueSessionNotification(sessionId: string, operation: () => void | Promise<void>): void {
const run = async () => {
try {
await operation();
} catch (error) {
logger.error("Error handling Codex session notification", error);
}
};

const previous = this.sessionNotificationQueues.get(sessionId);
const next = previous ? previous.then(run, run) : run();
this.sessionNotificationQueues.set(sessionId, next);
void next.finally(() => {
if (this.sessionNotificationQueues.get(sessionId) === next) {
this.sessionNotificationQueues.delete(sessionId);
}
});
}

async sendPrompt(
Expand Down
2 changes: 2 additions & 0 deletions src/CodexAcpServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1222,6 +1222,8 @@ export class CodexAcpServer implements acp.Agent {
};
}

await this.codexAcpClient.waitForSessionNotifications(params.sessionId);

// Check if turn was interrupted (cancelled)
if (turnCompleted.turn.status === "interrupted") {
if (!this.sessionIsClosing(params.sessionId) && this.sessions.has(params.sessionId)) {
Expand Down
13 changes: 12 additions & 1 deletion src/CodexToolCallMapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,18 @@ async function createUpdateFileContent(change: FileUpdateChange): Promise<ToolCa
const oldContent = await readFileContent(change.path);
if (oldContent !== null) {
const patchedContent = applyPatch(oldContent, unifiedDiff);
if (patchedContent === false) return null;
if (patchedContent === false) {
// If Codex runs in full access mode, the file might already be patched.
// we can verify this by checking if the reverted patch applies.
const revertedPatch = revertPatch(unifiedDiff);
if (revertedPatch) {
const revertedContent = applyPatch(oldContent, revertedPatch);
if (revertedContent !== false) {
return createUpdateDiffContent(change.path, revertedContent, oldContent);
}
}
return null;
}
return createUpdateDiffContent(movePath ?? change.path, oldContent, patchedContent);
}

Expand Down
1 change: 1 addition & 0 deletions src/__tests__/CodexACPAgent/CodexAcpClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1154,6 +1154,7 @@ describe('ACP server test', { timeout: 40_000 }, () => {
}
}
});
await mockFixture.getCodexAcpClient().waitForSessionNotifications(sessionId);

expect(sessionState.rateLimits).not.toBeNull();
expect(sessionState.rateLimits!.size).toBe(2);
Expand Down
50 changes: 25 additions & 25 deletions src/__tests__/CodexACPAgent/data/mcp-tool-completed-with-logs.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,23 @@
{
"sessionId": "test-session-id",
"update": {
"sessionUpdate": "tool_call_update",
"sessionUpdate": "tool_call",
"toolCallId": "call-id",
"_meta": {
"mcp_output_delta": {
"data": "File /Users/aleksandr.slapoguzov/Projects/ultimate/.ai/local.md doesn't exist or can't be opened"
"kind": "execute",
"title": "mcp.ijproxy.read_file",
"status": "in_progress",
"rawInput": {
"server": "ijproxy",
"tool": "read_file",
"arguments": {
"file_path": ".ai/local.md",
"mode": "slice",
"start_line": 1,
"max_lines": 200
}
},
"_meta": {
"is_mcp_tool_call": true
}
}
}
Expand All @@ -23,21 +34,9 @@
"update": {
"sessionUpdate": "tool_call_update",
"toolCallId": "call-id",
"status": "failed",
"rawInput": {
"server": "ijproxy",
"tool": "read_file",
"arguments": {
"file_path": ".ai/local.md",
"mode": "slice",
"start_line": 1,
"max_lines": 200
}
},
"rawOutput": {
"result": null,
"error": {
"message": "File /Users/aleksandr.slapoguzov/Projects/ultimate/.ai/local.md doesn't exist or can't be opened"
"_meta": {
"mcp_output_delta": {
"data": "File /Users/aleksandr.slapoguzov/Projects/ultimate/.ai/local.md doesn't exist or can't be opened"
}
}
}
Expand All @@ -50,11 +49,9 @@
{
"sessionId": "test-session-id",
"update": {
"sessionUpdate": "tool_call",
"sessionUpdate": "tool_call_update",
"toolCallId": "call-id",
"kind": "execute",
"title": "mcp.ijproxy.read_file",
"status": "in_progress",
"status": "failed",
"rawInput": {
"server": "ijproxy",
"tool": "read_file",
Expand All @@ -65,8 +62,11 @@
"max_lines": 200
}
},
"_meta": {
"is_mcp_tool_call": true
"rawOutput": {
"result": null,
"error": {
"message": "File /Users/aleksandr.slapoguzov/Projects/ultimate/.ai/local.md doesn't exist or can't be opened"
}
}
}
}
Expand Down
44 changes: 22 additions & 22 deletions src/__tests__/CodexACPAgent/data/mcp-tool-repeated-progress.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,20 @@
{
"sessionId": "test-session-id",
"update": {
"sessionUpdate": "tool_call_update",
"sessionUpdate": "tool_call",
"toolCallId": "call-id",
"_meta": {
"mcp_output_delta": {
"data": "Polling for status"
"kind": "execute",
"title": "mcp.server-name.tool-name",
"status": "in_progress",
"rawInput": {
"server": "server-name",
"tool": "tool-name",
"arguments": {
"argument": "example"
}
},
"_meta": {
"is_mcp_tool_call": true
}
}
}
Expand Down Expand Up @@ -40,18 +48,9 @@
"update": {
"sessionUpdate": "tool_call_update",
"toolCallId": "call-id",
"status": "failed",
"rawInput": {
"server": "server-name",
"tool": "tool-name",
"arguments": {
"argument": "example"
}
},
"rawOutput": {
"result": null,
"error": {
"message": "Polling for status"
"_meta": {
"mcp_output_delta": {
"data": "Polling for status"
}
}
}
Expand All @@ -64,20 +63,21 @@
{
"sessionId": "test-session-id",
"update": {
"sessionUpdate": "tool_call",
"sessionUpdate": "tool_call_update",
"toolCallId": "call-id",
"kind": "execute",
"title": "mcp.server-name.tool-name",
"status": "in_progress",
"status": "failed",
"rawInput": {
"server": "server-name",
"tool": "tool-name",
"arguments": {
"argument": "example"
}
},
"_meta": {
"is_mcp_tool_call": true
"rawOutput": {
"result": null,
"error": {
"message": "Polling for status"
}
}
}
}
Expand Down
52 changes: 26 additions & 26 deletions src/__tests__/CodexACPAgent/data/terminal-full-flow.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,24 @@
{
"sessionId": "test-session-id",
"update": {
"sessionUpdate": "tool_call_update",
"sessionUpdate": "tool_call",
"toolCallId": "command-flow",
"kind": "execute",
"title": "echo hello",
"status": "in_progress",
"content": [
{
"type": "terminal",
"terminalId": "command-flow"
}
],
"rawInput": {
"command": "echo hello",
"cwd": "/test/project"
},
"_meta": {
"terminal_output_delta": {
"data": "hello\n",
"terminal_info": {
"cwd": "/test/project",
"terminal_id": "command-flow"
}
}
Expand All @@ -24,15 +37,9 @@
"update": {
"sessionUpdate": "tool_call_update",
"toolCallId": "command-flow",
"status": "completed",
"rawOutput": {
"formatted_output": "hello\n",
"exit_code": 0
},
"_meta": {
"terminal_exit": {
"exit_code": 0,
"signal": null,
"terminal_output_delta": {
"data": "hello\n",
"terminal_id": "command-flow"
}
}
Expand All @@ -46,24 +53,17 @@
{
"sessionId": "test-session-id",
"update": {
"sessionUpdate": "tool_call",
"sessionUpdate": "tool_call_update",
"toolCallId": "command-flow",
"kind": "execute",
"title": "echo hello",
"status": "in_progress",
"content": [
{
"type": "terminal",
"terminalId": "command-flow"
}
],
"rawInput": {
"command": "echo hello",
"cwd": "/test/project"
"status": "completed",
"rawOutput": {
"formatted_output": "hello\n",
"exit_code": 0
},
"_meta": {
"terminal_info": {
"cwd": "/test/project",
"terminal_exit": {
"exit_code": 0,
"signal": null,
"terminal_id": "command-flow"
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/__tests__/CodexACPAgent/elicitation-events.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ describe('Elicitation Events', () => {

fixture.sendServerNotification(startedNotification);
fixture.sendServerNotification(completedNotification);
await fixture.getCodexAcpClient().waitForSessionNotifications(sessionId);
fixture.clearAcpConnectionDump();

const params: McpServerElicitationRequestParams = {
Expand Down Expand Up @@ -340,6 +341,7 @@ describe('Elicitation Events', () => {

fixture.sendServerNotification(startedNotification);
fixture.sendServerNotification(resolvedNotification);
await fixture.getCodexAcpClient().waitForSessionNotifications(sessionId);
fixture.clearAcpConnectionDump();

const params: McpServerElicitationRequestParams = {
Expand Down
Loading
Loading