diff --git a/apps/code/src/renderer/features/sessions/service/service.test.ts b/apps/code/src/renderer/features/sessions/service/service.test.ts index 953c1f3df..e046cea59 100644 --- a/apps/code/src/renderer/features/sessions/service/service.test.ts +++ b/apps/code/src/renderer/features/sessions/service/service.test.ts @@ -1163,6 +1163,125 @@ describe("SessionService", () => { }); }); + it("flushes queued cloud messages when cloudStatus flips to in_progress on a connected session", async () => { + const service = getSessionService(); + mockBuildAuthenticatedClient.mockReturnValue(mockAuthenticatedClient); + const queuedMessage = { + id: "q-1", + content: "follow up", + queuedAt: 1700000000, + }; + const sessionWithQueue = createMockSession({ + taskRunId: "run-123", + taskId: "task-123", + status: "connected", + isCloud: true, + cloudStatus: "in_progress", + events: [], + messageQueue: [queuedMessage], + }); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( + sessionWithQueue, + ); + mockSessionStoreSetters.getSessions.mockReturnValue({ + "run-123": sessionWithQueue, + }); + mockSessionStoreSetters.dequeueMessages.mockReturnValue([queuedMessage]); + mockTrpcCloudTask.sendCommand.mutate.mockResolvedValue({ + success: true, + result: { stopReason: "end_turn" }, + }); + + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + undefined, + "https://logs.example.com/run-123", + ); + + const subscribeOptions = mockTrpcCloudTask.onUpdate.subscribe.mock + .calls[0][1] as { + onData: (update: { + kind: "status"; + taskId: string; + runId: string; + status: "in_progress"; + }) => void; + }; + subscribeOptions.onData({ + kind: "status", + taskId: "task-123", + runId: "run-123", + status: "in_progress", + }); + + await vi.waitFor(() => { + expect(mockTrpcCloudTask.sendCommand.mutate).toHaveBeenCalledWith( + expect.objectContaining({ + taskId: "task-123", + method: "user_message", + params: expect.objectContaining({ content: "follow up" }), + }), + ); + }); + }); + + it("does not flush queued cloud messages when cloudStatus flips to in_progress while still connecting", async () => { + const service = getSessionService(); + mockBuildAuthenticatedClient.mockReturnValue(mockAuthenticatedClient); + const queuedMessage = { + id: "q-1", + content: "follow up", + queuedAt: 1700000000, + }; + const sessionWithQueue = createMockSession({ + taskRunId: "run-123", + taskId: "task-123", + status: "connecting", + isCloud: true, + cloudStatus: "queued", + events: [], + messageQueue: [queuedMessage], + }); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( + sessionWithQueue, + ); + mockSessionStoreSetters.getSessions.mockReturnValue({ + "run-123": sessionWithQueue, + }); + + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + undefined, + "https://logs.example.com/run-123", + ); + + const subscribeOptions = mockTrpcCloudTask.onUpdate.subscribe.mock + .calls[0][1] as { + onData: (update: { + kind: "status"; + taskId: string; + runId: string; + status: "in_progress"; + }) => void; + }; + subscribeOptions.onData({ + kind: "status", + taskId: "task-123", + runId: "run-123", + status: "in_progress", + }); + + // Give the setTimeout(0) microtask time to resolve had it been scheduled. + await new Promise((resolve) => setTimeout(resolve, 0)); + expect(mockTrpcCloudTask.sendCommand.mutate).not.toHaveBeenCalled(); + }); + it("re-enqueues queued cloud messages when the dispatch fails", async () => { const service = getSessionService(); const queuedMessage = { @@ -2359,6 +2478,95 @@ describe("SessionService", () => { expect(mockTrpcCloudTask.sendCommand.mutate).not.toHaveBeenCalled(); }); + it("kicks an SSE retry when queueing on a disconnected cloud session", async () => { + const service = getSessionService(); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( + createMockSession({ + isCloud: true, + cloudStatus: "in_progress", + status: "disconnected", + isPromptPending: false, + }), + ); + + const prompt: ContentBlock[] = [{ type: "text", text: "wake me up" }]; + await service.sendPrompt("task-123", prompt); + + await vi.waitFor(() => { + expect(mockTrpcCloudTask.retry.mutate).toHaveBeenCalledWith({ + taskId: "task-123", + runId: "run-123", + }); + }); + }); + + it("kicks an SSE retry when queueing on an errored cloud session", async () => { + const service = getSessionService(); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( + createMockSession({ + isCloud: true, + cloudStatus: "in_progress", + status: "error", + errorMessage: "Lost connection", + isPromptPending: false, + }), + ); + + const prompt: ContentBlock[] = [{ type: "text", text: "wake me up" }]; + await service.sendPrompt("task-123", prompt); + + await vi.waitFor(() => { + expect(mockTrpcCloudTask.retry.mutate).toHaveBeenCalledWith({ + taskId: "task-123", + runId: "run-123", + }); + }); + }); + + it("does not kick an SSE retry when queueing on a still-connecting cloud session", async () => { + const service = getSessionService(); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( + createMockSession({ + isCloud: true, + cloudStatus: "in_progress", + status: "connecting", + isPromptPending: false, + }), + ); + + const prompt: ContentBlock[] = [{ type: "text", text: "wake me up" }]; + const result = await service.sendPrompt("task-123", prompt); + + expect(result.stopReason).toBe("queued"); + expect(mockTrpcCloudTask.retry.mutate).not.toHaveBeenCalled(); + }); + + it("does not pin isPromptPending when queueing during sandbox boot", async () => { + const service = getSessionService(); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( + createMockSession({ + isCloud: true, + cloudStatus: "queued", + status: "connecting", + isPromptPending: false, + }), + ); + + const prompt: ContentBlock[] = [{ type: "text", text: "before boot" }]; + const result = await service.sendPrompt("task-123", prompt); + + expect(result.stopReason).toBe("queued"); + expect(mockSessionStoreSetters.enqueueMessage).toHaveBeenCalledWith( + "task-123", + "before boot", + ); + const wroteIsPromptPendingTrue = + mockSessionStoreSetters.updateSession.mock.calls.some( + ([, patch]) => patch?.isPromptPending === true, + ); + expect(wroteIsPromptPendingTrue).toBe(false); + }); + it("preserves cloud attachment prompts when queueing a follow-up", async () => { const service = getSessionService(); mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index 519f444f9..2703d3e3b 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -1682,9 +1682,6 @@ export class SessionService { if (session.cloudStatus !== "in_progress") { sessionStoreSetters.enqueueMessage(session.taskId, transport.promptText); - sessionStoreSetters.updateSession(session.taskRunId, { - isPromptPending: true, - }); log.info("Cloud message queued (sandbox not ready)", { taskId: session.taskId, cloudStatus: session.cloudStatus, @@ -1714,6 +1711,19 @@ export class SessionService { sessionStatus: session.status, queueLength: session.messageQueue.length + 1, }); + // The watcher may have exhausted its reconnect budget and been left in a + // failed state — without an SSE stream, no `turn_complete` will arrive + // to drain the queue. Kick a retry so the stream comes back online; the + // queued message dispatches naturally once `run_started`/`turn_complete` + // is observed. + if (session.status === "disconnected" || session.status === "error") { + this.retryCloudTaskWatch(session.taskId).catch((err) => { + log.warn("Auto-retry of cloud task watch from queue gate failed", { + taskId: session.taskId, + error: String(err), + }); + }); + } return { stopReason: "queued" }; } @@ -3250,10 +3260,31 @@ export class SessionService { branch: update.branch, }); - // No cloudStatus="in_progress" auto-flush here. `run_started` from - // the agent-server is the canonical "agent is ready" trigger and - // handles both initial boot and post-restart recovery; firing - // earlier would race with `sendInitialTaskMessage`. + // Recovery path for missed `turn_complete` notifications. `run_started` + // is normally the canonical "agent is ready" trigger and would race with + // `sendInitialTaskMessage` — but only while `session.status` is not yet + // "connected". Once status is "connected", the agent's handshake is + // done; if the run becomes `in_progress` and we still hold queued + // messages, attempt to drain. `sendQueuedCloudMessages` itself bails + // when `isPromptPending` is true, preserving the race protection. + if (update.status === "in_progress") { + const sessionAfter = sessionStoreSetters.getSessions()[taskRunId]; + if ( + sessionAfter?.isCloud && + sessionAfter.status === "connected" && + sessionAfter.messageQueue.length > 0 + ) { + const taskId = sessionAfter.taskId; + setTimeout(() => { + this.sendQueuedCloudMessages(taskId).catch((err) => + log.error("status-driven cloud queue flush failed", { + taskId, + error: err, + }), + ); + }, 0); + } + } if (isTerminalStatus(update.status)) { // Clean up any pending resume messages that couldn't be sent