diff --git a/src/CodexAcpClient.ts b/src/CodexAcpClient.ts index 22cde0da..0fc55a78 100644 --- a/src/CodexAcpClient.ts +++ b/src/CodexAcpClient.ts @@ -199,7 +199,7 @@ export class CodexAcpClient { return this.codexClient.accountRead({refreshToken: false}); } - async resumeSession(request: acp.ResumeSessionRequest): Promise { + async resumeSession(request: acp.ResumeSessionRequest, onSubscribed?: () => void): Promise { await this.refreshSkills(request.cwd, request._meta); const response = await this.codexClient.threadResume({ @@ -208,6 +208,7 @@ export class CodexAcpClient { modelProvider: this.getResumeModelProvider(), threadId: request.sessionId, }); + onSubscribed?.(); const codexModels = await this.fetchAvailableModels(); const currentModelId = this.createModelId(codexModels, response.model, response.reasoningEffort).toString(); return { @@ -218,13 +219,14 @@ export class CodexAcpClient { } } - async loadSession(request: acp.LoadSessionRequest): Promise { + async loadSession(request: acp.LoadSessionRequest, onSubscribed?: () => void): Promise { const response = await this.codexClient.threadResume({ config: await this.createSessionConfig(request.cwd, request.mcpServers ?? []), cwd: request.cwd, modelProvider: this.getResumeModelProvider(), threadId: request.sessionId, }); + onSubscribed?.(); const codexModels = await this.fetchAvailableModels(); const currentModelId = this.createModelId(codexModels, response.model, response.reasoningEffort).toString(); return { @@ -258,6 +260,14 @@ export class CodexAcpClient { }; } + async closeSession(sessionId: string): Promise { + try { + await this.codexClient.threadUnsubscribe({threadId: sessionId}); + } finally { + this.codexClient.clearThreadHandlers(sessionId); + } + } + async awaitMcpServerStartup(serverNames: Array, afterVersion: number): Promise { return await this.codexClient.awaitMcpServerStartup(serverNames, afterVersion); } @@ -385,11 +395,16 @@ export class CodexAcpClient { serviceTier: ServiceTier | null, disableSummary: boolean, cwd: string, - ): Promise { + onTurnStarted?: (turnId: string) => void, + shouldCancel?: () => boolean, + ): Promise { const input = buildPromptItems(request.prompt); const effort = modelId.effort as ReasoningEffort | null; //TODO remove unsafe conversion await this.refreshSkills(cwd, request._meta); + if (shouldCancel?.()) { + return null; + } return await this.codexClient.runTurn({ threadId: request.sessionId, input: input, @@ -399,7 +414,15 @@ export class CodexAcpClient { effort: effort, model: modelId.model, serviceTier: serviceTier, - }); + }, onTurnStarted); + } + + resolveTurnInterrupted(params: { threadId: string, turnId: string }): void { + this.codexClient.resolveTurnInterrupted(params.threadId, params.turnId); + } + + markTurnStale(params: { threadId: string, turnId: string }): void { + this.codexClient.markTurnStale(params.threadId, params.turnId); } async listSkills(params?: SkillsListParams): Promise { diff --git a/src/CodexAcpServer.ts b/src/CodexAcpServer.ts index 058ef099..a3d324c0 100644 --- a/src/CodexAcpServer.ts +++ b/src/CodexAcpServer.ts @@ -65,6 +65,18 @@ interface PendingMcpStartupSession { afterVersion: number; } +interface PendingTurnStart { + promise: Promise; + resolve: (turnId: string | null) => void; +} + +interface ActivePrompt { + completion: Promise; + closeSignal: Promise; + requestClose: () => void; + complete: () => void; +} + export class CodexAcpServer implements acp.Agent { private static readonly MODEL_NAME_TOKEN_OVERRIDES: Record = { gpt: "GPT", @@ -80,6 +92,11 @@ export class CodexAcpServer implements acp.Agent { private readonly sessions: Map; private readonly pendingMcpStartupSessions: Map; + private readonly pendingTurnStarts: Map; + private readonly activePrompts: Map; + private readonly closingSessions: Map; + private readonly sessionGenerations: Map; + private readonly sessionOpenGenerations: Map; constructor( connection: acp.AgentSideConnection, @@ -89,6 +106,11 @@ export class CodexAcpServer implements acp.Agent { ) { this.sessions = new Map(); this.pendingMcpStartupSessions = new Map(); + this.pendingTurnStarts = new Map(); + this.activePrompts = new Map(); + this.closingSessions = new Map(); + this.sessionGenerations = new Map(); + this.sessionOpenGenerations = new Map(); this.connection = connection; this.codexAcpClient = codexAcpClient; this.defaultAuthRequest = defaultAuthRequest ?? null; @@ -123,7 +145,8 @@ export class CodexAcpServer implements acp.Agent { }, sessionCapabilities: { resume: { }, - list: { } + list: { }, + close: { }, }, mcpCapabilities: { acp: false, @@ -184,7 +207,73 @@ export class CodexAcpServer implements acp.Agent { } } + private beginSessionOpen(sessionId: string): number { + const generation = this.getSessionGeneration(sessionId); + if (this.sessionIsClosing(sessionId)) { + throw RequestError.invalidRequest(`Session ${sessionId} is closing`); + } + this.sessionOpenGenerations.set(sessionId, generation); + return generation; + } + + private sessionOpenCanInstall(sessionId: string, generation: number): boolean { + return !this.sessionIsClosing(sessionId) && this.getSessionGeneration(sessionId) === generation; + } + + private async cleanupStaleSessionOpen(sessionId: string, generation: number): Promise { + if (this.sessionOpenGenerations.get(sessionId) === generation) { + if (!this.sessionIsClosing(sessionId)) { + this.bumpSessionGeneration(sessionId); + } + this.beginSessionCloseFence(sessionId); + try { + await this.runWithProcessCheck(() => this.codexAcpClient.closeSession(sessionId)); + } catch (err) { + logger.error(`Failed to close stale session open for ${sessionId}`, err); + } finally { + this.endSessionCloseFence(sessionId); + } + return true; + } + return false; + } + + private async closeStaleSessionOpen(sessionId: string, generation: number): Promise { + await this.cleanupStaleSessionOpen(sessionId, generation); + throw RequestError.invalidRequest(`Session ${sessionId} is closing`); + } + + private sessionIsClosing(sessionId: string): boolean { + return (this.closingSessions.get(sessionId) ?? 0) > 0; + } + + private beginSessionCloseFence(sessionId: string): void { + this.closingSessions.set(sessionId, (this.closingSessions.get(sessionId) ?? 0) + 1); + } + + private endSessionCloseFence(sessionId: string): void { + const count = this.closingSessions.get(sessionId) ?? 0; + if (count <= 1) { + this.closingSessions.delete(sessionId); + return; + } + this.closingSessions.set(sessionId, count - 1); + } + + private getSessionGeneration(sessionId: string): number { + return this.sessionGenerations.get(sessionId) ?? 0; + } + + private bumpSessionGeneration(sessionId: string): number { + const generation = this.getSessionGeneration(sessionId) + 1; + this.sessionGenerations.set(sessionId, generation); + return generation; + } + async tryCreateSession(request: acp.NewSessionRequest | acp.ResumeSessionRequest): Promise<[SessionId, SessionModelState, SessionModeState]> { + const requestedSessionGeneration = "sessionId" in request + ? this.beginSessionOpen(request.sessionId) + : null; await this.checkAuthorization(); const requestedMcpServers = request.mcpServers ?? []; const mcpServerStartupVersion = requestedMcpServers.length > 0 @@ -192,16 +281,41 @@ export class CodexAcpServer implements acp.Agent { : null; let sessionMetadata: SessionMetadata; + let resumeSubscribed = false; if ("sessionId" in request) { logger.log(`Resume existing session: ${request.sessionId}...`) - sessionMetadata = await this.runWithProcessCheck(() => this.codexAcpClient.resumeSession(request)); + try { + sessionMetadata = await this.runWithProcessCheck(() => + this.codexAcpClient.resumeSession(request, () => { + resumeSubscribed = true; + }) + ); + } catch (err) { + if (resumeSubscribed && requestedSessionGeneration !== null) { + await this.cleanupStaleSessionOpen(request.sessionId, requestedSessionGeneration); + } + throw err; + } } else { logger.log(`Create new session...`) sessionMetadata = await this.runWithProcessCheck(() => this.codexAcpClient.newSession(request)); } - const account = await this.getActiveAccount(); const {sessionId, currentModelId, models} = sessionMetadata; + let account: Account | null; + try { + account = await this.getActiveAccount(); + } catch (err) { + if (resumeSubscribed && requestedSessionGeneration !== null) { + await this.cleanupStaleSessionOpen(sessionId, requestedSessionGeneration); + } + throw err; + } + const sessionGeneration = requestedSessionGeneration ?? this.beginSessionOpen(sessionId); + if (!this.sessionOpenCanInstall(sessionId, sessionGeneration)) { + resumeSubscribed = false; + await this.closeStaleSessionOpen(sessionId, sessionGeneration); + } const sessionMcpServers = this.resolveSessionMcpServers(requestedMcpServers, "sessionId" in request); const currentModel = this.findCurrentModel(models, currentModelId); const currentModelSupportsFast = modelSupportsFast(currentModel); @@ -223,6 +337,7 @@ export class CodexAcpServer implements acp.Agent { sessionMcpServers: sessionMcpServers, } this.sessions.set(sessionId, sessionState); + resumeSubscribed = false; if (requestedMcpServers.length > 0 && mcpServerStartupVersion !== null) { this.pendingMcpStartupSessions.set(sessionId, { @@ -292,6 +407,40 @@ export class CodexAcpServer implements acp.Agent { return await this.runWithProcessCheck(() => this.codexAcpClient.listSessions(params)); } + async closeSession(params: acp.CloseSessionRequest): Promise { + logger.log("Closing session...", {sessionId: params.sessionId}); + const closeGeneration = this.bumpSessionGeneration(params.sessionId); + const sessionState = this.sessions.get(params.sessionId); + this.beginSessionCloseFence(params.sessionId); + + try { + if (sessionState) { + await this.interruptSessionTurn(sessionState, "Close", true); + } else { + logger.log("Close request received for unknown local session", {sessionId: params.sessionId}); + } + + const activePrompt = this.activePrompts.get(params.sessionId); + if (activePrompt) { + activePrompt.requestClose(); + await activePrompt.completion; + } + + await this.runWithProcessCheck(() => this.codexAcpClient.closeSession(params.sessionId)); + logger.log("Session closed", {sessionId: params.sessionId}); + } finally { + if (this.getSessionGeneration(params.sessionId) === closeGeneration) { + this.sessions.delete(params.sessionId); + this.pendingMcpStartupSessions.delete(params.sessionId); + this.pendingTurnStarts.delete(params.sessionId); + this.activePrompts.delete(params.sessionId); + } + this.endSessionCloseFence(params.sessionId); + } + + return {}; + } + async newSession( params: acp.NewSessionRequest, ): Promise { @@ -456,6 +605,7 @@ export class CodexAcpServer implements acp.Agent { modeState: SessionModeState; thread: Thread; }> { + const requestedSessionGeneration = this.beginSessionOpen(request.sessionId); await this.checkAuthorization(); const requestedMcpServers = request.mcpServers ?? []; const mcpServerStartupVersion = requestedMcpServers.length > 0 @@ -463,12 +613,35 @@ export class CodexAcpServer implements acp.Agent { : null; logger.log(`Load existing session: ${request.sessionId}...`); - const sessionMetadata: SessionMetadataWithThread = await this.runWithProcessCheck(() => - this.codexAcpClient.loadSession(request) - ); + let subscribed = false; + let sessionMetadata: SessionMetadataWithThread; + try { + sessionMetadata = await this.runWithProcessCheck(() => + this.codexAcpClient.loadSession(request, () => { + subscribed = true; + }) + ); + } catch (err) { + if (subscribed) { + await this.cleanupStaleSessionOpen(request.sessionId, requestedSessionGeneration); + } + throw err; + } - const account = await this.getActiveAccount(); const {sessionId, currentModelId, models, thread} = sessionMetadata; + let account: Account | null; + try { + account = await this.getActiveAccount(); + } catch (err) { + if (subscribed) { + await this.cleanupStaleSessionOpen(request.sessionId, requestedSessionGeneration); + } + throw err; + } + if (!this.sessionOpenCanInstall(sessionId, requestedSessionGeneration)) { + subscribed = false; + await this.closeStaleSessionOpen(sessionId, requestedSessionGeneration); + } const sessionMcpServers = this.resolveSessionMcpServers(requestedMcpServers, true); const currentModel = this.findCurrentModel(models, currentModelId); const currentModelSupportsFast = modelSupportsFast(currentModel); @@ -490,6 +663,7 @@ export class CodexAcpServer implements acp.Agent { sessionMcpServers: sessionMcpServers, }; this.sessions.set(sessionId, sessionState); + subscribed = false; if (requestedMcpServers.length > 0 && mcpServerStartupVersion !== null) { this.pendingMcpStartupSessions.set(sessionId, { @@ -778,11 +952,18 @@ export class CodexAcpServer implements acp.Agent { pendingStartup.afterVersion, ) ); + if (!this.sessions.has(sessionId) + || this.sessionIsClosing(sessionId) + || this.pendingMcpStartupSessions.get(sessionId) !== pendingStartup) { + return; + } await this.publishMcpStartupStatus(sessionId, mcpStartup, pendingStartup.requestedServers); } catch (err) { logger.error(`Failed to publish MCP startup status for session ${sessionId}`, err); } finally { - this.pendingMcpStartupSessions.delete(sessionId); + if (this.pendingMcpStartupSessions.get(sessionId) === pendingStartup) { + this.pendingMcpStartupSessions.delete(sessionId); + } } } @@ -807,6 +988,141 @@ export class CodexAcpServer implements acp.Agent { } } + private trackActivePrompt(sessionId: string): ActivePrompt { + let resolveCompletion: () => void = () => {}; + const completion = new Promise((resolve) => { + resolveCompletion = resolve; + }); + let resolveCloseSignal: (value: null) => void = () => {}; + const closeSignal = new Promise((resolve) => { + resolveCloseSignal = resolve; + }); + + let completed = false; + let closeRequested = false; + const activePrompt: ActivePrompt = { + completion, + closeSignal, + requestClose: () => { + if (closeRequested) { + return; + } + closeRequested = true; + resolveCloseSignal(null); + }, + complete: () => { + if (completed) { + return; + } + completed = true; + if (this.activePrompts.get(sessionId) === activePrompt) { + this.activePrompts.delete(sessionId); + } + resolveCompletion(); + }, + }; + + this.activePrompts.set(sessionId, activePrompt); + return activePrompt; + } + + private createPendingTurnStart(): PendingTurnStart { + let resolve: (turnId: string | null) => void = () => {}; + const promise = new Promise((innerResolve) => { + resolve = innerResolve; + }); + return {promise, resolve}; + } + + private interruptLateStartedTurn(sessionId: string, turnId: string): void { + this.codexAcpClient.markTurnStale({ + threadId: sessionId, + turnId, + }); + void this.runWithProcessCheck(() => this.codexAcpClient.turnInterrupt({ + threadId: sessionId, + turnId, + })).catch((err) => { + logger.error(`Close - late turnInterrupt failed`, err); + }).finally(() => { + this.codexAcpClient.resolveTurnInterrupted({ + threadId: sessionId, + turnId, + }); + }); + } + + private promptIsClosedOrStale(sessionId: string, activePrompt: ActivePrompt): boolean { + return this.activePrompts.get(sessionId) !== activePrompt || this.sessionIsClosing(sessionId); + } + + private async interruptSessionTurn( + sessionState: SessionState, + requestName: "Cancel" | "Close", + resolveInterruptedTurn: boolean, + ): Promise { + const turnId = await this.getInterruptibleTurnId(sessionState, requestName); + if (!turnId) { + return; + } + + logger.log(`${requestName} session requested`, { + sessionId: sessionState.sessionId, + currentTurnId: turnId, + }); + if (resolveInterruptedTurn) { + this.codexAcpClient.markTurnStale({ + threadId: sessionState.sessionId, + turnId, + }); + } + try { + await this.runWithProcessCheck(() => this.codexAcpClient.turnInterrupt({ + threadId: sessionState.sessionId, + turnId, + })); + logger.log(`${requestName} - turnInterrupt succeeded`, { + sessionId: sessionState.sessionId, + currentTurnId: turnId, + }); + } catch (err) { + logger.error(`${requestName} - turnInterrupt failed`, err); + } finally { + if (resolveInterruptedTurn) { + this.codexAcpClient.resolveTurnInterrupted({ + threadId: sessionState.sessionId, + turnId, + }); + } + } + } + + private async getInterruptibleTurnId( + sessionState: SessionState, + requestName: "Cancel" | "Close", + ): Promise { + if (sessionState.currentTurnId) { + return sessionState.currentTurnId; + } + + const pendingTurnStart = this.pendingTurnStarts.get(sessionState.sessionId); + if (!pendingTurnStart) { + logger.log(`${requestName} request rejected: no current turn`, {sessionId: sessionState.sessionId}); + return null; + } + + if (requestName === "Close") { + pendingTurnStart.resolve(null); + return null; + } + + const turnId = await pendingTurnStart.promise; + if (!turnId) { + logger.log(`${requestName} request rejected: no current turn`, {sessionId: sessionState.sessionId}); + } + return turnId; + } + async prompt(params: acp.PromptRequest): Promise { logger.log("Prompt received", { sessionId: params.sessionId, @@ -815,6 +1131,8 @@ export class CodexAcpServer implements acp.Agent { const sessionState = this.getSessionState(params.sessionId); sessionState.currentTurnId = null; sessionState.lastTokenUsage = null; + const activePrompt = this.trackActivePrompt(params.sessionId); + let pendingTurnStart: PendingTurnStart | null = null; try { const eventHandler = new CodexEventHandler(this.connection, sessionState); @@ -837,6 +1155,14 @@ export class CodexAcpServer implements acp.Agent { }; } + if (this.sessionIsClosing(params.sessionId)) { + return { + stopReason: "cancelled", + usage: this.buildPromptUsage(sessionState.lastTokenUsage), + _meta: this.buildQuotaMeta(sessionState), + }; + } + const modelId = ModelId.fromString(sessionState.currentModelId); const modelLacksReasoning = sessionState.supportedReasoningEfforts.length > 0 && sessionState.supportedReasoningEfforts.every(e => e.reasoningEffort === "none"); @@ -857,21 +1183,58 @@ export class CodexAcpServer implements acp.Agent { sessionState.fastModeEnabled, sessionState.currentModelSupportsFast, ); - const turnCompleted = await this.runWithProcessCheck( - () => this.codexAcpClient.sendPrompt(params, agentMode, modelId, serviceTier, disableSummary, sessionState.cwd)); + pendingTurnStart = this.createPendingTurnStart(); + this.pendingTurnStarts.set(params.sessionId, pendingTurnStart); + const sendPromptPromise = this.runWithProcessCheck( + () => this.codexAcpClient.sendPrompt( + params, + agentMode, + modelId, + serviceTier, + disableSummary, + sessionState.cwd, + (turnId) => { + if (this.promptIsClosedOrStale(params.sessionId, activePrompt)) { + this.interruptLateStartedTurn(params.sessionId, turnId); + return; + } + sessionState.currentTurnId = turnId; + pendingTurnStart?.resolve(turnId); + }, + () => this.promptIsClosedOrStale(params.sessionId, activePrompt), + )); + void sendPromptPromise.catch((err) => { + if (this.activePrompts.get(params.sessionId) !== activePrompt) { + logger.error(`Prompt for closed session ${params.sessionId} failed after close`, err); + } + }); + const turnCompleted = await Promise.race([ + sendPromptPromise, + activePrompt.closeSignal, + ]); + + if (turnCompleted === null) { + return { + stopReason: "cancelled", + usage: this.buildPromptUsage(sessionState.lastTokenUsage), + _meta: this.buildQuotaMeta(sessionState), + }; + } // Check if turn was interrupted (cancelled) if (turnCompleted.turn.status === "interrupted") { - await this.connection.sessionUpdate({ - sessionId: params.sessionId, - update: { - sessionUpdate: "agent_message_chunk", - content: { - type: "text", - text: "*Conversation interrupted*" + if (!this.sessionIsClosing(params.sessionId) && this.sessions.has(params.sessionId)) { + await this.connection.sessionUpdate({ + sessionId: params.sessionId, + update: { + sessionUpdate: "agent_message_chunk", + content: { + type: "text", + text: "*Conversation interrupted*" + } } - } - }); + }); + } return { stopReason: "cancelled", usage: this.buildPromptUsage(sessionState.lastTokenUsage), @@ -896,6 +1259,11 @@ export class CodexAcpServer implements acp.Agent { } finally { logger.log("Prompt completed", {sessionId: params.sessionId}); sessionState.currentTurnId = null; + if (pendingTurnStart !== null && this.pendingTurnStarts.get(params.sessionId) === pendingTurnStart) { + this.pendingTurnStarts.delete(params.sessionId); + } + pendingTurnStart?.resolve(null); + activePrompt.complete(); } } @@ -948,28 +1316,8 @@ export class CodexAcpServer implements acp.Agent { return; } - if (!sessionState.currentTurnId) { - logger.log("Cancel request rejected: no current turn", {sessionId: params.sessionId}); - return; - } - - logger.log("Cancel session requested", { - sessionId: params.sessionId, - currentTurnId: sessionState.currentTurnId - }); - try { - // After turnInterrupt(), Codex will send turn/completed event, which will naturally complete awaitTurnCompleted() - await this.codexAcpClient.turnInterrupt({ - threadId: params.sessionId, - turnId: sessionState.currentTurnId - }); - logger.log("Cancel - turnInterrupt succeeded", { - sessionId: params.sessionId, - currentTurnId: sessionState.currentTurnId - }); - } catch (err) { - logger.error(`Cancel - turnInterrupt failed`, err); - } + // After turnInterrupt(), Codex will send turn/completed, which naturally completes awaitTurnCompleted(). + await this.interruptSessionTurn(sessionState, "Cancel", false); } } diff --git a/src/CodexAppServerClient.ts b/src/CodexAppServerClient.ts index 0d2d0832..df9309eb 100644 --- a/src/CodexAppServerClient.ts +++ b/src/CodexAppServerClient.ts @@ -33,6 +33,8 @@ import type { ThreadResumeResponse, ThreadStartParams, ThreadStartResponse, + ThreadUnsubscribeParams, + ThreadUnsubscribeResponse, TurnCompletedNotification, TurnInterruptParams, TurnInterruptResponse, @@ -95,6 +97,7 @@ export class CodexAppServerClient { private readonly mcpServerStartupResolvers: Array = []; private readonly pendingTurnCompletionResolvers = new Map void>>(); private readonly turnCompletionCaptures = new Map void>>(); + private readonly staleTurnIds = new Map>(); constructor(connection: MessageConnection) { this.connection = connection; @@ -112,6 +115,17 @@ export class CodexAppServerClient { if (isTurnCompletedNotification(serverNotification)) { this.recordTurnCompleted(serverNotification.params); } + const routing = extractTurnRouting(serverNotification); + const staleTurnNotification = this.isStaleTurn(routing.threadId, routing.turnId); + if (staleTurnNotification) { + if (isTurnCompletedNotification(serverNotification) && routing.threadId !== null && routing.turnId !== null) { + this.clearStaleTurn(routing.threadId, routing.turnId); + } + for (const callback of this.codexEventHandlers) { + callback({ eventType: "notification", ...serverNotification }); + } + return; + } this.notify(serverNotification); for (const callback of this.codexEventHandlers) { callback({ eventType: "notification", ...serverNotification }); @@ -119,6 +133,9 @@ export class CodexAppServerClient { }); this.connection.onRequest(CommandExecutionApprovalRequest, async (params) => { + if (this.isStaleTurn(params.threadId, params.turnId)) { + return { decision: "cancel" }; + } const handler = this.approvalHandlers.get(params.threadId); if (!handler) { return { decision: "cancel" }; @@ -127,6 +144,9 @@ export class CodexAppServerClient { }); this.connection.onRequest(FileChangeApprovalRequest, async (params) => { + if (this.isStaleTurn(params.threadId, params.turnId)) { + return { decision: "cancel" }; + } const handler = this.approvalHandlers.get(params.threadId); if (!handler) { return { decision: "cancel" }; @@ -135,6 +155,9 @@ export class CodexAppServerClient { }); this.connection.onRequest(McpServerElicitationRequest, async (params) => { + if (this.isStaleTurn(params.threadId, params.turnId)) { + return { action: "cancel", content: null, _meta: null }; + } const handler = this.elicitationHandlers.get(params.threadId); if (!handler) { return { action: "cancel", content: null, _meta: null }; @@ -151,6 +174,12 @@ export class CodexAppServerClient { this.elicitationHandlers.set(threadId, handler); } + clearThreadHandlers(threadId: string): void { + this.notificationHandlers.delete(threadId); + this.approvalHandlers.delete(threadId); + this.elicitationHandlers.delete(threadId); + } + async initialize(params: InitializeParams): Promise { return await this.sendRequest({ method: "initialize", params: params }); } @@ -159,7 +188,7 @@ export class CodexAppServerClient { return await this.sendRequest({ method: "turn/start", params: params }); } - async runTurn(params: TurnStartParams): Promise { + async runTurn(params: TurnStartParams, onTurnStarted?: (turnId: string) => void): Promise { const capturedCompletions: Array = []; const releaseCapture = this.captureTurnCompletions(params.threadId, (event) => { capturedCompletions.push(event); @@ -167,6 +196,7 @@ export class CodexAppServerClient { try { const turnStarted = await this.turnStart(params); + onTurnStarted?.(turnStarted.turn.id); const earlyCompletion = capturedCompletions.find(event => event.turn.id === turnStarted.turn.id); releaseCapture(); if (earlyCompletion) { @@ -184,6 +214,12 @@ export class CodexAppServerClient { return await this.sendRequest({ method: "turn/interrupt", params: params }); } + markTurnStale(threadId: string, turnId: string): void { + const threadStaleTurns = this.staleTurnIds.get(threadId) ?? new Set(); + threadStaleTurns.add(turnId); + this.staleTurnIds.set(threadId, threadStaleTurns); + } + async threadStart(params: ThreadStartParams): Promise { return await this.sendRequest({ method: "thread/start", params: params }); } @@ -204,6 +240,10 @@ export class CodexAppServerClient { return await this.sendRequest({ method: "thread/read", params: params }); } + async threadUnsubscribe(params: ThreadUnsubscribeParams): Promise { + return await this.sendRequest({ method: "thread/unsubscribe", params: params }); + } + async listMcpServerStatus(params: ListMcpServerStatusParams): Promise { return await this.sendRequest({ method: "mcpServerStatus/list", params }); } @@ -256,6 +296,21 @@ export class CodexAppServerClient { }); } + resolveTurnInterrupted(threadId: string, turnId: string): void { + this.recordTurnCompleted({ + threadId, + turn: { + id: turnId, + items: [], + status: "interrupted", + error: null, + startedAt: null, + completedAt: null, + durationMs: null, + }, + }); + } + async listModels(params: ModelListParams): Promise { return await this.sendRequest({ method: "model/list", params }); } @@ -313,6 +368,24 @@ export class CodexAppServerClient { } } + private isStaleTurn(threadId: string | null, turnId: string | null): boolean { + if (threadId === null || turnId === null) { + return false; + } + return this.staleTurnIds.get(threadId)?.has(turnId) ?? false; + } + + private clearStaleTurn(threadId: string, turnId: string): void { + const threadStaleTurns = this.staleTurnIds.get(threadId); + if (!threadStaleTurns) { + return; + } + threadStaleTurns.delete(turnId); + if (threadStaleTurns.size === 0) { + this.staleTurnIds.delete(threadId); + } + } + private getOrCreatePendingTurnCompletionResolvers(threadId: string): Map void> { const existing = this.pendingTurnCompletionResolvers.get(threadId); if (existing) { @@ -447,3 +520,19 @@ function extractThreadId(notification: ServerNotification): string | null { } return null; } + +function extractTurnRouting(notification: ServerNotification): { threadId: string | null, turnId: string | null } { + const params = notification.params as { + threadId?: unknown, + turnId?: unknown, + turn?: { id?: unknown }, + } | undefined; + const threadId = extractThreadId(notification); + if (params && typeof params.turnId === "string") { + return {threadId, turnId: params.turnId}; + } + if (params && typeof params.turn?.id === "string") { + return {threadId, turnId: params.turn.id}; + } + return {threadId, turnId: null}; +} diff --git a/src/__tests__/CodexACPAgent/data/session-close-active-turn.json b/src/__tests__/CodexACPAgent/data/session-close-active-turn.json new file mode 100644 index 00000000..c427fc06 --- /dev/null +++ b/src/__tests__/CodexACPAgent/data/session-close-active-turn.json @@ -0,0 +1,21 @@ +{ + "eventType": "request", + "method": "turn/interrupt", + "params": { + "threadId": "session-id", + "turnId": "turn-id" + } +} +{ + "eventType": "response" +} +{ + "eventType": "request", + "method": "thread/unsubscribe", + "params": { + "threadId": "session-id" + } +} +{ + "eventType": "response" +} \ No newline at end of file diff --git a/src/__tests__/CodexACPAgent/data/session-close-idle.json b/src/__tests__/CodexACPAgent/data/session-close-idle.json new file mode 100644 index 00000000..1cdd0eb9 --- /dev/null +++ b/src/__tests__/CodexACPAgent/data/session-close-idle.json @@ -0,0 +1,10 @@ +{ + "eventType": "request", + "method": "thread/unsubscribe", + "params": { + "threadId": "session-id" + } +} +{ + "eventType": "response" +} \ No newline at end of file diff --git a/src/__tests__/CodexACPAgent/data/session-close-interrupt-failed.json b/src/__tests__/CodexACPAgent/data/session-close-interrupt-failed.json new file mode 100644 index 00000000..1cdd0eb9 --- /dev/null +++ b/src/__tests__/CodexACPAgent/data/session-close-interrupt-failed.json @@ -0,0 +1,10 @@ +{ + "eventType": "request", + "method": "thread/unsubscribe", + "params": { + "threadId": "session-id" + } +} +{ + "eventType": "response" +} \ No newline at end of file diff --git a/src/__tests__/CodexACPAgent/initialize.test.ts b/src/__tests__/CodexACPAgent/initialize.test.ts index bf6e0604..0d6e3af2 100644 --- a/src/__tests__/CodexACPAgent/initialize.test.ts +++ b/src/__tests__/CodexACPAgent/initialize.test.ts @@ -49,6 +49,7 @@ describe('CodexACPAgent - initialize', () => { sessionCapabilities: { resume: {}, list: {}, + close: {}, }, mcpCapabilities: { acp: false, diff --git a/src/__tests__/CodexACPAgent/session-close.test.ts b/src/__tests__/CodexACPAgent/session-close.test.ts new file mode 100644 index 00000000..a4d09199 --- /dev/null +++ b/src/__tests__/CodexACPAgent/session-close.test.ts @@ -0,0 +1,517 @@ +import {describe, expect, it, vi} from "vitest"; +import { + createCodexMockTestFixture, + createTestModel, + mockPromptTurn, + type CodexMockTestFixture, +} from "../acp-test-utils"; +import type {CodexAcpServer} from "../../CodexAcpServer"; +import type {CodexAcpClient, SessionMetadata} from "../../CodexAcpClient"; +import type {McpStartupResult} from "../../CodexAppServerClient"; +import type {TurnStartResponse} from "../../app-server/v2"; +import type {McpServer} from "@agentclientprotocol/sdk"; + +const sessionId = "session-id"; + +describe("ACP session close", () => { + it("advertises session close support", async () => { + const fixture = createCodexMockTestFixture(); + + const response = await fixture.getCodexAcpAgent().initialize({protocolVersion: 1}); + + expect(response.agentCapabilities?.sessionCapabilities?.close).toEqual({}); + }); + + it("unsubscribes idle sessions and clears local session handlers", async () => { + const {fixture, codexAcpAgent} = await createSession(); + + mockPromptTurn(fixture, sessionId); + await codexAcpAgent.prompt({ + sessionId, + prompt: [{type: "text", text: "register session handlers"}], + }); + + fixture.clearCodexConnectionDump(); + fixture.clearAcpConnectionDump(); + + await codexAcpAgent.closeSession({sessionId}); + + await expect(fixture.getCodexConnectionDump([])).toMatchFileSnapshot("data/session-close-idle.json"); + expect(() => codexAcpAgent.getSessionState(sessionId)).toThrow(`Session ${sessionId} not found`); + + fixture.sendServerNotification({ + method: "thread/name/updated", + params: { + threadId: sessionId, + threadName: "Ignored after close", + }, + }); + await waitForMicrotasks(); + + expect(fixture.getAcpConnectionEvents([])).toEqual([]); + }); + + it("interrupts active turns before unsubscribing", async () => { + const {fixture, codexAcpAgent} = await createSession(); + codexAcpAgent.getSessionState(sessionId).currentTurnId = "turn-id"; + + await codexAcpAgent.closeSession({sessionId}); + + await expect(fixture.getCodexConnectionDump([])).toMatchFileSnapshot("data/session-close-active-turn.json"); + expect(() => codexAcpAgent.getSessionState(sessionId)).toThrow(`Session ${sessionId} not found`); + }); + + it("does not wait for delayed turn start before closing", async () => { + const {fixture, codexAcpAgent} = await createSession(); + const turnStart = deferred(); + const turnStartCalled = deferred(); + + vi.spyOn(fixture.getCodexAppServerClient(), "turnStart").mockImplementation(async () => { + turnStartCalled.resolve(); + return await turnStart.promise; + }); + + const promptPromise = codexAcpAgent.prompt({ + sessionId, + prompt: [{type: "text", text: "long running prompt"}], + }); + await turnStartCalled.promise; + fixture.clearCodexConnectionDump(); + + const closePromise = codexAcpAgent.closeSession({sessionId}); + + await expect(closePromise).resolves.toEqual({}); + await expect(promptPromise).resolves.toMatchObject({stopReason: "cancelled"}); + + const requestMethods = fixture.getCodexConnectionEvents([]) + .flatMap(event => event.eventType === "request" ? [event.method] : []); + expect(requestMethods).toEqual(["thread/unsubscribe"]); + expect(fixture.getAcpConnectionDump([])).not.toContain("Conversation interrupted"); + expect(() => codexAcpAgent.getSessionState(sessionId)).toThrow(`Session ${sessionId} not found`); + + fixture.clearCodexConnectionDump(); + turnStart.resolve(createTurnStartResponse("turn-id")); + + await vi.waitFor(() => { + const lateRequestMethods = fixture.getCodexConnectionEvents([]) + .flatMap(event => event.eventType === "request" ? [event.method] : []); + expect(lateRequestMethods).toContain("turn/interrupt"); + }); + }); + + it("does not start a turn after close while prompt startup is still refreshing skills", async () => { + const {fixture, codexAcpAgent} = await createSession(); + const skillRefresh = deferred<{data: []}>(); + const listSkillsSpy = vi.spyOn(fixture.getCodexAppServerClient(), "listSkills") + .mockReturnValue(skillRefresh.promise); + const turnStartSpy = vi.spyOn(fixture.getCodexAppServerClient(), "turnStart") + .mockResolvedValue(createTurnStartResponse("turn-id")); + + const promptPromise = codexAcpAgent.prompt({ + sessionId, + prompt: [{type: "text", text: "prompt before turn start"}], + }); + + await vi.waitFor(() => { + expect(listSkillsSpy).toHaveBeenCalled(); + }); + + await expect(codexAcpAgent.closeSession({sessionId})).resolves.toEqual({}); + await expect(promptPromise).resolves.toMatchObject({stopReason: "cancelled"}); + + skillRefresh.resolve({data: []}); + await waitForMicrotasks(); + + expect(turnStartSpy).not.toHaveBeenCalled(); + expect(() => codexAcpAgent.getSessionState(sessionId)).toThrow(`Session ${sessionId} not found`); + }); + + it("does not hang when close interrupt fails during an active prompt", async () => { + const {fixture, codexAcpAgent} = await createSession(); + const turnInterruptSpy = vi.spyOn(fixture.getCodexAcpClient(), "turnInterrupt") + .mockRejectedValue(new Error("interrupt failed")); + + vi.spyOn(fixture.getCodexAppServerClient(), "turnStart").mockResolvedValue(createTurnStartResponse("turn-id")); + + const promptPromise = codexAcpAgent.prompt({ + sessionId, + prompt: [{type: "text", text: "long running prompt"}], + }); + + await vi.waitFor(() => { + expect(codexAcpAgent.getSessionState(sessionId).currentTurnId).toBe("turn-id"); + }); + fixture.clearCodexConnectionDump(); + + await expect(codexAcpAgent.closeSession({sessionId})).resolves.toEqual({}); + await expect(promptPromise).resolves.toMatchObject({stopReason: "cancelled"}); + + expect(turnInterruptSpy).toHaveBeenCalledWith({ + threadId: sessionId, + turnId: "turn-id", + }); + await expect(fixture.getCodexConnectionDump([])).toMatchFileSnapshot( + "data/session-close-interrupt-failed.json" + ); + expect(() => codexAcpAgent.getSessionState(sessionId)).toThrow(`Session ${sessionId} not found`); + }); + + it("suppresses MCP startup updates while close is in progress", async () => { + const mcpStartup = deferred(); + const mcpServer: McpServer = { + name: "broken-mcp", + command: "npx", + args: ["broken"], + env: [], + }; + const {fixture, codexAcpAgent, codexAcpClient} = await createSession({ + mcpServers: [mcpServer], + configure: ({codexAcpClient}) => { + vi.spyOn(codexAcpClient, "awaitMcpServerStartup").mockReturnValue(mcpStartup.promise); + }, + }); + const unsubscribe = deferred(); + vi.spyOn(codexAcpClient, "closeSession").mockReturnValue(unsubscribe.promise); + + await vi.waitFor(() => { + expect(codexAcpClient.awaitMcpServerStartup).toHaveBeenCalledWith(["broken-mcp"], expect.any(Number)); + }); + fixture.clearAcpConnectionDump(); + + const closePromise = codexAcpAgent.closeSession({sessionId}); + await vi.waitFor(() => { + expect(codexAcpClient.closeSession).toHaveBeenCalledWith(sessionId); + }); + + mcpStartup.resolve({ + ready: [], + failed: [{server: "broken-mcp", error: "boom"}], + cancelled: [], + }); + await waitForMicrotasks(); + + expect(fixture.getAcpConnectionEvents([])).toEqual([]); + + unsubscribe.resolve(undefined); + await closePromise; + }); + + it("rejects an in-flight resume that completes after close", async () => { + const {codexAcpAgent, codexAcpClient} = await createSession(); + const resume = deferred(); + vi.spyOn(codexAcpClient, "resumeSession").mockReturnValue(resume.promise); + const closeSessionSpy = vi.spyOn(codexAcpClient, "closeSession"); + + const resumePromise = codexAcpAgent.resumeSession({ + sessionId, + cwd: "/test/cwd", + mcpServers: [], + }); + + await codexAcpAgent.closeSession({sessionId}); + resume.resolve(createSessionMetadata()); + + await expect(resumePromise).rejects.toThrow("Invalid request"); + expect(closeSessionSpy).toHaveBeenCalledTimes(2); + expect(closeSessionSpy).toHaveBeenLastCalledWith(sessionId); + expect(() => codexAcpAgent.getSessionState(sessionId)).toThrow(`Session ${sessionId} not found`); + }); + + it("does not let a stale resume unsubscribe a newer reopened session", async () => { + const {codexAcpAgent, codexAcpClient} = await createSession(); + const staleResume = deferred(); + vi.spyOn(codexAcpClient, "resumeSession") + .mockReturnValueOnce(staleResume.promise) + .mockResolvedValueOnce(createSessionMetadata()); + const closeSessionSpy = vi.spyOn(codexAcpClient, "closeSession"); + + const staleResumePromise = codexAcpAgent.resumeSession({ + sessionId, + cwd: "/test/cwd", + mcpServers: [], + }); + + await codexAcpAgent.closeSession({sessionId}); + await codexAcpAgent.resumeSession({ + sessionId, + cwd: "/test/cwd", + mcpServers: [], + }); + + staleResume.resolve(createSessionMetadata()); + + await expect(staleResumePromise).rejects.toThrow("Invalid request"); + expect(closeSessionSpy).toHaveBeenCalledTimes(1); + expect(codexAcpAgent.getSessionState(sessionId).sessionId).toBe(sessionId); + }); + + it("blocks reopen while stale resume cleanup is unsubscribing", async () => { + const {codexAcpAgent, codexAcpClient} = await createSession(); + const staleResume = deferred(); + const staleUnsubscribe = deferred(); + const resumeSpy = vi.spyOn(codexAcpClient, "resumeSession") + .mockReturnValueOnce(staleResume.promise) + .mockResolvedValueOnce(createSessionMetadata()); + const closeSessionSpy = vi.spyOn(codexAcpClient, "closeSession") + .mockResolvedValueOnce() + .mockReturnValueOnce(staleUnsubscribe.promise); + + const staleResumePromise = codexAcpAgent.resumeSession({ + sessionId, + cwd: "/test/cwd", + mcpServers: [], + }); + + await codexAcpAgent.closeSession({sessionId}); + staleResume.resolve(createSessionMetadata()); + + await vi.waitFor(() => { + expect(closeSessionSpy).toHaveBeenCalledTimes(2); + }); + + await expect(codexAcpAgent.resumeSession({ + sessionId, + cwd: "/test/cwd", + mcpServers: [], + })).rejects.toThrow("Invalid request"); + expect(resumeSpy).toHaveBeenCalledTimes(1); + + staleUnsubscribe.resolve(undefined); + await expect(staleResumePromise).rejects.toThrow("Invalid request"); + + await expect(codexAcpAgent.resumeSession({ + sessionId, + cwd: "/test/cwd", + mcpServers: [], + })).resolves.toEqual(expect.objectContaining({ + models: expect.objectContaining({ + currentModelId: "model-id[medium]", + }), + })); + }); + + it("preserves local close cleanup while stale resume cleanup overlaps close", async () => { + const {codexAcpAgent, codexAcpClient} = await createSession(); + const staleResume = deferred(); + const activeUnsubscribe = deferred(); + const staleUnsubscribe = deferred(); + vi.spyOn(codexAcpClient, "resumeSession").mockReturnValue(staleResume.promise); + const closeSessionSpy = vi.spyOn(codexAcpClient, "closeSession") + .mockReturnValueOnce(activeUnsubscribe.promise) + .mockReturnValueOnce(staleUnsubscribe.promise); + + const staleResumePromise = codexAcpAgent.resumeSession({ + sessionId, + cwd: "/test/cwd", + mcpServers: [], + }); + const closePromise = codexAcpAgent.closeSession({sessionId}); + + await vi.waitFor(() => { + expect(closeSessionSpy).toHaveBeenCalledTimes(1); + }); + + staleResume.resolve(createSessionMetadata()); + await vi.waitFor(() => { + expect(closeSessionSpy).toHaveBeenCalledTimes(2); + }); + + activeUnsubscribe.resolve(undefined); + await expect(closePromise).resolves.toEqual({}); + expect(() => codexAcpAgent.getSessionState(sessionId)).toThrow(`Session ${sessionId} not found`); + + staleUnsubscribe.resolve(undefined); + await expect(staleResumePromise).rejects.toThrow("Invalid request"); + }); + + it("unsubscribes a resume that fails after app-server subscription", async () => { + const fixture = createCodexMockTestFixture(); + const codexAcpAgent = fixture.getCodexAcpAgent(); + const codexAcpClient = fixture.getCodexAcpClient(); + vi.spyOn(codexAcpClient, "authRequired").mockResolvedValue(false); + vi.spyOn(codexAcpClient, "resumeSession").mockImplementation(async (_request, onSubscribed) => { + onSubscribed?.(); + throw new Error("model list failed"); + }); + const closeSessionSpy = vi.spyOn(codexAcpClient, "closeSession").mockResolvedValue(); + + await expect(codexAcpAgent.resumeSession({ + sessionId, + cwd: "/test/cwd", + mcpServers: [], + })).rejects.toThrow("model list failed"); + + expect(closeSessionSpy).toHaveBeenCalledWith(sessionId); + }); + + it("unsubscribes a resume that fails during account read after app-server subscription", async () => { + const fixture = createCodexMockTestFixture(); + const codexAcpAgent = fixture.getCodexAcpAgent(); + const codexAcpClient = fixture.getCodexAcpClient(); + vi.spyOn(codexAcpClient, "authRequired").mockResolvedValue(false); + vi.spyOn(codexAcpClient, "resumeSession").mockImplementation(async (_request, onSubscribed) => { + onSubscribed?.(); + return createSessionMetadata(); + }); + vi.spyOn(codexAcpClient, "getAccount").mockRejectedValue(new Error("account read failed")); + const closeSessionSpy = vi.spyOn(codexAcpClient, "closeSession").mockResolvedValue(); + + await expect(codexAcpAgent.resumeSession({ + sessionId, + cwd: "/test/cwd", + mcpServers: [], + })).rejects.toThrow("account read failed"); + + expect(closeSessionSpy).toHaveBeenCalledWith(sessionId); + }); + + it("does not route stale turn notifications into a reopened session", async () => { + const {fixture, codexAcpAgent, codexAcpClient} = await createSession(); + const oldTurnStart = deferred(); + const turnStartSpy = vi.spyOn(fixture.getCodexAppServerClient(), "turnStart") + .mockReturnValueOnce(oldTurnStart.promise) + .mockResolvedValueOnce(createTurnStartResponse("new-turn")); + vi.spyOn(fixture.getCodexAppServerClient(), "awaitTurnCompleted").mockResolvedValue({ + threadId: sessionId, + turn: createCompletedTurn("new-turn"), + }); + + const promptPromise = codexAcpAgent.prompt({ + sessionId, + prompt: [{type: "text", text: "old prompt"}], + }); + await vi.waitFor(() => { + expect(turnStartSpy).toHaveBeenCalledTimes(1); + }); + + await expect(codexAcpAgent.closeSession({sessionId})).resolves.toEqual({}); + await expect(promptPromise).resolves.toMatchObject({stopReason: "cancelled"}); + + vi.spyOn(codexAcpClient, "resumeSession").mockResolvedValue(createSessionMetadata()); + await codexAcpAgent.resumeSession({ + sessionId, + cwd: "/test/cwd", + mcpServers: [], + }); + await codexAcpAgent.prompt({ + sessionId, + prompt: [{type: "text", text: "new prompt"}], + }); + + fixture.clearAcpConnectionDump(); + oldTurnStart.resolve(createTurnStartResponse("old-turn")); + + await vi.waitFor(() => { + const requestMethods = fixture.getCodexConnectionEvents([]) + .flatMap(event => event.eventType === "request" ? [event.method] : []); + expect(requestMethods).toContain("turn/interrupt"); + }); + + fixture.sendServerNotification({ + method: "item/agentMessage/delta", + params: {threadId: sessionId, turnId: "old-turn", itemId: "old-item", delta: "stale"}, + }); + await waitForMicrotasks(); + expect(fixture.getAcpConnectionEvents([])).toEqual([]); + + fixture.sendServerNotification({ + method: "turn/completed", + params: { + threadId: sessionId, + turn: createCompletedTurn("old-turn"), + }, + }); + fixture.sendServerNotification({ + method: "item/agentMessage/delta", + params: {threadId: sessionId, turnId: "new-turn", itemId: "new-item", delta: "fresh"}, + }); + + await vi.waitFor(() => { + expect(fixture.getAcpConnectionDump([])).toContain("fresh"); + }); + expect(fixture.getAcpConnectionDump([])).not.toContain("stale"); + }); +}); + +async function createSession(options: { + mcpServers?: McpServer[], + configure?: (params: { + fixture: CodexMockTestFixture, + codexAcpAgent: CodexAcpServer, + codexAcpClient: CodexAcpClient, + }) => void, +} = {}): Promise<{ + fixture: CodexMockTestFixture, + codexAcpAgent: CodexAcpServer, + codexAcpClient: CodexAcpClient, +}> { + const fixture = createCodexMockTestFixture(); + const codexAcpAgent = fixture.getCodexAcpAgent(); + const codexAcpClient = fixture.getCodexAcpClient(); + const model = createTestModel(); + + vi.spyOn(codexAcpClient, "authRequired").mockResolvedValue(false); + vi.spyOn(codexAcpClient, "getAccount").mockResolvedValue({account: null, requiresOpenaiAuth: false}); + vi.spyOn(codexAcpClient, "listSkills").mockResolvedValue({data: []}); + vi.spyOn(codexAcpClient, "newSession").mockResolvedValue({ + sessionId, + currentModelId: "model-id[medium]", + models: [model], + currentServiceTier: null, + }); + + options.configure?.({fixture, codexAcpAgent, codexAcpClient}); + + await codexAcpAgent.newSession({cwd: "/test/cwd", mcpServers: options.mcpServers ?? []}); + fixture.clearCodexConnectionDump(); + fixture.clearAcpConnectionDump(); + + return {fixture, codexAcpAgent, codexAcpClient}; +} + +function createTurnStartResponse(turnId: string): TurnStartResponse { + return { + turn: { + id: turnId, + items: [], + status: "inProgress", + error: null, + startedAt: null, + completedAt: null, + durationMs: null, + }, + }; +} + +function createCompletedTurn(turnId: string): TurnStartResponse["turn"] { + return { + id: turnId, + items: [], + status: "completed", + error: null, + startedAt: null, + completedAt: null, + durationMs: null, + }; +} + +function createSessionMetadata(): SessionMetadata { + return { + sessionId, + currentModelId: "model-id[medium]", + models: [createTestModel()], + currentServiceTier: null, + }; +} + +function deferred(): {promise: Promise, resolve: (value: T) => void} { + let resolve: (value: T) => void = () => {}; + const promise = new Promise((innerResolve) => { + resolve = innerResolve; + }); + return {promise, resolve}; +} + +async function waitForMicrotasks(): Promise { + await new Promise(resolve => setTimeout(resolve, 10)); +}