diff --git a/src/node/services/streamManager.test.ts b/src/node/services/streamManager.test.ts index c60f3cdd9d..ee05e27820 100644 --- a/src/node/services/streamManager.test.ts +++ b/src/node/services/streamManager.test.ts @@ -1654,6 +1654,132 @@ describe("StreamManager - empty stream completions", () => { expect(partial?.metadata?.metadataModel).toBe(KNOWN_MODELS.SONNET.id); expect(partial?.parts).toMatchObject([{ type: "text", text: "partial answer" }]); }); + + test("treats streamText's synthesized (other, undefined) finish part as a truncated stream", async () => { + // streamText's runStep initializes stepFinishReason="other" / + // stepRawFinishReason=undefined and unconditionally emits those from its + // flush() at end-of-stream. The OpenAI Responses, Chat Completions, and + // Anthropic Messages adapters all surface this shape when the upstream + // SSE stream closed before any terminal event arrived. StreamManager + // must treat that synthesized default as a missing terminal event so the + // existing truncation guard fires a retryable `stream_truncated` error + // rather than committing the partial output as a clean assistant + // message. + const streamManager = new StreamManager(historyService); + const errorEvents: Array<{ messageId: string; error: string; errorType?: string }> = []; + const streamEndEvents: unknown[] = []; + + streamManager.on("error", (data) => { + errorEvents.push(data as { messageId: string; error: string; errorType?: string }); + }); + streamManager.on("stream-end", (data) => { + streamEndEvents.push(data); + }); + + Reflect.set(streamManager, "tokenTracker", { + setModel: () => Promise.resolve(undefined), + countTokens: () => Promise.resolve(0), + }); + + const workspaceId = "synthesized-finish-workspace"; + const messageId = "synthesized-finish-message"; + const historySequence = 1; + + await appendPartialAssistantForTests(workspaceId, messageId, historySequence); + const processStreamWithCleanup = getProcessStreamWithCleanupForTests(streamManager); + const startTime = Date.now() - 250; + const streamInfo = createStreamInfoForTests({ + streamResult: createStreamResultForTests( + (async function* () { + await Promise.resolve(); + yield { type: "text-delta", text: "partial answer" }; + // The provider adapter never emitted its own finish (e.g. clean + // SSE EOF before response.completed / message_stop). The ai + // package's flush() synthesizes this one: + yield { type: "finish", finishReason: "other", rawFinishReason: undefined }; + })(), + { inputTokens: 3, outputTokens: 2, totalTokens: 5 } + ), + messageId, + startTime, + lastPartTimestamp: startTime, + model: KNOWN_MODELS.SONNET.id, + metadataModel: KNOWN_MODELS.SONNET.id, + historySequence, + initialMetadata: { agentId: "plan" }, + runtime, + }); + + await processStreamWithCleanup.call(streamManager, workspaceId, streamInfo, historySequence); + + expect(streamEndEvents).toHaveLength(0); + expect(errorEvents).toHaveLength(1); + expect(errorEvents[0]).toMatchObject({ + messageId, + errorType: "stream_truncated", + }); + + const partial = await historyService.readPartial(workspaceId); + expect(partial?.metadata?.errorType).toBe("stream_truncated"); + expect(partial?.parts).toMatchObject([{ type: "text", text: "partial answer" }]); + }); + + test("treats real (other, ) finish parts as a clean completion", async () => { + // The synthesized-default discriminator must NOT swallow legitimate + // `"other"` finishes. Both OpenAI and Anthropic map a few real stop + // reasons to `unified: "other"`, but always with a defined raw value + // (e.g. Anthropic's `"compaction"`). This test guards against the + // discriminator widening into a false positive that would mis-fire the + // truncation guard on a clean stream. + const streamManager = new StreamManager(historyService); + const errorEvents: Array<{ messageId: string; error: string; errorType?: string }> = []; + const streamEndEvents: unknown[] = []; + + streamManager.on("error", (data) => { + errorEvents.push(data as { messageId: string; error: string; errorType?: string }); + }); + streamManager.on("stream-end", (data) => { + streamEndEvents.push(data); + }); + + Reflect.set(streamManager, "tokenTracker", { + setModel: () => Promise.resolve(undefined), + countTokens: () => Promise.resolve(0), + }); + + const workspaceId = "real-other-finish-workspace"; + const messageId = "real-other-finish-message"; + const historySequence = 1; + + await appendPartialAssistantForTests(workspaceId, messageId, historySequence); + const processStreamWithCleanup = getProcessStreamWithCleanupForTests(streamManager); + const startTime = Date.now() - 250; + const streamInfo = createStreamInfoForTests({ + streamResult: createStreamResultForTests( + (async function* () { + await Promise.resolve(); + yield { type: "text-delta", text: "complete answer" }; + // Real Anthropic compaction finish (or any other mapped-to-other + // stop reason) carries a defined raw value. + yield { type: "finish", finishReason: "other", rawFinishReason: "compaction" }; + })(), + { inputTokens: 3, outputTokens: 2, totalTokens: 5 } + ), + messageId, + startTime, + lastPartTimestamp: startTime, + model: KNOWN_MODELS.SONNET.id, + metadataModel: KNOWN_MODELS.SONNET.id, + historySequence, + initialMetadata: { agentId: "plan" }, + runtime, + }); + + await processStreamWithCleanup.call(streamManager, workspaceId, streamInfo, historySequence); + + expect(errorEvents).toHaveLength(0); + expect(streamEndEvents).toHaveLength(1); + }); }); describe("StreamManager - TTFT metadata persistence", () => { diff --git a/src/node/services/streamManager.ts b/src/node/services/streamManager.ts index fbd499454a..88c71cc683 100644 --- a/src/node/services/streamManager.ts +++ b/src/node/services/streamManager.ts @@ -2118,7 +2118,54 @@ export class StreamManager extends EventEmitter { break; case "finish": { - const finishPart = part as { finishReason?: unknown }; + const finishPart = part as { + finishReason?: unknown; + rawFinishReason?: unknown; + }; + // Skip the `ai` package's synthesized-default finish part. + // + // `streamText`'s internal `runStep` initializes + // `stepFinishReason = "other"` and `stepRawFinishReason = undefined`, + // and unconditionally emits those values from its flush() at + // end-of-stream — even when the underlying SSE stream closed + // before any terminal event arrived. The OpenAI Responses, + // Chat Completions, and Anthropic Messages adapters all + // exhibit this in practice: a clean upstream EOF (no + // `response.completed`, no `message_stop`, no + // `finish_reason` delta) ends up as a synthesized + // `(other, undefined)` finish here. + // + // The discriminator is narrow on purpose. Every real OpenAI + // and Anthropic finish path that maps to `"other"` pairs it + // with a defined raw reason (e.g. Anthropic's `"compaction"`). + // The `(other, undefined)` shape is unreachable from the + // adapters' own finish-reason mappers and is therefore a + // reliable signal that the finish was synthesized. + // + // Treating it as a non-event lets the existing + // `!receivedTerminalEvent` branch below fire + // `handleTruncatedStreamCompletion`, which surfaces a + // retryable `stream_truncated` error instead of silently + // committing partial output as a normal assistant message. + if ( + finishPart.finishReason === "other" && + finishPart.rawFinishReason === undefined + ) { + // Observability for the unaudited-adapter risk: if a future + // provider adapter we haven't read source for legitimately + // emits `(other, undefined)` as a real terminal finish, the + // discriminator misfires and the user sees a spurious + // truncated-stream retry. Surface a log so that misfires + // are diagnosable instead of silent. + workspaceLog.warn( + "Treating synthesized-default (other, undefined) finish as truncated stream", + { + messageId: streamInfo.messageId, + model: streamInfo.model, + } + ); + break; + } streamInfo.receivedTerminalEvent = true; if (typeof finishPart.finishReason === "string") { streamInfo.terminalFinishReason = finishPart.finishReason;