Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 126 additions & 0 deletions src/node/services/streamManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1654,6 +1654,132 @@ describe("StreamManager - empty stream completions", () => {
expect(partial?.metadata?.metadataModel).toBe(KNOWN_MODELS.SONNET.id);
expect(partial?.parts).toMatchObject([{ type: "text", text: "partial answer" }]);
});

test("treats streamText's synthesized (other, undefined) finish part as a truncated stream", async () => {
// streamText's runStep initializes stepFinishReason="other" /
// stepRawFinishReason=undefined and unconditionally emits those from its
// flush() at end-of-stream. The OpenAI Responses, Chat Completions, and
// Anthropic Messages adapters all surface this shape when the upstream
// SSE stream closed before any terminal event arrived. StreamManager
// must treat that synthesized default as a missing terminal event so the
// existing truncation guard fires a retryable `stream_truncated` error
// rather than committing the partial output as a clean assistant
// message.
const streamManager = new StreamManager(historyService);
const errorEvents: Array<{ messageId: string; error: string; errorType?: string }> = [];
const streamEndEvents: unknown[] = [];

streamManager.on("error", (data) => {
errorEvents.push(data as { messageId: string; error: string; errorType?: string });
});
streamManager.on("stream-end", (data) => {
streamEndEvents.push(data);
});

Reflect.set(streamManager, "tokenTracker", {
setModel: () => Promise.resolve(undefined),
countTokens: () => Promise.resolve(0),
});

const workspaceId = "synthesized-finish-workspace";
const messageId = "synthesized-finish-message";
const historySequence = 1;

await appendPartialAssistantForTests(workspaceId, messageId, historySequence);
const processStreamWithCleanup = getProcessStreamWithCleanupForTests(streamManager);
const startTime = Date.now() - 250;
const streamInfo = createStreamInfoForTests({
streamResult: createStreamResultForTests(
(async function* () {
await Promise.resolve();
yield { type: "text-delta", text: "partial answer" };
// The provider adapter never emitted its own finish (e.g. clean
// SSE EOF before response.completed / message_stop). The ai
// package's flush() synthesizes this one:
yield { type: "finish", finishReason: "other", rawFinishReason: undefined };
})(),
{ inputTokens: 3, outputTokens: 2, totalTokens: 5 }
),
messageId,
startTime,
lastPartTimestamp: startTime,
model: KNOWN_MODELS.SONNET.id,
metadataModel: KNOWN_MODELS.SONNET.id,
historySequence,
initialMetadata: { agentId: "plan" },
runtime,
});

await processStreamWithCleanup.call(streamManager, workspaceId, streamInfo, historySequence);

expect(streamEndEvents).toHaveLength(0);
expect(errorEvents).toHaveLength(1);
expect(errorEvents[0]).toMatchObject({
messageId,
errorType: "stream_truncated",
});

const partial = await historyService.readPartial(workspaceId);
expect(partial?.metadata?.errorType).toBe("stream_truncated");
expect(partial?.parts).toMatchObject([{ type: "text", text: "partial answer" }]);
});

test("treats real (other, <raw>) finish parts as a clean completion", async () => {
// The synthesized-default discriminator must NOT swallow legitimate
// `"other"` finishes. Both OpenAI and Anthropic map a few real stop
// reasons to `unified: "other"`, but always with a defined raw value
// (e.g. Anthropic's `"compaction"`). This test guards against the
// discriminator widening into a false positive that would mis-fire the
// truncation guard on a clean stream.
const streamManager = new StreamManager(historyService);
const errorEvents: Array<{ messageId: string; error: string; errorType?: string }> = [];
const streamEndEvents: unknown[] = [];

streamManager.on("error", (data) => {
errorEvents.push(data as { messageId: string; error: string; errorType?: string });
});
streamManager.on("stream-end", (data) => {
streamEndEvents.push(data);
});

Reflect.set(streamManager, "tokenTracker", {
setModel: () => Promise.resolve(undefined),
countTokens: () => Promise.resolve(0),
});

const workspaceId = "real-other-finish-workspace";
const messageId = "real-other-finish-message";
const historySequence = 1;

await appendPartialAssistantForTests(workspaceId, messageId, historySequence);
const processStreamWithCleanup = getProcessStreamWithCleanupForTests(streamManager);
const startTime = Date.now() - 250;
const streamInfo = createStreamInfoForTests({
streamResult: createStreamResultForTests(
(async function* () {
await Promise.resolve();
yield { type: "text-delta", text: "complete answer" };
// Real Anthropic compaction finish (or any other mapped-to-other
// stop reason) carries a defined raw value.
yield { type: "finish", finishReason: "other", rawFinishReason: "compaction" };
})(),
{ inputTokens: 3, outputTokens: 2, totalTokens: 5 }
),
messageId,
startTime,
lastPartTimestamp: startTime,
model: KNOWN_MODELS.SONNET.id,
metadataModel: KNOWN_MODELS.SONNET.id,
historySequence,
initialMetadata: { agentId: "plan" },
runtime,
});

await processStreamWithCleanup.call(streamManager, workspaceId, streamInfo, historySequence);

expect(errorEvents).toHaveLength(0);
expect(streamEndEvents).toHaveLength(1);
});
});

describe("StreamManager - TTFT metadata persistence", () => {
Expand Down
49 changes: 48 additions & 1 deletion src/node/services/streamManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2118,7 +2118,54 @@ export class StreamManager extends EventEmitter {
break;

case "finish": {
const finishPart = part as { finishReason?: unknown };
const finishPart = part as {
finishReason?: unknown;
rawFinishReason?: unknown;
};
// Skip the `ai` package's synthesized-default finish part.
//
// `streamText`'s internal `runStep` initializes
// `stepFinishReason = "other"` and `stepRawFinishReason = undefined`,
// and unconditionally emits those values from its flush() at
// end-of-stream β€” even when the underlying SSE stream closed
// before any terminal event arrived. The OpenAI Responses,
// Chat Completions, and Anthropic Messages adapters all
// exhibit this in practice: a clean upstream EOF (no
// `response.completed`, no `message_stop`, no
// `finish_reason` delta) ends up as a synthesized
// `(other, undefined)` finish here.
//
// The discriminator is narrow on purpose. Every real OpenAI
// and Anthropic finish path that maps to `"other"` pairs it
// with a defined raw reason (e.g. Anthropic's `"compaction"`).
// The `(other, undefined)` shape is unreachable from the
// adapters' own finish-reason mappers and is therefore a
// reliable signal that the finish was synthesized.
//
// Treating it as a non-event lets the existing
// `!receivedTerminalEvent` branch below fire
// `handleTruncatedStreamCompletion`, which surfaces a
// retryable `stream_truncated` error instead of silently
// committing partial output as a normal assistant message.
if (
finishPart.finishReason === "other" &&
finishPart.rawFinishReason === undefined
) {
// Observability for the unaudited-adapter risk: if a future
// provider adapter we haven't read source for legitimately
// emits `(other, undefined)` as a real terminal finish, the
// discriminator misfires and the user sees a spurious
// truncated-stream retry. Surface a log so that misfires
// are diagnosable instead of silent.
workspaceLog.warn(
"Treating synthesized-default (other, undefined) finish as truncated stream",
{
messageId: streamInfo.messageId,
model: streamInfo.model,
}
);
break;
}
streamInfo.receivedTerminalEvent = true;
if (typeof finishPart.finishReason === "string") {
streamInfo.terminalFinishReason = finishPart.finishReason;
Expand Down
Loading