Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/opencode/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,10 @@ export const Info = Schema.Struct({
mcp_timeout: Schema.optional(PositiveInt).annotate({
description: "Timeout in milliseconds for model context protocol (MCP) requests",
}),
llm_stream_idle_timeout: Schema.optional(PositiveInt).annotate({
description:
"Idle timeout in milliseconds for the LLM streaming response. Fails the stream if no event arrives within this window. Set to 0 to disable. Default: 120000 (2 minutes).",
}),
}),
),
}).annotate({ identifier: "Config" })
Expand Down
52 changes: 42 additions & 10 deletions packages/opencode/src/session/llm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,27 @@ import { LLMRequestPrep } from "./llm/request"
const log = Log.create({ service: "llm" })
export const OUTPUT_TOKEN_MAX = ProviderTransform.OUTPUT_TOKEN_MAX

/** Default idle-timeout for LLM streaming responses (ms). Overridable via
* `experimental.llm_stream_idle_timeout` in opencode config; 0 disables. */
export const DEFAULT_LLM_STREAM_IDLE_TIMEOUT_MS = 120_000

export class LLMStreamIdleTimeout extends Error {
readonly _tag = "LLMStreamIdleTimeout"
constructor(public readonly idleMs: number) {
super(`LLM stream idle timeout: no events received for ${idleMs}ms`)
}
}

export function withIdleTimeout<A, E, R>(
source: Stream.Stream<A, E, R>,
idleMs: number,
): Stream.Stream<A, E | LLMStreamIdleTimeout, R> {
return Stream.timeoutOrElse(source, {
duration: `${idleMs} millis`,
orElse: () => Stream.fail(new LLMStreamIdleTimeout(idleMs)),
})
}

export type StreamInput = {
user: MessageV2.User
sessionID: string
Expand Down Expand Up @@ -349,19 +370,30 @@ const live: Layer.Layer<
(ctrl) => Effect.sync(() => ctrl.abort()),
)

const cfg = yield* config.get()
const idleMs = cfg.experimental?.llm_stream_idle_timeout ?? DEFAULT_LLM_STREAM_IDLE_TIMEOUT_MS

const result = yield* run({ ...input, abort: ctrl.signal })

if (result.type === "native") return result.stream
const base =
result.type === "native"
? result.stream
: // Adapter seam: both runtimes expose the same LLMEvent stream. Native
// already returns one; AI SDK streams are converted here.
(() => {
const state = LLMAISDK.adapterState()
return Stream.fromAsyncIterable(result.result.fullStream, (e) =>
e instanceof Error ? e : new Error(String(e)),
).pipe(
Stream.mapEffect((event) => LLMAISDK.toLLMEvents(state, event)),
Stream.flatMap((events) => Stream.fromIterable(events)),
)
})()

// Adapter seam: both runtimes expose the same LLMEvent stream. Native
// already returns one; AI SDK streams are converted here.
const state = LLMAISDK.adapterState()
return Stream.fromAsyncIterable(result.result.fullStream, (e) =>
e instanceof Error ? e : new Error(String(e)),
).pipe(
Stream.mapEffect((event) => LLMAISDK.toLLMEvents(state, event)),
Stream.flatMap((events) => Stream.fromIterable(events)),
)
// Idle-timeout guard. Without this an upstream that silently stalls
// (proxy/relay drop, provider hang, transport half-open) leaves the
// session loop blocked on the next chunk indefinitely.
return idleMs > 0 ? withIdleTimeout(base, idleMs) : base
}),
),
)
Expand Down
35 changes: 35 additions & 0 deletions packages/opencode/test/session/llm-idle-timeout.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { describe, expect, test } from "bun:test"
import { Cause, Effect, Exit, Option, Stream } from "effect"
import { LLMStreamIdleTimeout, withIdleTimeout } from "../../src/session/llm"

describe("session.llm.withIdleTimeout", () => {
test("fails with LLMStreamIdleTimeout when no event arrives within the idle window", async () => {
const stalled = Stream.never as Stream.Stream<number>
const exit = await Effect.runPromiseExit(Stream.runCollect(withIdleTimeout(stalled, 30)))
expect(Exit.isFailure(exit)).toBe(true)
if (Exit.isFailure(exit)) {
const err = Cause.findErrorOption(exit.cause)
expect(Option.isSome(err) && err.value instanceof LLMStreamIdleTimeout).toBe(true)
}
})

test("emits early events then fails when the stream stalls", async () => {
const partial = Stream.make(1, 2).pipe(Stream.concat(Stream.never as Stream.Stream<number>))
const collected: number[] = []
const exit = await Effect.runPromiseExit(
Stream.runForEach(withIdleTimeout(partial, 30), (n) =>
Effect.sync(() => {
collected.push(n)
}),
),
)
expect(collected).toEqual([1, 2])
expect(Exit.isFailure(exit)).toBe(true)
})

test("does not interfere with streams that complete before the idle window", async () => {
const ok = Stream.make("a", "b", "c")
const out = await Effect.runPromise(Stream.runCollect(withIdleTimeout(ok, 1_000)))
expect(Array.from(out)).toEqual(["a", "b", "c"])
})
})
Loading