diff --git a/packages/core/src/command-center/commandCenterSignature.test.ts b/packages/core/src/command-center/commandCenterSignature.test.ts new file mode 100644 index 0000000000..1989b8bf69 --- /dev/null +++ b/packages/core/src/command-center/commandCenterSignature.test.ts @@ -0,0 +1,61 @@ +import { describe, expect, it } from "vitest"; +import { + type CommandCenterSessionFields, + computeCommandCenterSessionSignature, +} from "./status"; + +function session( + taskId: string, + over: Partial = {}, +): CommandCenterSessionFields { + return { + taskId, + status: "connected", + cloudStatus: undefined, + pendingPermissions: { size: 0 }, + isPromptPending: false, + ...over, + }; +} + +const sig = (record: Record) => + computeCommandCenterSessionSignature(record); + +describe("computeCommandCenterSessionSignature", () => { + it("is stable when only non-status fields change (e.g. streamed events)", () => { + // Two sessions identical in the 4 status-relevant fields → same signature, + // regardless of any other mutation the store makes (token appends etc.). + const a = { r1: session("t1") }; + const b = { r1: session("t1") }; + expect(sig(a)).toBe(sig(b)); + }); + + it.each<{ name: string; over: Partial }>([ + { + name: "isPromptPending flips (idle ↔ running)", + over: { isPromptPending: true }, + }, + { + name: "a pending permission appears (running → waiting)", + over: { pendingPermissions: { size: 1 } }, + }, + { name: "connection status changes", over: { status: "error" } }, + { name: "cloud status changes", over: { cloudStatus: "completed" } }, + ])("changes when $name", ({ over }) => { + expect(sig({ r1: session("t1") })).not.toBe( + sig({ r1: session("t1", over) }), + ); + }); + + it("is order-independent across sessions", () => { + const s1 = session("t1"); + const s2 = session("t2", { isPromptPending: true }); + expect(sig({ r1: s1, r2: s2 })).toBe(sig({ r2: s2, r1: s1 })); + }); + + it("ignores sessions without a taskId", () => { + expect(sig({ r1: session("t1"), r2: session("", {}) })).toBe( + sig({ r1: session("t1") }), + ); + }); +}); diff --git a/packages/core/src/command-center/status.ts b/packages/core/src/command-center/status.ts index 4d33b32ac8..e4663aca33 100644 --- a/packages/core/src/command-center/status.ts +++ b/packages/core/src/command-center/status.ts @@ -28,6 +28,31 @@ export function deriveStatus( return "idle"; } +export type CommandCenterSessionFields = SessionStatusInput & { + taskId?: string; +}; + +/** + * Stable string signature of the only session fields the command-center grid + * reads (the inputs to `deriveStatus`). Lets the grid subscribe to status + * changes instead of re-rendering on every streamed token — cell transcripts + * update independently through each EmbeddedSessionView's own subscription, so + * the grid data never needs to churn on token-level `events` mutations. + */ +export function computeCommandCenterSessionSignature( + sessions: Record, +): string { + const parts: string[] = []; + for (const s of Object.values(sessions)) { + if (!s.taskId) continue; + parts.push( + `${s.taskId}\t${s.status}\t${s.cloudStatus ?? ""}\t${s.pendingPermissions.size}\t${s.isPromptPending ? 1 : 0}`, + ); + } + parts.sort(); + return parts.join("\n"); +} + export function getRepoName(task: Task): string | null { const repository = getTaskRepository(task); if (!repository) return null; diff --git a/packages/core/src/sessions/ensureEventsLoaded.test.ts b/packages/core/src/sessions/ensureEventsLoaded.test.ts new file mode 100644 index 0000000000..c93f6b6abe --- /dev/null +++ b/packages/core/src/sessions/ensureEventsLoaded.test.ts @@ -0,0 +1,165 @@ +import type { AgentSession } from "@posthog/shared"; +import { describe, expect, it, vi } from "vitest"; +import { createBaseSession } from "./sessionFactory"; +import { SessionService, type SessionServiceDeps } from "./sessionService"; + +const TASK_ID = "task-1"; +const RUN_ID = "run-1"; + +function ndjson(...texts: string[]): string { + return texts + .map((text) => + JSON.stringify({ + type: "notification", + timestamp: "2026-06-16T15:22:35.396Z", + notification: { + jsonrpc: "2.0", + method: "session/update", + params: { update: { sessionUpdate: "agent_message_chunk", text } }, + }, + }), + ) + .join("\n"); +} + +function createHarness( + sessionOverrides: Partial = {}, + localLogs = ndjson("hello", "world"), +) { + const sessions: Record = {}; + const base = createBaseSession(RUN_ID, TASK_ID, "Task"); + sessions[RUN_ID] = { ...base, events: [], ...sessionOverrides }; + + const updateSession = vi.fn( + (taskRunId: string, updates: Partial) => { + const session = sessions[taskRunId]; + if (session) Object.assign(session, updates); + }, + ); + + const readLocalLogs = vi.fn().mockResolvedValue(localLogs); + + const store = { + getSessions: () => sessions, + getSessionByTaskId: (taskId: string) => + Object.values(sessions).find((s) => s.taskId === taskId), + setSession: (s: AgentSession) => { + sessions[s.taskRunId] = s; + }, + updateSession, + prependEvents: (taskRunId: string, older: AgentSession["events"]) => { + const session = sessions[taskRunId]; + if (session) session.events = [...older, ...session.events]; + }, + }; + + const deps = { + store, + log: { info: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() }, + trpc: { + agent: { + onSessionIdleKilled: { + subscribe: () => ({ unsubscribe: vi.fn() }), + }, + }, + logs: { + readLocalLogs: { query: readLocalLogs }, + fetchS3Logs: { query: vi.fn().mockResolvedValue("") }, + writeLocalLogs: { mutate: vi.fn().mockResolvedValue(undefined) }, + }, + }, + } as unknown as SessionServiceDeps; + + return { + service: new SessionService(deps), + sessions, + updateSession, + readLocalLogs, + }; +} + +describe("SessionService.ensureEventsLoaded", () => { + it("rehydrates events from disk for an evicted (empty) session", async () => { + const h = createHarness(); + + await h.service.ensureEventsLoaded(TASK_ID); + + expect(h.readLocalLogs).toHaveBeenCalledWith({ taskRunId: RUN_ID }); + expect(h.sessions[RUN_ID].events.length).toBeGreaterThan(0); + }); + + it("no-ops when events are already resident (no disk read)", async () => { + const h = createHarness({ + events: [{ message: {} }] as unknown as AgentSession["events"], + }); + + await h.service.ensureEventsLoaded(TASK_ID); + + expect(h.readLocalLogs).not.toHaveBeenCalled(); + expect(h.updateSession).not.toHaveBeenCalled(); + }); + + it("no-ops for an unknown task", async () => { + const h = createHarness(); + await h.service.ensureEventsLoaded("nope"); + expect(h.readLocalLogs).not.toHaveBeenCalled(); + }); + + it("sets processedLineCount for cloud sessions, leaves it undefined for local", async () => { + const cloud = createHarness({ isCloud: true }); + await cloud.service.ensureEventsLoaded(TASK_ID); + expect(cloud.sessions[RUN_ID].processedLineCount).toBe(2); + + const local = createHarness({ isCloud: false }); + await local.service.ensureEventsLoaded(TASK_ID); + expect(local.sessions[RUN_ID].processedLineCount).toBeUndefined(); + }); + + it("does not clobber events when a turn starts streaming during the load", async () => { + const h = createHarness(); + // Simulate a turn starting (isPromptPending) + a streamed event landing + // while the disk read is in flight — the live transcript is authoritative. + h.readLocalLogs.mockImplementationOnce(async () => { + h.sessions[RUN_ID].isPromptPending = true; + h.sessions[RUN_ID].events = [ + { message: {} }, + ] as unknown as AgentSession["events"]; + return ndjson("stale"); + }); + + await h.service.ensureEventsLoaded(TASK_ID); + + // The streamed event survives; the historical load is discarded. + expect(h.sessions[RUN_ID].events).toHaveLength(1); + expect(h.updateSession).not.toHaveBeenCalled(); + }); + + it("opens a tail window for a large local log, flagging older events", async () => { + // 4000 lines > the 1000-line tail window → only the tail renders, the rest + // stays loadable via scrollback. + const bigLog = ndjson(...Array.from({ length: 4000 }, (_, i) => `m${i}`)); + const h = createHarness({}, bigLog); + + await h.service.ensureEventsLoaded(TASK_ID); + + expect(h.sessions[RUN_ID].events.length).toBe(1000); // tail window only + expect(h.sessions[RUN_ID].hasOlderEvents).toBe(true); + }); + + it("loadOlderEvents prepends the next older chunk", async () => { + const bigLog = ndjson(...Array.from({ length: 4000 }, (_, i) => `m${i}`)); + const h = createHarness({}, bigLog); + await h.service.ensureEventsLoaded(TASK_ID); + expect(h.sessions[RUN_ID].events.length).toBe(1000); + + await h.service.loadOlderEvents(TASK_ID); + + expect(h.sessions[RUN_ID].events.length).toBe(2000); // +1000 older, prepended + expect(h.sessions[RUN_ID].hasOlderEvents).toBe(true); + // The newly prepended chunk sits before the tail (older first). + await h.service.loadOlderEvents(TASK_ID); + await h.service.loadOlderEvents(TASK_ID); + expect(h.sessions[RUN_ID].events.length).toBe(4000); // all loaded + expect(h.sessions[RUN_ID].hasOlderEvents).toBe(false); + }); +}); diff --git a/packages/core/src/sessions/sessionEvents.ts b/packages/core/src/sessions/sessionEvents.ts index 48371e7072..3113ee0dd1 100644 --- a/packages/core/src/sessions/sessionEvents.ts +++ b/packages/core/src/sessions/sessionEvents.ts @@ -178,14 +178,25 @@ export function convertStoredEntriesToEvents( const startTs = entries[0]?.timestamp ? new Date(entries[0].timestamp).getTime() - 1 : Date.now(); - events.push(createUserMessageEvent(taskDescription, startTs)); + events.push( + Object.freeze(createUserMessageEvent(taskDescription, startTs)), + ); } for (const entry of entries) { - events.push(storedEntryToAcpMessage(entry)); + // Freeze each event as it's born. Events are immutable log data, and the + // store is immer-backed: when the array is committed, immer's auto-freeze + // would otherwise deep-freeze every element — measured at ~240ms for a + // 48k-event session (immer's per-element isDraftable/handleValue machinery + // is ~50x slower than a plain Object.freeze loop). Pre-freezing makes immer + // short-circuit on Object.isFrozen, turning that ~240ms into ~5ms. + events.push(Object.freeze(storedEntryToAcpMessage(entry))); } - return events; + // Freezing the array too lets immer skip the whole array on commit (it bails + // on Object.isFrozen before recursing), so a full-transcript load no longer + // stalls the renderer. + return Object.freeze(events) as AcpMessage[]; } /** diff --git a/packages/core/src/sessions/sessionService.ts b/packages/core/src/sessions/sessionService.ts index 341b429f32..c8621ab278 100644 --- a/packages/core/src/sessions/sessionService.ts +++ b/packages/core/src/sessions/sessionService.ts @@ -80,6 +80,11 @@ import { parseSessionLogContent, planSkippedPromptFilter, } from "./sessionLogs"; +import { + clearTranscriptWindow, + openTranscriptWindow, + takeOlderEntries, +} from "./transcriptWindows"; const LOCAL_SESSION_RECONNECT_ATTEMPTS = 3; const LOCAL_SESSION_RECONNECT_BACKOFF = { @@ -161,6 +166,7 @@ export interface ISessionStore { events: AcpMessage[], newLineCount?: number, ): void; + prependEvents(taskRunId: string, olderEvents: AcpMessage[]): void; updateCloudStatus( taskRunId: string, fields: { @@ -669,9 +675,9 @@ export class SessionService { return; } - const [workspaceResult, logResult] = await Promise.all([ + const [workspaceResult, content] = await Promise.all([ this.d.trpc.workspace.verify.query({ taskId }), - this.fetchSessionLogs(logUrl, existingRunId), + this.fetchSessionLogContent(logUrl, existingRunId), ]); if (!workspaceResult.exists) { @@ -679,7 +685,9 @@ export class SessionService { taskId, missingPath: workspaceResult.missingPath, }); - const events = convertStoredEntriesToEvents(logResult.rawEntries); + const events = convertStoredEntriesToEvents( + content ? parseSessionLogContent(content).rawEntries : [], + ); const session = createBaseSession(existingRunId, taskId, taskTitle); session.events = events; session.logUrl = logUrl; @@ -698,7 +706,7 @@ export class SessionService { logUrl, repoPath, auth, - logResult, + { content: content ?? undefined }, ); } else { if (!this.d.getIsOnline()) { @@ -820,6 +828,55 @@ export class SessionService { } } + /** + * Resolve the events to seed a reconnecting session. When given raw `content`, + * opens a tail window (only the latest events render; the rest stay loadable + * via scrollback) and reports `hasOlder`. `sessionId`/`adapter` come from the + * head (the sdk_session notification sits at the very start of the log). + */ + private async resolveReconnectEvents( + logUrl: string | undefined, + taskRunId: string, + prefetched?: { + content?: string; + rawEntries?: StoredLogEntry[]; + sessionId?: string; + adapter?: Adapter; + }, + ): Promise<{ + events: AcpMessage[]; + sessionId?: string; + adapter?: Adapter; + hasOlder: boolean; + }> { + const content = prefetched?.content; + if (content?.trim()) { + const head = parseSessionLogContent(content.slice(0, 128 * 1024)); + const { tail, hasOlder } = openTranscriptWindow(taskRunId, content); + return { + events: convertStoredEntriesToEvents(tail), + sessionId: head.sessionId, + adapter: head.adapter, + hasOlder, + }; + } + if (prefetched?.rawEntries) { + return { + events: convertStoredEntriesToEvents(prefetched.rawEntries), + sessionId: prefetched.sessionId, + adapter: prefetched.adapter, + hasOlder: false, + }; + } + const parsed = await this.fetchSessionLogs(logUrl, taskRunId); + return { + events: convertStoredEntriesToEvents(parsed.rawEntries), + sessionId: parsed.sessionId, + adapter: parsed.adapter, + hasOlder: false, + }; + } + private async reconnectToLocalSession( taskId: string, taskRunId: string, @@ -827,15 +884,15 @@ export class SessionService { logUrl: string | undefined, repoPath: string, auth: AuthCredentials, - prefetchedLogs?: { - rawEntries: StoredLogEntry[]; + prefetched?: { + content?: string; + rawEntries?: StoredLogEntry[]; sessionId?: string; adapter?: Adapter; }, ): Promise { - const { rawEntries, sessionId, adapter } = - prefetchedLogs ?? (await this.fetchSessionLogs(logUrl, taskRunId)); - const events = convertStoredEntriesToEvents(rawEntries); + const { events, sessionId, adapter, hasOlder } = + await this.resolveReconnectEvents(logUrl, taskRunId, prefetched); const storedAdapter = this.d.adapterStore.getAdapter(taskRunId); const resolvedAdapter = adapter ?? storedAdapter; @@ -845,6 +902,7 @@ export class SessionService { const session = createBaseSession(taskRunId, taskId, taskTitle); session.events = events; + session.hasOlderEvents = hasOlder; if (logUrl) { session.logUrl = logUrl; } @@ -993,6 +1051,7 @@ export class SessionService { private async teardownSession(taskRunId: string): Promise { const session = this.getSessionByRunId(taskRunId); + clearTranscriptWindow(taskRunId); try { await this.d.trpc.agent.cancel.mutate({ sessionId: taskRunId }); @@ -1318,6 +1377,135 @@ export class SessionService { await this.teardownSession(session.taskRunId); } + /** + * Rehydrate the transcript of a session whose `events` were evicted (see + * `sessionStoreSetters.evictEvents`) while it stayed warm/connected. + * + * Unlike `loadLogsOnly`, this does NOT recreate the session — it re-reads the + * ndjson from disk (S3 fallback) and patches `events` back in via + * `updateSession`, leaving the live subscription, status and config intact. + * Safe to call unconditionally on focus: it no-ops when events are already + * resident, and bails if a live stream re-populates events during the fetch + * so it never clobbers fresher data. + */ + async ensureEventsLoaded(taskId: string): Promise { + const session = this.d.store.getSessionByTaskId(taskId); + if (!session) return; // not connected — the normal connect path hydrates + if (session.events.length > 0) return; // already resident + const { taskRunId, logUrl } = session; + + // Prefer the raw local cache so we can render the tail instantly and parse + // the rest in the background. Re-reading a 178MB ndjson and JSON.parsing + // ~100k lines synchronously is the ~500ms+ freeze we're avoiding here. + let content: string | null = null; + try { + content = await this.d.trpc.logs.readLocalLogs.query({ taskRunId }); + } catch { + content = null; + } + + if (!content || !content.trim()) { + // No local cache (e.g. cloud-only run): fall back to the one-shot fetch. + let parsed: ParsedSessionLogs; + try { + parsed = await this.fetchSessionLogs(logUrl, taskRunId); + } catch (err) { + this.d.log.warn("ensureEventsLoaded: failed to fetch logs", { + taskId, + error: err, + }); + return; + } + if (parsed.rawEntries.length === 0) return; + this.commitWindowedEvents( + taskId, + taskRunId, + convertStoredEntriesToEvents(parsed.rawEntries), + false, + parsed.totalLineCount, + ); + return; + } + + // Cloud runs track `processedLineCount` for their append/dedup loop, so they + // load the full transcript (windowing only fits local ndjson logs). + if (session.isCloud) { + const parsed = parseSessionLogContent(content); + this.commitWindowedEvents( + taskId, + taskRunId, + convertStoredEntriesToEvents(parsed.rawEntries), + false, + parsed.totalLineCount, + ); + return; + } + + // Open a tail window: render only the latest ~1000 events (instant) and keep + // the rest of the log as raw text for on-demand scrollback. This is what + // keeps opening a 100k-event session O(visible) instead of O(history). + const { tail, hasOlder } = openTranscriptWindow(taskRunId, content); + this.commitWindowedEvents( + taskId, + taskRunId, + convertStoredEntriesToEvents(tail), + hasOlder, + ); + } + + /** Seed the transcript window, unless a live stream now owns the events. */ + private commitWindowedEvents( + taskId: string, + taskRunId: string, + events: AcpMessage[], + hasOlder: boolean, + totalLineCount?: number, + ): void { + const current = this.d.store.getSessionByTaskId(taskId); + if (!current || current.taskRunId !== taskRunId) return; + // A turn started or events arrived while we were reading — don't clobber. + if (current.isPromptPending || current.events.length > 0) return; + this.d.store.updateSession(taskRunId, { + events, + hasOlderEvents: hasOlder, + processedLineCount: current.isCloud ? totalLineCount : undefined, + }); + } + + /** + * Scrollback: parse and prepend the next older chunk of the transcript. Called + * by the UI when the user scrolls toward the top of a windowed transcript. + */ + async loadOlderEvents(taskId: string): Promise { + const session = this.d.store.getSessionByTaskId(taskId); + if (!session?.hasOlderEvents) return; + const { taskRunId } = session; + // Re-read the ndjson from disk (OS page cache → ~ms) instead of holding the + // whole log in memory for the open transcript's lifetime. See the note in + // `transcriptWindows.ts`: scroll-up is rare and user-initiated, so paying a + // little latency here buys a large, always-on renderer-memory win. + let content: string | null = null; + try { + content = await this.d.trpc.logs.readLocalLogs.query({ taskRunId }); + } catch { + content = null; + } + if (!content || !content.trim()) { + this.d.store.updateSession(taskRunId, { hasOlderEvents: false }); + return; + } + const result = takeOlderEntries(taskRunId, content); + if (!result) { + this.d.store.updateSession(taskRunId, { hasOlderEvents: false }); + return; + } + this.d.store.prependEvents( + taskRunId, + convertStoredEntriesToEvents(result.older), + ); + this.d.store.updateSession(taskRunId, { hasOlderEvents: result.hasOlder }); + } + // --- Subscription Management --- private subscribeToChannel(taskRunId: string): void { @@ -4532,6 +4720,40 @@ export class SessionService { } } + /** + * Raw log content (local cache first, then S3), WITHOUT parsing — lets the + * caller parse the tail synchronously for an instant render and defer the + * full parse. Returns null when nothing is available. + */ + private async fetchSessionLogContent( + logUrl: string | undefined, + taskRunId?: string, + ): Promise { + if (taskRunId) { + try { + const local = await this.d.trpc.logs.readLocalLogs.query({ taskRunId }); + if (local?.trim()) return local; + } catch { + this.d.log.warn("Failed to read local logs, falling back to S3", { + taskRunId, + }); + } + } + if (!logUrl) return null; + try { + const content = await this.d.trpc.logs.fetchS3Logs.query({ logUrl }); + if (!content?.trim()) return null; + if (taskRunId) { + this.d.trpc.logs.writeLocalLogs + .mutate({ taskRunId, content }) + .catch(() => {}); + } + return content; + } catch { + return null; + } + } + private commitReconciledCloudEvents( taskRunId: string, rawEntries: StoredLogEntry[], diff --git a/packages/core/src/sessions/sessionStore.test.ts b/packages/core/src/sessions/sessionStore.test.ts new file mode 100644 index 0000000000..f4d2233f45 --- /dev/null +++ b/packages/core/src/sessions/sessionStore.test.ts @@ -0,0 +1,67 @@ +import type { AcpMessage, AgentSession } from "@posthog/shared"; +import { beforeEach, describe, expect, it } from "vitest"; +import { createBaseSession } from "./sessionFactory"; +import { sessionStore, sessionStoreSetters } from "./sessionStore"; + +function event(text: string): AcpMessage { + return { + message: { + jsonrpc: "2.0", + method: "session/update", + params: { update: { sessionUpdate: "agent_message_chunk", text } }, + }, + } as unknown as AcpMessage; +} + +function seed(taskRunId: string, taskId: string): AgentSession { + const session = createBaseSession(taskRunId, taskId, "Task"); + sessionStoreSetters.setSession(session); + return session; +} + +describe("sessionStoreSetters.evictEvents", () => { + beforeEach(() => { + sessionStoreSetters.clearAll(); + }); + + it("empties the events array and resets processedLineCount", () => { + seed("run-1", "task-1"); + sessionStoreSetters.appendEvents("run-1", [event("a"), event("b")], 2); + expect(sessionStore.getState().sessions["run-1"].events).toHaveLength(2); + expect(sessionStore.getState().sessions["run-1"].processedLineCount).toBe( + 2, + ); + + sessionStoreSetters.evictEvents("run-1"); + + const session = sessionStore.getState().sessions["run-1"]; + expect(session.events).toHaveLength(0); + expect(session.processedLineCount).toBeUndefined(); + }); + + it("keeps the session and its identity index intact (non-destructive)", () => { + seed("run-1", "task-1"); + sessionStoreSetters.appendEvents("run-1", [event("a")]); + + sessionStoreSetters.evictEvents("run-1"); + + expect(sessionStore.getState().sessions["run-1"]).toBeDefined(); + expect(sessionStoreSetters.getSessionByTaskId("task-1")?.taskRunId).toBe( + "run-1", + ); + }); + + it("is a no-op for an unknown taskRunId", () => { + expect(() => sessionStoreSetters.evictEvents("missing")).not.toThrow(); + }); + + it("lets events be appended again after eviction (rehydration path)", () => { + seed("run-1", "task-1"); + sessionStoreSetters.appendEvents("run-1", [event("a"), event("b")]); + sessionStoreSetters.evictEvents("run-1"); + + sessionStoreSetters.appendEvents("run-1", [event("c")]); + + expect(sessionStore.getState().sessions["run-1"].events).toHaveLength(1); + }); +}); diff --git a/packages/core/src/sessions/sessionStore.ts b/packages/core/src/sessions/sessionStore.ts index 2ea1cbad09..a16d6f826a 100644 --- a/packages/core/src/sessions/sessionStore.ts +++ b/packages/core/src/sessions/sessionStore.ts @@ -9,6 +9,7 @@ import type { } from "@posthog/shared"; import { immer } from "zustand/middleware/immer"; import { createStore } from "zustand/vanilla"; +import { clearTranscriptWindow } from "./transcriptWindows"; export interface SessionState { /** Sessions indexed by taskRunId */ @@ -72,6 +73,47 @@ export const sessionStoreSetters = { }); }, + /** + * Prepend older events to the front of the transcript — the scrollback path. + * The window opens with only the tail (instant render); this loads the chunk + * before the current window when the user scrolls toward the top. The result + * array is frozen so immer skips its O(n) finalize walk on commit. + */ + prependEvents: (taskRunId: string, olderEvents: AcpMessage[]) => { + if (olderEvents.length === 0) return; + // Read current events from the committed (frozen) state BEFORE entering the + // immer draft. Spreading `state.sessions[].events` inside the producer would + // capture draft proxies that immer revokes on exit — accessing them later + // (during render) throws "proxy has been revoked". + const current = sessionStore.getState().sessions[taskRunId]?.events; + if (!current) return; + const next = Object.freeze([...olderEvents, ...current]) as AcpMessage[]; + sessionStore.setState((state) => { + const session = state.sessions[taskRunId]; + if (session) session.events = next; + }); + }, + + /** + * Drop the in-memory transcript of an inactive session to reclaim RAM. + * `events` is an append-only mirror of the on-disk ndjson log, so emptying + * it here is non-destructive — `SessionService.ensureEventsLoaded` rehydrates + * it from disk when the session is focused again. `processedLineCount` is + * reset so the cloud append/dedup path re-syncs from a clean baseline. + * Never call this on a streaming session (isPromptPending / isCompacting). + */ + evictEvents: (taskRunId: string) => { + clearTranscriptWindow(taskRunId); + sessionStore.setState((state) => { + const session = state.sessions[taskRunId]; + if (session) { + session.events = []; + session.processedLineCount = undefined; + session.hasOlderEvents = undefined; + } + }); + }, + updateCloudStatus: ( taskRunId: string, fields: { diff --git a/packages/core/src/sessions/transcriptWindows.ts b/packages/core/src/sessions/transcriptWindows.ts new file mode 100644 index 0000000000..6c25e1c1d8 --- /dev/null +++ b/packages/core/src/sessions/transcriptWindows.ts @@ -0,0 +1,88 @@ +import type { StoredLogEntry } from "@posthog/shared"; +import { parseSessionLogContent } from "./sessionLogs"; + +/** + * Scrollback windows for open transcripts. A huge session opens with only its + * tail parsed + rendered (instant); the rest of the ndjson loads on demand when + * the user scrolls toward the top. This keeps open cost O(visible), not + * O(history). + * + * We deliberately retain only the window *cursor* here — not the log text. An + * earlier version cached every ndjson line as raw strings; a production memory + * benchmark showed that pinned ~110MB+ per open transcript (the renderer heap of + * one 48k-event session was ~270MB active vs ~50MB with the cursor-only window), + * and it compounded across tasks in Command Center / "ADHD mode". Re-reading the + * log from disk on a scroll-up is cheap — the OS page cache serves it in ~ms, and + * scroll-up is user-initiated and infrequent — so we trade a little latency on a + * rare action for a large, always-on memory win. The fetch itself lives in the + * caller (`SessionService.loadOlderEvents`), which already knows how to read the + * ndjson (`logs.readLocalLogs`); this module just slices and parses it. + */ +interface TranscriptWindow { + /** Index of the first line currently materialised into `session.events`. */ + windowStart: number; +} + +const windows = new Map(); + +/** Lines materialised on first open / refocus. Tuned to fill a viewport + overscan. */ +export const TAIL_WINDOW_LINES = 1000; +/** Lines pulled in per scroll-toward-top load. */ +export const OLDER_CHUNK_LINES = 1000; + +/** + * Open a window over `content`, returning the tail's raw entries plus whether + * older lines remain. Stores only the window cursor — `content` is not retained; + * older chunks are sliced from a fresh re-read via `takeOlderEntries`. + */ +export function openTranscriptWindow( + taskRunId: string, + content: string, +): { tail: StoredLogEntry[]; hasOlder: boolean } { + const lines = content.trim().split("\n"); + const windowStart = Math.max(0, lines.length - TAIL_WINDOW_LINES); + windows.set(taskRunId, { windowStart }); + const tail = parseSessionLogContent( + lines.slice(windowStart).join("\n"), + ).rawEntries; + return { tail, hasOlder: windowStart > 0 }; +} + +/** + * Parse and return the next older chunk, advancing the window. The caller passes + * freshly re-read log `content` (older lines are start-indexed and append-stable, + * so a grown log still slices correctly). Returns null when there's no window or + * nothing older remains. + */ +export function takeOlderEntries( + taskRunId: string, + content: string, + count = OLDER_CHUNK_LINES, +): { older: StoredLogEntry[]; hasOlder: boolean } | null { + const window = windows.get(taskRunId); + if (!window || window.windowStart === 0) return null; + const lines = content.trim().split("\n"); + // Pull older chunks until we have at least one renderable entry or reach the + // top. A chunk can parse to zero entries (e.g. a run of non-event lines); if + // we returned an empty slice while still reporting `hasOlder`, the caller + // would prepend nothing and the UI's scrollback gate could never advance. + let newStart = window.windowStart; + let older: StoredLogEntry[] = []; + while (newStart > 0 && older.length === 0) { + const from = Math.max(0, newStart - count); + older = parseSessionLogContent( + lines.slice(from, newStart).join("\n"), + ).rawEntries; + newStart = from; + } + window.windowStart = newStart; + return { older, hasOlder: newStart > 0 }; +} + +export function hasTranscriptWindow(taskRunId: string): boolean { + return windows.has(taskRunId); +} + +export function clearTranscriptWindow(taskRunId: string): void { + windows.delete(taskRunId); +} diff --git a/packages/core/src/sidebar/buildSidebarData.ts b/packages/core/src/sidebar/buildSidebarData.ts index 5b41353260..9a9793bde8 100644 --- a/packages/core/src/sidebar/buildSidebarData.ts +++ b/packages/core/src/sidebar/buildSidebarData.ts @@ -1,3 +1,4 @@ +import type { AgentSession } from "@posthog/shared"; import type { Task, TaskRunStatus } from "@posthog/shared/domain-types"; import { getRepositoryInfo } from "./groupTasks"; import type { TaskData } from "./sidebarData.types"; @@ -214,3 +215,31 @@ export function sliceChronological( hasMore: sortedUnpinnedTasks.length > historyVisibleCount, }; } + +/** Only the session fields the sidebar actually reads (see deriveTaskData). */ +type SidebarSessionFields = Pick< + AgentSession, + "taskId" | "isPromptPending" | "pendingPermissions" | "cloudStatus" +> & { cloudOutput?: { pr_url?: unknown } | null }; + +/** + * A compact, primitive signature of just the session fields the sidebar reads. + * The sidebar subscribes to this string instead of the whole sessions record, + * so streaming token appends — which only mutate `session.events` — don't + * re-render the sidebar (and, since it's mounted at the root, the whole tree). + */ +export function computeSidebarSessionSignature( + sessions: Record, +): string { + const parts: string[] = []; + for (const s of Object.values(sessions)) { + if (!s.taskId) continue; + const prUrl = + typeof s.cloudOutput?.pr_url === "string" ? s.cloudOutput.pr_url : ""; + parts.push( + `${s.taskId}\t${s.isPromptPending ? 1 : 0}\t${s.pendingPermissions.size}\t${s.cloudStatus ?? ""}\t${prUrl}`, + ); + } + parts.sort(); + return parts.join("\n"); +} diff --git a/packages/core/src/sidebar/computeSidebarSessionSignature.test.ts b/packages/core/src/sidebar/computeSidebarSessionSignature.test.ts new file mode 100644 index 0000000000..09ab89feed --- /dev/null +++ b/packages/core/src/sidebar/computeSidebarSessionSignature.test.ts @@ -0,0 +1,52 @@ +import { describe, expect, it } from "vitest"; +import { computeSidebarSessionSignature } from "./buildSidebarData"; + +// Minimal session shape the signature reads. `events` is included to prove it +// is ignored (the streaming hot path only mutates `events`). +function session(over: Record = {}) { + return { + taskId: "t1", + isPromptPending: false, + pendingPermissions: new Map(), + cloudStatus: undefined, + cloudOutput: null, + events: [], + ...over, + } as never; +} + +describe("computeSidebarSessionSignature", () => { + it("ignores events, so streaming tokens don't change it", () => { + const few = computeSidebarSessionSignature({ + r1: session({ events: [1, 2] }), + }); + const many = computeSidebarSessionSignature({ + r1: session({ events: [1, 2, 3, 4, 5, 6, 7] }), + }); + expect(many).toBe(few); + }); + + it.each([ + { label: "isPromptPending", over: { isPromptPending: true } }, + { label: "cloudStatus", over: { cloudStatus: "in_progress" } }, + { + label: "pendingPermissions size", + over: { pendingPermissions: new Map([["p", {}]]) }, + }, + { + label: "cloudOutput.pr_url", + over: { cloudOutput: { pr_url: "https://x/pr/1" } }, + }, + ])("changes when $label changes", ({ over }) => { + const before = computeSidebarSessionSignature({ r1: session() }); + expect(computeSidebarSessionSignature({ r1: session(over) })).not.toBe( + before, + ); + }); + + it("skips sessions without a taskId", () => { + expect( + computeSidebarSessionSignature({ r1: session({ taskId: "" }) }), + ).toBe(""); + }); +}); diff --git a/packages/shared/src/sessions.ts b/packages/shared/src/sessions.ts index 1c7663ba6b..21eb80e682 100644 --- a/packages/shared/src/sessions.ts +++ b/packages/shared/src/sessions.ts @@ -51,6 +51,12 @@ export interface AgentSession { taskTitle: string; channel: string; events: AcpMessage[]; + /** + * True when the transcript is a tail window and older events remain on disk, + * loadable via `SessionService.loadOlderEvents` (scrollback). Undefined for a + * fully-loaded transcript. + */ + hasOlderEvents?: boolean; startedAt: number; status: SessionStatus; errorTitle?: string; diff --git a/packages/ui/src/features/command-center/hooks/useCommandCenterData.ts b/packages/ui/src/features/command-center/hooks/useCommandCenterData.ts index 372053b391..aaca6977f3 100644 --- a/packages/ui/src/features/command-center/hooks/useCommandCenterData.ts +++ b/packages/ui/src/features/command-center/hooks/useCommandCenterData.ts @@ -9,8 +9,7 @@ import { } from "@posthog/core/command-center/status"; import type { Task } from "@posthog/shared/domain-types"; import { useMemo } from "react"; -import type { AgentSession } from "../../sessions/sessionStore"; -import { useSessions } from "../../sessions/useSession"; +import { useCommandCenterSessionMap } from "../../sessions/useSession"; import { useTasks } from "../../tasks/useTasks"; import { useWorkspaces } from "../../workspace/useWorkspace"; import { useCommandCenterStore } from "../commandCenterStore"; @@ -24,7 +23,9 @@ export function useCommandCenterData(): { } { const storeCells = useCommandCenterStore((s) => s.cells); const { data: tasks = [] } = useTasks(); - const sessions = useSessions(); + // Keyed by taskId, rebuilt only when a status-relevant field changes — not on + // every streamed token (transcripts update via each cell's own subscription). + const sessionByTaskId = useCommandCenterSessionMap(); const { data: workspaces } = useWorkspaces(); const taskById = useMemo(() => { @@ -35,16 +36,6 @@ export function useCommandCenterData(): { return map; }, [tasks]); - const sessionByTaskId = useMemo(() => { - const map = new Map(); - for (const session of Object.values(sessions)) { - if (session.taskId) { - map.set(session.taskId, session); - } - } - return map; - }, [sessions]); - const cells = useMemo( () => buildCommandCenterCells(storeCells, { diff --git a/packages/ui/src/features/sessions/components/ConversationView.tsx b/packages/ui/src/features/sessions/components/ConversationView.tsx index d15543515e..ac81bad2bc 100644 --- a/packages/ui/src/features/sessions/components/ConversationView.tsx +++ b/packages/ui/src/features/sessions/components/ConversationView.tsx @@ -1,5 +1,9 @@ import { ArrowDown, XCircle } from "@phosphor-icons/react"; import { WorkerPoolContextProvider } from "@pierre/diffs/react"; +import { + SESSION_SERVICE, + type SessionService, +} from "@posthog/core/sessions/sessionService"; import { useService } from "@posthog/di/react"; import { Button, @@ -163,6 +167,13 @@ export function ConversationView({ const isCloud = session?.isCloud ?? false; + // Scrollback: the transcript opens as a tail window; pull the next older chunk + // when the user scrolls toward the top. + const sessionService = useService(SESSION_SERVICE); + const handleReachTop = useCallback(() => { + if (taskId) void sessionService.loadOlderEvents(taskId); + }, [sessionService, taskId]); + const items = useMemo( () => mergeConversationItems({ @@ -383,6 +394,8 @@ export function ConversationView({ getItemKey={getRowKey} renderItem={renderRow} onScrollStateChange={handleScrollStateChange} + onReachTop={handleReachTop} + hasMoreAbove={session?.hasOlderEvents ?? false} keepMounted={rowKeepMounted} className="absolute inset-0 bg-background" itemClassName="mx-auto px-2 py-1.5" diff --git a/packages/ui/src/features/sessions/components/VirtualizedList.tsx b/packages/ui/src/features/sessions/components/VirtualizedList.tsx index dab37a4e4f..5e3f9805a1 100644 --- a/packages/ui/src/features/sessions/components/VirtualizedList.tsx +++ b/packages/ui/src/features/sessions/components/VirtualizedList.tsx @@ -22,6 +22,13 @@ interface VirtualizedListProps { itemStyle?: CSSProperties; footer?: ReactNode; onScrollStateChange?: (isAtBottom: boolean) => void; + /** + * Called when the user scrolls near the top and `hasMoreAbove` is set, so the + * caller can load older items (scrollback). New items prepended after this + * fires are anchored: the viewport keeps the same content, no jump. + */ + onReachTop?: () => void; + hasMoreAbove?: boolean; keepMounted?: readonly number[]; /** * Allow horizontal scrolling of the list viewport. Defaults to true. Narrow @@ -43,6 +50,9 @@ const OVERSCAN = 6; // A real upward drift, not a 1-frame measure transient: the DOM bottom sits // this far below the viewport. Well above any single append's measure gap. const FAR_DRIFT_THRESHOLD = 400; +// Start loading older items this far before the very top, so scrollback feels +// seamless rather than snagging at the edge. +const LOAD_OLDER_THRESHOLD = 600; function VirtualizedListInner( { @@ -54,6 +64,8 @@ function VirtualizedListInner( itemStyle, footer, onScrollStateChange, + onReachTop, + hasMoreAbove, keepMounted, scrollX = true, }: VirtualizedListProps, @@ -68,6 +80,15 @@ function VirtualizedListInner( const settleRafRef = useRef(null); const onScrollStateChangeRef = useRef(onScrollStateChange); onScrollStateChangeRef.current = onScrollStateChange; + // Scrollback: keep the latest callback/flag in refs so handleScroll stays + // stable. `loadingOlder` gates re-entrancy; `scrollHeightBeforeLoad` anchors + // the viewport when the prepended items land. + const onReachTopRef = useRef(onReachTop); + onReachTopRef.current = onReachTop; + const hasMoreAboveRef = useRef(hasMoreAbove); + hasMoreAboveRef.current = hasMoreAbove; + const loadingOlderRef = useRef(false); + const scrollHeightBeforeLoadRef = useRef(0); const hasFooter = footer != null; @@ -196,6 +217,27 @@ function VirtualizedListInner( virtualizer.scrollToEnd(); }, [totalSize, virtualizer]); + // Scrollback anchor: when a load prepends older items, the virtual height + // grows above the viewport. Shift scrollTop by that growth so the content the + // user was reading stays exactly where it was — no jump. Synchronous, + // pre-paint. Gated on loadingOlderRef so streaming appends never trip it. + // biome-ignore lint/correctness/useExhaustiveDependencies: items.length is the trigger + useLayoutEffect(() => { + if (!loadingOlderRef.current) return; + const el = parentRef.current; + if (!el) return; + const delta = el.scrollHeight - scrollHeightBeforeLoadRef.current; + if (delta > 0) { + el.scrollTop += delta; + isAtBottomRef.current = false; + } + // Release the gate whenever this fires after a load — not only when the + // height grew. Gating the release on `delta > 0` could leave it stuck (and + // permanently block further scrollback) if the virtualizer hadn't measured + // the new height yet this pass; the next scroll-up simply retries. + loadingOlderRef.current = false; + }, [items.length]); + const handleScroll = useCallback(() => { const el = parentRef.current; const scrollTop = el?.scrollTop ?? 0; @@ -203,6 +245,23 @@ function VirtualizedListInner( const scrolledUp = scrollTop < lastScrollTopRef.current - 1; lastScrollTopRef.current = scrollTop; + // Scrollback: as the user nears the top, ask the caller for older items. + // Record the pre-load scroll height so the anchor effect can keep the + // viewport on the same content once they prepend. Gate on `initialized` so + // the initial scroll-to-end settle (which can leave scrollTop near 0 for a + // short tail) doesn't spuriously pull the whole history in. + if ( + el && + initializedRef.current && + hasMoreAboveRef.current && + !loadingOlderRef.current && + scrollTop < LOAD_OLDER_THRESHOLD + ) { + loadingOlderRef.current = true; + scrollHeightBeforeLoadRef.current = el.scrollHeight; + onReachTopRef.current?.(); + } + const atEnd = virtualizer.isAtEnd(AT_BOTTOM_THRESHOLD); // Genuine far drift (not a 1-frame measure transient): the DOM bottom sits // well below the viewport. diff --git a/packages/ui/src/features/sessions/hooks/useSessionEventsResidency.test.tsx b/packages/ui/src/features/sessions/hooks/useSessionEventsResidency.test.tsx new file mode 100644 index 0000000000..6a4f7ecf60 --- /dev/null +++ b/packages/ui/src/features/sessions/hooks/useSessionEventsResidency.test.tsx @@ -0,0 +1,103 @@ +import { renderHook } from "@testing-library/react"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +const ensureEventsLoaded = vi.hoisted(() => vi.fn()); +const getSessionByTaskId = vi.hoisted(() => vi.fn()); +const evictEvents = vi.hoisted(() => vi.fn()); + +vi.mock("@posthog/di/react", () => ({ + useService: () => ({ ensureEventsLoaded }), +})); +vi.mock("@posthog/ui/features/sessions/sessionStore", () => ({ + sessionStoreSetters: { getSessionByTaskId, evictEvents }, +})); + +import { useSessionEventsResidency } from "./useSessionEventsResidency"; + +const GRACE_MS = 20_000; + +function idleSession(overrides: Record = {}) { + return { + taskRunId: "run-1", + events: [{ message: {} }], + isPromptPending: false, + isCompacting: false, + isCloud: false, + cloudStatus: undefined, + messageQueue: [], + ...overrides, + }; +} + +describe("useSessionEventsResidency", () => { + beforeEach(() => { + vi.useFakeTimers(); + ensureEventsLoaded.mockReset(); + getSessionByTaskId.mockReset().mockReturnValue(idleSession()); + evictEvents.mockReset(); + }); + afterEach(() => { + vi.useRealTimers(); + }); + + it("rehydrates events on mount", () => { + renderHook(() => useSessionEventsResidency("task-1")); + expect(ensureEventsLoaded).toHaveBeenCalledWith("task-1"); + }); + + it("evicts an idle session after the grace window on unmount", () => { + const { unmount } = renderHook(() => useSessionEventsResidency("task-1")); + unmount(); + expect(evictEvents).not.toHaveBeenCalled(); // still within grace + vi.advanceTimersByTime(GRACE_MS); + expect(evictEvents).toHaveBeenCalledWith("run-1"); + }); + + it("never evicts a streaming session", () => { + getSessionByTaskId.mockReturnValue(idleSession({ isPromptPending: true })); + const { unmount } = renderHook(() => useSessionEventsResidency("task-1")); + unmount(); + vi.advanceTimersByTime(GRACE_MS); + expect(evictEvents).not.toHaveBeenCalled(); + }); + + it("never evicts a live (non-terminal) cloud run", () => { + getSessionByTaskId.mockReturnValue( + idleSession({ isCloud: true, cloudStatus: "in_progress" }), + ); + const { unmount } = renderHook(() => useSessionEventsResidency("task-1")); + unmount(); + vi.advanceTimersByTime(GRACE_MS); + expect(evictEvents).not.toHaveBeenCalled(); + }); + + it("never evicts a session with queued messages pending dispatch", () => { + getSessionByTaskId.mockReturnValue( + idleSession({ messageQueue: [{ id: "q1" }] }), + ); + const { unmount } = renderHook(() => useSessionEventsResidency("task-1")); + unmount(); + vi.advanceTimersByTime(GRACE_MS); + expect(evictEvents).not.toHaveBeenCalled(); + }); + + it("evicts a finished (terminal) cloud run", () => { + getSessionByTaskId.mockReturnValue( + idleSession({ isCloud: true, cloudStatus: "completed" }), + ); + const { unmount } = renderHook(() => useSessionEventsResidency("task-1")); + unmount(); + vi.advanceTimersByTime(GRACE_MS); + expect(evictEvents).toHaveBeenCalledWith("run-1"); + }); + + it("cancels a pending eviction when the task is refocused within the grace window", () => { + const { unmount } = renderHook(() => useSessionEventsResidency("task-1")); + unmount(); + vi.advanceTimersByTime(GRACE_MS / 2); + // Refocus before the grace window elapses. + renderHook(() => useSessionEventsResidency("task-1")); + vi.advanceTimersByTime(GRACE_MS); + expect(evictEvents).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/ui/src/features/sessions/hooks/useSessionEventsResidency.ts b/packages/ui/src/features/sessions/hooks/useSessionEventsResidency.ts new file mode 100644 index 0000000000..ebce838b02 --- /dev/null +++ b/packages/ui/src/features/sessions/hooks/useSessionEventsResidency.ts @@ -0,0 +1,68 @@ +import { + SESSION_SERVICE, + type SessionService, +} from "@posthog/core/sessions/sessionService"; +import { useService } from "@posthog/di/react"; +import { isTerminalStatus } from "@posthog/shared/domain-types"; +import { sessionStoreSetters } from "@posthog/ui/features/sessions/sessionStore"; +import { useEffect } from "react"; + +/** + * How long a transcript can stay unfocused before its in-memory `events` are + * evicted. The grace window keeps quick back-and-forth navigation cheap (no + * reload churn) while still bounding resident memory for genuinely idle tasks. + */ +const EVICT_GRACE_MS = 20_000; + +/** Pending eviction timers keyed by taskId, shared across hook instances. */ +const evictTimers = new Map>(); + +function cancelPendingEvict(taskId: string): void { + const timer = evictTimers.get(taskId); + if (timer) { + clearTimeout(timer); + evictTimers.delete(taskId); + } +} + +/** + * Bounds the memory held by `session.events` (an append-only mirror of the + * on-disk ndjson) across many open tasks. + * + * While a task's transcript is mounted it stays fully resident. On unmount the + * events are evicted after a grace window — but never for a session that is + * still streaming (`isPromptPending` / `isCompacting`) or a live cloud run, so + * background work is never disturbed. On (re)mount the transcript is rehydrated + * from disk via `ensureEventsLoaded`, which keeps the session warm (subscription + * and status intact) and no-ops when events are already present. + * + * Mechanism + policy validated empirically in `membench/` (~−55% steady-state + * renderer RAM on a realistic multi-session workload). + */ +export function useSessionEventsResidency(taskId: string): void { + const sessionService = useService(SESSION_SERVICE); + + useEffect(() => { + // Focused: cancel any scheduled eviction and restore the transcript. + cancelPendingEvict(taskId); + void sessionService.ensureEventsLoaded(taskId); + + return () => { + // Blurred: schedule eviction after the grace window. + cancelPendingEvict(taskId); + const timer = setTimeout(() => { + evictTimers.delete(taskId); + const session = sessionStoreSetters.getSessionByTaskId(taskId); + if (!session || session.events.length === 0) return; + // Never disturb an in-flight turn, a queued turn about to dispatch, or a + // live cloud run — any of these can append events while unfocused, which + // would truncate the transcript (rehydration bails when events exist). + if (session.isPromptPending || session.isCompacting) return; + if (session.messageQueue.length > 0) return; + if (session.isCloud && !isTerminalStatus(session.cloudStatus)) return; + sessionStoreSetters.evictEvents(session.taskRunId); + }, EVICT_GRACE_MS); + evictTimers.set(taskId, timer); + }; + }, [taskId, sessionService]); +} diff --git a/packages/ui/src/features/sessions/sessionStore.ts b/packages/ui/src/features/sessions/sessionStore.ts index b9432901f0..21727310b6 100644 --- a/packages/ui/src/features/sessions/sessionStore.ts +++ b/packages/ui/src/features/sessions/sessionStore.ts @@ -87,5 +87,6 @@ export { useQueuedMessagesForTask, useSessionForTask, useSessions, + useSidebarSessionMap, useThoughtLevelConfigOptionForTask, } from "./useSession"; diff --git a/packages/ui/src/features/sessions/useSession.ts b/packages/ui/src/features/sessions/useSession.ts index 0352ee1095..87901458db 100644 --- a/packages/ui/src/features/sessions/useSession.ts +++ b/packages/ui/src/features/sessions/useSession.ts @@ -2,11 +2,14 @@ import type { AvailableCommand, SessionConfigOption, } from "@agentclientprotocol/sdk"; +import { computeCommandCenterSessionSignature } from "@posthog/core/command-center/status"; import { extractAvailableCommandsFromEvents, extractUserPromptsFromEvents, } from "@posthog/core/sessions/sessionEvents"; +import { computeSidebarSessionSignature } from "@posthog/core/sidebar/buildSidebarData"; import type { PermissionRequest } from "@posthog/ui/features/sessions/sessionLogTypes"; +import { useMemo } from "react"; import { shallow } from "zustand/shallow"; import { type Adapter, @@ -19,6 +22,50 @@ import { export const useSessions = () => useSessionStore((s) => s.sessions); +/** + * The sidebar's view of sessions, keyed by taskId. Subscribes only to a + * signature of the fields the sidebar reads (see computeSidebarSessionSignature), + * so streaming token appends — which only mutate `events` — don't re-render the + * sidebar (which is mounted at the root). The map is rebuilt from the live + * snapshot only when a sidebar-relevant field actually changes. + */ +export const useSidebarSessionMap = (): Map => { + const signature = useSessionStore((s) => + computeSidebarSessionSignature(s.sessions), + ); + // `signature` is the trigger, not read inside: rebuild the map from the live + // snapshot only when a sidebar-relevant field changes (not on every token). + // biome-ignore lint/correctness/useExhaustiveDependencies: keyed by signature on purpose + return useMemo(() => { + const map = new Map(); + for (const session of Object.values(useSessionStore.getState().sessions)) { + if (session.taskId) map.set(session.taskId, session); + } + return map; + }, [signature]); +}; + +/** + * The command-center grid's view of sessions, keyed by taskId. Same trick as + * useSidebarSessionMap: subscribe only to a signature of the fields the grid's + * `deriveStatus` reads, so streaming token appends don't re-render the grid. + * Cell transcripts update independently via each EmbeddedSessionView's own + * subscription, so the grid map never needs to churn on `events` mutations. + */ +export const useCommandCenterSessionMap = (): Map => { + const signature = useSessionStore((s) => + computeCommandCenterSessionSignature(s.sessions), + ); + // biome-ignore lint/correctness/useExhaustiveDependencies: keyed by signature on purpose + return useMemo(() => { + const map = new Map(); + for (const session of Object.values(useSessionStore.getState().sessions)) { + if (session.taskId) map.set(session.taskId, session); + } + return map; + }, [signature]); +}; + /** O(1) lookup using taskIdIndex */ export const useSessionForTask = ( taskId: string | undefined, diff --git a/packages/ui/src/features/sessions/useSidebarSessionMap.test.tsx b/packages/ui/src/features/sessions/useSidebarSessionMap.test.tsx new file mode 100644 index 0000000000..e2bd300b82 --- /dev/null +++ b/packages/ui/src/features/sessions/useSidebarSessionMap.test.tsx @@ -0,0 +1,62 @@ +import type { AcpMessage, AgentSession } from "@posthog/shared"; +import { act, renderHook } from "@testing-library/react"; +import { beforeEach, describe, expect, it } from "vitest"; +import { sessionStoreSetters, useSessionStore } from "./sessionStore"; +import { useSessions, useSidebarSessionMap } from "./useSession"; + +function makeSession(taskId: string, taskRunId: string): AgentSession { + return { + taskId, + taskRunId, + events: [], + isPromptPending: false, + pendingPermissions: new Map(), + } as unknown as AgentSession; +} + +const TOKEN = {} as AcpMessage; + +/** Mount a hook and count how many times it (re)renders. */ +function countRenders(hook: () => T): () => number { + let n = 0; + renderHook(() => { + n++; + return hook(); + }); + return () => n; +} + +describe("sidebar session subscription — re-render cost during streaming", () => { + beforeEach(() => { + useSessionStore.setState({ sessions: {}, taskIdIndex: {} }); + sessionStoreSetters.setSession(makeSession("t1", "r1")); + }); + + it("baseline: useSessions() re-renders on every streamed token", () => { + const renders = countRenders(() => useSessions()); + const before = renders(); + for (let i = 0; i < 20; i++) { + act(() => sessionStoreSetters.appendEvents("r1", [TOKEN])); + } + // Old behaviour: one re-render per token (and the sidebar is at the root). + expect(renders() - before).toBe(20); + }); + + it("fixed: useSidebarSessionMap() ignores streamed tokens (0 re-renders)", () => { + const renders = countRenders(() => useSidebarSessionMap()); + const before = renders(); + for (let i = 0; i < 20; i++) { + act(() => sessionStoreSetters.appendEvents("r1", [TOKEN])); + } + expect(renders() - before).toBe(0); + }); + + it("fixed: still re-renders when a sidebar-relevant field changes", () => { + const renders = countRenders(() => useSidebarSessionMap()); + const before = renders(); + act(() => + sessionStoreSetters.updateSession("r1", { isPromptPending: true }), + ); + expect(renders() - before).toBe(1); + }); +}); diff --git a/packages/ui/src/features/sidebar/useSidebarData.ts b/packages/ui/src/features/sidebar/useSidebarData.ts index a040204883..f6c01e0784 100644 --- a/packages/ui/src/features/sidebar/useSidebarData.ts +++ b/packages/ui/src/features/sidebar/useSidebarData.ts @@ -18,7 +18,7 @@ import type { AppView } from "@posthog/ui/router/useAppView"; import { useEffect, useMemo, useRef } from "react"; import { useArchivedTaskIds } from "../archive/useArchivedTaskIds"; import { useProvisioningStore } from "../provisioning/store"; -import { useSessions } from "../sessions/sessionStore"; +import { useSidebarSessionMap } from "../sessions/sessionStore"; import { useSuspendedTaskIds } from "../suspension/useSuspendedTaskIds"; import { useSlackTasks, useTaskSummaries, useTasks } from "../tasks/useTasks"; import { useWorkspaces } from "../workspace/useWorkspace"; @@ -41,7 +41,6 @@ export function useSidebarData({ const archivedTaskIds = useArchivedTaskIds(); const suspendedTaskIds = useSuspendedTaskIds(); const provisioningTaskIds = useProvisioningStore((s) => s.activeTasks); - const sessions = useSessions(); const { timestamps } = useTaskViewed(); const historyVisibleCount = useSidebarStore( (state) => state.historyVisibleCount, @@ -140,15 +139,7 @@ export function useSidebarData({ const activeTaskId = activeView.type === "task-detail" ? (activeView.taskId ?? null) : null; - const sessionByTaskId = useMemo(() => { - const map = new Map(); - for (const session of Object.values(sessions)) { - if (session.taskId) { - map.set(session.taskId, session); - } - } - return map; - }, [sessions]); + const sessionByTaskId = useSidebarSessionMap(); const taskData = useMemo( () => diff --git a/packages/ui/src/features/task-detail/components/TaskLogsPanel.tsx b/packages/ui/src/features/task-detail/components/TaskLogsPanel.tsx index c7d32e7f9a..9bab117f1a 100644 --- a/packages/ui/src/features/task-detail/components/TaskLogsPanel.tsx +++ b/packages/ui/src/features/task-detail/components/TaskLogsPanel.tsx @@ -11,6 +11,7 @@ import { useProvisioningStore } from "../../provisioning/store"; import { SessionView } from "../../sessions/components/SessionView"; import { useSessionCallbacks } from "../../sessions/hooks/useSessionCallbacks"; import { useSessionConnection } from "../../sessions/hooks/useSessionConnection"; +import { useSessionEventsResidency } from "../../sessions/hooks/useSessionEventsResidency"; import { useSessionViewState } from "../../sessions/hooks/useSessionViewState"; import { useRestoreTask } from "../../suspension/useRestoreTask"; import { useSuspendedTaskIds } from "../../suspension/useSuspendedTaskIds"; @@ -70,6 +71,10 @@ export function TaskLogsPanel({ taskId, task, hideInput }: TaskLogsPanelProps) { isSuspended, }); + // Evict this transcript's events while it is unfocused (rehydrated on focus) + // to bound renderer RAM across many open tasks. + useSessionEventsResidency(taskId); + const { handleSendPrompt, handleCancelPrompt,