Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions apps/memos-local-plugin/core/config/defaults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ export const DEFAULT_CONFIG: ResolvedConfig = {
session: {
followUpMode: "merge_follow_ups",
mergeMaxGapMs: 2 * 60 * 60 * 1000,
maxTurnsPerEpisode: 30,
classifyTimeoutMs: 5000,
bgLlmConcurrency: 2,
},
retrieval: {
tier1TopK: 3,
Expand Down
17 changes: 17 additions & 0 deletions apps/memos-local-plugin/core/config/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,23 @@ const AlgorithmSchema = Type.Object({
* `taskIdleTimeoutMs`.
*/
mergeMaxGapMs: NumberInRange(2 * 60 * 60 * 1000, 0, 24 * 60 * 60 * 1000),
/**
* Hard cap on turns in a merged episode. Once reached, the next
* turn forces a topic boundary even if relation classification says
* follow-up/revision. Keeps task-end processing bounded.
*/
maxTurnsPerEpisode: NumberInRange(30, 5, 200),
/**
* Max time to wait for relation classification before defaulting
* to a conservative new-task boundary so foreground prompt
* construction cannot stall indefinitely.
*/
classifyTimeoutMs: NumberInRange(5000, 1000, 30000),
/**
* Shared LLM concurrency budget for asynchronous background
* capture/reward/L2/L3/skill-evolution processing.
*/
bgLlmConcurrency: NumberInRange(2, 1, 8),
}, { default: {} }),
retrieval: Type.Object({
/** How many Skill snippets to inject at turn start. */
Expand Down
30 changes: 19 additions & 11 deletions apps/memos-local-plugin/core/pipeline/deps.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ import type {
PipelineSubscriptions,
} from "./types.js";
import { wrapRetrievalRepos } from "./retrieval-repos.js";
import { createSemaphore } from "../util/semaphore.js";
import { rateLimitLlmClient } from "../util/rate-limited-llm.js";

// ─── Algorithm config slice helper ────────────────────────────────────────

Expand Down Expand Up @@ -165,6 +167,9 @@ export function extractAlgorithmConfig(
session: {
followUpMode: alg.session.followUpMode,
mergeMaxGapMs: alg.session.mergeMaxGapMs,
maxTurnsPerEpisode: alg.session.maxTurnsPerEpisode,
classifyTimeoutMs: alg.session.classifyTimeoutMs,
bgLlmConcurrency: alg.session.bgLlmConcurrency,
},
};
}
Expand Down Expand Up @@ -203,14 +208,17 @@ export function buildPipelineSubscribers(
session?: PipelineSessionSet,
): PipelineSubscriberSet {
const log = deps.log ?? rootLogger.child({ channel: "core.pipeline" });
const bgLlmSemaphore = createSemaphore(algorithm.session.bgLlmConcurrency);
const bgLlm = rateLimitLlmClient(deps.llm, bgLlmSemaphore);
const bgReflectLlm = rateLimitLlmClient(deps.reflectLlm, bgLlmSemaphore);

const captureRunner = createCaptureRunner({
tracesRepo: deps.repos.traces,
embeddingRetryQueue: deps.repos.embeddingRetryQueue,
episodesRepo: adaptEpisodesRepo(deps.repos.episodes),
embedder: deps.embedder,
llm: deps.llm,
reflectLlm: deps.reflectLlm,
llm: bgLlm,
reflectLlm: bgReflectLlm,
bus: buses.capture,
cfg: algorithm.capture,
now: deps.now,
Expand All @@ -220,14 +228,14 @@ export function buildPipelineSubscribers(
tracesRepo: deps.repos.traces,
episodesRepo: deps.repos.episodes,
feedbackRepo: deps.repos.feedback,
llm: deps.llm,
llm: bgLlm,
bus: buses.reward,
cfg: algorithm.reward,
evaluator: {
reflectionProvider: deps.reflectLlm?.provider,
reflectionModel: deps.reflectLlm?.model,
scorerProvider: deps.llm?.provider,
scorerModel: deps.llm?.model,
reflectionProvider: bgReflectLlm?.provider,
reflectionModel: bgReflectLlm?.model,
scorerProvider: bgLlm?.provider,
scorerModel: bgLlm?.model,
},
now: deps.now,
// Wire the live episode snapshot so the R_human scorer sees the
Expand Down Expand Up @@ -259,7 +267,7 @@ export function buildPipelineSubscribers(
repos: deps.repos,
rewardBus: buses.reward,
l2Bus: buses.l2,
llm: deps.llm,
llm: bgLlm,
log: log.child({ channel: "core.memory.l2" }),
config: algorithm.l2Induction,
thresholds: {
Expand All @@ -273,15 +281,15 @@ export function buildPipelineSubscribers(
repos: deps.repos,
l2Bus: buses.l2,
l3Bus: buses.l3,
llm: deps.llm,
llm: bgLlm,
log: log.child({ channel: "core.memory.l3" }),
config: algorithm.l3Abstraction,
});

const skillHandle = attachSkillSubscriber({
repos: deps.repos,
embedder: deps.embedder,
llm: deps.llm,
llm: bgLlm,
bus: buses.skill,
l2Bus: buses.l2,
rewardBus: buses.reward,
Expand All @@ -291,7 +299,7 @@ export function buildPipelineSubscribers(

const feedbackHandle = attachFeedbackSubscriber({
repos: deps.repos,
llm: deps.llm,
llm: bgLlm,
embedder: deps.embedder,
bus: buses.feedback,
log: log.child({ channel: "core.feedback" }),
Expand Down
150 changes: 110 additions & 40 deletions apps/memos-local-plugin/core/pipeline/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,30 @@ import { createEmbeddingRetryWorker, systemErrorEvent } from "../embedding/index
import type { EpisodeSnapshot } from "../session/index.js";
import type { IntentDecision, RelationDecision, TurnRelation } from "../session/types.js";

function classifyWithTimeout(
classifyFn: () => Promise<RelationDecision>,
timeoutMs: number,
log: Logger,
): Promise<RelationDecision> {
return Promise.race([
classifyFn(),
new Promise<RelationDecision>((_, reject) =>
setTimeout(() => reject(new Error("classify_timeout")), timeoutMs),
),
]).catch((err) => {
log.warn("relation.classify_timeout", {
timeoutMs,
err: err instanceof Error ? err.message : String(err),
});
return {
relation: "new_task" as const,
confidence: 0,
reason: "classify_timeout",
signals: ["classify_timeout"],
};
});
}

// ─── Factory ──────────────────────────────────────────────────────────────

export function createPipeline(deps: PipelineDeps): PipelineHandle {
Expand Down Expand Up @@ -419,13 +443,17 @@ export function createPipeline(deps: PipelineDeps): PipelineHandle {
const gapMs = Math.max(0, (turnTs ?? now()) - lastTurnTs);

const relationStartedAt = Date.now();
const decision = await session.relation.classify({
prevUserText: ctx.prevUserText,
prevAssistantText: ctx.prevAssistantText,
newUserText: userText,
gapMs,
prevEpisodeId: currentEpId,
});
const decision = await classifyWithTimeout(
() => session.relation.classify({
prevUserText: ctx.prevUserText,
prevAssistantText: ctx.prevAssistantText,
newUserText: userText,
gapMs,
prevEpisodeId: currentEpId,
}),
algorithm.session.classifyTimeoutMs,
log,
);
const relationDurationMs = Math.max(0, Date.now() - relationStartedAt);

log.info("relation.classified", {
Expand Down Expand Up @@ -472,7 +500,7 @@ export function createPipeline(deps: PipelineDeps): PipelineHandle {
durationMs: relationDurationMs,
});

if (keepAppending) {
if (keepAppending && open.turns.length < algorithm.session.maxTurnsPerEpisode) {
// Same topic — just append the new user turn to the open
// episode. No finalize, no reflect; that's deferred until
// the user actually changes topic / closes the session.
Expand All @@ -489,8 +517,19 @@ export function createPipeline(deps: PipelineDeps): PipelineHandle {
return { episode: open, sessionId, relation: decision.relation };
}

if (keepAppending) {
log.info("episode.turn_limit_reached", {
sessionId,
episodeId: currentEpId,
turns: open.turns.length,
maxTurnsPerEpisode: algorithm.session.maxTurnsPerEpisode,
relation: decision.relation,
source: "open_episode",
});
}

// Topic changed (new_task) OR gap too large OR
// episode_per_turn mode — finalize the open episode, which
// episode_per_turn mode OR turn limit reached — finalize the open episode, which
// fires `episode.finalized` → captureSubscriber.runReflect →
// R_human + V backprop. Fire-and-forget; the chain runs on
// its own clock (tests can drive it via `flush()`).
Expand Down Expand Up @@ -584,13 +623,17 @@ export function createPipeline(deps: PipelineDeps): PipelineHandle {
}
} else {
const relationStartedAt = Date.now();
const decision = await session.relation.classify({
prevUserText: ctx.prevUserText,
prevAssistantText: ctx.prevAssistantText,
newUserText: userText,
gapMs,
prevEpisodeId: snapshot.id as EpisodeId,
});
const decision = await classifyWithTimeout(
() => session.relation.classify({
prevUserText: ctx.prevUserText,
prevAssistantText: ctx.prevAssistantText,
newUserText: userText,
gapMs,
prevEpisodeId: snapshot.id as EpisodeId,
}),
algorithm.session.classifyTimeoutMs,
log,
);
const relationDurationMs = Math.max(0, Date.now() - relationStartedAt);

log.info("relation.classified", {
Expand Down Expand Up @@ -635,7 +678,7 @@ export function createPipeline(deps: PipelineDeps): PipelineHandle {
durationMs: relationDurationMs,
});

if (keepAppending) {
if (keepAppending && snapshot.turns.length < algorithm.session.maxTurnsPerEpisode) {
if (snapshot.status === "closed") {
session.sessionManager.reopenEpisode(
snapshot.id as EpisodeId,
Expand All @@ -662,6 +705,17 @@ export function createPipeline(deps: PipelineDeps): PipelineHandle {
};
}

if (keepAppending) {
log.info("episode.turn_limit_reached", {
sessionId,
episodeId: snapshot.id,
turns: snapshot.turns.length,
maxTurnsPerEpisode: algorithm.session.maxTurnsPerEpisode,
relation: decision.relation,
source: "recovered_open_topic",
});
}

if (snapshot.status === "open") {
session.sessionManager.finalizeEpisode(snapshot.id as EpisodeId, {
patchMeta: {
Expand All @@ -686,13 +740,17 @@ export function createPipeline(deps: PipelineDeps): PipelineHandle {

const gapMs = Math.max(0, (turnTs ?? now()) - prev.endedAt);
const relationStartedAt = Date.now();
const decision = await session.relation.classify({
prevUserText: prev.userText,
prevAssistantText: prev.assistantText,
newUserText: userText,
gapMs,
prevEpisodeId: prev.episodeId,
});
const decision = await classifyWithTimeout(
() => session.relation.classify({
prevUserText: prev.userText,
prevAssistantText: prev.assistantText,
newUserText: userText,
gapMs,
prevEpisodeId: prev.episodeId,
}),
algorithm.session.classifyTimeoutMs,
log,
);
const relationDurationMs = Math.max(0, Date.now() - relationStartedAt);

log.info("relation.classified", {
Expand Down Expand Up @@ -738,22 +796,34 @@ export function createPipeline(deps: PipelineDeps): PipelineHandle {
});

if (shouldReopen) {
const reopenReason =
decision.relation === "revision" ? "revision" : "follow_up";
const snap = session.sessionManager.reopenEpisode(prev.episodeId, reopenReason);
session.sessionManager.addTurn(prev.episodeId, {
role: "user",
content: userText,
ts: turnTs,
meta: {
source: reopenReason,
classifiedRelation: decision.relation,
...meta,
},
});
openEpisodeBySession.set(sessionId, prev.episodeId);
lastEpisodeBySession.delete(sessionId);
return { episode: snap, sessionId, relation: decision.relation };
const prevSnap = session.sessionManager.getEpisode(prev.episodeId);
if (prevSnap && prevSnap.turns.length >= algorithm.session.maxTurnsPerEpisode) {
log.info("episode.turn_limit_reached", {
sessionId,
episodeId: prev.episodeId,
turns: prevSnap.turns.length,
maxTurnsPerEpisode: algorithm.session.maxTurnsPerEpisode,
relation: decision.relation,
source: "closed_episode",
});
} else {
const reopenReason =
decision.relation === "revision" ? "revision" : "follow_up";
const snap = session.sessionManager.reopenEpisode(prev.episodeId, reopenReason);
session.sessionManager.addTurn(prev.episodeId, {
role: "user",
content: userText,
ts: turnTs,
meta: {
source: reopenReason,
classifiedRelation: decision.relation,
...meta,
},
});
openEpisodeBySession.set(sessionId, prev.episodeId);
lastEpisodeBySession.delete(sessionId);
return { episode: snap, sessionId, relation: decision.relation };
}
}

if (decision.relation === "new_task") {
Expand Down
12 changes: 12 additions & 0 deletions apps/memos-local-plugin/core/pipeline/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,18 @@ export interface SessionRoutingConfig {
* new-episode boundary even for revision/follow_up verdicts.
*/
mergeMaxGapMs: number;
/**
* Hard cap on turns in a merged episode. Once reached, the next turn
* forces a topic boundary before appending to keep topic-end work bounded.
*/
maxTurnsPerEpisode: number;
/**
* Max time to wait for relation classification before defaulting to
* a conservative new-task boundary.
*/
classifyTimeoutMs: number;
/** Shared LLM concurrency budget for async background subscribers. */
bgLlmConcurrency: number;
}

// ─── Dependency graph ─────────────────────────────────────────────────────
Expand Down
Loading