From 2e8c3ddf0e8e1b8ccb75d0a91904088d92754577 Mon Sep 17 00:00:00 2001 From: Liao Shiwu Date: Sun, 31 May 2026 22:52:57 +0800 Subject: [PATCH] fix: guard against episode storm stalling foreground sessions (#1755) --- .../core/config/defaults.ts | 3 + apps/memos-local-plugin/core/config/schema.ts | 17 ++ apps/memos-local-plugin/core/pipeline/deps.ts | 30 ++-- .../core/pipeline/orchestrator.ts | 150 +++++++++++++----- .../memos-local-plugin/core/pipeline/types.ts | 12 ++ .../core/util/rate-limited-llm.ts | 88 ++++++++++ .../memos-local-plugin/core/util/semaphore.ts | 30 ++++ 7 files changed, 279 insertions(+), 51 deletions(-) create mode 100644 apps/memos-local-plugin/core/util/rate-limited-llm.ts create mode 100644 apps/memos-local-plugin/core/util/semaphore.ts diff --git a/apps/memos-local-plugin/core/config/defaults.ts b/apps/memos-local-plugin/core/config/defaults.ts index 680957819..76be825d0 100644 --- a/apps/memos-local-plugin/core/config/defaults.ts +++ b/apps/memos-local-plugin/core/config/defaults.ts @@ -193,6 +193,9 @@ export const DEFAULT_CONFIG: ResolvedConfig = { session: { followUpMode: "merge_follow_ups", mergeMaxGapMs: 2 * 60 * 60 * 1000, + maxTurnsPerEpisode: 30, + classifyTimeoutMs: 5000, + bgLlmConcurrency: 2, }, retrieval: { tier1TopK: 3, diff --git a/apps/memos-local-plugin/core/config/schema.ts b/apps/memos-local-plugin/core/config/schema.ts index 675cade70..54b396105 100644 --- a/apps/memos-local-plugin/core/config/schema.ts +++ b/apps/memos-local-plugin/core/config/schema.ts @@ -293,6 +293,23 @@ const AlgorithmSchema = Type.Object({ * `taskIdleTimeoutMs`. */ mergeMaxGapMs: NumberInRange(2 * 60 * 60 * 1000, 0, 24 * 60 * 60 * 1000), + /** + * Hard cap on turns in a merged episode. Once reached, the next + * turn forces a topic boundary even if relation classification says + * follow-up/revision. Keeps task-end processing bounded. + */ + maxTurnsPerEpisode: NumberInRange(30, 5, 200), + /** + * Max time to wait for relation classification before defaulting + * to a conservative new-task boundary so foreground prompt + * construction cannot stall indefinitely. + */ + classifyTimeoutMs: NumberInRange(5000, 1000, 30000), + /** + * Shared LLM concurrency budget for asynchronous background + * capture/reward/L2/L3/skill-evolution processing. + */ + bgLlmConcurrency: NumberInRange(2, 1, 8), }, { default: {} }), retrieval: Type.Object({ /** How many Skill snippets to inject at turn start. */ diff --git a/apps/memos-local-plugin/core/pipeline/deps.ts b/apps/memos-local-plugin/core/pipeline/deps.ts index 9d699ab43..ef4ac196c 100644 --- a/apps/memos-local-plugin/core/pipeline/deps.ts +++ b/apps/memos-local-plugin/core/pipeline/deps.ts @@ -99,6 +99,8 @@ import type { PipelineSubscriptions, } from "./types.js"; import { wrapRetrievalRepos } from "./retrieval-repos.js"; +import { createSemaphore } from "../util/semaphore.js"; +import { rateLimitLlmClient } from "../util/rate-limited-llm.js"; // ─── Algorithm config slice helper ──────────────────────────────────────── @@ -160,6 +162,9 @@ export function extractAlgorithmConfig( session: { followUpMode: alg.session.followUpMode, mergeMaxGapMs: alg.session.mergeMaxGapMs, + maxTurnsPerEpisode: alg.session.maxTurnsPerEpisode, + classifyTimeoutMs: alg.session.classifyTimeoutMs, + bgLlmConcurrency: alg.session.bgLlmConcurrency, }, }; } @@ -198,14 +203,17 @@ export function buildPipelineSubscribers( session?: PipelineSessionSet, ): PipelineSubscriberSet { const log = deps.log ?? rootLogger.child({ channel: "core.pipeline" }); + const bgLlmSemaphore = createSemaphore(algorithm.session.bgLlmConcurrency); + const bgLlm = rateLimitLlmClient(deps.llm, bgLlmSemaphore); + const bgReflectLlm = rateLimitLlmClient(deps.reflectLlm, bgLlmSemaphore); const captureRunner = createCaptureRunner({ tracesRepo: deps.repos.traces, embeddingRetryQueue: deps.repos.embeddingRetryQueue, episodesRepo: adaptEpisodesRepo(deps.repos.episodes), embedder: deps.embedder, - llm: deps.llm, - reflectLlm: deps.reflectLlm, + llm: bgLlm, + reflectLlm: bgReflectLlm, bus: buses.capture, cfg: algorithm.capture, now: deps.now, @@ -215,14 +223,14 @@ export function buildPipelineSubscribers( tracesRepo: deps.repos.traces, episodesRepo: deps.repos.episodes, feedbackRepo: deps.repos.feedback, - llm: deps.llm, + llm: bgLlm, bus: buses.reward, cfg: algorithm.reward, evaluator: { - reflectionProvider: deps.reflectLlm?.provider, - reflectionModel: deps.reflectLlm?.model, - scorerProvider: deps.llm?.provider, - scorerModel: deps.llm?.model, + reflectionProvider: bgReflectLlm?.provider, + reflectionModel: bgReflectLlm?.model, + scorerProvider: bgLlm?.provider, + scorerModel: bgLlm?.model, }, now: deps.now, // Wire the live episode snapshot so the R_human scorer sees the @@ -254,7 +262,7 @@ export function buildPipelineSubscribers( repos: deps.repos, rewardBus: buses.reward, l2Bus: buses.l2, - llm: deps.llm, + llm: bgLlm, log: log.child({ channel: "core.memory.l2" }), config: algorithm.l2Induction, thresholds: { @@ -268,7 +276,7 @@ export function buildPipelineSubscribers( repos: deps.repos, l2Bus: buses.l2, l3Bus: buses.l3, - llm: deps.llm, + llm: bgLlm, log: log.child({ channel: "core.memory.l3" }), config: algorithm.l3Abstraction, }); @@ -276,7 +284,7 @@ export function buildPipelineSubscribers( const skillHandle = attachSkillSubscriber({ repos: deps.repos, embedder: deps.embedder, - llm: deps.llm, + llm: bgLlm, bus: buses.skill, l2Bus: buses.l2, rewardBus: buses.reward, @@ -286,7 +294,7 @@ export function buildPipelineSubscribers( const feedbackHandle = attachFeedbackSubscriber({ repos: deps.repos, - llm: deps.llm, + llm: bgLlm, embedder: deps.embedder, bus: buses.feedback, log: log.child({ channel: "core.feedback" }), diff --git a/apps/memos-local-plugin/core/pipeline/orchestrator.ts b/apps/memos-local-plugin/core/pipeline/orchestrator.ts index ad5490e30..666fd5a3c 100644 --- a/apps/memos-local-plugin/core/pipeline/orchestrator.ts +++ b/apps/memos-local-plugin/core/pipeline/orchestrator.ts @@ -82,6 +82,30 @@ import { createEmbeddingRetryWorker, systemErrorEvent } from "../embedding/index import type { EpisodeSnapshot } from "../session/index.js"; import type { RelationDecision } from "../session/types.js"; +function classifyWithTimeout( + classifyFn: () => Promise, + timeoutMs: number, + log: Logger, +): Promise { + return Promise.race([ + classifyFn(), + new Promise((_, reject) => + setTimeout(() => reject(new Error("classify_timeout")), timeoutMs), + ), + ]).catch((err) => { + log.warn("relation.classify_timeout", { + timeoutMs, + err: err instanceof Error ? err.message : String(err), + }); + return { + relation: "new_task" as const, + confidence: 0, + reason: "classify_timeout", + signals: ["classify_timeout"], + }; + }); +} + // ─── Factory ────────────────────────────────────────────────────────────── export function createPipeline(deps: PipelineDeps): PipelineHandle { @@ -365,13 +389,17 @@ export function createPipeline(deps: PipelineDeps): PipelineHandle { const gapMs = Math.max(0, (turnTs ?? now()) - lastTurnTs); const relationStartedAt = Date.now(); - const decision = await session.relation.classify({ - prevUserText: ctx.prevUserText, - prevAssistantText: ctx.prevAssistantText, - newUserText: userText, - gapMs, - prevEpisodeId: currentEpId, - }); + const decision = await classifyWithTimeout( + () => session.relation.classify({ + prevUserText: ctx.prevUserText, + prevAssistantText: ctx.prevAssistantText, + newUserText: userText, + gapMs, + prevEpisodeId: currentEpId, + }), + algorithm.session.classifyTimeoutMs, + log, + ); const relationDurationMs = Math.max(0, Date.now() - relationStartedAt); log.info("relation.classified", { @@ -418,7 +446,7 @@ export function createPipeline(deps: PipelineDeps): PipelineHandle { durationMs: relationDurationMs, }); - if (keepAppending) { + if (keepAppending && open.turns.length < algorithm.session.maxTurnsPerEpisode) { // Same topic — just append the new user turn to the open // episode. No finalize, no reflect; that's deferred until // the user actually changes topic / closes the session. @@ -435,8 +463,19 @@ export function createPipeline(deps: PipelineDeps): PipelineHandle { return { episode: open, sessionId, relation: decision.relation }; } + if (keepAppending) { + log.info("episode.turn_limit_reached", { + sessionId, + episodeId: currentEpId, + turns: open.turns.length, + maxTurnsPerEpisode: algorithm.session.maxTurnsPerEpisode, + relation: decision.relation, + source: "open_episode", + }); + } + // Topic changed (new_task) OR gap too large OR - // episode_per_turn mode — finalize the open episode, which + // episode_per_turn mode OR turn limit reached — finalize the open episode, which // fires `episode.finalized` → captureSubscriber.runReflect → // R_human + V backprop. Fire-and-forget; the chain runs on // its own clock (tests can drive it via `flush()`). @@ -530,13 +569,17 @@ export function createPipeline(deps: PipelineDeps): PipelineHandle { } } else { const relationStartedAt = Date.now(); - const decision = await session.relation.classify({ - prevUserText: ctx.prevUserText, - prevAssistantText: ctx.prevAssistantText, - newUserText: userText, - gapMs, - prevEpisodeId: snapshot.id as EpisodeId, - }); + const decision = await classifyWithTimeout( + () => session.relation.classify({ + prevUserText: ctx.prevUserText, + prevAssistantText: ctx.prevAssistantText, + newUserText: userText, + gapMs, + prevEpisodeId: snapshot.id as EpisodeId, + }), + algorithm.session.classifyTimeoutMs, + log, + ); const relationDurationMs = Math.max(0, Date.now() - relationStartedAt); log.info("relation.classified", { @@ -581,7 +624,7 @@ export function createPipeline(deps: PipelineDeps): PipelineHandle { durationMs: relationDurationMs, }); - if (keepAppending) { + if (keepAppending && snapshot.turns.length < algorithm.session.maxTurnsPerEpisode) { if (snapshot.status === "closed") { session.sessionManager.reopenEpisode( snapshot.id as EpisodeId, @@ -608,6 +651,17 @@ export function createPipeline(deps: PipelineDeps): PipelineHandle { }; } + if (keepAppending) { + log.info("episode.turn_limit_reached", { + sessionId, + episodeId: snapshot.id, + turns: snapshot.turns.length, + maxTurnsPerEpisode: algorithm.session.maxTurnsPerEpisode, + relation: decision.relation, + source: "recovered_open_topic", + }); + } + if (snapshot.status === "open") { session.sessionManager.finalizeEpisode(snapshot.id as EpisodeId, { patchMeta: { @@ -632,13 +686,17 @@ export function createPipeline(deps: PipelineDeps): PipelineHandle { const gapMs = Math.max(0, (turnTs ?? now()) - prev.endedAt); const relationStartedAt = Date.now(); - const decision = await session.relation.classify({ - prevUserText: prev.userText, - prevAssistantText: prev.assistantText, - newUserText: userText, - gapMs, - prevEpisodeId: prev.episodeId, - }); + const decision = await classifyWithTimeout( + () => session.relation.classify({ + prevUserText: prev.userText, + prevAssistantText: prev.assistantText, + newUserText: userText, + gapMs, + prevEpisodeId: prev.episodeId, + }), + algorithm.session.classifyTimeoutMs, + log, + ); const relationDurationMs = Math.max(0, Date.now() - relationStartedAt); log.info("relation.classified", { @@ -684,22 +742,34 @@ export function createPipeline(deps: PipelineDeps): PipelineHandle { }); if (shouldReopen) { - const reopenReason = - decision.relation === "revision" ? "revision" : "follow_up"; - const snap = session.sessionManager.reopenEpisode(prev.episodeId, reopenReason); - session.sessionManager.addTurn(prev.episodeId, { - role: "user", - content: userText, - ts: turnTs, - meta: { - source: reopenReason, - classifiedRelation: decision.relation, - ...meta, - }, - }); - openEpisodeBySession.set(sessionId, prev.episodeId); - lastEpisodeBySession.delete(sessionId); - return { episode: snap, sessionId, relation: decision.relation }; + const prevSnap = session.sessionManager.getEpisode(prev.episodeId); + if (prevSnap && prevSnap.turns.length >= algorithm.session.maxTurnsPerEpisode) { + log.info("episode.turn_limit_reached", { + sessionId, + episodeId: prev.episodeId, + turns: prevSnap.turns.length, + maxTurnsPerEpisode: algorithm.session.maxTurnsPerEpisode, + relation: decision.relation, + source: "closed_episode", + }); + } else { + const reopenReason = + decision.relation === "revision" ? "revision" : "follow_up"; + const snap = session.sessionManager.reopenEpisode(prev.episodeId, reopenReason); + session.sessionManager.addTurn(prev.episodeId, { + role: "user", + content: userText, + ts: turnTs, + meta: { + source: reopenReason, + classifiedRelation: decision.relation, + ...meta, + }, + }); + openEpisodeBySession.set(sessionId, prev.episodeId); + lastEpisodeBySession.delete(sessionId); + return { episode: snap, sessionId, relation: decision.relation }; + } } if (decision.relation === "new_task") { diff --git a/apps/memos-local-plugin/core/pipeline/types.ts b/apps/memos-local-plugin/core/pipeline/types.ts index 495dddf3d..4dd79a3cb 100644 --- a/apps/memos-local-plugin/core/pipeline/types.ts +++ b/apps/memos-local-plugin/core/pipeline/types.ts @@ -110,6 +110,18 @@ export interface SessionRoutingConfig { * new-episode boundary even for revision/follow_up verdicts. */ mergeMaxGapMs: number; + /** + * Hard cap on turns in a merged episode. Once reached, the next turn + * forces a topic boundary before appending to keep topic-end work bounded. + */ + maxTurnsPerEpisode: number; + /** + * Max time to wait for relation classification before defaulting to + * a conservative new-task boundary. + */ + classifyTimeoutMs: number; + /** Shared LLM concurrency budget for async background subscribers. */ + bgLlmConcurrency: number; } // ─── Dependency graph ───────────────────────────────────────────────────── diff --git a/apps/memos-local-plugin/core/util/rate-limited-llm.ts b/apps/memos-local-plugin/core/util/rate-limited-llm.ts new file mode 100644 index 000000000..4d229b4c9 --- /dev/null +++ b/apps/memos-local-plugin/core/util/rate-limited-llm.ts @@ -0,0 +1,88 @@ +import type { + LlmCallOptions, + LlmClient, + LlmClientStats, + LlmCompleteJsonOptions, + LlmCompletion, + LlmJsonCompletion, + LlmMessage, + LlmProviderName, + LlmStreamChunk, +} from "../llm/types.js"; +import type { Semaphore } from "./semaphore.js"; + +/** + * Wrap an LLM client so expensive background subscribers share one + * process-wide concurrency budget without changing call-site semantics. + */ +export function rateLimitLlmClient(client: LlmClient | null, semaphore: Semaphore): LlmClient | null { + if (!client) return null; + return new RateLimitedLlmClient(client, semaphore); +} + +class RateLimitedLlmClient implements LlmClient { + constructor( + private readonly inner: LlmClient, + private readonly semaphore: Semaphore, + ) {} + + get provider(): LlmProviderName { + return this.inner.provider; + } + + get model(): string { + return this.inner.model; + } + + get canStream(): boolean { + return this.inner.canStream; + } + + async complete( + messages: LlmMessage[] | string, + opts?: LlmCallOptions, + ): Promise { + const release = await this.semaphore.acquire(); + try { + return await this.inner.complete(messages, opts); + } finally { + release(); + } + } + + async completeJson( + messages: LlmMessage[] | string, + opts?: LlmCompleteJsonOptions, + ): Promise> { + const release = await this.semaphore.acquire(); + try { + return await this.inner.completeJson(messages, opts); + } finally { + release(); + } + } + + async *stream( + messages: LlmMessage[] | string, + opts?: LlmCallOptions, + ): AsyncIterable { + const release = await this.semaphore.acquire(); + try { + yield* this.inner.stream(messages, opts); + } finally { + release(); + } + } + + stats(): LlmClientStats { + return this.inner.stats(); + } + + resetStats(): void { + this.inner.resetStats(); + } + + close(): Promise { + return this.inner.close(); + } +} diff --git a/apps/memos-local-plugin/core/util/semaphore.ts b/apps/memos-local-plugin/core/util/semaphore.ts new file mode 100644 index 000000000..8dd9b75fc --- /dev/null +++ b/apps/memos-local-plugin/core/util/semaphore.ts @@ -0,0 +1,30 @@ +export interface Semaphore { + acquire(): Promise<() => void>; +} + +export function createSemaphore(max: number): Semaphore { + const limit = Math.max(1, Math.floor(max)); + let current = 0; + const waiters: Array<() => void> = []; + + return { + async acquire() { + if (current < limit) { + current++; + return release; + } + return new Promise<() => void>((resolve) => { + waiters.push(() => { + current++; + resolve(release); + }); + }); + }, + }; + + function release() { + current = Math.max(0, current - 1); + const next = waiters.shift(); + if (next) next(); + } +}