From 34f0d47ac77216f8036aec6b4f6b1ee721685e9e Mon Sep 17 00:00:00 2001 From: ethan Date: Tue, 2 Jun 2026 16:15:41 +1000 Subject: [PATCH 1/4] =?UTF-8?q?=F0=9F=A4=96=20fix:=20treat=20AI=20SDK=20sy?= =?UTF-8?q?nthesized=20"other"=20finish=20as=20stream=5Ftruncated?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The OpenAI Responses adapter in @ai-sdk/openai initializes its finish reason to { unified: "other", raw: undefined } and flushes that default when the SSE stream closes without a terminal event. PR #3415's truncation guard only fires when no finish part is emitted at all, so this synthesized finish bypassed the guard and committed partial output as a normal assistant message with no UI feedback. Use rawFinishReason (exposed by the AI SDK on finish parts) to discriminate the synthesized default ("other" + undefined raw) from a legitimate unmapped "other" finish, which always carries a raw reason string. Only the synthesized default skips receivedTerminalEvent = true, so the existing handleTruncatedStreamCompletion path then turns it into a retryable stream_truncated error. Normal "stop" finishes also have rawFinishReason: undefined, but they're not "other", so they remain unaffected. --- src/node/services/streamManager.test.ts | 127 ++++++++++++++++++++++++ src/node/services/streamManager.ts | 20 +++- 2 files changed, 143 insertions(+), 4 deletions(-) diff --git a/src/node/services/streamManager.test.ts b/src/node/services/streamManager.test.ts index c60f3cdd9d..c82d7d66bc 100644 --- a/src/node/services/streamManager.test.ts +++ b/src/node/services/streamManager.test.ts @@ -1510,6 +1510,63 @@ describe("StreamManager - Concurrent Stream Prevention", () => { describe("StreamManager - empty stream completions", () => { const runtime = createRuntime({ type: "local", srcBaseDir: "/tmp" }); + async function processProviderStreamForCompletionTest(params: { + workspaceId: string; + messageId: string; + model?: string; + stream: AsyncGenerator; + }): Promise<{ + errorEvents: Array<{ messageId: string; error: string; errorType?: string }>; + streamEndEvents: unknown[]; + }> { + const streamManager = new StreamManager(historyService); + const errorEvents: Array<{ messageId: string; error: string; errorType?: string }> = []; + const streamEndEvents: unknown[] = []; + + streamManager.on("error", (data) => { + errorEvents.push(data as { messageId: string; error: string; errorType?: string }); + }); + streamManager.on("stream-end", (data) => { + streamEndEvents.push(data); + }); + + const replaceTokenTrackerResult = Reflect.set(streamManager, "tokenTracker", { + setModel: () => Promise.resolve(undefined), + countTokens: () => Promise.resolve(0), + }); + expect(replaceTokenTrackerResult).toBe(true); + + const historySequence = 1; + await appendPartialAssistantForTests(params.workspaceId, params.messageId, historySequence); + + const startTime = Date.now() - 250; + const model = params.model ?? KNOWN_MODELS.SONNET.id; + const streamInfo = createStreamInfoForTests({ + streamResult: createStreamResultForTests(params.stream, { + inputTokens: 3, + outputTokens: 2, + totalTokens: 5, + }), + messageId: params.messageId, + startTime, + lastPartTimestamp: startTime, + model, + metadataModel: model, + historySequence, + initialMetadata: { agentId: "plan" }, + runtime, + }); + + await getProcessStreamWithCleanupForTests(streamManager).call( + streamManager, + params.workspaceId, + streamInfo, + historySequence + ); + + return { errorEvents, streamEndEvents }; + } + test("retries one empty stream internally before persisting a retryable empty-output error", async () => { const streamManager = new StreamManager(historyService); const errorEvents: Array<{ messageId: string; error: string; errorType?: string }> = []; @@ -1654,6 +1711,76 @@ describe("StreamManager - empty stream completions", () => { expect(partial?.metadata?.metadataModel).toBe(KNOWN_MODELS.SONNET.id); expect(partial?.parts).toMatchObject([{ type: "text", text: "partial answer" }]); }); + + test("treats synthesized other finish without raw reason as stream_truncated", async () => { + const workspaceId = "openai-synthesized-other-workspace"; + const messageId = "openai-synthesized-other-message"; + + const { errorEvents, streamEndEvents } = await processProviderStreamForCompletionTest({ + workspaceId, + messageId, + model: "openai:gpt-4.1-mini", + stream: (async function* () { + await Promise.resolve(); + yield { type: "text-delta", text: "partial answer" }; + yield { type: "finish", finishReason: "other", rawFinishReason: undefined }; + })(), + }); + + expect(streamEndEvents).toHaveLength(0); + expect(errorEvents).toHaveLength(1); + expect(errorEvents[0]).toMatchObject({ + messageId, + errorType: "stream_truncated", + }); + expect(errorEvents[0]?.error).toContain( + "OpenAI stream closed unexpectedly before the response completed" + ); + + const partial = await historyService.readPartial(workspaceId); + expect(partial?.metadata?.errorType).toBe("stream_truncated"); + expect(partial?.metadata?.error).toContain( + "OpenAI stream closed unexpectedly before the response completed" + ); + expect(partial?.parts).toMatchObject([{ type: "text", text: "partial answer" }]); + }); + + test("allows legitimate other finish when a raw reason is present", async () => { + const workspaceId = "openai-legitimate-other-workspace"; + const messageId = "openai-legitimate-other-message"; + + const { errorEvents, streamEndEvents } = await processProviderStreamForCompletionTest({ + workspaceId, + messageId, + model: "openai:gpt-4.1-mini", + stream: (async function* () { + await Promise.resolve(); + yield { type: "text-delta", text: "complete answer" }; + yield { + type: "finish", + finishReason: "other", + rawFinishReason: "safety_violation", + }; + })(), + }); + + expect(errorEvents).toHaveLength(0); + expect(streamEndEvents).toHaveLength(1); + + const partial = await historyService.readPartial(workspaceId); + expect(partial).toBeNull(); + + const historyResult = await historyService.getHistoryFromLatestBoundary(workspaceId); + expect(historyResult.success).toBe(true); + if (!historyResult.success) { + throw new Error(historyResult.error); + } + + const updatedMessage = historyResult.data.find((message) => message.id === messageId); + expect(updatedMessage).toBeDefined(); + expect(updatedMessage?.metadata?.finishReason).toBe("other"); + expect(updatedMessage?.parts).toMatchObject([{ type: "text", text: "complete answer" }]); + }); }); describe("StreamManager - TTFT metadata persistence", () => { diff --git a/src/node/services/streamManager.ts b/src/node/services/streamManager.ts index fbd499454a..3b4647dada 100644 --- a/src/node/services/streamManager.ts +++ b/src/node/services/streamManager.ts @@ -2118,10 +2118,22 @@ export class StreamManager extends EventEmitter { break; case "finish": { - const finishPart = part as { finishReason?: unknown }; - streamInfo.receivedTerminalEvent = true; - if (typeof finishPart.finishReason === "string") { - streamInfo.terminalFinishReason = finishPart.finishReason; + const finishPart = part as { + finishReason?: unknown; + rawFinishReason?: unknown; + }; + // The OpenAI Responses adapter in the AI SDK flushes its initial + // { unified: "other", raw: undefined } default when the SSE stream + // closes before a terminal event. Treat only that synthesized default + // as missing-terminal; a real unmapped "other" carries a raw reason. + const isSynthesizedOtherFinish = + finishPart.finishReason === "other" && finishPart.rawFinishReason === undefined; + + if (!isSynthesizedOtherFinish) { + streamInfo.receivedTerminalEvent = true; + if (typeof finishPart.finishReason === "string") { + streamInfo.terminalFinishReason = finishPart.finishReason; + } } break; } From f019a609bfca59923226227dc4e7fd45250ad5f7 Mon Sep 17 00:00:00 2001 From: ethan Date: Tue, 2 Jun 2026 16:37:38 +1000 Subject: [PATCH 2/4] =?UTF-8?q?=F0=9F=A4=96=20fix:=20scope=20synthesized-f?= =?UTF-8?q?inish=20filter=20to=20the=20OpenAI=20adapter=20boundary?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex pointed out that the previous discriminator lived in StreamManager and treated every `finish` part with `(unified: "other", raw: undefined)` as truncated. The LanguageModelV2 contract permits adapters to emit that shape as a legitimate terminal event, so the heuristic was too broad for the provider-agnostic StreamManager. Move the fix to the boundary where the synthesized default originates. The @ai-sdk/openai adapter family — Responses, Chat Completions, legacy Completions — initializes its internal finishReason to `{ unified: "other", raw: undefined }` and unconditionally emits that value from its TransformStream.flush() at end-of-stream, even when no terminal SSE event arrived. The SDK's mappers only return `unified: "other"` paired with a defined `raw`, so within this adapter family the `(other, undefined)` shape is unreachable except as the uninitialized default. Dropping it is safe and intentionally scoped to the OpenAI provider construction path (and the Copilot path, which reuses the same adapter). Implementation: introduce src/node/services/openAISynthesizedFinishFilter.ts which exposes `wrapOpenAIModelToFilterSynthesizedFinish(model)`. The wrapper pipes `doStream`'s output through a TransformStream that drops only the synthesized-default finish part; all other parts pass through unchanged. Apply the wrapper at the two `createOpenAI(...)` callsites in providerModelFactory.ts. With the synthesized finish dropped, the existing `!receivedTerminalEvent` branch in StreamManager handles a clean upstream EOF as `stream_truncated` exactly as PR #3415 intended. Revert the StreamManager-side heuristic and tests from the previous commit so StreamManager stays provider-agnostic. --- .../openAISynthesizedFinishFilter.test.ts | 225 ++++++++++++++++++ .../services/openAISynthesizedFinishFilter.ts | 111 +++++++++ src/node/services/providerModelFactory.ts | 13 +- src/node/services/streamManager.test.ts | 127 ---------- src/node/services/streamManager.ts | 20 +- 5 files changed, 351 insertions(+), 145 deletions(-) create mode 100644 src/node/services/openAISynthesizedFinishFilter.test.ts create mode 100644 src/node/services/openAISynthesizedFinishFilter.ts diff --git a/src/node/services/openAISynthesizedFinishFilter.test.ts b/src/node/services/openAISynthesizedFinishFilter.test.ts new file mode 100644 index 0000000000..6e3bd47893 --- /dev/null +++ b/src/node/services/openAISynthesizedFinishFilter.test.ts @@ -0,0 +1,225 @@ +import { describe, test, expect } from "bun:test"; + +import { + createOpenAISynthesizedFinishFilter, + isOpenAISynthesizedDefaultFinishPart, + wrapOpenAIModelToFilterSynthesizedFinish, +} from "./openAISynthesizedFinishFilter"; +// The wrapper is typed against the LanguageModel union (V2|V3) from "ai", but +// the V2/V3 stream-part and tool-result variants drift just enough to make +// constructing a fully-typed in-test fake unworkable. Tests run against a +// minimal structural shape that matches the wrapper's runtime surface, then +// cast at the boundary. +interface LanguageModelLike { + specificationVersion: string; + provider: string; + modelId: string; + supportedUrls: Record; + doGenerate: (...args: unknown[]) => Promise; + doStream: (options: unknown) => Promise<{ + stream: ReadableStream; + request?: unknown; + response?: unknown; + }>; +} + +function streamOf(items: T[]): ReadableStream { + return new ReadableStream({ + start(controller) { + for (const item of items) { + controller.enqueue(item); + } + controller.close(); + }, + }); +} + +async function collect(stream: ReadableStream): Promise { + const out: T[] = []; + const reader = stream.getReader(); + while (true) { + const { value, done } = await reader.read(); + if (done) break; + out.push(value); + } + return out; +} + +describe("isOpenAISynthesizedDefaultFinishPart", () => { + test("matches the OpenAI adapter's uninitialized default", () => { + const part = { type: "finish", finishReason: { unified: "other", raw: undefined } }; + expect(isOpenAISynthesizedDefaultFinishPart(part)).toBe(true); + }); + + test("rejects 'other' finishes that carry an unmapped raw reason", () => { + // The legitimate-but-unmapped "other" case (e.g. a future + // incomplete_details.reason the SDK does not know about) — these must + // continue to count as terminal events. + const part = { + type: "finish", + finishReason: { unified: "other", raw: "safety_violation" }, + }; + expect(isOpenAISynthesizedDefaultFinishPart(part)).toBe(false); + }); + + test("rejects normal stop finishes", () => { + const part = { type: "finish", finishReason: { unified: "stop", raw: "stop" } }; + expect(isOpenAISynthesizedDefaultFinishPart(part)).toBe(false); + }); + + test("rejects length and content-filter finishes", () => { + expect( + isOpenAISynthesizedDefaultFinishPart({ + type: "finish", + finishReason: { unified: "length", raw: "max_output_tokens" }, + }) + ).toBe(false); + expect( + isOpenAISynthesizedDefaultFinishPart({ + type: "finish", + finishReason: { unified: "content-filter", raw: "content_filter" }, + }) + ).toBe(false); + }); + + test("rejects error finishes synthesized by the adapter", () => { + // When the adapter's transform observes a chunk-level error it sets + // finishReason to { unified: "error", raw: undefined }. We must not drop + // that — it's a real terminal signal. + expect( + isOpenAISynthesizedDefaultFinishPart({ + type: "finish", + finishReason: { unified: "error", raw: undefined }, + }) + ).toBe(false); + }); + + test("rejects non-finish parts even if shaped suspiciously", () => { + expect( + isOpenAISynthesizedDefaultFinishPart({ + type: "text-delta", + finishReason: { unified: "other", raw: undefined }, + }) + ).toBe(false); + expect(isOpenAISynthesizedDefaultFinishPart(null)).toBe(false); + expect(isOpenAISynthesizedDefaultFinishPart(undefined)).toBe(false); + expect(isOpenAISynthesizedDefaultFinishPart("finish")).toBe(false); + }); + + test("rejects finish parts shaped as a unified string (post-streamText conversion)", () => { + // Defensive: if this filter is ever fed a stream that has already been + // normalized through streamText, the finishReason will be the string + // "other" rather than { unified: "other", raw: undefined }. We must not + // accidentally drop legitimate finishes at that boundary. + expect(isOpenAISynthesizedDefaultFinishPart({ type: "finish", finishReason: "other" })).toBe( + false + ); + }); +}); + +describe("createOpenAISynthesizedFinishFilter", () => { + test("drops only the synthesized default finish part", async () => { + const input = [ + { type: "stream-start", warnings: [] }, + { type: "text-delta", id: "0", delta: "partial answer" }, + { type: "finish", finishReason: { unified: "other", raw: undefined } }, + ]; + + const out = await collect(streamOf(input).pipeThrough(createOpenAISynthesizedFinishFilter())); + + expect(out).toEqual([ + { type: "stream-start", warnings: [] }, + { type: "text-delta", id: "0", delta: "partial answer" }, + ]); + }); + + test("passes through real finish parts", async () => { + const input = [ + { type: "text-delta", id: "0", delta: "done" }, + { type: "finish", finishReason: { unified: "stop", raw: "stop" } }, + ]; + + const out = await collect(streamOf(input).pipeThrough(createOpenAISynthesizedFinishFilter())); + + expect(out).toEqual(input); + }); + + test("passes through legitimate unmapped 'other' finishes (raw reason present)", async () => { + const input = [ + { type: "text-delta", id: "0", delta: "done" }, + { type: "finish", finishReason: { unified: "other", raw: "safety_violation" } }, + ]; + + const out = await collect(streamOf(input).pipeThrough(createOpenAISynthesizedFinishFilter())); + + expect(out).toEqual(input); + }); +}); + +describe("wrapOpenAIModelToFilterSynthesizedFinish", () => { + test("filters the model's doStream output without touching anything else", async () => { + const captured: unknown[] = []; + const fakeModel: LanguageModelLike = { + specificationVersion: "v3", + provider: "openai", + modelId: "gpt-fake", + supportedUrls: {}, + doGenerate: () => Promise.reject(new Error("unused")), + doStream: (options) => { + captured.push(options); + return Promise.resolve({ + stream: streamOf([ + { type: "stream-start", warnings: [] }, + { type: "text-delta", id: "0", delta: "partial" }, + { type: "finish", finishReason: { unified: "other", raw: undefined } }, + ]), + request: {}, + response: { headers: {} }, + }); + }, + }; + + const wrapped = wrapOpenAIModelToFilterSynthesizedFinish( + fakeModel as unknown as Parameters[0] + ) as unknown as LanguageModelLike; + expect(wrapped).toBe(fakeModel); + + const result = await wrapped.doStream({ marker: 1 }); + expect(captured).toEqual([{ marker: 1 }]); + + const parts = await collect(result.stream); + expect(parts).toEqual([ + { type: "stream-start", warnings: [] }, + { type: "text-delta", id: "0", delta: "partial" }, + ]); + }); + + test("preserves real finish parts emitted by the model", async () => { + const fakeModel: LanguageModelLike = { + specificationVersion: "v3", + provider: "openai", + modelId: "gpt-fake", + supportedUrls: {}, + doGenerate: () => Promise.reject(new Error("unused")), + doStream: () => + Promise.resolve({ + stream: streamOf([ + { type: "text-delta", id: "0", delta: "complete" }, + { type: "finish", finishReason: { unified: "stop", raw: "stop" } }, + ]), + request: {}, + response: { headers: {} }, + }), + }; + + const wrapped = wrapOpenAIModelToFilterSynthesizedFinish( + fakeModel as unknown as Parameters[0] + ) as unknown as LanguageModelLike; + const result = await wrapped.doStream({}); + const parts = await collect(result.stream); + expect(parts).toEqual([ + { type: "text-delta", id: "0", delta: "complete" }, + { type: "finish", finishReason: { unified: "stop", raw: "stop" } }, + ]); + }); +}); diff --git a/src/node/services/openAISynthesizedFinishFilter.ts b/src/node/services/openAISynthesizedFinishFilter.ts new file mode 100644 index 0000000000..e032a3fc91 --- /dev/null +++ b/src/node/services/openAISynthesizedFinishFilter.ts @@ -0,0 +1,111 @@ +import type { LanguageModel } from "ai"; + +/** + * The object form of {@link LanguageModel} (excluding the bare model-ID string + * shape). Only the object form exposes `doStream`, which is what the filter + * mutates. + */ +type LanguageModelInstance = Exclude; + +/** + * Filters the `@ai-sdk/openai` adapters' synthesized-default `finish` part out of + * the LanguageModelV2 stream so {@link StreamManager}'s missing-terminal-event guard + * (introduced in PR #3415) surfaces a clean upstream EOF as a retryable + * `stream_truncated` error instead of committing partial output as a normal + * assistant message. + * + * Background: every `@ai-sdk/openai` streaming adapter (Responses, Chat + * Completions, legacy Completions) initializes its internal finish reason to + * `{ unified: "other", raw: undefined }` and unconditionally emits that value + * from its `TransformStream.flush()` at end-of-stream — even when the SSE + * upstream closed before any terminal event arrived + * (`response.completed` / `response.incomplete` / `response.failed`, or a + * Chat Completions delta carrying `finish_reason`). That synthesized finish + * silently bypasses Mux's missing-terminal-event guard, committing partial + * output to history as if the model stopped cleanly. + * + * Discriminator: in this adapter family, the SDK's + * `mapOpenAIResponseFinishReason` and `mapOpenAIFinishReason` only return + * `unified: "other"` paired with a defined `raw` (the original unmapped API + * value). Among finish parts that these adapters legitimately emit, the + * `{ unified: "other", raw: undefined }` shape is unreachable except as the + * uninitialized default. Dropping it is safe — and intentionally narrow to + * the OpenAI adapter family. We do **not** extend this heuristic to other + * providers: the public AI SDK contract permits any adapter to emit `(other, + * undefined)` as a real terminal finish, so the filter must stay scoped to + * the boundary where we know the synthesized default originates. + * + * Implementation: wrap the model's `doStream` so its output stream is piped + * through a `TransformStream` that drops only the synthesized-default finish + * part. Other parts (text deltas, tool calls, real finishes, errors, etc.) + * pass through unchanged. When the synthesized default is dropped, the + * downstream consumer (`streamText`, then `StreamManager`) sees a stream that + * ended without a finish part, and the existing + * `!receivedTerminalEvent` branch handles it as `stream_truncated`. + */ +export function wrapOpenAIModelToFilterSynthesizedFinish( + model: M +): M { + const originalDoStream = model.doStream.bind(model); + // The wrapper is element-shape preserving (a pure filter), but the runtime + // shape spans the union of LanguageModelV*StreamPart types — TypeScript can't + // see that the same filter satisfies every member of the union, so we erase + // the element type for the pipe and restore it on the way out. The function + // signature still ties input and output element types together for callers. + type AnyDoStream = (options: unknown) => Promise<{ stream: ReadableStream }>; + const wrappedDoStream: AnyDoStream = async (options) => { + const result = await (originalDoStream as AnyDoStream)(options); + return { + ...result, + stream: result.stream.pipeThrough(createOpenAISynthesizedFinishFilter()), + }; + }; + model.doStream = wrappedDoStream as unknown as M["doStream"]; + return model; +} + +/** + * The TransformStream used by {@link wrapOpenAIModelToFilterSynthesizedFinish}. + * Exported separately so unit tests can exercise the filter directly without + * constructing a full LanguageModel mock. + * + * Generic over the element type so `ReadableStream.pipeThrough(...)` stays + * `ReadableStream` for the caller. The runtime check is shape-based and + * does not rely on `T`. + */ +export function createOpenAISynthesizedFinishFilter(): TransformStream { + return new TransformStream({ + transform(part, controller) { + if (isOpenAISynthesizedDefaultFinishPart(part)) { + return; + } + controller.enqueue(part); + }, + }); +} + +/** + * True iff `part` is the `@ai-sdk/openai` synthesized-default finish — the + * uninitialized `{ unified: "other", raw: undefined }` value that the adapter + * emits when no terminal SSE event arrived before EOF. + * + * The OpenAI adapters emit finish parts whose `finishReason` is the internal + * `{ unified, raw }` object (see `@ai-sdk/openai`'s flush handlers); the AI + * SDK's `streamText` later splits that into `finishReason` / `rawFinishReason` + * fields. We're operating at the adapter→`streamText` boundary, so we match + * against the `{ unified, raw }` shape. + */ +export function isOpenAISynthesizedDefaultFinishPart(part: unknown): boolean { + if (typeof part !== "object" || part === null) { + return false; + } + const { type, finishReason } = part as { type?: unknown; finishReason?: unknown }; + if (type !== "finish") { + return false; + } + if (typeof finishReason !== "object" || finishReason === null) { + return false; + } + const reason = finishReason as { unified?: unknown; raw?: unknown }; + return reason.unified === "other" && reason.raw === undefined; +} diff --git a/src/node/services/providerModelFactory.ts b/src/node/services/providerModelFactory.ts index 1f03b22e66..c12d70efe4 100644 --- a/src/node/services/providerModelFactory.ts +++ b/src/node/services/providerModelFactory.ts @@ -48,6 +48,7 @@ import { moveLanguageModelCleanup, } from "@/node/services/languageModelCleanup"; import { createOpenAIWebSocketTransportFetch } from "@/node/services/openAIWebSocketTransportFetch"; +import { wrapOpenAIModelToFilterSynthesizedFinish } from "@/node/services/openAISynthesizedFinishFilter"; import { log } from "@/node/services/log"; import { MUX_ANTHROPIC_EFFORT_OVERRIDE_HEADER, @@ -1434,10 +1435,14 @@ export class ProviderModelFactory { }); // OpenAI reasoning state is preserved via explicit history, so no extra // middleware is needed beyond the provider's standard Responses handling. - const model = + const rawModel = effectiveWireFormat === "chatCompletions" ? provider.chat(modelId) : provider.responses(modelId); + // Strip the @ai-sdk/openai adapter's synthesized-default finish part so + // StreamManager's missing-terminal-event guard can fire on a clean EOF. + // See openAISynthesizedFinishFilter.ts for the full rationale. + const model = wrapOpenAIModelToFilterSynthesizedFinish(rawModel); if (webSocketTransport.active) { attachLanguageModelCleanup(model, webSocketTransport.close); } @@ -1901,7 +1906,11 @@ export class ProviderModelFactory { apiKey: "copilot", // placeholder, actual auth via custom fetch fetch: providerFetch, }); - return Ok(provider.chat(outboundCopilotModelId)); + // Same synthesized-default-finish filter as the direct OpenAI path: + // Copilot routes Responses/Chat Completions traffic through the same + // adapter, so a clean upstream EOF without a terminal event reaches + // StreamManager's guard via openAISynthesizedFinishFilter.ts. + return Ok(wrapOpenAIModelToFilterSynthesizedFinish(provider.chat(outboundCopilotModelId))); } // Generic handler for simple providers (standard API key + factory pattern) diff --git a/src/node/services/streamManager.test.ts b/src/node/services/streamManager.test.ts index c82d7d66bc..c60f3cdd9d 100644 --- a/src/node/services/streamManager.test.ts +++ b/src/node/services/streamManager.test.ts @@ -1510,63 +1510,6 @@ describe("StreamManager - Concurrent Stream Prevention", () => { describe("StreamManager - empty stream completions", () => { const runtime = createRuntime({ type: "local", srcBaseDir: "/tmp" }); - async function processProviderStreamForCompletionTest(params: { - workspaceId: string; - messageId: string; - model?: string; - stream: AsyncGenerator; - }): Promise<{ - errorEvents: Array<{ messageId: string; error: string; errorType?: string }>; - streamEndEvents: unknown[]; - }> { - const streamManager = new StreamManager(historyService); - const errorEvents: Array<{ messageId: string; error: string; errorType?: string }> = []; - const streamEndEvents: unknown[] = []; - - streamManager.on("error", (data) => { - errorEvents.push(data as { messageId: string; error: string; errorType?: string }); - }); - streamManager.on("stream-end", (data) => { - streamEndEvents.push(data); - }); - - const replaceTokenTrackerResult = Reflect.set(streamManager, "tokenTracker", { - setModel: () => Promise.resolve(undefined), - countTokens: () => Promise.resolve(0), - }); - expect(replaceTokenTrackerResult).toBe(true); - - const historySequence = 1; - await appendPartialAssistantForTests(params.workspaceId, params.messageId, historySequence); - - const startTime = Date.now() - 250; - const model = params.model ?? KNOWN_MODELS.SONNET.id; - const streamInfo = createStreamInfoForTests({ - streamResult: createStreamResultForTests(params.stream, { - inputTokens: 3, - outputTokens: 2, - totalTokens: 5, - }), - messageId: params.messageId, - startTime, - lastPartTimestamp: startTime, - model, - metadataModel: model, - historySequence, - initialMetadata: { agentId: "plan" }, - runtime, - }); - - await getProcessStreamWithCleanupForTests(streamManager).call( - streamManager, - params.workspaceId, - streamInfo, - historySequence - ); - - return { errorEvents, streamEndEvents }; - } - test("retries one empty stream internally before persisting a retryable empty-output error", async () => { const streamManager = new StreamManager(historyService); const errorEvents: Array<{ messageId: string; error: string; errorType?: string }> = []; @@ -1711,76 +1654,6 @@ describe("StreamManager - empty stream completions", () => { expect(partial?.metadata?.metadataModel).toBe(KNOWN_MODELS.SONNET.id); expect(partial?.parts).toMatchObject([{ type: "text", text: "partial answer" }]); }); - - test("treats synthesized other finish without raw reason as stream_truncated", async () => { - const workspaceId = "openai-synthesized-other-workspace"; - const messageId = "openai-synthesized-other-message"; - - const { errorEvents, streamEndEvents } = await processProviderStreamForCompletionTest({ - workspaceId, - messageId, - model: "openai:gpt-4.1-mini", - stream: (async function* () { - await Promise.resolve(); - yield { type: "text-delta", text: "partial answer" }; - yield { type: "finish", finishReason: "other", rawFinishReason: undefined }; - })(), - }); - - expect(streamEndEvents).toHaveLength(0); - expect(errorEvents).toHaveLength(1); - expect(errorEvents[0]).toMatchObject({ - messageId, - errorType: "stream_truncated", - }); - expect(errorEvents[0]?.error).toContain( - "OpenAI stream closed unexpectedly before the response completed" - ); - - const partial = await historyService.readPartial(workspaceId); - expect(partial?.metadata?.errorType).toBe("stream_truncated"); - expect(partial?.metadata?.error).toContain( - "OpenAI stream closed unexpectedly before the response completed" - ); - expect(partial?.parts).toMatchObject([{ type: "text", text: "partial answer" }]); - }); - - test("allows legitimate other finish when a raw reason is present", async () => { - const workspaceId = "openai-legitimate-other-workspace"; - const messageId = "openai-legitimate-other-message"; - - const { errorEvents, streamEndEvents } = await processProviderStreamForCompletionTest({ - workspaceId, - messageId, - model: "openai:gpt-4.1-mini", - stream: (async function* () { - await Promise.resolve(); - yield { type: "text-delta", text: "complete answer" }; - yield { - type: "finish", - finishReason: "other", - rawFinishReason: "safety_violation", - }; - })(), - }); - - expect(errorEvents).toHaveLength(0); - expect(streamEndEvents).toHaveLength(1); - - const partial = await historyService.readPartial(workspaceId); - expect(partial).toBeNull(); - - const historyResult = await historyService.getHistoryFromLatestBoundary(workspaceId); - expect(historyResult.success).toBe(true); - if (!historyResult.success) { - throw new Error(historyResult.error); - } - - const updatedMessage = historyResult.data.find((message) => message.id === messageId); - expect(updatedMessage).toBeDefined(); - expect(updatedMessage?.metadata?.finishReason).toBe("other"); - expect(updatedMessage?.parts).toMatchObject([{ type: "text", text: "complete answer" }]); - }); }); describe("StreamManager - TTFT metadata persistence", () => { diff --git a/src/node/services/streamManager.ts b/src/node/services/streamManager.ts index 3b4647dada..fbd499454a 100644 --- a/src/node/services/streamManager.ts +++ b/src/node/services/streamManager.ts @@ -2118,22 +2118,10 @@ export class StreamManager extends EventEmitter { break; case "finish": { - const finishPart = part as { - finishReason?: unknown; - rawFinishReason?: unknown; - }; - // The OpenAI Responses adapter in the AI SDK flushes its initial - // { unified: "other", raw: undefined } default when the SSE stream - // closes before a terminal event. Treat only that synthesized default - // as missing-terminal; a real unmapped "other" carries a raw reason. - const isSynthesizedOtherFinish = - finishPart.finishReason === "other" && finishPart.rawFinishReason === undefined; - - if (!isSynthesizedOtherFinish) { - streamInfo.receivedTerminalEvent = true; - if (typeof finishPart.finishReason === "string") { - streamInfo.terminalFinishReason = finishPart.finishReason; - } + const finishPart = part as { finishReason?: unknown }; + streamInfo.receivedTerminalEvent = true; + if (typeof finishPart.finishReason === "string") { + streamInfo.terminalFinishReason = finishPart.finishReason; } break; } From b1d060dfebb30870c3f0d81463f296b380840924 Mon Sep 17 00:00:00 2001 From: ethan Date: Tue, 2 Jun 2026 22:37:09 +1000 Subject: [PATCH 3/4] fix: filter streamText's synthesized default finish in StreamManager MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous adapter-boundary filter (commit f019a609b) targeted only the @ai-sdk/openai adapters' synthesized default finish part. While correct as far as it went, it did not actually fix the bug: ai's streamText wraps every adapter in its own runStep TransformStream which initializes stepFinishReason="other" / stepRawFinishReason=undefined and unconditionally emits those values from its own flush() at end-of-stream. So even after dropping the OpenAI adapter's synthesized finish, an identical synthesized finish reappears one layer up. The same bug affects @ai-sdk/anthropic: its TransformStream has no flush(), so a clean upstream EOF (no message_stop) emits no finish part at all from the adapter — at which point streamText's runStep flush synthesizes the same (other, undefined) part. Symptoms are identical to the OpenAI bug: partial output is committed silently as a normal assistant message instead of surfacing a retryable stream_truncated error. Move the filter to the streamText → StreamManager boundary, which is where the synthesized default actually originates. Drop the OpenAI adapter wrapper module entirely. Discriminator: a finish part whose normalized finishReason is "other" and rawFinishReason is undefined. Empirically, the public mapping functions in both @ai-sdk/openai (mapOpenAIResponseFinishReason / mapOpenAIFinishReason) and @ai-sdk/anthropic (mapAnthropicStopReason) only ever produce unified="other" paired with a defined raw value (e.g. Anthropic's "compaction"). The (other, undefined) shape is unreachable from the adapters' own finish-reason mappers and is therefore a reliable signal that streamText's flush synthesized the finish. --- .../openAISynthesizedFinishFilter.test.ts | 225 ------------------ .../services/openAISynthesizedFinishFilter.ts | 111 --------- src/node/services/providerModelFactory.ts | 13 +- src/node/services/streamManager.test.ts | 126 ++++++++++ src/node/services/streamManager.ts | 36 ++- 5 files changed, 163 insertions(+), 348 deletions(-) delete mode 100644 src/node/services/openAISynthesizedFinishFilter.test.ts delete mode 100644 src/node/services/openAISynthesizedFinishFilter.ts diff --git a/src/node/services/openAISynthesizedFinishFilter.test.ts b/src/node/services/openAISynthesizedFinishFilter.test.ts deleted file mode 100644 index 6e3bd47893..0000000000 --- a/src/node/services/openAISynthesizedFinishFilter.test.ts +++ /dev/null @@ -1,225 +0,0 @@ -import { describe, test, expect } from "bun:test"; - -import { - createOpenAISynthesizedFinishFilter, - isOpenAISynthesizedDefaultFinishPart, - wrapOpenAIModelToFilterSynthesizedFinish, -} from "./openAISynthesizedFinishFilter"; -// The wrapper is typed against the LanguageModel union (V2|V3) from "ai", but -// the V2/V3 stream-part and tool-result variants drift just enough to make -// constructing a fully-typed in-test fake unworkable. Tests run against a -// minimal structural shape that matches the wrapper's runtime surface, then -// cast at the boundary. -interface LanguageModelLike { - specificationVersion: string; - provider: string; - modelId: string; - supportedUrls: Record; - doGenerate: (...args: unknown[]) => Promise; - doStream: (options: unknown) => Promise<{ - stream: ReadableStream; - request?: unknown; - response?: unknown; - }>; -} - -function streamOf(items: T[]): ReadableStream { - return new ReadableStream({ - start(controller) { - for (const item of items) { - controller.enqueue(item); - } - controller.close(); - }, - }); -} - -async function collect(stream: ReadableStream): Promise { - const out: T[] = []; - const reader = stream.getReader(); - while (true) { - const { value, done } = await reader.read(); - if (done) break; - out.push(value); - } - return out; -} - -describe("isOpenAISynthesizedDefaultFinishPart", () => { - test("matches the OpenAI adapter's uninitialized default", () => { - const part = { type: "finish", finishReason: { unified: "other", raw: undefined } }; - expect(isOpenAISynthesizedDefaultFinishPart(part)).toBe(true); - }); - - test("rejects 'other' finishes that carry an unmapped raw reason", () => { - // The legitimate-but-unmapped "other" case (e.g. a future - // incomplete_details.reason the SDK does not know about) — these must - // continue to count as terminal events. - const part = { - type: "finish", - finishReason: { unified: "other", raw: "safety_violation" }, - }; - expect(isOpenAISynthesizedDefaultFinishPart(part)).toBe(false); - }); - - test("rejects normal stop finishes", () => { - const part = { type: "finish", finishReason: { unified: "stop", raw: "stop" } }; - expect(isOpenAISynthesizedDefaultFinishPart(part)).toBe(false); - }); - - test("rejects length and content-filter finishes", () => { - expect( - isOpenAISynthesizedDefaultFinishPart({ - type: "finish", - finishReason: { unified: "length", raw: "max_output_tokens" }, - }) - ).toBe(false); - expect( - isOpenAISynthesizedDefaultFinishPart({ - type: "finish", - finishReason: { unified: "content-filter", raw: "content_filter" }, - }) - ).toBe(false); - }); - - test("rejects error finishes synthesized by the adapter", () => { - // When the adapter's transform observes a chunk-level error it sets - // finishReason to { unified: "error", raw: undefined }. We must not drop - // that — it's a real terminal signal. - expect( - isOpenAISynthesizedDefaultFinishPart({ - type: "finish", - finishReason: { unified: "error", raw: undefined }, - }) - ).toBe(false); - }); - - test("rejects non-finish parts even if shaped suspiciously", () => { - expect( - isOpenAISynthesizedDefaultFinishPart({ - type: "text-delta", - finishReason: { unified: "other", raw: undefined }, - }) - ).toBe(false); - expect(isOpenAISynthesizedDefaultFinishPart(null)).toBe(false); - expect(isOpenAISynthesizedDefaultFinishPart(undefined)).toBe(false); - expect(isOpenAISynthesizedDefaultFinishPart("finish")).toBe(false); - }); - - test("rejects finish parts shaped as a unified string (post-streamText conversion)", () => { - // Defensive: if this filter is ever fed a stream that has already been - // normalized through streamText, the finishReason will be the string - // "other" rather than { unified: "other", raw: undefined }. We must not - // accidentally drop legitimate finishes at that boundary. - expect(isOpenAISynthesizedDefaultFinishPart({ type: "finish", finishReason: "other" })).toBe( - false - ); - }); -}); - -describe("createOpenAISynthesizedFinishFilter", () => { - test("drops only the synthesized default finish part", async () => { - const input = [ - { type: "stream-start", warnings: [] }, - { type: "text-delta", id: "0", delta: "partial answer" }, - { type: "finish", finishReason: { unified: "other", raw: undefined } }, - ]; - - const out = await collect(streamOf(input).pipeThrough(createOpenAISynthesizedFinishFilter())); - - expect(out).toEqual([ - { type: "stream-start", warnings: [] }, - { type: "text-delta", id: "0", delta: "partial answer" }, - ]); - }); - - test("passes through real finish parts", async () => { - const input = [ - { type: "text-delta", id: "0", delta: "done" }, - { type: "finish", finishReason: { unified: "stop", raw: "stop" } }, - ]; - - const out = await collect(streamOf(input).pipeThrough(createOpenAISynthesizedFinishFilter())); - - expect(out).toEqual(input); - }); - - test("passes through legitimate unmapped 'other' finishes (raw reason present)", async () => { - const input = [ - { type: "text-delta", id: "0", delta: "done" }, - { type: "finish", finishReason: { unified: "other", raw: "safety_violation" } }, - ]; - - const out = await collect(streamOf(input).pipeThrough(createOpenAISynthesizedFinishFilter())); - - expect(out).toEqual(input); - }); -}); - -describe("wrapOpenAIModelToFilterSynthesizedFinish", () => { - test("filters the model's doStream output without touching anything else", async () => { - const captured: unknown[] = []; - const fakeModel: LanguageModelLike = { - specificationVersion: "v3", - provider: "openai", - modelId: "gpt-fake", - supportedUrls: {}, - doGenerate: () => Promise.reject(new Error("unused")), - doStream: (options) => { - captured.push(options); - return Promise.resolve({ - stream: streamOf([ - { type: "stream-start", warnings: [] }, - { type: "text-delta", id: "0", delta: "partial" }, - { type: "finish", finishReason: { unified: "other", raw: undefined } }, - ]), - request: {}, - response: { headers: {} }, - }); - }, - }; - - const wrapped = wrapOpenAIModelToFilterSynthesizedFinish( - fakeModel as unknown as Parameters[0] - ) as unknown as LanguageModelLike; - expect(wrapped).toBe(fakeModel); - - const result = await wrapped.doStream({ marker: 1 }); - expect(captured).toEqual([{ marker: 1 }]); - - const parts = await collect(result.stream); - expect(parts).toEqual([ - { type: "stream-start", warnings: [] }, - { type: "text-delta", id: "0", delta: "partial" }, - ]); - }); - - test("preserves real finish parts emitted by the model", async () => { - const fakeModel: LanguageModelLike = { - specificationVersion: "v3", - provider: "openai", - modelId: "gpt-fake", - supportedUrls: {}, - doGenerate: () => Promise.reject(new Error("unused")), - doStream: () => - Promise.resolve({ - stream: streamOf([ - { type: "text-delta", id: "0", delta: "complete" }, - { type: "finish", finishReason: { unified: "stop", raw: "stop" } }, - ]), - request: {}, - response: { headers: {} }, - }), - }; - - const wrapped = wrapOpenAIModelToFilterSynthesizedFinish( - fakeModel as unknown as Parameters[0] - ) as unknown as LanguageModelLike; - const result = await wrapped.doStream({}); - const parts = await collect(result.stream); - expect(parts).toEqual([ - { type: "text-delta", id: "0", delta: "complete" }, - { type: "finish", finishReason: { unified: "stop", raw: "stop" } }, - ]); - }); -}); diff --git a/src/node/services/openAISynthesizedFinishFilter.ts b/src/node/services/openAISynthesizedFinishFilter.ts deleted file mode 100644 index e032a3fc91..0000000000 --- a/src/node/services/openAISynthesizedFinishFilter.ts +++ /dev/null @@ -1,111 +0,0 @@ -import type { LanguageModel } from "ai"; - -/** - * The object form of {@link LanguageModel} (excluding the bare model-ID string - * shape). Only the object form exposes `doStream`, which is what the filter - * mutates. - */ -type LanguageModelInstance = Exclude; - -/** - * Filters the `@ai-sdk/openai` adapters' synthesized-default `finish` part out of - * the LanguageModelV2 stream so {@link StreamManager}'s missing-terminal-event guard - * (introduced in PR #3415) surfaces a clean upstream EOF as a retryable - * `stream_truncated` error instead of committing partial output as a normal - * assistant message. - * - * Background: every `@ai-sdk/openai` streaming adapter (Responses, Chat - * Completions, legacy Completions) initializes its internal finish reason to - * `{ unified: "other", raw: undefined }` and unconditionally emits that value - * from its `TransformStream.flush()` at end-of-stream — even when the SSE - * upstream closed before any terminal event arrived - * (`response.completed` / `response.incomplete` / `response.failed`, or a - * Chat Completions delta carrying `finish_reason`). That synthesized finish - * silently bypasses Mux's missing-terminal-event guard, committing partial - * output to history as if the model stopped cleanly. - * - * Discriminator: in this adapter family, the SDK's - * `mapOpenAIResponseFinishReason` and `mapOpenAIFinishReason` only return - * `unified: "other"` paired with a defined `raw` (the original unmapped API - * value). Among finish parts that these adapters legitimately emit, the - * `{ unified: "other", raw: undefined }` shape is unreachable except as the - * uninitialized default. Dropping it is safe — and intentionally narrow to - * the OpenAI adapter family. We do **not** extend this heuristic to other - * providers: the public AI SDK contract permits any adapter to emit `(other, - * undefined)` as a real terminal finish, so the filter must stay scoped to - * the boundary where we know the synthesized default originates. - * - * Implementation: wrap the model's `doStream` so its output stream is piped - * through a `TransformStream` that drops only the synthesized-default finish - * part. Other parts (text deltas, tool calls, real finishes, errors, etc.) - * pass through unchanged. When the synthesized default is dropped, the - * downstream consumer (`streamText`, then `StreamManager`) sees a stream that - * ended without a finish part, and the existing - * `!receivedTerminalEvent` branch handles it as `stream_truncated`. - */ -export function wrapOpenAIModelToFilterSynthesizedFinish( - model: M -): M { - const originalDoStream = model.doStream.bind(model); - // The wrapper is element-shape preserving (a pure filter), but the runtime - // shape spans the union of LanguageModelV*StreamPart types — TypeScript can't - // see that the same filter satisfies every member of the union, so we erase - // the element type for the pipe and restore it on the way out. The function - // signature still ties input and output element types together for callers. - type AnyDoStream = (options: unknown) => Promise<{ stream: ReadableStream }>; - const wrappedDoStream: AnyDoStream = async (options) => { - const result = await (originalDoStream as AnyDoStream)(options); - return { - ...result, - stream: result.stream.pipeThrough(createOpenAISynthesizedFinishFilter()), - }; - }; - model.doStream = wrappedDoStream as unknown as M["doStream"]; - return model; -} - -/** - * The TransformStream used by {@link wrapOpenAIModelToFilterSynthesizedFinish}. - * Exported separately so unit tests can exercise the filter directly without - * constructing a full LanguageModel mock. - * - * Generic over the element type so `ReadableStream.pipeThrough(...)` stays - * `ReadableStream` for the caller. The runtime check is shape-based and - * does not rely on `T`. - */ -export function createOpenAISynthesizedFinishFilter(): TransformStream { - return new TransformStream({ - transform(part, controller) { - if (isOpenAISynthesizedDefaultFinishPart(part)) { - return; - } - controller.enqueue(part); - }, - }); -} - -/** - * True iff `part` is the `@ai-sdk/openai` synthesized-default finish — the - * uninitialized `{ unified: "other", raw: undefined }` value that the adapter - * emits when no terminal SSE event arrived before EOF. - * - * The OpenAI adapters emit finish parts whose `finishReason` is the internal - * `{ unified, raw }` object (see `@ai-sdk/openai`'s flush handlers); the AI - * SDK's `streamText` later splits that into `finishReason` / `rawFinishReason` - * fields. We're operating at the adapter→`streamText` boundary, so we match - * against the `{ unified, raw }` shape. - */ -export function isOpenAISynthesizedDefaultFinishPart(part: unknown): boolean { - if (typeof part !== "object" || part === null) { - return false; - } - const { type, finishReason } = part as { type?: unknown; finishReason?: unknown }; - if (type !== "finish") { - return false; - } - if (typeof finishReason !== "object" || finishReason === null) { - return false; - } - const reason = finishReason as { unified?: unknown; raw?: unknown }; - return reason.unified === "other" && reason.raw === undefined; -} diff --git a/src/node/services/providerModelFactory.ts b/src/node/services/providerModelFactory.ts index c12d70efe4..1f03b22e66 100644 --- a/src/node/services/providerModelFactory.ts +++ b/src/node/services/providerModelFactory.ts @@ -48,7 +48,6 @@ import { moveLanguageModelCleanup, } from "@/node/services/languageModelCleanup"; import { createOpenAIWebSocketTransportFetch } from "@/node/services/openAIWebSocketTransportFetch"; -import { wrapOpenAIModelToFilterSynthesizedFinish } from "@/node/services/openAISynthesizedFinishFilter"; import { log } from "@/node/services/log"; import { MUX_ANTHROPIC_EFFORT_OVERRIDE_HEADER, @@ -1435,14 +1434,10 @@ export class ProviderModelFactory { }); // OpenAI reasoning state is preserved via explicit history, so no extra // middleware is needed beyond the provider's standard Responses handling. - const rawModel = + const model = effectiveWireFormat === "chatCompletions" ? provider.chat(modelId) : provider.responses(modelId); - // Strip the @ai-sdk/openai adapter's synthesized-default finish part so - // StreamManager's missing-terminal-event guard can fire on a clean EOF. - // See openAISynthesizedFinishFilter.ts for the full rationale. - const model = wrapOpenAIModelToFilterSynthesizedFinish(rawModel); if (webSocketTransport.active) { attachLanguageModelCleanup(model, webSocketTransport.close); } @@ -1906,11 +1901,7 @@ export class ProviderModelFactory { apiKey: "copilot", // placeholder, actual auth via custom fetch fetch: providerFetch, }); - // Same synthesized-default-finish filter as the direct OpenAI path: - // Copilot routes Responses/Chat Completions traffic through the same - // adapter, so a clean upstream EOF without a terminal event reaches - // StreamManager's guard via openAISynthesizedFinishFilter.ts. - return Ok(wrapOpenAIModelToFilterSynthesizedFinish(provider.chat(outboundCopilotModelId))); + return Ok(provider.chat(outboundCopilotModelId)); } // Generic handler for simple providers (standard API key + factory pattern) diff --git a/src/node/services/streamManager.test.ts b/src/node/services/streamManager.test.ts index c60f3cdd9d..ee05e27820 100644 --- a/src/node/services/streamManager.test.ts +++ b/src/node/services/streamManager.test.ts @@ -1654,6 +1654,132 @@ describe("StreamManager - empty stream completions", () => { expect(partial?.metadata?.metadataModel).toBe(KNOWN_MODELS.SONNET.id); expect(partial?.parts).toMatchObject([{ type: "text", text: "partial answer" }]); }); + + test("treats streamText's synthesized (other, undefined) finish part as a truncated stream", async () => { + // streamText's runStep initializes stepFinishReason="other" / + // stepRawFinishReason=undefined and unconditionally emits those from its + // flush() at end-of-stream. The OpenAI Responses, Chat Completions, and + // Anthropic Messages adapters all surface this shape when the upstream + // SSE stream closed before any terminal event arrived. StreamManager + // must treat that synthesized default as a missing terminal event so the + // existing truncation guard fires a retryable `stream_truncated` error + // rather than committing the partial output as a clean assistant + // message. + const streamManager = new StreamManager(historyService); + const errorEvents: Array<{ messageId: string; error: string; errorType?: string }> = []; + const streamEndEvents: unknown[] = []; + + streamManager.on("error", (data) => { + errorEvents.push(data as { messageId: string; error: string; errorType?: string }); + }); + streamManager.on("stream-end", (data) => { + streamEndEvents.push(data); + }); + + Reflect.set(streamManager, "tokenTracker", { + setModel: () => Promise.resolve(undefined), + countTokens: () => Promise.resolve(0), + }); + + const workspaceId = "synthesized-finish-workspace"; + const messageId = "synthesized-finish-message"; + const historySequence = 1; + + await appendPartialAssistantForTests(workspaceId, messageId, historySequence); + const processStreamWithCleanup = getProcessStreamWithCleanupForTests(streamManager); + const startTime = Date.now() - 250; + const streamInfo = createStreamInfoForTests({ + streamResult: createStreamResultForTests( + (async function* () { + await Promise.resolve(); + yield { type: "text-delta", text: "partial answer" }; + // The provider adapter never emitted its own finish (e.g. clean + // SSE EOF before response.completed / message_stop). The ai + // package's flush() synthesizes this one: + yield { type: "finish", finishReason: "other", rawFinishReason: undefined }; + })(), + { inputTokens: 3, outputTokens: 2, totalTokens: 5 } + ), + messageId, + startTime, + lastPartTimestamp: startTime, + model: KNOWN_MODELS.SONNET.id, + metadataModel: KNOWN_MODELS.SONNET.id, + historySequence, + initialMetadata: { agentId: "plan" }, + runtime, + }); + + await processStreamWithCleanup.call(streamManager, workspaceId, streamInfo, historySequence); + + expect(streamEndEvents).toHaveLength(0); + expect(errorEvents).toHaveLength(1); + expect(errorEvents[0]).toMatchObject({ + messageId, + errorType: "stream_truncated", + }); + + const partial = await historyService.readPartial(workspaceId); + expect(partial?.metadata?.errorType).toBe("stream_truncated"); + expect(partial?.parts).toMatchObject([{ type: "text", text: "partial answer" }]); + }); + + test("treats real (other, ) finish parts as a clean completion", async () => { + // The synthesized-default discriminator must NOT swallow legitimate + // `"other"` finishes. Both OpenAI and Anthropic map a few real stop + // reasons to `unified: "other"`, but always with a defined raw value + // (e.g. Anthropic's `"compaction"`). This test guards against the + // discriminator widening into a false positive that would mis-fire the + // truncation guard on a clean stream. + const streamManager = new StreamManager(historyService); + const errorEvents: Array<{ messageId: string; error: string; errorType?: string }> = []; + const streamEndEvents: unknown[] = []; + + streamManager.on("error", (data) => { + errorEvents.push(data as { messageId: string; error: string; errorType?: string }); + }); + streamManager.on("stream-end", (data) => { + streamEndEvents.push(data); + }); + + Reflect.set(streamManager, "tokenTracker", { + setModel: () => Promise.resolve(undefined), + countTokens: () => Promise.resolve(0), + }); + + const workspaceId = "real-other-finish-workspace"; + const messageId = "real-other-finish-message"; + const historySequence = 1; + + await appendPartialAssistantForTests(workspaceId, messageId, historySequence); + const processStreamWithCleanup = getProcessStreamWithCleanupForTests(streamManager); + const startTime = Date.now() - 250; + const streamInfo = createStreamInfoForTests({ + streamResult: createStreamResultForTests( + (async function* () { + await Promise.resolve(); + yield { type: "text-delta", text: "complete answer" }; + // Real Anthropic compaction finish (or any other mapped-to-other + // stop reason) carries a defined raw value. + yield { type: "finish", finishReason: "other", rawFinishReason: "compaction" }; + })(), + { inputTokens: 3, outputTokens: 2, totalTokens: 5 } + ), + messageId, + startTime, + lastPartTimestamp: startTime, + model: KNOWN_MODELS.SONNET.id, + metadataModel: KNOWN_MODELS.SONNET.id, + historySequence, + initialMetadata: { agentId: "plan" }, + runtime, + }); + + await processStreamWithCleanup.call(streamManager, workspaceId, streamInfo, historySequence); + + expect(errorEvents).toHaveLength(0); + expect(streamEndEvents).toHaveLength(1); + }); }); describe("StreamManager - TTFT metadata persistence", () => { diff --git a/src/node/services/streamManager.ts b/src/node/services/streamManager.ts index fbd499454a..5dcf3dcc0f 100644 --- a/src/node/services/streamManager.ts +++ b/src/node/services/streamManager.ts @@ -2118,7 +2118,41 @@ export class StreamManager extends EventEmitter { break; case "finish": { - const finishPart = part as { finishReason?: unknown }; + const finishPart = part as { + finishReason?: unknown; + rawFinishReason?: unknown; + }; + // Skip the `ai` package's synthesized-default finish part. + // + // `streamText`'s internal `runStep` initializes + // `stepFinishReason = "other"` and `stepRawFinishReason = undefined`, + // and unconditionally emits those values from its flush() at + // end-of-stream — even when the underlying SSE stream closed + // before any terminal event arrived. The OpenAI Responses, + // Chat Completions, and Anthropic Messages adapters all + // exhibit this in practice: a clean upstream EOF (no + // `response.completed`, no `message_stop`, no + // `finish_reason` delta) ends up as a synthesized + // `(other, undefined)` finish here. + // + // The discriminator is narrow on purpose. Every real OpenAI + // and Anthropic finish path that maps to `"other"` pairs it + // with a defined raw reason (e.g. Anthropic's `"compaction"`). + // The `(other, undefined)` shape is unreachable from the + // adapters' own finish-reason mappers and is therefore a + // reliable signal that the finish was synthesized. + // + // Treating it as a non-event lets the existing + // `!receivedTerminalEvent` branch below fire + // `handleTruncatedStreamCompletion`, which surfaces a + // retryable `stream_truncated` error instead of silently + // committing partial output as a normal assistant message. + if ( + finishPart.finishReason === "other" && + finishPart.rawFinishReason === undefined + ) { + break; + } streamInfo.receivedTerminalEvent = true; if (typeof finishPart.finishReason === "string") { streamInfo.terminalFinishReason = finishPart.finishReason; From 89f9f460fc1e8e3b6ab19625d3c3ea6c8583ff35 Mon Sep 17 00:00:00 2001 From: ethan Date: Tue, 2 Jun 2026 23:39:09 +1000 Subject: [PATCH 4/4] fix: log when synthesized-default finish discriminator fires Adds observability for the unaudited-adapter risk: provider adapters outside @ai-sdk/openai and @ai-sdk/anthropic (xai, openai-compatible, bedrock, ollama-ai-provider, etc.) were not source-audited for their finish-reason mappers. If one legitimately emits (other, undefined) as a real terminal finish, the discriminator misfires and the user sees a spurious truncated-stream retry. A log line at the firing site makes misfires diagnosable instead of silent. --- src/node/services/streamManager.ts | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/node/services/streamManager.ts b/src/node/services/streamManager.ts index 5dcf3dcc0f..88c71cc683 100644 --- a/src/node/services/streamManager.ts +++ b/src/node/services/streamManager.ts @@ -2151,6 +2151,19 @@ export class StreamManager extends EventEmitter { finishPart.finishReason === "other" && finishPart.rawFinishReason === undefined ) { + // Observability for the unaudited-adapter risk: if a future + // provider adapter we haven't read source for legitimately + // emits `(other, undefined)` as a real terminal finish, the + // discriminator misfires and the user sees a spurious + // truncated-stream retry. Surface a log so that misfires + // are diagnosable instead of silent. + workspaceLog.warn( + "Treating synthesized-default (other, undefined) finish as truncated stream", + { + messageId: streamInfo.messageId, + model: streamInfo.model, + } + ); break; } streamInfo.receivedTerminalEvent = true;