diff --git a/packages/opencode/src/config/config.ts b/packages/opencode/src/config/config.ts index 456d6c3ee317..3ce3dfdb1e31 100644 --- a/packages/opencode/src/config/config.ts +++ b/packages/opencode/src/config/config.ts @@ -301,6 +301,10 @@ export const Info = Schema.Struct({ mcp_timeout: Schema.optional(PositiveInt).annotate({ description: "Timeout in milliseconds for model context protocol (MCP) requests", }), + llm_stream_idle_timeout: Schema.optional(PositiveInt).annotate({ + description: + "Idle timeout in milliseconds for the LLM streaming response. Fails the stream if no event arrives within this window. Set to 0 to disable. Default: 120000 (2 minutes).", + }), }), ), }).annotate({ identifier: "Config" }) diff --git a/packages/opencode/src/session/llm.ts b/packages/opencode/src/session/llm.ts index ea2efc99d007..dbde10b4d965 100644 --- a/packages/opencode/src/session/llm.ts +++ b/packages/opencode/src/session/llm.ts @@ -30,6 +30,27 @@ import { LLMRequestPrep } from "./llm/request" const log = Log.create({ service: "llm" }) export const OUTPUT_TOKEN_MAX = ProviderTransform.OUTPUT_TOKEN_MAX +/** Default idle-timeout for LLM streaming responses (ms). Overridable via + * `experimental.llm_stream_idle_timeout` in opencode config; 0 disables. */ +export const DEFAULT_LLM_STREAM_IDLE_TIMEOUT_MS = 120_000 + +export class LLMStreamIdleTimeout extends Error { + readonly _tag = "LLMStreamIdleTimeout" + constructor(public readonly idleMs: number) { + super(`LLM stream idle timeout: no events received for ${idleMs}ms`) + } +} + +export function withIdleTimeout( + source: Stream.Stream, + idleMs: number, +): Stream.Stream { + return Stream.timeoutOrElse(source, { + duration: `${idleMs} millis`, + orElse: () => Stream.fail(new LLMStreamIdleTimeout(idleMs)), + }) +} + export type StreamInput = { user: MessageV2.User sessionID: string @@ -349,19 +370,30 @@ const live: Layer.Layer< (ctrl) => Effect.sync(() => ctrl.abort()), ) + const cfg = yield* config.get() + const idleMs = cfg.experimental?.llm_stream_idle_timeout ?? DEFAULT_LLM_STREAM_IDLE_TIMEOUT_MS + const result = yield* run({ ...input, abort: ctrl.signal }) - if (result.type === "native") return result.stream + const base = + result.type === "native" + ? result.stream + : // Adapter seam: both runtimes expose the same LLMEvent stream. Native + // already returns one; AI SDK streams are converted here. + (() => { + const state = LLMAISDK.adapterState() + return Stream.fromAsyncIterable(result.result.fullStream, (e) => + e instanceof Error ? e : new Error(String(e)), + ).pipe( + Stream.mapEffect((event) => LLMAISDK.toLLMEvents(state, event)), + Stream.flatMap((events) => Stream.fromIterable(events)), + ) + })() - // Adapter seam: both runtimes expose the same LLMEvent stream. Native - // already returns one; AI SDK streams are converted here. - const state = LLMAISDK.adapterState() - return Stream.fromAsyncIterable(result.result.fullStream, (e) => - e instanceof Error ? e : new Error(String(e)), - ).pipe( - Stream.mapEffect((event) => LLMAISDK.toLLMEvents(state, event)), - Stream.flatMap((events) => Stream.fromIterable(events)), - ) + // Idle-timeout guard. Without this an upstream that silently stalls + // (proxy/relay drop, provider hang, transport half-open) leaves the + // session loop blocked on the next chunk indefinitely. + return idleMs > 0 ? withIdleTimeout(base, idleMs) : base }), ), ) diff --git a/packages/opencode/test/session/llm-idle-timeout.test.ts b/packages/opencode/test/session/llm-idle-timeout.test.ts new file mode 100644 index 000000000000..f249a3b8846d --- /dev/null +++ b/packages/opencode/test/session/llm-idle-timeout.test.ts @@ -0,0 +1,35 @@ +import { describe, expect, test } from "bun:test" +import { Cause, Effect, Exit, Option, Stream } from "effect" +import { LLMStreamIdleTimeout, withIdleTimeout } from "../../src/session/llm" + +describe("session.llm.withIdleTimeout", () => { + test("fails with LLMStreamIdleTimeout when no event arrives within the idle window", async () => { + const stalled = Stream.never as Stream.Stream + const exit = await Effect.runPromiseExit(Stream.runCollect(withIdleTimeout(stalled, 30))) + expect(Exit.isFailure(exit)).toBe(true) + if (Exit.isFailure(exit)) { + const err = Cause.findErrorOption(exit.cause) + expect(Option.isSome(err) && err.value instanceof LLMStreamIdleTimeout).toBe(true) + } + }) + + test("emits early events then fails when the stream stalls", async () => { + const partial = Stream.make(1, 2).pipe(Stream.concat(Stream.never as Stream.Stream)) + const collected: number[] = [] + const exit = await Effect.runPromiseExit( + Stream.runForEach(withIdleTimeout(partial, 30), (n) => + Effect.sync(() => { + collected.push(n) + }), + ), + ) + expect(collected).toEqual([1, 2]) + expect(Exit.isFailure(exit)).toBe(true) + }) + + test("does not interfere with streams that complete before the idle window", async () => { + const ok = Stream.make("a", "b", "c") + const out = await Effect.runPromise(Stream.runCollect(withIdleTimeout(ok, 1_000))) + expect(Array.from(out)).toEqual(["a", "b", "c"]) + }) +})