From 36e9a41c1840e12b557105d723845149881829fa Mon Sep 17 00:00:00 2001 From: WonderJL Date: Tue, 26 May 2026 15:44:36 +0800 Subject: [PATCH] fix(session): add idle timeout to LLM stream If the model provider's SSE stream stalls without sending [DONE] or closing the socket (proxy drop, transport half-open, upstream hang), the session loop blocks on the next chunk forever and the session appears stuck with no error. Wrap the LLM event stream in Stream.timeoutOrElse so that an idle window with no events fails the stream with a tagged LLMStreamIdleTimeout error, allowing the session loop to surface the failure to the user. Applies to both the native runtime and ai-sdk runtime paths. Configurable via experimental.llm_stream_idle_timeout (ms); defaults to 120000ms. Set to 0 to disable. --- packages/opencode/src/config/config.ts | 4 ++ packages/opencode/src/session/llm.ts | 52 +++++++++++++++---- .../test/session/llm-idle-timeout.test.ts | 35 +++++++++++++ 3 files changed, 81 insertions(+), 10 deletions(-) create mode 100644 packages/opencode/test/session/llm-idle-timeout.test.ts diff --git a/packages/opencode/src/config/config.ts b/packages/opencode/src/config/config.ts index 456d6c3ee317..3ce3dfdb1e31 100644 --- a/packages/opencode/src/config/config.ts +++ b/packages/opencode/src/config/config.ts @@ -301,6 +301,10 @@ export const Info = Schema.Struct({ mcp_timeout: Schema.optional(PositiveInt).annotate({ description: "Timeout in milliseconds for model context protocol (MCP) requests", }), + llm_stream_idle_timeout: Schema.optional(PositiveInt).annotate({ + description: + "Idle timeout in milliseconds for the LLM streaming response. Fails the stream if no event arrives within this window. Set to 0 to disable. Default: 120000 (2 minutes).", + }), }), ), }).annotate({ identifier: "Config" }) diff --git a/packages/opencode/src/session/llm.ts b/packages/opencode/src/session/llm.ts index ea2efc99d007..dbde10b4d965 100644 --- a/packages/opencode/src/session/llm.ts +++ b/packages/opencode/src/session/llm.ts @@ -30,6 +30,27 @@ import { LLMRequestPrep } from "./llm/request" const log = Log.create({ service: "llm" }) export const OUTPUT_TOKEN_MAX = ProviderTransform.OUTPUT_TOKEN_MAX +/** Default idle-timeout for LLM streaming responses (ms). Overridable via + * `experimental.llm_stream_idle_timeout` in opencode config; 0 disables. */ +export const DEFAULT_LLM_STREAM_IDLE_TIMEOUT_MS = 120_000 + +export class LLMStreamIdleTimeout extends Error { + readonly _tag = "LLMStreamIdleTimeout" + constructor(public readonly idleMs: number) { + super(`LLM stream idle timeout: no events received for ${idleMs}ms`) + } +} + +export function withIdleTimeout( + source: Stream.Stream, + idleMs: number, +): Stream.Stream { + return Stream.timeoutOrElse(source, { + duration: `${idleMs} millis`, + orElse: () => Stream.fail(new LLMStreamIdleTimeout(idleMs)), + }) +} + export type StreamInput = { user: MessageV2.User sessionID: string @@ -349,19 +370,30 @@ const live: Layer.Layer< (ctrl) => Effect.sync(() => ctrl.abort()), ) + const cfg = yield* config.get() + const idleMs = cfg.experimental?.llm_stream_idle_timeout ?? DEFAULT_LLM_STREAM_IDLE_TIMEOUT_MS + const result = yield* run({ ...input, abort: ctrl.signal }) - if (result.type === "native") return result.stream + const base = + result.type === "native" + ? result.stream + : // Adapter seam: both runtimes expose the same LLMEvent stream. Native + // already returns one; AI SDK streams are converted here. + (() => { + const state = LLMAISDK.adapterState() + return Stream.fromAsyncIterable(result.result.fullStream, (e) => + e instanceof Error ? e : new Error(String(e)), + ).pipe( + Stream.mapEffect((event) => LLMAISDK.toLLMEvents(state, event)), + Stream.flatMap((events) => Stream.fromIterable(events)), + ) + })() - // Adapter seam: both runtimes expose the same LLMEvent stream. Native - // already returns one; AI SDK streams are converted here. - const state = LLMAISDK.adapterState() - return Stream.fromAsyncIterable(result.result.fullStream, (e) => - e instanceof Error ? e : new Error(String(e)), - ).pipe( - Stream.mapEffect((event) => LLMAISDK.toLLMEvents(state, event)), - Stream.flatMap((events) => Stream.fromIterable(events)), - ) + // Idle-timeout guard. Without this an upstream that silently stalls + // (proxy/relay drop, provider hang, transport half-open) leaves the + // session loop blocked on the next chunk indefinitely. + return idleMs > 0 ? withIdleTimeout(base, idleMs) : base }), ), ) diff --git a/packages/opencode/test/session/llm-idle-timeout.test.ts b/packages/opencode/test/session/llm-idle-timeout.test.ts new file mode 100644 index 000000000000..f249a3b8846d --- /dev/null +++ b/packages/opencode/test/session/llm-idle-timeout.test.ts @@ -0,0 +1,35 @@ +import { describe, expect, test } from "bun:test" +import { Cause, Effect, Exit, Option, Stream } from "effect" +import { LLMStreamIdleTimeout, withIdleTimeout } from "../../src/session/llm" + +describe("session.llm.withIdleTimeout", () => { + test("fails with LLMStreamIdleTimeout when no event arrives within the idle window", async () => { + const stalled = Stream.never as Stream.Stream + const exit = await Effect.runPromiseExit(Stream.runCollect(withIdleTimeout(stalled, 30))) + expect(Exit.isFailure(exit)).toBe(true) + if (Exit.isFailure(exit)) { + const err = Cause.findErrorOption(exit.cause) + expect(Option.isSome(err) && err.value instanceof LLMStreamIdleTimeout).toBe(true) + } + }) + + test("emits early events then fails when the stream stalls", async () => { + const partial = Stream.make(1, 2).pipe(Stream.concat(Stream.never as Stream.Stream)) + const collected: number[] = [] + const exit = await Effect.runPromiseExit( + Stream.runForEach(withIdleTimeout(partial, 30), (n) => + Effect.sync(() => { + collected.push(n) + }), + ), + ) + expect(collected).toEqual([1, 2]) + expect(Exit.isFailure(exit)).toBe(true) + }) + + test("does not interfere with streams that complete before the idle window", async () => { + const ok = Stream.make("a", "b", "c") + const out = await Effect.runPromise(Stream.runCollect(withIdleTimeout(ok, 1_000))) + expect(Array.from(out)).toEqual(["a", "b", "c"]) + }) +})