Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 208 additions & 0 deletions apps/code/src/renderer/features/sessions/service/service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1163,6 +1163,125 @@ describe("SessionService", () => {
});
});

it("flushes queued cloud messages when cloudStatus flips to in_progress on a connected session", async () => {
const service = getSessionService();
mockBuildAuthenticatedClient.mockReturnValue(mockAuthenticatedClient);
const queuedMessage = {
id: "q-1",
content: "follow up",
queuedAt: 1700000000,
};
const sessionWithQueue = createMockSession({
taskRunId: "run-123",
taskId: "task-123",
status: "connected",
isCloud: true,
cloudStatus: "in_progress",
events: [],
messageQueue: [queuedMessage],
});
mockSessionStoreSetters.getSessionByTaskId.mockReturnValue(
sessionWithQueue,
);
mockSessionStoreSetters.getSessions.mockReturnValue({
"run-123": sessionWithQueue,
});
mockSessionStoreSetters.dequeueMessages.mockReturnValue([queuedMessage]);
mockTrpcCloudTask.sendCommand.mutate.mockResolvedValue({
success: true,
result: { stopReason: "end_turn" },
});

service.watchCloudTask(
"task-123",
"run-123",
"https://api.anthropic.com",
123,
undefined,
"https://logs.example.com/run-123",
);

const subscribeOptions = mockTrpcCloudTask.onUpdate.subscribe.mock
.calls[0][1] as {
onData: (update: {
kind: "status";
taskId: string;
runId: string;
status: "in_progress";
}) => void;
};
subscribeOptions.onData({
kind: "status",
taskId: "task-123",
runId: "run-123",
status: "in_progress",
});

await vi.waitFor(() => {
expect(mockTrpcCloudTask.sendCommand.mutate).toHaveBeenCalledWith(
expect.objectContaining({
taskId: "task-123",
method: "user_message",
params: expect.objectContaining({ content: "follow up" }),
}),
);
});
});

it("does not flush queued cloud messages when cloudStatus flips to in_progress while still connecting", async () => {
const service = getSessionService();
mockBuildAuthenticatedClient.mockReturnValue(mockAuthenticatedClient);
const queuedMessage = {
id: "q-1",
content: "follow up",
queuedAt: 1700000000,
};
const sessionWithQueue = createMockSession({
taskRunId: "run-123",
taskId: "task-123",
status: "connecting",
isCloud: true,
cloudStatus: "queued",
events: [],
messageQueue: [queuedMessage],
});
mockSessionStoreSetters.getSessionByTaskId.mockReturnValue(
sessionWithQueue,
);
mockSessionStoreSetters.getSessions.mockReturnValue({
"run-123": sessionWithQueue,
});

service.watchCloudTask(
"task-123",
"run-123",
"https://api.anthropic.com",
123,
undefined,
"https://logs.example.com/run-123",
);

const subscribeOptions = mockTrpcCloudTask.onUpdate.subscribe.mock
.calls[0][1] as {
onData: (update: {
kind: "status";
taskId: string;
runId: string;
status: "in_progress";
}) => void;
};
subscribeOptions.onData({
kind: "status",
taskId: "task-123",
runId: "run-123",
status: "in_progress",
});

// Give the setTimeout(0) microtask time to resolve had it been scheduled.
await new Promise((resolve) => setTimeout(resolve, 0));
expect(mockTrpcCloudTask.sendCommand.mutate).not.toHaveBeenCalled();
});

it("re-enqueues queued cloud messages when the dispatch fails", async () => {
const service = getSessionService();
const queuedMessage = {
Expand Down Expand Up @@ -2359,6 +2478,95 @@ describe("SessionService", () => {
expect(mockTrpcCloudTask.sendCommand.mutate).not.toHaveBeenCalled();
});

it("kicks an SSE retry when queueing on a disconnected cloud session", async () => {
const service = getSessionService();
mockSessionStoreSetters.getSessionByTaskId.mockReturnValue(
createMockSession({
isCloud: true,
cloudStatus: "in_progress",
status: "disconnected",
isPromptPending: false,
}),
);

const prompt: ContentBlock[] = [{ type: "text", text: "wake me up" }];
await service.sendPrompt("task-123", prompt);

await vi.waitFor(() => {
expect(mockTrpcCloudTask.retry.mutate).toHaveBeenCalledWith({
taskId: "task-123",
runId: "run-123",
});
});
});

it("kicks an SSE retry when queueing on an errored cloud session", async () => {
const service = getSessionService();
mockSessionStoreSetters.getSessionByTaskId.mockReturnValue(
createMockSession({
isCloud: true,
cloudStatus: "in_progress",
status: "error",
errorMessage: "Lost connection",
isPromptPending: false,
}),
);

const prompt: ContentBlock[] = [{ type: "text", text: "wake me up" }];
await service.sendPrompt("task-123", prompt);

await vi.waitFor(() => {
expect(mockTrpcCloudTask.retry.mutate).toHaveBeenCalledWith({
taskId: "task-123",
runId: "run-123",
});
});
});

it("does not kick an SSE retry when queueing on a still-connecting cloud session", async () => {
const service = getSessionService();
mockSessionStoreSetters.getSessionByTaskId.mockReturnValue(
createMockSession({
isCloud: true,
cloudStatus: "in_progress",
status: "connecting",
isPromptPending: false,
}),
);

const prompt: ContentBlock[] = [{ type: "text", text: "wake me up" }];
const result = await service.sendPrompt("task-123", prompt);

expect(result.stopReason).toBe("queued");
expect(mockTrpcCloudTask.retry.mutate).not.toHaveBeenCalled();
});

it("does not pin isPromptPending when queueing during sandbox boot", async () => {
const service = getSessionService();
mockSessionStoreSetters.getSessionByTaskId.mockReturnValue(
createMockSession({
isCloud: true,
cloudStatus: "queued",
status: "connecting",
isPromptPending: false,
}),
);

const prompt: ContentBlock[] = [{ type: "text", text: "before boot" }];
const result = await service.sendPrompt("task-123", prompt);

expect(result.stopReason).toBe("queued");
expect(mockSessionStoreSetters.enqueueMessage).toHaveBeenCalledWith(
"task-123",
"before boot",
);
const wroteIsPromptPendingTrue =
mockSessionStoreSetters.updateSession.mock.calls.some(
([, patch]) => patch?.isPromptPending === true,
);
expect(wroteIsPromptPendingTrue).toBe(false);
});

it("preserves cloud attachment prompts when queueing a follow-up", async () => {
const service = getSessionService();
mockSessionStoreSetters.getSessionByTaskId.mockReturnValue(
Expand Down
45 changes: 38 additions & 7 deletions apps/code/src/renderer/features/sessions/service/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1682,9 +1682,6 @@ export class SessionService {

if (session.cloudStatus !== "in_progress") {
sessionStoreSetters.enqueueMessage(session.taskId, transport.promptText);
sessionStoreSetters.updateSession(session.taskRunId, {
isPromptPending: true,
});
log.info("Cloud message queued (sandbox not ready)", {
taskId: session.taskId,
cloudStatus: session.cloudStatus,
Expand Down Expand Up @@ -1714,6 +1711,19 @@ export class SessionService {
sessionStatus: session.status,
queueLength: session.messageQueue.length + 1,
});
// The watcher may have exhausted its reconnect budget and been left in a
// failed state — without an SSE stream, no `turn_complete` will arrive
// to drain the queue. Kick a retry so the stream comes back online; the
// queued message dispatches naturally once `run_started`/`turn_complete`
// is observed.
if (session.status === "disconnected" || session.status === "error") {
this.retryCloudTaskWatch(session.taskId).catch((err) => {
log.warn("Auto-retry of cloud task watch from queue gate failed", {
taskId: session.taskId,
error: String(err),
});
});
}
return { stopReason: "queued" };
}

Expand Down Expand Up @@ -3250,10 +3260,31 @@ export class SessionService {
branch: update.branch,
});

// No cloudStatus="in_progress" auto-flush here. `run_started` from
// the agent-server is the canonical "agent is ready" trigger and
// handles both initial boot and post-restart recovery; firing
// earlier would race with `sendInitialTaskMessage`.
// Recovery path for missed `turn_complete` notifications. `run_started`
// is normally the canonical "agent is ready" trigger and would race with
// `sendInitialTaskMessage` — but only while `session.status` is not yet
// "connected". Once status is "connected", the agent's handshake is
// done; if the run becomes `in_progress` and we still hold queued
// messages, attempt to drain. `sendQueuedCloudMessages` itself bails
// when `isPromptPending` is true, preserving the race protection.
if (update.status === "in_progress") {
const sessionAfter = sessionStoreSetters.getSessions()[taskRunId];
if (
sessionAfter?.isCloud &&
sessionAfter.status === "connected" &&
sessionAfter.messageQueue.length > 0
) {
const taskId = sessionAfter.taskId;
setTimeout(() => {
this.sendQueuedCloudMessages(taskId).catch((err) =>
log.error("status-driven cloud queue flush failed", {
taskId,
error: err,
}),
);
}, 0);
}
}

if (isTerminalStatus(update.status)) {
// Clean up any pending resume messages that couldn't be sent
Expand Down
Loading