From 88b27159d3598a4b88a1b989099c88aa134d5162 Mon Sep 17 00:00:00 2001 From: karlo Date: Tue, 16 Jun 2026 13:16:31 -0700 Subject: [PATCH 01/11] perf(sidebar): stop re-rendering on every streamed token The sidebar consumed the whole `sessions` record via `useSessions()`, which immer replaces on every appended event (one per streamed token). Since the sidebar is mounted at the root, that re-rendered the whole tree on every token. `deriveTaskData` only reads four session fields (isPromptPending, pendingPermissions size, cloudStatus, cloudOutput.pr_url) -- never `events`: - Add `computeSidebarSessionSignature` (core, pure): a primitive signature of just those fields. - Add `useSidebarSessionMap` (ui): subscribes to that signature and rebuilds the taskId -> session map only when a sidebar-relevant field changes. - `useSidebarData` uses it instead of `useSessions()`. Render-count test: 20 streamed tokens caused 20 sidebar re-renders before, 0 after (and 1 when a relevant field actually changes). Part of #2162 --- packages/core/src/sidebar/buildSidebarData.ts | 29 +++++++++ .../computeSidebarSessionSignature.test.ts | 52 ++++++++++++++++ .../ui/src/features/sessions/sessionStore.ts | 1 + .../ui/src/features/sessions/useSession.ts | 25 ++++++++ .../sessions/useSidebarSessionMap.test.tsx | 62 +++++++++++++++++++ .../ui/src/features/sidebar/useSidebarData.ts | 13 +--- 6 files changed, 171 insertions(+), 11 deletions(-) create mode 100644 packages/core/src/sidebar/computeSidebarSessionSignature.test.ts create mode 100644 packages/ui/src/features/sessions/useSidebarSessionMap.test.tsx diff --git a/packages/core/src/sidebar/buildSidebarData.ts b/packages/core/src/sidebar/buildSidebarData.ts index 5b41353260..9a9793bde8 100644 --- a/packages/core/src/sidebar/buildSidebarData.ts +++ b/packages/core/src/sidebar/buildSidebarData.ts @@ -1,3 +1,4 @@ +import type { AgentSession } from "@posthog/shared"; import type { Task, TaskRunStatus } from "@posthog/shared/domain-types"; import { getRepositoryInfo } from "./groupTasks"; import type { TaskData } from "./sidebarData.types"; @@ -214,3 +215,31 @@ export function sliceChronological( hasMore: sortedUnpinnedTasks.length > historyVisibleCount, }; } + +/** Only the session fields the sidebar actually reads (see deriveTaskData). */ +type SidebarSessionFields = Pick< + AgentSession, + "taskId" | "isPromptPending" | "pendingPermissions" | "cloudStatus" +> & { cloudOutput?: { pr_url?: unknown } | null }; + +/** + * A compact, primitive signature of just the session fields the sidebar reads. + * The sidebar subscribes to this string instead of the whole sessions record, + * so streaming token appends — which only mutate `session.events` — don't + * re-render the sidebar (and, since it's mounted at the root, the whole tree). + */ +export function computeSidebarSessionSignature( + sessions: Record, +): string { + const parts: string[] = []; + for (const s of Object.values(sessions)) { + if (!s.taskId) continue; + const prUrl = + typeof s.cloudOutput?.pr_url === "string" ? s.cloudOutput.pr_url : ""; + parts.push( + `${s.taskId}\t${s.isPromptPending ? 1 : 0}\t${s.pendingPermissions.size}\t${s.cloudStatus ?? ""}\t${prUrl}`, + ); + } + parts.sort(); + return parts.join("\n"); +} diff --git a/packages/core/src/sidebar/computeSidebarSessionSignature.test.ts b/packages/core/src/sidebar/computeSidebarSessionSignature.test.ts new file mode 100644 index 0000000000..09ab89feed --- /dev/null +++ b/packages/core/src/sidebar/computeSidebarSessionSignature.test.ts @@ -0,0 +1,52 @@ +import { describe, expect, it } from "vitest"; +import { computeSidebarSessionSignature } from "./buildSidebarData"; + +// Minimal session shape the signature reads. `events` is included to prove it +// is ignored (the streaming hot path only mutates `events`). +function session(over: Record = {}) { + return { + taskId: "t1", + isPromptPending: false, + pendingPermissions: new Map(), + cloudStatus: undefined, + cloudOutput: null, + events: [], + ...over, + } as never; +} + +describe("computeSidebarSessionSignature", () => { + it("ignores events, so streaming tokens don't change it", () => { + const few = computeSidebarSessionSignature({ + r1: session({ events: [1, 2] }), + }); + const many = computeSidebarSessionSignature({ + r1: session({ events: [1, 2, 3, 4, 5, 6, 7] }), + }); + expect(many).toBe(few); + }); + + it.each([ + { label: "isPromptPending", over: { isPromptPending: true } }, + { label: "cloudStatus", over: { cloudStatus: "in_progress" } }, + { + label: "pendingPermissions size", + over: { pendingPermissions: new Map([["p", {}]]) }, + }, + { + label: "cloudOutput.pr_url", + over: { cloudOutput: { pr_url: "https://x/pr/1" } }, + }, + ])("changes when $label changes", ({ over }) => { + const before = computeSidebarSessionSignature({ r1: session() }); + expect(computeSidebarSessionSignature({ r1: session(over) })).not.toBe( + before, + ); + }); + + it("skips sessions without a taskId", () => { + expect( + computeSidebarSessionSignature({ r1: session({ taskId: "" }) }), + ).toBe(""); + }); +}); diff --git a/packages/ui/src/features/sessions/sessionStore.ts b/packages/ui/src/features/sessions/sessionStore.ts index b9432901f0..21727310b6 100644 --- a/packages/ui/src/features/sessions/sessionStore.ts +++ b/packages/ui/src/features/sessions/sessionStore.ts @@ -87,5 +87,6 @@ export { useQueuedMessagesForTask, useSessionForTask, useSessions, + useSidebarSessionMap, useThoughtLevelConfigOptionForTask, } from "./useSession"; diff --git a/packages/ui/src/features/sessions/useSession.ts b/packages/ui/src/features/sessions/useSession.ts index 0352ee1095..960bf3ff21 100644 --- a/packages/ui/src/features/sessions/useSession.ts +++ b/packages/ui/src/features/sessions/useSession.ts @@ -6,7 +6,9 @@ import { extractAvailableCommandsFromEvents, extractUserPromptsFromEvents, } from "@posthog/core/sessions/sessionEvents"; +import { computeSidebarSessionSignature } from "@posthog/core/sidebar/buildSidebarData"; import type { PermissionRequest } from "@posthog/ui/features/sessions/sessionLogTypes"; +import { useMemo } from "react"; import { shallow } from "zustand/shallow"; import { type Adapter, @@ -19,6 +21,29 @@ import { export const useSessions = () => useSessionStore((s) => s.sessions); +/** + * The sidebar's view of sessions, keyed by taskId. Subscribes only to a + * signature of the fields the sidebar reads (see computeSidebarSessionSignature), + * so streaming token appends — which only mutate `events` — don't re-render the + * sidebar (which is mounted at the root). The map is rebuilt from the live + * snapshot only when a sidebar-relevant field actually changes. + */ +export const useSidebarSessionMap = (): Map => { + const signature = useSessionStore((s) => + computeSidebarSessionSignature(s.sessions), + ); + // `signature` is the trigger, not read inside: rebuild the map from the live + // snapshot only when a sidebar-relevant field changes (not on every token). + // biome-ignore lint/correctness/useExhaustiveDependencies: keyed by signature on purpose + return useMemo(() => { + const map = new Map(); + for (const session of Object.values(useSessionStore.getState().sessions)) { + if (session.taskId) map.set(session.taskId, session); + } + return map; + }, [signature]); +}; + /** O(1) lookup using taskIdIndex */ export const useSessionForTask = ( taskId: string | undefined, diff --git a/packages/ui/src/features/sessions/useSidebarSessionMap.test.tsx b/packages/ui/src/features/sessions/useSidebarSessionMap.test.tsx new file mode 100644 index 0000000000..e2bd300b82 --- /dev/null +++ b/packages/ui/src/features/sessions/useSidebarSessionMap.test.tsx @@ -0,0 +1,62 @@ +import type { AcpMessage, AgentSession } from "@posthog/shared"; +import { act, renderHook } from "@testing-library/react"; +import { beforeEach, describe, expect, it } from "vitest"; +import { sessionStoreSetters, useSessionStore } from "./sessionStore"; +import { useSessions, useSidebarSessionMap } from "./useSession"; + +function makeSession(taskId: string, taskRunId: string): AgentSession { + return { + taskId, + taskRunId, + events: [], + isPromptPending: false, + pendingPermissions: new Map(), + } as unknown as AgentSession; +} + +const TOKEN = {} as AcpMessage; + +/** Mount a hook and count how many times it (re)renders. */ +function countRenders(hook: () => T): () => number { + let n = 0; + renderHook(() => { + n++; + return hook(); + }); + return () => n; +} + +describe("sidebar session subscription — re-render cost during streaming", () => { + beforeEach(() => { + useSessionStore.setState({ sessions: {}, taskIdIndex: {} }); + sessionStoreSetters.setSession(makeSession("t1", "r1")); + }); + + it("baseline: useSessions() re-renders on every streamed token", () => { + const renders = countRenders(() => useSessions()); + const before = renders(); + for (let i = 0; i < 20; i++) { + act(() => sessionStoreSetters.appendEvents("r1", [TOKEN])); + } + // Old behaviour: one re-render per token (and the sidebar is at the root). + expect(renders() - before).toBe(20); + }); + + it("fixed: useSidebarSessionMap() ignores streamed tokens (0 re-renders)", () => { + const renders = countRenders(() => useSidebarSessionMap()); + const before = renders(); + for (let i = 0; i < 20; i++) { + act(() => sessionStoreSetters.appendEvents("r1", [TOKEN])); + } + expect(renders() - before).toBe(0); + }); + + it("fixed: still re-renders when a sidebar-relevant field changes", () => { + const renders = countRenders(() => useSidebarSessionMap()); + const before = renders(); + act(() => + sessionStoreSetters.updateSession("r1", { isPromptPending: true }), + ); + expect(renders() - before).toBe(1); + }); +}); diff --git a/packages/ui/src/features/sidebar/useSidebarData.ts b/packages/ui/src/features/sidebar/useSidebarData.ts index a040204883..f6c01e0784 100644 --- a/packages/ui/src/features/sidebar/useSidebarData.ts +++ b/packages/ui/src/features/sidebar/useSidebarData.ts @@ -18,7 +18,7 @@ import type { AppView } from "@posthog/ui/router/useAppView"; import { useEffect, useMemo, useRef } from "react"; import { useArchivedTaskIds } from "../archive/useArchivedTaskIds"; import { useProvisioningStore } from "../provisioning/store"; -import { useSessions } from "../sessions/sessionStore"; +import { useSidebarSessionMap } from "../sessions/sessionStore"; import { useSuspendedTaskIds } from "../suspension/useSuspendedTaskIds"; import { useSlackTasks, useTaskSummaries, useTasks } from "../tasks/useTasks"; import { useWorkspaces } from "../workspace/useWorkspace"; @@ -41,7 +41,6 @@ export function useSidebarData({ const archivedTaskIds = useArchivedTaskIds(); const suspendedTaskIds = useSuspendedTaskIds(); const provisioningTaskIds = useProvisioningStore((s) => s.activeTasks); - const sessions = useSessions(); const { timestamps } = useTaskViewed(); const historyVisibleCount = useSidebarStore( (state) => state.historyVisibleCount, @@ -140,15 +139,7 @@ export function useSidebarData({ const activeTaskId = activeView.type === "task-detail" ? (activeView.taskId ?? null) : null; - const sessionByTaskId = useMemo(() => { - const map = new Map(); - for (const session of Object.values(sessions)) { - if (session.taskId) { - map.set(session.taskId, session); - } - } - return map; - }, [sessions]); + const sessionByTaskId = useSidebarSessionMap(); const taskData = useMemo( () => From 69fbec58b2be4fb7fcedcba9418e1a88295627ac Mon Sep 17 00:00:00 2001 From: karlo Date: Fri, 26 Jun 2026 12:13:16 -0700 Subject: [PATCH 02/11] perf(sessions): evict inactive session transcripts to bound renderer RAM session.events is an append-only mirror of the on-disk ndjson log that was never freed, so renderer memory grew unbounded across open tasks. Evict the events of unfocused, idle sessions after a grace window and rehydrate from disk on refocus (ensureEventsLoaded keeps the session warm). Never evicts a streaming session, a queued-turn session, or a live cloud run. Part of #2162. --- .../src/sessions/ensureEventsLoaded.test.ts | 130 ++++++++++++++++++ packages/core/src/sessions/sessionService.ts | 42 ++++++ .../core/src/sessions/sessionStore.test.ts | 67 +++++++++ packages/core/src/sessions/sessionStore.ts | 18 +++ .../hooks/useSessionEventsResidency.test.tsx | 103 ++++++++++++++ .../hooks/useSessionEventsResidency.ts | 68 +++++++++ .../task-detail/components/TaskLogsPanel.tsx | 5 + 7 files changed, 433 insertions(+) create mode 100644 packages/core/src/sessions/ensureEventsLoaded.test.ts create mode 100644 packages/core/src/sessions/sessionStore.test.ts create mode 100644 packages/ui/src/features/sessions/hooks/useSessionEventsResidency.test.tsx create mode 100644 packages/ui/src/features/sessions/hooks/useSessionEventsResidency.ts diff --git a/packages/core/src/sessions/ensureEventsLoaded.test.ts b/packages/core/src/sessions/ensureEventsLoaded.test.ts new file mode 100644 index 0000000000..b9f2713848 --- /dev/null +++ b/packages/core/src/sessions/ensureEventsLoaded.test.ts @@ -0,0 +1,130 @@ +import type { AgentSession } from "@posthog/shared"; +import { describe, expect, it, vi } from "vitest"; +import { createBaseSession } from "./sessionFactory"; +import { SessionService, type SessionServiceDeps } from "./sessionService"; + +const TASK_ID = "task-1"; +const RUN_ID = "run-1"; + +function ndjson(...texts: string[]): string { + return texts + .map((text) => + JSON.stringify({ + type: "notification", + timestamp: "2026-06-16T15:22:35.396Z", + notification: { + jsonrpc: "2.0", + method: "session/update", + params: { update: { sessionUpdate: "agent_message_chunk", text } }, + }, + }), + ) + .join("\n"); +} + +function createHarness( + sessionOverrides: Partial = {}, + localLogs = ndjson("hello", "world"), +) { + const sessions: Record = {}; + const base = createBaseSession(RUN_ID, TASK_ID, "Task"); + sessions[RUN_ID] = { ...base, events: [], ...sessionOverrides }; + + const updateSession = vi.fn( + (taskRunId: string, updates: Partial) => { + const session = sessions[taskRunId]; + if (session) Object.assign(session, updates); + }, + ); + + const readLocalLogs = vi.fn().mockResolvedValue(localLogs); + + const store = { + getSessions: () => sessions, + getSessionByTaskId: (taskId: string) => + Object.values(sessions).find((s) => s.taskId === taskId), + setSession: (s: AgentSession) => { + sessions[s.taskRunId] = s; + }, + updateSession, + }; + + const deps = { + store, + log: { info: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() }, + trpc: { + agent: { + onSessionIdleKilled: { + subscribe: () => ({ unsubscribe: vi.fn() }), + }, + }, + logs: { + readLocalLogs: { query: readLocalLogs }, + fetchS3Logs: { query: vi.fn().mockResolvedValue("") }, + writeLocalLogs: { mutate: vi.fn().mockResolvedValue(undefined) }, + }, + }, + } as unknown as SessionServiceDeps; + + return { + service: new SessionService(deps), + sessions, + updateSession, + readLocalLogs, + }; +} + +describe("SessionService.ensureEventsLoaded", () => { + it("rehydrates events from disk for an evicted (empty) session", async () => { + const h = createHarness(); + + await h.service.ensureEventsLoaded(TASK_ID); + + expect(h.readLocalLogs).toHaveBeenCalledWith({ taskRunId: RUN_ID }); + expect(h.sessions[RUN_ID].events.length).toBeGreaterThan(0); + }); + + it("no-ops when events are already resident (no disk read)", async () => { + const h = createHarness({ + events: [{ message: {} }] as unknown as AgentSession["events"], + }); + + await h.service.ensureEventsLoaded(TASK_ID); + + expect(h.readLocalLogs).not.toHaveBeenCalled(); + expect(h.updateSession).not.toHaveBeenCalled(); + }); + + it("no-ops for an unknown task", async () => { + const h = createHarness(); + await h.service.ensureEventsLoaded("nope"); + expect(h.readLocalLogs).not.toHaveBeenCalled(); + }); + + it("sets processedLineCount for cloud sessions, leaves it undefined for local", async () => { + const cloud = createHarness({ isCloud: true }); + await cloud.service.ensureEventsLoaded(TASK_ID); + expect(cloud.sessions[RUN_ID].processedLineCount).toBe(2); + + const local = createHarness({ isCloud: false }); + await local.service.ensureEventsLoaded(TASK_ID); + expect(local.sessions[RUN_ID].processedLineCount).toBeUndefined(); + }); + + it("does not clobber events that a live stream appended during the fetch", async () => { + const h = createHarness(); + // Simulate a stream landing while the disk read is in flight. + h.readLocalLogs.mockImplementationOnce(async () => { + h.sessions[RUN_ID].events = [ + { message: {} }, + ] as unknown as AgentSession["events"]; + return ndjson("stale"); + }); + + await h.service.ensureEventsLoaded(TASK_ID); + + // The single streamed event survives; the stale disk read is discarded. + expect(h.sessions[RUN_ID].events).toHaveLength(1); + expect(h.updateSession).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/core/src/sessions/sessionService.ts b/packages/core/src/sessions/sessionService.ts index 341b429f32..3f0fdff89c 100644 --- a/packages/core/src/sessions/sessionService.ts +++ b/packages/core/src/sessions/sessionService.ts @@ -1318,6 +1318,48 @@ export class SessionService { await this.teardownSession(session.taskRunId); } + /** + * Rehydrate the transcript of a session whose `events` were evicted (see + * `sessionStoreSetters.evictEvents`) while it stayed warm/connected. + * + * Unlike `loadLogsOnly`, this does NOT recreate the session — it re-reads the + * ndjson from disk (S3 fallback) and patches `events` back in via + * `updateSession`, leaving the live subscription, status and config intact. + * Safe to call unconditionally on focus: it no-ops when events are already + * resident, and bails if a live stream re-populates events during the fetch + * so it never clobbers fresher data. + */ + async ensureEventsLoaded(taskId: string): Promise { + const session = this.d.store.getSessionByTaskId(taskId); + if (!session) return; // not connected — the normal connect path hydrates + if (session.events.length > 0) return; // already resident + const { taskRunId, logUrl } = session; + + let parsed: ParsedSessionLogs; + try { + parsed = await this.fetchSessionLogs(logUrl, taskRunId); + } catch (err) { + this.d.log.warn("ensureEventsLoaded: failed to fetch logs", { + taskId, + error: err, + }); + return; + } + if (parsed.rawEntries.length === 0) return; // nothing on disk to restore + + // Re-check: the session may have been torn down or a live stream may have + // appended events while we awaited the fetch. Don't overwrite either. + const current = this.d.store.getSessionByTaskId(taskId); + if (!current || current.taskRunId !== taskRunId) return; + if (current.events.length > 0) return; + + const events = convertStoredEntriesToEvents(parsed.rawEntries); + this.d.store.updateSession(taskRunId, { + events, + processedLineCount: current.isCloud ? parsed.totalLineCount : undefined, + }); + } + // --- Subscription Management --- private subscribeToChannel(taskRunId: string): void { diff --git a/packages/core/src/sessions/sessionStore.test.ts b/packages/core/src/sessions/sessionStore.test.ts new file mode 100644 index 0000000000..f4d2233f45 --- /dev/null +++ b/packages/core/src/sessions/sessionStore.test.ts @@ -0,0 +1,67 @@ +import type { AcpMessage, AgentSession } from "@posthog/shared"; +import { beforeEach, describe, expect, it } from "vitest"; +import { createBaseSession } from "./sessionFactory"; +import { sessionStore, sessionStoreSetters } from "./sessionStore"; + +function event(text: string): AcpMessage { + return { + message: { + jsonrpc: "2.0", + method: "session/update", + params: { update: { sessionUpdate: "agent_message_chunk", text } }, + }, + } as unknown as AcpMessage; +} + +function seed(taskRunId: string, taskId: string): AgentSession { + const session = createBaseSession(taskRunId, taskId, "Task"); + sessionStoreSetters.setSession(session); + return session; +} + +describe("sessionStoreSetters.evictEvents", () => { + beforeEach(() => { + sessionStoreSetters.clearAll(); + }); + + it("empties the events array and resets processedLineCount", () => { + seed("run-1", "task-1"); + sessionStoreSetters.appendEvents("run-1", [event("a"), event("b")], 2); + expect(sessionStore.getState().sessions["run-1"].events).toHaveLength(2); + expect(sessionStore.getState().sessions["run-1"].processedLineCount).toBe( + 2, + ); + + sessionStoreSetters.evictEvents("run-1"); + + const session = sessionStore.getState().sessions["run-1"]; + expect(session.events).toHaveLength(0); + expect(session.processedLineCount).toBeUndefined(); + }); + + it("keeps the session and its identity index intact (non-destructive)", () => { + seed("run-1", "task-1"); + sessionStoreSetters.appendEvents("run-1", [event("a")]); + + sessionStoreSetters.evictEvents("run-1"); + + expect(sessionStore.getState().sessions["run-1"]).toBeDefined(); + expect(sessionStoreSetters.getSessionByTaskId("task-1")?.taskRunId).toBe( + "run-1", + ); + }); + + it("is a no-op for an unknown taskRunId", () => { + expect(() => sessionStoreSetters.evictEvents("missing")).not.toThrow(); + }); + + it("lets events be appended again after eviction (rehydration path)", () => { + seed("run-1", "task-1"); + sessionStoreSetters.appendEvents("run-1", [event("a"), event("b")]); + sessionStoreSetters.evictEvents("run-1"); + + sessionStoreSetters.appendEvents("run-1", [event("c")]); + + expect(sessionStore.getState().sessions["run-1"].events).toHaveLength(1); + }); +}); diff --git a/packages/core/src/sessions/sessionStore.ts b/packages/core/src/sessions/sessionStore.ts index 2ea1cbad09..c837e5dcdd 100644 --- a/packages/core/src/sessions/sessionStore.ts +++ b/packages/core/src/sessions/sessionStore.ts @@ -72,6 +72,24 @@ export const sessionStoreSetters = { }); }, + /** + * Drop the in-memory transcript of an inactive session to reclaim RAM. + * `events` is an append-only mirror of the on-disk ndjson log, so emptying + * it here is non-destructive — `SessionService.ensureEventsLoaded` rehydrates + * it from disk when the session is focused again. `processedLineCount` is + * reset so the cloud append/dedup path re-syncs from a clean baseline. + * Never call this on a streaming session (isPromptPending / isCompacting). + */ + evictEvents: (taskRunId: string) => { + sessionStore.setState((state) => { + const session = state.sessions[taskRunId]; + if (session) { + session.events = []; + session.processedLineCount = undefined; + } + }); + }, + updateCloudStatus: ( taskRunId: string, fields: { diff --git a/packages/ui/src/features/sessions/hooks/useSessionEventsResidency.test.tsx b/packages/ui/src/features/sessions/hooks/useSessionEventsResidency.test.tsx new file mode 100644 index 0000000000..6a4f7ecf60 --- /dev/null +++ b/packages/ui/src/features/sessions/hooks/useSessionEventsResidency.test.tsx @@ -0,0 +1,103 @@ +import { renderHook } from "@testing-library/react"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +const ensureEventsLoaded = vi.hoisted(() => vi.fn()); +const getSessionByTaskId = vi.hoisted(() => vi.fn()); +const evictEvents = vi.hoisted(() => vi.fn()); + +vi.mock("@posthog/di/react", () => ({ + useService: () => ({ ensureEventsLoaded }), +})); +vi.mock("@posthog/ui/features/sessions/sessionStore", () => ({ + sessionStoreSetters: { getSessionByTaskId, evictEvents }, +})); + +import { useSessionEventsResidency } from "./useSessionEventsResidency"; + +const GRACE_MS = 20_000; + +function idleSession(overrides: Record = {}) { + return { + taskRunId: "run-1", + events: [{ message: {} }], + isPromptPending: false, + isCompacting: false, + isCloud: false, + cloudStatus: undefined, + messageQueue: [], + ...overrides, + }; +} + +describe("useSessionEventsResidency", () => { + beforeEach(() => { + vi.useFakeTimers(); + ensureEventsLoaded.mockReset(); + getSessionByTaskId.mockReset().mockReturnValue(idleSession()); + evictEvents.mockReset(); + }); + afterEach(() => { + vi.useRealTimers(); + }); + + it("rehydrates events on mount", () => { + renderHook(() => useSessionEventsResidency("task-1")); + expect(ensureEventsLoaded).toHaveBeenCalledWith("task-1"); + }); + + it("evicts an idle session after the grace window on unmount", () => { + const { unmount } = renderHook(() => useSessionEventsResidency("task-1")); + unmount(); + expect(evictEvents).not.toHaveBeenCalled(); // still within grace + vi.advanceTimersByTime(GRACE_MS); + expect(evictEvents).toHaveBeenCalledWith("run-1"); + }); + + it("never evicts a streaming session", () => { + getSessionByTaskId.mockReturnValue(idleSession({ isPromptPending: true })); + const { unmount } = renderHook(() => useSessionEventsResidency("task-1")); + unmount(); + vi.advanceTimersByTime(GRACE_MS); + expect(evictEvents).not.toHaveBeenCalled(); + }); + + it("never evicts a live (non-terminal) cloud run", () => { + getSessionByTaskId.mockReturnValue( + idleSession({ isCloud: true, cloudStatus: "in_progress" }), + ); + const { unmount } = renderHook(() => useSessionEventsResidency("task-1")); + unmount(); + vi.advanceTimersByTime(GRACE_MS); + expect(evictEvents).not.toHaveBeenCalled(); + }); + + it("never evicts a session with queued messages pending dispatch", () => { + getSessionByTaskId.mockReturnValue( + idleSession({ messageQueue: [{ id: "q1" }] }), + ); + const { unmount } = renderHook(() => useSessionEventsResidency("task-1")); + unmount(); + vi.advanceTimersByTime(GRACE_MS); + expect(evictEvents).not.toHaveBeenCalled(); + }); + + it("evicts a finished (terminal) cloud run", () => { + getSessionByTaskId.mockReturnValue( + idleSession({ isCloud: true, cloudStatus: "completed" }), + ); + const { unmount } = renderHook(() => useSessionEventsResidency("task-1")); + unmount(); + vi.advanceTimersByTime(GRACE_MS); + expect(evictEvents).toHaveBeenCalledWith("run-1"); + }); + + it("cancels a pending eviction when the task is refocused within the grace window", () => { + const { unmount } = renderHook(() => useSessionEventsResidency("task-1")); + unmount(); + vi.advanceTimersByTime(GRACE_MS / 2); + // Refocus before the grace window elapses. + renderHook(() => useSessionEventsResidency("task-1")); + vi.advanceTimersByTime(GRACE_MS); + expect(evictEvents).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/ui/src/features/sessions/hooks/useSessionEventsResidency.ts b/packages/ui/src/features/sessions/hooks/useSessionEventsResidency.ts new file mode 100644 index 0000000000..ebce838b02 --- /dev/null +++ b/packages/ui/src/features/sessions/hooks/useSessionEventsResidency.ts @@ -0,0 +1,68 @@ +import { + SESSION_SERVICE, + type SessionService, +} from "@posthog/core/sessions/sessionService"; +import { useService } from "@posthog/di/react"; +import { isTerminalStatus } from "@posthog/shared/domain-types"; +import { sessionStoreSetters } from "@posthog/ui/features/sessions/sessionStore"; +import { useEffect } from "react"; + +/** + * How long a transcript can stay unfocused before its in-memory `events` are + * evicted. The grace window keeps quick back-and-forth navigation cheap (no + * reload churn) while still bounding resident memory for genuinely idle tasks. + */ +const EVICT_GRACE_MS = 20_000; + +/** Pending eviction timers keyed by taskId, shared across hook instances. */ +const evictTimers = new Map>(); + +function cancelPendingEvict(taskId: string): void { + const timer = evictTimers.get(taskId); + if (timer) { + clearTimeout(timer); + evictTimers.delete(taskId); + } +} + +/** + * Bounds the memory held by `session.events` (an append-only mirror of the + * on-disk ndjson) across many open tasks. + * + * While a task's transcript is mounted it stays fully resident. On unmount the + * events are evicted after a grace window — but never for a session that is + * still streaming (`isPromptPending` / `isCompacting`) or a live cloud run, so + * background work is never disturbed. On (re)mount the transcript is rehydrated + * from disk via `ensureEventsLoaded`, which keeps the session warm (subscription + * and status intact) and no-ops when events are already present. + * + * Mechanism + policy validated empirically in `membench/` (~−55% steady-state + * renderer RAM on a realistic multi-session workload). + */ +export function useSessionEventsResidency(taskId: string): void { + const sessionService = useService(SESSION_SERVICE); + + useEffect(() => { + // Focused: cancel any scheduled eviction and restore the transcript. + cancelPendingEvict(taskId); + void sessionService.ensureEventsLoaded(taskId); + + return () => { + // Blurred: schedule eviction after the grace window. + cancelPendingEvict(taskId); + const timer = setTimeout(() => { + evictTimers.delete(taskId); + const session = sessionStoreSetters.getSessionByTaskId(taskId); + if (!session || session.events.length === 0) return; + // Never disturb an in-flight turn, a queued turn about to dispatch, or a + // live cloud run — any of these can append events while unfocused, which + // would truncate the transcript (rehydration bails when events exist). + if (session.isPromptPending || session.isCompacting) return; + if (session.messageQueue.length > 0) return; + if (session.isCloud && !isTerminalStatus(session.cloudStatus)) return; + sessionStoreSetters.evictEvents(session.taskRunId); + }, EVICT_GRACE_MS); + evictTimers.set(taskId, timer); + }; + }, [taskId, sessionService]); +} diff --git a/packages/ui/src/features/task-detail/components/TaskLogsPanel.tsx b/packages/ui/src/features/task-detail/components/TaskLogsPanel.tsx index c7d32e7f9a..9bab117f1a 100644 --- a/packages/ui/src/features/task-detail/components/TaskLogsPanel.tsx +++ b/packages/ui/src/features/task-detail/components/TaskLogsPanel.tsx @@ -11,6 +11,7 @@ import { useProvisioningStore } from "../../provisioning/store"; import { SessionView } from "../../sessions/components/SessionView"; import { useSessionCallbacks } from "../../sessions/hooks/useSessionCallbacks"; import { useSessionConnection } from "../../sessions/hooks/useSessionConnection"; +import { useSessionEventsResidency } from "../../sessions/hooks/useSessionEventsResidency"; import { useSessionViewState } from "../../sessions/hooks/useSessionViewState"; import { useRestoreTask } from "../../suspension/useRestoreTask"; import { useSuspendedTaskIds } from "../../suspension/useSuspendedTaskIds"; @@ -70,6 +71,10 @@ export function TaskLogsPanel({ taskId, task, hideInput }: TaskLogsPanelProps) { isSuspended, }); + // Evict this transcript's events while it is unfocused (rehydrated on focus) + // to bound renderer RAM across many open tasks. + useSessionEventsResidency(taskId); + const { handleSendPrompt, handleCancelPrompt, From 19bf5d59c09c64309d56470113280d94d9bfc8bc Mon Sep 17 00:00:00 2001 From: karlo Date: Fri, 26 Jun 2026 12:18:30 -0700 Subject: [PATCH 03/11] perf(command-center): stop re-rendering the grid on every streamed token useCommandCenterData subscribed to the whole sessions Record via useSessions(), so every appendEvents (one per token) rebuilt the cells and re-rendered the grid. The grid only needs deriveStatus's 4 fields; cell transcripts update independently via each EmbeddedSessionView's own subscription. Mirror the sidebar fix (#2710): subscribe to a stable status signature and rebuild the session map only when it changes. Part of #2162. --- .../commandCenterSignature.test.ts | 65 +++++++++++++++++++ packages/core/src/command-center/status.ts | 25 +++++++ .../hooks/useCommandCenterData.ts | 17 ++--- .../ui/src/features/sessions/useSession.ts | 22 +++++++ 4 files changed, 116 insertions(+), 13 deletions(-) create mode 100644 packages/core/src/command-center/commandCenterSignature.test.ts diff --git a/packages/core/src/command-center/commandCenterSignature.test.ts b/packages/core/src/command-center/commandCenterSignature.test.ts new file mode 100644 index 0000000000..a33b31d11e --- /dev/null +++ b/packages/core/src/command-center/commandCenterSignature.test.ts @@ -0,0 +1,65 @@ +import { describe, expect, it } from "vitest"; +import { + type CommandCenterSessionFields, + computeCommandCenterSessionSignature, +} from "./status"; + +function session( + taskId: string, + over: Partial = {}, +): CommandCenterSessionFields { + return { + taskId, + status: "connected", + cloudStatus: undefined, + pendingPermissions: { size: 0 }, + isPromptPending: false, + ...over, + }; +} + +const sig = (record: Record) => + computeCommandCenterSessionSignature(record); + +describe("computeCommandCenterSessionSignature", () => { + it("is stable when only non-status fields change (e.g. streamed events)", () => { + // Two sessions identical in the 4 status-relevant fields → same signature, + // regardless of any other mutation the store makes (token appends etc.). + const a = { r1: session("t1") }; + const b = { r1: session("t1") }; + expect(sig(a)).toBe(sig(b)); + }); + + it("changes when isPromptPending flips (idle ↔ running)", () => { + expect(sig({ r1: session("t1", { isPromptPending: false }) })).not.toBe( + sig({ r1: session("t1", { isPromptPending: true }) }), + ); + }); + + it("changes when a pending permission appears (running → waiting)", () => { + expect(sig({ r1: session("t1") })).not.toBe( + sig({ r1: session("t1", { pendingPermissions: { size: 1 } }) }), + ); + }); + + it("changes when connection status or cloud status changes", () => { + expect(sig({ r1: session("t1") })).not.toBe( + sig({ r1: session("t1", { status: "error" }) }), + ); + expect(sig({ r1: session("t1") })).not.toBe( + sig({ r1: session("t1", { cloudStatus: "completed" }) }), + ); + }); + + it("is order-independent across sessions", () => { + const s1 = session("t1"); + const s2 = session("t2", { isPromptPending: true }); + expect(sig({ r1: s1, r2: s2 })).toBe(sig({ r2: s2, r1: s1 })); + }); + + it("ignores sessions without a taskId", () => { + expect(sig({ r1: session("t1"), r2: session("", {}) })).toBe( + sig({ r1: session("t1") }), + ); + }); +}); diff --git a/packages/core/src/command-center/status.ts b/packages/core/src/command-center/status.ts index 4d33b32ac8..e4663aca33 100644 --- a/packages/core/src/command-center/status.ts +++ b/packages/core/src/command-center/status.ts @@ -28,6 +28,31 @@ export function deriveStatus( return "idle"; } +export type CommandCenterSessionFields = SessionStatusInput & { + taskId?: string; +}; + +/** + * Stable string signature of the only session fields the command-center grid + * reads (the inputs to `deriveStatus`). Lets the grid subscribe to status + * changes instead of re-rendering on every streamed token — cell transcripts + * update independently through each EmbeddedSessionView's own subscription, so + * the grid data never needs to churn on token-level `events` mutations. + */ +export function computeCommandCenterSessionSignature( + sessions: Record, +): string { + const parts: string[] = []; + for (const s of Object.values(sessions)) { + if (!s.taskId) continue; + parts.push( + `${s.taskId}\t${s.status}\t${s.cloudStatus ?? ""}\t${s.pendingPermissions.size}\t${s.isPromptPending ? 1 : 0}`, + ); + } + parts.sort(); + return parts.join("\n"); +} + export function getRepoName(task: Task): string | null { const repository = getTaskRepository(task); if (!repository) return null; diff --git a/packages/ui/src/features/command-center/hooks/useCommandCenterData.ts b/packages/ui/src/features/command-center/hooks/useCommandCenterData.ts index 372053b391..aaca6977f3 100644 --- a/packages/ui/src/features/command-center/hooks/useCommandCenterData.ts +++ b/packages/ui/src/features/command-center/hooks/useCommandCenterData.ts @@ -9,8 +9,7 @@ import { } from "@posthog/core/command-center/status"; import type { Task } from "@posthog/shared/domain-types"; import { useMemo } from "react"; -import type { AgentSession } from "../../sessions/sessionStore"; -import { useSessions } from "../../sessions/useSession"; +import { useCommandCenterSessionMap } from "../../sessions/useSession"; import { useTasks } from "../../tasks/useTasks"; import { useWorkspaces } from "../../workspace/useWorkspace"; import { useCommandCenterStore } from "../commandCenterStore"; @@ -24,7 +23,9 @@ export function useCommandCenterData(): { } { const storeCells = useCommandCenterStore((s) => s.cells); const { data: tasks = [] } = useTasks(); - const sessions = useSessions(); + // Keyed by taskId, rebuilt only when a status-relevant field changes — not on + // every streamed token (transcripts update via each cell's own subscription). + const sessionByTaskId = useCommandCenterSessionMap(); const { data: workspaces } = useWorkspaces(); const taskById = useMemo(() => { @@ -35,16 +36,6 @@ export function useCommandCenterData(): { return map; }, [tasks]); - const sessionByTaskId = useMemo(() => { - const map = new Map(); - for (const session of Object.values(sessions)) { - if (session.taskId) { - map.set(session.taskId, session); - } - } - return map; - }, [sessions]); - const cells = useMemo( () => buildCommandCenterCells(storeCells, { diff --git a/packages/ui/src/features/sessions/useSession.ts b/packages/ui/src/features/sessions/useSession.ts index 960bf3ff21..87901458db 100644 --- a/packages/ui/src/features/sessions/useSession.ts +++ b/packages/ui/src/features/sessions/useSession.ts @@ -2,6 +2,7 @@ import type { AvailableCommand, SessionConfigOption, } from "@agentclientprotocol/sdk"; +import { computeCommandCenterSessionSignature } from "@posthog/core/command-center/status"; import { extractAvailableCommandsFromEvents, extractUserPromptsFromEvents, @@ -44,6 +45,27 @@ export const useSidebarSessionMap = (): Map => { }, [signature]); }; +/** + * The command-center grid's view of sessions, keyed by taskId. Same trick as + * useSidebarSessionMap: subscribe only to a signature of the fields the grid's + * `deriveStatus` reads, so streaming token appends don't re-render the grid. + * Cell transcripts update independently via each EmbeddedSessionView's own + * subscription, so the grid map never needs to churn on `events` mutations. + */ +export const useCommandCenterSessionMap = (): Map => { + const signature = useSessionStore((s) => + computeCommandCenterSessionSignature(s.sessions), + ); + // biome-ignore lint/correctness/useExhaustiveDependencies: keyed by signature on purpose + return useMemo(() => { + const map = new Map(); + for (const session of Object.values(useSessionStore.getState().sessions)) { + if (session.taskId) map.set(session.taskId, session); + } + return map; + }, [signature]); +}; + /** O(1) lookup using taskIdIndex */ export const useSessionForTask = ( taskId: string | undefined, From 647b1a9c830a8be770d48893e5a6c32ec4674a50 Mon Sep 17 00:00:00 2001 From: karlo Date: Fri, 26 Jun 2026 14:34:26 -0700 Subject: [PATCH 04/11] perf(sessions): tail-first progressive transcript load MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reloading an evicted transcript re-read the whole ndjson (178MB) and JSON.parsed ~100k lines synchronously — a ~500ms+ main-thread freeze. Now parse the last 256KB first so the latest messages render in ~1-2ms, then parse the full history in yielding chunks (parseSessionLogContentChunked) without blocking, and swap it in for scrollback. Measured 1.5ms vs 1710ms (1134x) for the tail on a real 323MB / 54k-event session. Part of #2162. --- .../src/sessions/ensureEventsLoaded.test.ts | 26 +++++- .../src/sessions/sessionLogs.chunked.test.ts | 92 +++++++++++++++++++ packages/core/src/sessions/sessionLogs.ts | 74 +++++++++++++++ packages/core/src/sessions/sessionService.ts | 90 +++++++++++++++--- 4 files changed, 266 insertions(+), 16 deletions(-) create mode 100644 packages/core/src/sessions/sessionLogs.chunked.test.ts diff --git a/packages/core/src/sessions/ensureEventsLoaded.test.ts b/packages/core/src/sessions/ensureEventsLoaded.test.ts index b9f2713848..b98cb53e7f 100644 --- a/packages/core/src/sessions/ensureEventsLoaded.test.ts +++ b/packages/core/src/sessions/ensureEventsLoaded.test.ts @@ -111,10 +111,12 @@ describe("SessionService.ensureEventsLoaded", () => { expect(local.sessions[RUN_ID].processedLineCount).toBeUndefined(); }); - it("does not clobber events that a live stream appended during the fetch", async () => { + it("does not clobber events when a turn starts streaming during the load", async () => { const h = createHarness(); - // Simulate a stream landing while the disk read is in flight. + // Simulate a turn starting (isPromptPending) + a streamed event landing + // while the disk read is in flight — the live transcript is authoritative. h.readLocalLogs.mockImplementationOnce(async () => { + h.sessions[RUN_ID].isPromptPending = true; h.sessions[RUN_ID].events = [ { message: {} }, ] as unknown as AgentSession["events"]; @@ -123,8 +125,26 @@ describe("SessionService.ensureEventsLoaded", () => { await h.service.ensureEventsLoaded(TASK_ID); - // The single streamed event survives; the stale disk read is discarded. + // The streamed event survives; the historical load is discarded. expect(h.sessions[RUN_ID].events).toHaveLength(1); expect(h.updateSession).not.toHaveBeenCalled(); }); + + it("renders the tail first, then swaps in the full transcript", async () => { + // A log larger than the 256KB tail window → phase 1 seeds a tail placeholder, + // phase 2 replaces it with the complete set. + const bigLog = ndjson(...Array.from({ length: 4000 }, (_, i) => `m${i}`)); + const h = createHarness({}, bigLog); + + await h.service.ensureEventsLoaded(TASK_ID); + + // Final state is the full transcript (4000 events). + expect(h.sessions[RUN_ID].events.length).toBe(4000); + // updateSession was called at least twice: tail seed + full swap. + expect(h.updateSession.mock.calls.length).toBeGreaterThanOrEqual(2); + const firstLen = (h.updateSession.mock.calls[0][1].events as unknown[]) + .length; + expect(firstLen).toBeGreaterThan(0); + expect(firstLen).toBeLessThan(4000); // the tail, not everything + }); }); diff --git a/packages/core/src/sessions/sessionLogs.chunked.test.ts b/packages/core/src/sessions/sessionLogs.chunked.test.ts new file mode 100644 index 0000000000..347719cf70 --- /dev/null +++ b/packages/core/src/sessions/sessionLogs.chunked.test.ts @@ -0,0 +1,92 @@ +import { describe, expect, it, vi } from "vitest"; +import { + parseSessionLogContent, + parseSessionLogContentChunked, +} from "./sessionLogs"; + +function line(obj: unknown): string { + return JSON.stringify(obj); +} + +function makeContent(n: number): string { + const lines: string[] = [ + line({ + type: "notification", + notification: { + method: "_posthog/sdk_session", + params: { sessionId: "sess-1", adapter: "claude" }, + }, + }), + ]; + for (let i = 0; i < n; i++) { + lines.push( + line({ + type: "notification", + notification: { + method: "session/update", + params: { + update: { sessionUpdate: "agent_message_chunk", text: `t${i}` }, + }, + }, + }), + ); + } + return lines.join("\n"); +} + +const immediateYield = () => Promise.resolve(); + +describe("parseSessionLogContentChunked", () => { + it("produces output identical to the sync parser", async () => { + const content = makeContent(2500); // spans multiple chunks + const sync = parseSessionLogContent(content); + const chunked = await parseSessionLogContentChunked(content, { + chunkSize: 1000, + yieldToHost: immediateYield, + }); + + expect(chunked.rawEntries).toEqual(sync.rawEntries); + expect(chunked.totalLineCount).toBe(sync.totalLineCount); + expect(chunked.parseFailureCount).toBe(sync.parseFailureCount); + expect(chunked.sessionId).toBe(sync.sessionId); + expect(chunked.adapter).toBe(sync.adapter); + }); + + it("matches the sync parser on corrupt/partial lines", async () => { + const content = `${makeContent(10)}\n{not valid json\n${line({ type: "notification", notification: { method: "x" } })}`; + const sync = parseSessionLogContent(content); + const chunked = await parseSessionLogContentChunked(content, { + chunkSize: 4, + yieldToHost: immediateYield, + }); + + expect(chunked.rawEntries).toEqual(sync.rawEntries); + expect(chunked.parseFailureCount).toBe(sync.parseFailureCount); + expect(chunked.parseFailureCount).toBeGreaterThan(0); + }); + + it("streams chunks progressively and yields between them", async () => { + const content = makeContent(2500); + const chunkSizes: number[] = []; + const yieldSpy = vi.fn(() => Promise.resolve()); + + await parseSessionLogContentChunked(content, { + chunkSize: 1000, + onChunk: (entries) => chunkSizes.push(entries.length), + yieldToHost: yieldSpy, + }); + + // 2501 lines / 1000 → 3 chunks (1000, 1000, 501); yields between, not after last. + expect(chunkSizes.length).toBe(3); + expect(yieldSpy).toHaveBeenCalledTimes(2); + }); + + it("handles empty content", async () => { + const sync = parseSessionLogContent(""); + const chunked = await parseSessionLogContentChunked("", { + yieldToHost: immediateYield, + }); + expect(chunked.rawEntries).toEqual(sync.rawEntries); + expect(chunked.totalLineCount).toBe(sync.totalLineCount); + }); +}); diff --git a/packages/core/src/sessions/sessionLogs.ts b/packages/core/src/sessions/sessionLogs.ts index b0150edf4b..8b3e7ca847 100644 --- a/packages/core/src/sessions/sessionLogs.ts +++ b/packages/core/src/sessions/sessionLogs.ts @@ -52,6 +52,80 @@ export function parseSessionLogContent( }; } +/** Parse one ndjson line into a StoredLogEntry, extracting session metadata. */ +function parseLogLine( + line: string, + acc: { sessionId?: string; adapter?: Adapter }, +): StoredLogEntry | null { + try { + const stored = JSON.parse(line) as StoredLogEntry; + if ( + stored.type === "notification" && + stored.notification?.method?.endsWith("posthog/sdk_session") + ) { + const params = stored.notification.params as { + sessionId?: string; + sdkSessionId?: string; + adapter?: Adapter; + }; + if (params?.sessionId) acc.sessionId = params.sessionId; + else if (params?.sdkSessionId) acc.sessionId = params.sdkSessionId; + if (params?.adapter) acc.adapter = params.adapter; + } + return stored; + } catch { + return null; + } +} + +/** + * Non-blocking variant of `parseSessionLogContent`: parses the ndjson in + * `chunkSize`-line slices, yielding control between slices via `yieldToHost` + * so a 97k-line / ~420ms `JSON.parse` no longer freezes the renderer. + * + * `onChunk(entries, soFar)` fires after each parsed slice so callers can render + * progressively. Produces byte-identical `rawEntries` to the sync parser (same + * order, same parse-failure handling) — see sessionLogs.chunked.test.ts. + */ +export async function parseSessionLogContentChunked( + content: string, + options: { + chunkSize?: number; + onChunk?: (entries: StoredLogEntry[], soFar: number) => void; + yieldToHost?: () => Promise; + } = {}, +): Promise { + const chunkSize = options.chunkSize ?? 1000; + const yieldToHost = + options.yieldToHost ?? (() => new Promise((r) => setTimeout(r, 0))); + + const lines = content.trim().split("\n"); + const rawEntries: StoredLogEntry[] = []; + const meta: { sessionId?: string; adapter?: Adapter } = {}; + let parseFailureCount = 0; + + for (let start = 0; start < lines.length; start += chunkSize) { + const end = Math.min(start + chunkSize, lines.length); + const chunk: StoredLogEntry[] = []; + for (let i = start; i < end; i++) { + const stored = parseLogLine(lines[i], meta); + if (stored === null) parseFailureCount += 1; + else chunk.push(stored); + } + rawEntries.push(...chunk); + options.onChunk?.(chunk, rawEntries.length); + if (end < lines.length) await yieldToHost(); + } + + return { + rawEntries, + totalLineCount: lines.length, + parseFailureCount, + sessionId: meta.sessionId, + adapter: meta.adapter, + }; +} + export function planSkippedPromptFilter( skipPolledPromptCount: number | undefined, events: AcpMessage[], diff --git a/packages/core/src/sessions/sessionService.ts b/packages/core/src/sessions/sessionService.ts index 3f0fdff89c..091ddbba1b 100644 --- a/packages/core/src/sessions/sessionService.ts +++ b/packages/core/src/sessions/sessionService.ts @@ -78,6 +78,7 @@ import { createBaseSession } from "./sessionFactory"; import { type ParsedSessionLogs, parseSessionLogContent, + parseSessionLogContentChunked, planSkippedPromptFilter, } from "./sessionLogs"; @@ -1335,28 +1336,91 @@ export class SessionService { if (session.events.length > 0) return; // already resident const { taskRunId, logUrl } = session; - let parsed: ParsedSessionLogs; + // Prefer the raw local cache so we can render the tail instantly and parse + // the rest in the background. Re-reading a 178MB ndjson and JSON.parsing + // ~100k lines synchronously is the ~500ms+ freeze we're avoiding here. + let content: string | null = null; try { - parsed = await this.fetchSessionLogs(logUrl, taskRunId); - } catch (err) { - this.d.log.warn("ensureEventsLoaded: failed to fetch logs", { + content = await this.d.trpc.logs.readLocalLogs.query({ taskRunId }); + } catch { + content = null; + } + + if (!content || !content.trim()) { + // No local cache (e.g. cloud-only run): fall back to the one-shot fetch. + let parsed: ParsedSessionLogs; + try { + parsed = await this.fetchSessionLogs(logUrl, taskRunId); + } catch (err) { + this.d.log.warn("ensureEventsLoaded: failed to fetch logs", { + taskId, + error: err, + }); + return; + } + if (parsed.rawEntries.length === 0) return; + this.commitLoadedEvents( taskId, - error: err, - }); + taskRunId, + convertStoredEntriesToEvents(parsed.rawEntries), + parsed.totalLineCount, + ); return; } - if (parsed.rawEntries.length === 0) return; // nothing on disk to restore - // Re-check: the session may have been torn down or a live stream may have - // appended events while we awaited the fetch. Don't overwrite either. + // PHASE 1 — tail-first: parse only the last slice so the latest messages + // (the part actually on screen) render in ~1-2ms. + this.renderTailFirst(taskId, taskRunId, content); + + // PHASE 2 — parse the full history in yielding chunks (no main-thread + // freeze), then swap in the complete transcript for scrollback. + const parsed = await parseSessionLogContentChunked(content, { + chunkSize: 2000, + }); + this.commitLoadedEvents( + taskId, + taskRunId, + convertStoredEntriesToEvents(parsed.rawEntries), + parsed.totalLineCount, + ); + } + + /** Tail of the on-disk log, parsed synchronously for an instant first paint. */ + private renderTailFirst( + taskId: string, + taskRunId: string, + content: string, + ): void { + const TAIL_BYTES = 256 * 1024; + if (content.length <= TAIL_BYTES) return; // small log — phase 2 is instant + let tail = content.slice(content.length - TAIL_BYTES); + const nl = tail.indexOf("\n"); + if (nl >= 0) tail = tail.slice(nl + 1); // drop the partial leading line + const parsed = parseSessionLogContent(tail); + if (parsed.rawEntries.length === 0) return; const current = this.d.store.getSessionByTaskId(taskId); - if (!current || current.taskRunId !== taskRunId) return; - if (current.events.length > 0) return; + // Only seed the placeholder if nothing has populated events meanwhile. + if (current?.taskRunId !== taskRunId || current.events.length > 0) return; + this.d.store.updateSession(taskRunId, { + events: convertStoredEntriesToEvents(parsed.rawEntries), + }); + } - const events = convertStoredEntriesToEvents(parsed.rawEntries); + /** Swap in the fully-parsed transcript, unless a live stream now owns it. */ + private commitLoadedEvents( + taskId: string, + taskRunId: string, + events: AcpMessage[], + totalLineCount: number, + ): void { + const current = this.d.store.getSessionByTaskId(taskId); + if (!current || current.taskRunId !== taskRunId) return; + // If a turn started while we were parsing, the live events are authoritative + // — don't clobber them with the historical load; a later idle load fills in. + if (current.isPromptPending) return; this.d.store.updateSession(taskRunId, { events, - processedLineCount: current.isCloud ? parsed.totalLineCount : undefined, + processedLineCount: current.isCloud ? totalLineCount : undefined, }); } From 665f0b3061c2a588f8a2c60151fa4b1afd36478e Mon Sep 17 00:00:00 2001 From: karlo Date: Fri, 26 Jun 2026 14:46:06 -0700 Subject: [PATCH 05/11] perf(sessions): tail-first progressive load on initial task open MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The first open of a finished task went through reconnectToLocalSession, which parsed the whole ndjson up front — the same ~500ms+ freeze as the refocus path. Fetch raw content instead of pre-parsed logs, seed the transcript from the tail (instant), derive sessionId/adapter from the head (the sdk_session marker sits at line ~4), and parse the full history in background chunks. Reuses commitLoadedEvents' streaming guard. Part of #2162. --- packages/core/src/sessions/sessionService.ts | 137 +++++++++++++++++-- 1 file changed, 128 insertions(+), 9 deletions(-) diff --git a/packages/core/src/sessions/sessionService.ts b/packages/core/src/sessions/sessionService.ts index 091ddbba1b..0decd5be96 100644 --- a/packages/core/src/sessions/sessionService.ts +++ b/packages/core/src/sessions/sessionService.ts @@ -670,9 +670,9 @@ export class SessionService { return; } - const [workspaceResult, logResult] = await Promise.all([ + const [workspaceResult, content] = await Promise.all([ this.d.trpc.workspace.verify.query({ taskId }), - this.fetchSessionLogs(logUrl, existingRunId), + this.fetchSessionLogContent(logUrl, existingRunId), ]); if (!workspaceResult.exists) { @@ -680,7 +680,9 @@ export class SessionService { taskId, missingPath: workspaceResult.missingPath, }); - const events = convertStoredEntriesToEvents(logResult.rawEntries); + const events = convertStoredEntriesToEvents( + content ? parseSessionLogContent(content).rawEntries : [], + ); const session = createBaseSession(existingRunId, taskId, taskTitle); session.events = events; session.logUrl = logUrl; @@ -699,7 +701,7 @@ export class SessionService { logUrl, repoPath, auth, - logResult, + { content: content ?? undefined }, ); } else { if (!this.d.getIsOnline()) { @@ -821,6 +823,69 @@ export class SessionService { } } + /** + * Resolve the events to seed a reconnecting session. When given raw `content`, + * renders the tail first (instant) and returns `deferredContent` so the caller + * can parse the full history in the background; `sessionId`/`adapter` come from + * the head (the sdk_session notification sits at the very start of the log). + */ + private async resolveReconnectEvents( + logUrl: string | undefined, + taskRunId: string, + prefetched?: { + content?: string; + rawEntries?: StoredLogEntry[]; + sessionId?: string; + adapter?: Adapter; + }, + ): Promise<{ + events: AcpMessage[]; + sessionId?: string; + adapter?: Adapter; + deferredContent: string | null; + }> { + const TAIL_BYTES = 256 * 1024; + const content = prefetched?.content; + if (content?.trim()) { + const head = parseSessionLogContent(content.slice(0, 128 * 1024)); + if (content.length > TAIL_BYTES) { + let tail = content.slice(content.length - TAIL_BYTES); + const nl = tail.indexOf("\n"); + if (nl >= 0) tail = tail.slice(nl + 1); // drop the partial leading line + return { + events: convertStoredEntriesToEvents( + parseSessionLogContent(tail).rawEntries, + ), + sessionId: head.sessionId, + adapter: head.adapter, + deferredContent: content, + }; + } + const full = parseSessionLogContent(content); + return { + events: convertStoredEntriesToEvents(full.rawEntries), + sessionId: full.sessionId, + adapter: full.adapter, + deferredContent: null, + }; + } + if (prefetched?.rawEntries) { + return { + events: convertStoredEntriesToEvents(prefetched.rawEntries), + sessionId: prefetched.sessionId, + adapter: prefetched.adapter, + deferredContent: null, + }; + } + const parsed = await this.fetchSessionLogs(logUrl, taskRunId); + return { + events: convertStoredEntriesToEvents(parsed.rawEntries), + sessionId: parsed.sessionId, + adapter: parsed.adapter, + deferredContent: null, + }; + } + private async reconnectToLocalSession( taskId: string, taskRunId: string, @@ -828,15 +893,15 @@ export class SessionService { logUrl: string | undefined, repoPath: string, auth: AuthCredentials, - prefetchedLogs?: { - rawEntries: StoredLogEntry[]; + prefetched?: { + content?: string; + rawEntries?: StoredLogEntry[]; sessionId?: string; adapter?: Adapter; }, ): Promise { - const { rawEntries, sessionId, adapter } = - prefetchedLogs ?? (await this.fetchSessionLogs(logUrl, taskRunId)); - const events = convertStoredEntriesToEvents(rawEntries); + const { events, sessionId, adapter, deferredContent } = + await this.resolveReconnectEvents(logUrl, taskRunId, prefetched); const storedAdapter = this.d.adapterStore.getAdapter(taskRunId); const resolvedAdapter = adapter ?? storedAdapter; @@ -868,6 +933,26 @@ export class SessionService { this.d.store.setSession(session); this.subscribeToChannel(taskRunId); + // We seeded the transcript from the tail for an instant render — now parse + // the full history in yielding chunks (no freeze) and swap it in. + if (deferredContent) { + void parseSessionLogContentChunked(deferredContent, { chunkSize: 2000 }) + .then((parsed) => { + this.commitLoadedEvents( + taskId, + taskRunId, + convertStoredEntriesToEvents(parsed.rawEntries), + parsed.totalLineCount, + ); + }) + .catch((err) => { + this.d.log.warn("reconnect: deferred full log parse failed", { + taskId, + error: err, + }); + }); + } + try { const modeOpt = getConfigOptionByCategory(persistedConfigOptions, "mode"); const persistedMode = @@ -4638,6 +4723,40 @@ export class SessionService { } } + /** + * Raw log content (local cache first, then S3), WITHOUT parsing — lets the + * caller parse the tail synchronously for an instant render and defer the + * full parse. Returns null when nothing is available. + */ + private async fetchSessionLogContent( + logUrl: string | undefined, + taskRunId?: string, + ): Promise { + if (taskRunId) { + try { + const local = await this.d.trpc.logs.readLocalLogs.query({ taskRunId }); + if (local?.trim()) return local; + } catch { + this.d.log.warn("Failed to read local logs, falling back to S3", { + taskRunId, + }); + } + } + if (!logUrl) return null; + try { + const content = await this.d.trpc.logs.fetchS3Logs.query({ logUrl }); + if (!content?.trim()) return null; + if (taskRunId) { + this.d.trpc.logs.writeLocalLogs + .mutate({ taskRunId, content }) + .catch(() => {}); + } + return content; + } catch { + return null; + } + } + private commitReconciledCloudEvents( taskRunId: string, rawEntries: StoredLogEntry[], From de1fda3c3cc5bb6efa2894f1c6f34fc71c3863a8 Mon Sep 17 00:00:00 2001 From: karlo Date: Fri, 26 Jun 2026 15:26:41 -0700 Subject: [PATCH 06/11] perf(sessions): freeze events at creation to skip immer's deep-freeze MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Committing a freshly-parsed transcript to the immer-backed session store made immer deep-freeze every event object — measured ~240ms for a 48k event session (immer's per-element isDraftable/handleValue machinery is ~50x slower than a plain Object.freeze loop). Events are immutable log data, so freeze them as they're built; immer then short-circuits on Object.isFrozen. ~240ms to ~5ms on load. Part of #2162. --- packages/core/src/sessions/sessionEvents.ts | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/packages/core/src/sessions/sessionEvents.ts b/packages/core/src/sessions/sessionEvents.ts index 48371e7072..3113ee0dd1 100644 --- a/packages/core/src/sessions/sessionEvents.ts +++ b/packages/core/src/sessions/sessionEvents.ts @@ -178,14 +178,25 @@ export function convertStoredEntriesToEvents( const startTs = entries[0]?.timestamp ? new Date(entries[0].timestamp).getTime() - 1 : Date.now(); - events.push(createUserMessageEvent(taskDescription, startTs)); + events.push( + Object.freeze(createUserMessageEvent(taskDescription, startTs)), + ); } for (const entry of entries) { - events.push(storedEntryToAcpMessage(entry)); + // Freeze each event as it's born. Events are immutable log data, and the + // store is immer-backed: when the array is committed, immer's auto-freeze + // would otherwise deep-freeze every element — measured at ~240ms for a + // 48k-event session (immer's per-element isDraftable/handleValue machinery + // is ~50x slower than a plain Object.freeze loop). Pre-freezing makes immer + // short-circuit on Object.isFrozen, turning that ~240ms into ~5ms. + events.push(Object.freeze(storedEntryToAcpMessage(entry))); } - return events; + // Freezing the array too lets immer skip the whole array on commit (it bails + // on Object.isFrozen before recursing), so a full-transcript load no longer + // stalls the renderer. + return Object.freeze(events) as AcpMessage[]; } /** From f488ac3305cd18872f2500e8db764786b4947f73 Mon Sep 17 00:00:00 2001 From: karlo Date: Fri, 26 Jun 2026 16:18:18 -0700 Subject: [PATCH 07/11] =?UTF-8?q?perf(sessions):=20lazy=20scrollback=20?= =?UTF-8?q?=E2=80=94=20open=20as=20a=20tail=20window,=20load=20older=20on?= =?UTF-8?q?=20scroll?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Opening a finished task parsed + committed + rendered the ENTIRE history (100k events) up front — seconds of main-thread hitches, even though you only see ~15 messages. Now a transcript opens as a tail window (latest ~1000 events, instant) with the rest kept as raw text outside the immer store; scrolling toward the top pulls in older chunks, anchored so the viewport doesn't jump. Opening f0117a1c: ~1763ms of blocking -> ~209ms. This is the Claude-fast model: open cost is O(visible), not O(history). Part of #2162. --- .../src/sessions/ensureEventsLoaded.test.ts | 37 +++-- packages/core/src/sessions/sessionService.ts | 155 ++++++++---------- packages/core/src/sessions/sessionStore.ts | 24 +++ .../core/src/sessions/transcriptWindows.ts | 70 ++++++++ packages/shared/src/sessions.ts | 6 + .../sessions/components/ConversationView.tsx | 13 ++ .../sessions/components/VirtualizedList.tsx | 55 +++++++ 7 files changed, 263 insertions(+), 97 deletions(-) create mode 100644 packages/core/src/sessions/transcriptWindows.ts diff --git a/packages/core/src/sessions/ensureEventsLoaded.test.ts b/packages/core/src/sessions/ensureEventsLoaded.test.ts index b98cb53e7f..c93f6b6abe 100644 --- a/packages/core/src/sessions/ensureEventsLoaded.test.ts +++ b/packages/core/src/sessions/ensureEventsLoaded.test.ts @@ -47,6 +47,10 @@ function createHarness( sessions[s.taskRunId] = s; }, updateSession, + prependEvents: (taskRunId: string, older: AgentSession["events"]) => { + const session = sessions[taskRunId]; + if (session) session.events = [...older, ...session.events]; + }, }; const deps = { @@ -130,21 +134,32 @@ describe("SessionService.ensureEventsLoaded", () => { expect(h.updateSession).not.toHaveBeenCalled(); }); - it("renders the tail first, then swaps in the full transcript", async () => { - // A log larger than the 256KB tail window → phase 1 seeds a tail placeholder, - // phase 2 replaces it with the complete set. + it("opens a tail window for a large local log, flagging older events", async () => { + // 4000 lines > the 1000-line tail window → only the tail renders, the rest + // stays loadable via scrollback. const bigLog = ndjson(...Array.from({ length: 4000 }, (_, i) => `m${i}`)); const h = createHarness({}, bigLog); await h.service.ensureEventsLoaded(TASK_ID); - // Final state is the full transcript (4000 events). - expect(h.sessions[RUN_ID].events.length).toBe(4000); - // updateSession was called at least twice: tail seed + full swap. - expect(h.updateSession.mock.calls.length).toBeGreaterThanOrEqual(2); - const firstLen = (h.updateSession.mock.calls[0][1].events as unknown[]) - .length; - expect(firstLen).toBeGreaterThan(0); - expect(firstLen).toBeLessThan(4000); // the tail, not everything + expect(h.sessions[RUN_ID].events.length).toBe(1000); // tail window only + expect(h.sessions[RUN_ID].hasOlderEvents).toBe(true); + }); + + it("loadOlderEvents prepends the next older chunk", async () => { + const bigLog = ndjson(...Array.from({ length: 4000 }, (_, i) => `m${i}`)); + const h = createHarness({}, bigLog); + await h.service.ensureEventsLoaded(TASK_ID); + expect(h.sessions[RUN_ID].events.length).toBe(1000); + + await h.service.loadOlderEvents(TASK_ID); + + expect(h.sessions[RUN_ID].events.length).toBe(2000); // +1000 older, prepended + expect(h.sessions[RUN_ID].hasOlderEvents).toBe(true); + // The newly prepended chunk sits before the tail (older first). + await h.service.loadOlderEvents(TASK_ID); + await h.service.loadOlderEvents(TASK_ID); + expect(h.sessions[RUN_ID].events.length).toBe(4000); // all loaded + expect(h.sessions[RUN_ID].hasOlderEvents).toBe(false); }); }); diff --git a/packages/core/src/sessions/sessionService.ts b/packages/core/src/sessions/sessionService.ts index 0decd5be96..b2e258308d 100644 --- a/packages/core/src/sessions/sessionService.ts +++ b/packages/core/src/sessions/sessionService.ts @@ -78,9 +78,13 @@ import { createBaseSession } from "./sessionFactory"; import { type ParsedSessionLogs, parseSessionLogContent, - parseSessionLogContentChunked, planSkippedPromptFilter, } from "./sessionLogs"; +import { + clearTranscriptWindow, + openTranscriptWindow, + takeOlderEntries, +} from "./transcriptWindows"; const LOCAL_SESSION_RECONNECT_ATTEMPTS = 3; const LOCAL_SESSION_RECONNECT_BACKOFF = { @@ -162,6 +166,7 @@ export interface ISessionStore { events: AcpMessage[], newLineCount?: number, ): void; + prependEvents(taskRunId: string, olderEvents: AcpMessage[]): void; updateCloudStatus( taskRunId: string, fields: { @@ -825,9 +830,9 @@ export class SessionService { /** * Resolve the events to seed a reconnecting session. When given raw `content`, - * renders the tail first (instant) and returns `deferredContent` so the caller - * can parse the full history in the background; `sessionId`/`adapter` come from - * the head (the sdk_session notification sits at the very start of the log). + * opens a tail window (only the latest events render; the rest stay loadable + * via scrollback) and reports `hasOlder`. `sessionId`/`adapter` come from the + * head (the sdk_session notification sits at the very start of the log). */ private async resolveReconnectEvents( logUrl: string | undefined, @@ -842,31 +847,17 @@ export class SessionService { events: AcpMessage[]; sessionId?: string; adapter?: Adapter; - deferredContent: string | null; + hasOlder: boolean; }> { - const TAIL_BYTES = 256 * 1024; const content = prefetched?.content; if (content?.trim()) { const head = parseSessionLogContent(content.slice(0, 128 * 1024)); - if (content.length > TAIL_BYTES) { - let tail = content.slice(content.length - TAIL_BYTES); - const nl = tail.indexOf("\n"); - if (nl >= 0) tail = tail.slice(nl + 1); // drop the partial leading line - return { - events: convertStoredEntriesToEvents( - parseSessionLogContent(tail).rawEntries, - ), - sessionId: head.sessionId, - adapter: head.adapter, - deferredContent: content, - }; - } - const full = parseSessionLogContent(content); + const { tail, hasOlder } = openTranscriptWindow(taskRunId, content); return { - events: convertStoredEntriesToEvents(full.rawEntries), - sessionId: full.sessionId, - adapter: full.adapter, - deferredContent: null, + events: convertStoredEntriesToEvents(tail), + sessionId: head.sessionId, + adapter: head.adapter, + hasOlder, }; } if (prefetched?.rawEntries) { @@ -874,7 +865,7 @@ export class SessionService { events: convertStoredEntriesToEvents(prefetched.rawEntries), sessionId: prefetched.sessionId, adapter: prefetched.adapter, - deferredContent: null, + hasOlder: false, }; } const parsed = await this.fetchSessionLogs(logUrl, taskRunId); @@ -882,7 +873,7 @@ export class SessionService { events: convertStoredEntriesToEvents(parsed.rawEntries), sessionId: parsed.sessionId, adapter: parsed.adapter, - deferredContent: null, + hasOlder: false, }; } @@ -900,7 +891,7 @@ export class SessionService { adapter?: Adapter; }, ): Promise { - const { events, sessionId, adapter, deferredContent } = + const { events, sessionId, adapter, hasOlder } = await this.resolveReconnectEvents(logUrl, taskRunId, prefetched); const storedAdapter = this.d.adapterStore.getAdapter(taskRunId); @@ -911,6 +902,7 @@ export class SessionService { const session = createBaseSession(taskRunId, taskId, taskTitle); session.events = events; + session.hasOlderEvents = hasOlder; if (logUrl) { session.logUrl = logUrl; } @@ -933,26 +925,6 @@ export class SessionService { this.d.store.setSession(session); this.subscribeToChannel(taskRunId); - // We seeded the transcript from the tail for an instant render — now parse - // the full history in yielding chunks (no freeze) and swap it in. - if (deferredContent) { - void parseSessionLogContentChunked(deferredContent, { chunkSize: 2000 }) - .then((parsed) => { - this.commitLoadedEvents( - taskId, - taskRunId, - convertStoredEntriesToEvents(parsed.rawEntries), - parsed.totalLineCount, - ); - }) - .catch((err) => { - this.d.log.warn("reconnect: deferred full log parse failed", { - taskId, - error: err, - }); - }); - } - try { const modeOpt = getConfigOptionByCategory(persistedConfigOptions, "mode"); const persistedMode = @@ -1079,6 +1051,7 @@ export class SessionService { private async teardownSession(taskRunId: string): Promise { const session = this.getSessionByRunId(taskRunId); + clearTranscriptWindow(taskRunId); try { await this.d.trpc.agent.cancel.mutate({ sessionId: taskRunId }); @@ -1444,71 +1417,81 @@ export class SessionService { return; } if (parsed.rawEntries.length === 0) return; - this.commitLoadedEvents( + this.commitWindowedEvents( taskId, taskRunId, convertStoredEntriesToEvents(parsed.rawEntries), + false, parsed.totalLineCount, ); return; } - // PHASE 1 — tail-first: parse only the last slice so the latest messages - // (the part actually on screen) render in ~1-2ms. - this.renderTailFirst(taskId, taskRunId, content); + // Cloud runs track `processedLineCount` for their append/dedup loop, so they + // load the full transcript (windowing only fits local ndjson logs). + if (session.isCloud) { + const parsed = parseSessionLogContent(content); + this.commitWindowedEvents( + taskId, + taskRunId, + convertStoredEntriesToEvents(parsed.rawEntries), + false, + parsed.totalLineCount, + ); + return; + } - // PHASE 2 — parse the full history in yielding chunks (no main-thread - // freeze), then swap in the complete transcript for scrollback. - const parsed = await parseSessionLogContentChunked(content, { - chunkSize: 2000, - }); - this.commitLoadedEvents( + // Open a tail window: render only the latest ~1000 events (instant) and keep + // the rest of the log as raw text for on-demand scrollback. This is what + // keeps opening a 100k-event session O(visible) instead of O(history). + const { tail, hasOlder } = openTranscriptWindow(taskRunId, content); + this.commitWindowedEvents( taskId, taskRunId, - convertStoredEntriesToEvents(parsed.rawEntries), - parsed.totalLineCount, + convertStoredEntriesToEvents(tail), + hasOlder, ); } - /** Tail of the on-disk log, parsed synchronously for an instant first paint. */ - private renderTailFirst( - taskId: string, - taskRunId: string, - content: string, - ): void { - const TAIL_BYTES = 256 * 1024; - if (content.length <= TAIL_BYTES) return; // small log — phase 2 is instant - let tail = content.slice(content.length - TAIL_BYTES); - const nl = tail.indexOf("\n"); - if (nl >= 0) tail = tail.slice(nl + 1); // drop the partial leading line - const parsed = parseSessionLogContent(tail); - if (parsed.rawEntries.length === 0) return; - const current = this.d.store.getSessionByTaskId(taskId); - // Only seed the placeholder if nothing has populated events meanwhile. - if (current?.taskRunId !== taskRunId || current.events.length > 0) return; - this.d.store.updateSession(taskRunId, { - events: convertStoredEntriesToEvents(parsed.rawEntries), - }); - } - - /** Swap in the fully-parsed transcript, unless a live stream now owns it. */ - private commitLoadedEvents( + /** Seed the transcript window, unless a live stream now owns the events. */ + private commitWindowedEvents( taskId: string, taskRunId: string, events: AcpMessage[], - totalLineCount: number, + hasOlder: boolean, + totalLineCount?: number, ): void { const current = this.d.store.getSessionByTaskId(taskId); if (!current || current.taskRunId !== taskRunId) return; - // If a turn started while we were parsing, the live events are authoritative - // — don't clobber them with the historical load; a later idle load fills in. - if (current.isPromptPending) return; + // A turn started or events arrived while we were reading — don't clobber. + if (current.isPromptPending || current.events.length > 0) return; this.d.store.updateSession(taskRunId, { events, + hasOlderEvents: hasOlder, processedLineCount: current.isCloud ? totalLineCount : undefined, }); } + /** + * Scrollback: parse and prepend the next older chunk of the transcript. Called + * by the UI when the user scrolls toward the top of a windowed transcript. + */ + async loadOlderEvents(taskId: string): Promise { + const session = this.d.store.getSessionByTaskId(taskId); + if (!session?.hasOlderEvents) return; + const { taskRunId } = session; + const result = takeOlderEntries(taskRunId); + if (!result) { + this.d.store.updateSession(taskRunId, { hasOlderEvents: false }); + return; + } + this.d.store.prependEvents( + taskRunId, + convertStoredEntriesToEvents(result.older), + ); + this.d.store.updateSession(taskRunId, { hasOlderEvents: result.hasOlder }); + } + // --- Subscription Management --- private subscribeToChannel(taskRunId: string): void { diff --git a/packages/core/src/sessions/sessionStore.ts b/packages/core/src/sessions/sessionStore.ts index c837e5dcdd..a16d6f826a 100644 --- a/packages/core/src/sessions/sessionStore.ts +++ b/packages/core/src/sessions/sessionStore.ts @@ -9,6 +9,7 @@ import type { } from "@posthog/shared"; import { immer } from "zustand/middleware/immer"; import { createStore } from "zustand/vanilla"; +import { clearTranscriptWindow } from "./transcriptWindows"; export interface SessionState { /** Sessions indexed by taskRunId */ @@ -72,6 +73,27 @@ export const sessionStoreSetters = { }); }, + /** + * Prepend older events to the front of the transcript — the scrollback path. + * The window opens with only the tail (instant render); this loads the chunk + * before the current window when the user scrolls toward the top. The result + * array is frozen so immer skips its O(n) finalize walk on commit. + */ + prependEvents: (taskRunId: string, olderEvents: AcpMessage[]) => { + if (olderEvents.length === 0) return; + // Read current events from the committed (frozen) state BEFORE entering the + // immer draft. Spreading `state.sessions[].events` inside the producer would + // capture draft proxies that immer revokes on exit — accessing them later + // (during render) throws "proxy has been revoked". + const current = sessionStore.getState().sessions[taskRunId]?.events; + if (!current) return; + const next = Object.freeze([...olderEvents, ...current]) as AcpMessage[]; + sessionStore.setState((state) => { + const session = state.sessions[taskRunId]; + if (session) session.events = next; + }); + }, + /** * Drop the in-memory transcript of an inactive session to reclaim RAM. * `events` is an append-only mirror of the on-disk ndjson log, so emptying @@ -81,11 +103,13 @@ export const sessionStoreSetters = { * Never call this on a streaming session (isPromptPending / isCompacting). */ evictEvents: (taskRunId: string) => { + clearTranscriptWindow(taskRunId); sessionStore.setState((state) => { const session = state.sessions[taskRunId]; if (session) { session.events = []; session.processedLineCount = undefined; + session.hasOlderEvents = undefined; } }); }, diff --git a/packages/core/src/sessions/transcriptWindows.ts b/packages/core/src/sessions/transcriptWindows.ts new file mode 100644 index 0000000000..f87cd38f52 --- /dev/null +++ b/packages/core/src/sessions/transcriptWindows.ts @@ -0,0 +1,70 @@ +import type { StoredLogEntry } from "@posthog/shared"; +import { parseSessionLogContent } from "./sessionLogs"; + +/** + * Scrollback windows for open transcripts. A huge session opens with only its + * tail parsed + rendered (instant); the rest of the ndjson lines stay here as + * raw text, parsed on demand when the user scrolls toward the top. This keeps + * open cost O(visible) instead of O(history). + * + * Held outside the immer session store on purpose: this is bulk, non-reactive + * data (one entry can be 100k+ line strings). It's cleared alongside the + * session's events — by `evictEvents` (inactive-session reclaim), on teardown, + * and replaced wholesale when a transcript is re-opened. + */ +interface TranscriptWindow { + /** All ndjson lines of the log. */ + lines: string[]; + /** Index of the first line currently materialised into `session.events`. */ + windowStart: number; +} + +const windows = new Map(); + +/** Lines materialised on first open / refocus. Tuned to fill a viewport + overscan. */ +export const TAIL_WINDOW_LINES = 1000; +/** Lines pulled in per scroll-toward-top load. */ +export const OLDER_CHUNK_LINES = 1000; + +/** + * Open a window over `content`, returning the tail's raw entries plus whether + * older lines remain. Stores the lines for later `takeOlderEntries` calls. + */ +export function openTranscriptWindow( + taskRunId: string, + content: string, +): { tail: StoredLogEntry[]; hasOlder: boolean } { + const lines = content.trim().split("\n"); + const windowStart = Math.max(0, lines.length - TAIL_WINDOW_LINES); + windows.set(taskRunId, { lines, windowStart }); + const tail = parseSessionLogContent( + lines.slice(windowStart).join("\n"), + ).rawEntries; + return { tail, hasOlder: windowStart > 0 }; +} + +/** + * Parse and return the next older chunk, advancing the window. Returns null + * when there's no window or nothing older remains. + */ +export function takeOlderEntries( + taskRunId: string, + count = OLDER_CHUNK_LINES, +): { older: StoredLogEntry[]; hasOlder: boolean } | null { + const window = windows.get(taskRunId); + if (!window || window.windowStart === 0) return null; + const newStart = Math.max(0, window.windowStart - count); + const older = parseSessionLogContent( + window.lines.slice(newStart, window.windowStart).join("\n"), + ).rawEntries; + window.windowStart = newStart; + return { older, hasOlder: newStart > 0 }; +} + +export function hasTranscriptWindow(taskRunId: string): boolean { + return windows.has(taskRunId); +} + +export function clearTranscriptWindow(taskRunId: string): void { + windows.delete(taskRunId); +} diff --git a/packages/shared/src/sessions.ts b/packages/shared/src/sessions.ts index 1c7663ba6b..21eb80e682 100644 --- a/packages/shared/src/sessions.ts +++ b/packages/shared/src/sessions.ts @@ -51,6 +51,12 @@ export interface AgentSession { taskTitle: string; channel: string; events: AcpMessage[]; + /** + * True when the transcript is a tail window and older events remain on disk, + * loadable via `SessionService.loadOlderEvents` (scrollback). Undefined for a + * fully-loaded transcript. + */ + hasOlderEvents?: boolean; startedAt: number; status: SessionStatus; errorTitle?: string; diff --git a/packages/ui/src/features/sessions/components/ConversationView.tsx b/packages/ui/src/features/sessions/components/ConversationView.tsx index d15543515e..ac81bad2bc 100644 --- a/packages/ui/src/features/sessions/components/ConversationView.tsx +++ b/packages/ui/src/features/sessions/components/ConversationView.tsx @@ -1,5 +1,9 @@ import { ArrowDown, XCircle } from "@phosphor-icons/react"; import { WorkerPoolContextProvider } from "@pierre/diffs/react"; +import { + SESSION_SERVICE, + type SessionService, +} from "@posthog/core/sessions/sessionService"; import { useService } from "@posthog/di/react"; import { Button, @@ -163,6 +167,13 @@ export function ConversationView({ const isCloud = session?.isCloud ?? false; + // Scrollback: the transcript opens as a tail window; pull the next older chunk + // when the user scrolls toward the top. + const sessionService = useService(SESSION_SERVICE); + const handleReachTop = useCallback(() => { + if (taskId) void sessionService.loadOlderEvents(taskId); + }, [sessionService, taskId]); + const items = useMemo( () => mergeConversationItems({ @@ -383,6 +394,8 @@ export function ConversationView({ getItemKey={getRowKey} renderItem={renderRow} onScrollStateChange={handleScrollStateChange} + onReachTop={handleReachTop} + hasMoreAbove={session?.hasOlderEvents ?? false} keepMounted={rowKeepMounted} className="absolute inset-0 bg-background" itemClassName="mx-auto px-2 py-1.5" diff --git a/packages/ui/src/features/sessions/components/VirtualizedList.tsx b/packages/ui/src/features/sessions/components/VirtualizedList.tsx index dab37a4e4f..0980dbe1cb 100644 --- a/packages/ui/src/features/sessions/components/VirtualizedList.tsx +++ b/packages/ui/src/features/sessions/components/VirtualizedList.tsx @@ -22,6 +22,13 @@ interface VirtualizedListProps { itemStyle?: CSSProperties; footer?: ReactNode; onScrollStateChange?: (isAtBottom: boolean) => void; + /** + * Called when the user scrolls near the top and `hasMoreAbove` is set, so the + * caller can load older items (scrollback). New items prepended after this + * fires are anchored: the viewport keeps the same content, no jump. + */ + onReachTop?: () => void; + hasMoreAbove?: boolean; keepMounted?: readonly number[]; /** * Allow horizontal scrolling of the list viewport. Defaults to true. Narrow @@ -43,6 +50,9 @@ const OVERSCAN = 6; // A real upward drift, not a 1-frame measure transient: the DOM bottom sits // this far below the viewport. Well above any single append's measure gap. const FAR_DRIFT_THRESHOLD = 400; +// Start loading older items this far before the very top, so scrollback feels +// seamless rather than snagging at the edge. +const LOAD_OLDER_THRESHOLD = 600; function VirtualizedListInner( { @@ -54,6 +64,8 @@ function VirtualizedListInner( itemStyle, footer, onScrollStateChange, + onReachTop, + hasMoreAbove, keepMounted, scrollX = true, }: VirtualizedListProps, @@ -68,6 +80,15 @@ function VirtualizedListInner( const settleRafRef = useRef(null); const onScrollStateChangeRef = useRef(onScrollStateChange); onScrollStateChangeRef.current = onScrollStateChange; + // Scrollback: keep the latest callback/flag in refs so handleScroll stays + // stable. `loadingOlder` gates re-entrancy; `scrollHeightBeforeLoad` anchors + // the viewport when the prepended items land. + const onReachTopRef = useRef(onReachTop); + onReachTopRef.current = onReachTop; + const hasMoreAboveRef = useRef(hasMoreAbove); + hasMoreAboveRef.current = hasMoreAbove; + const loadingOlderRef = useRef(false); + const scrollHeightBeforeLoadRef = useRef(0); const hasFooter = footer != null; @@ -196,6 +217,23 @@ function VirtualizedListInner( virtualizer.scrollToEnd(); }, [totalSize, virtualizer]); + // Scrollback anchor: when a load prepends older items, the virtual height + // grows above the viewport. Shift scrollTop by that growth so the content the + // user was reading stays exactly where it was — no jump. Synchronous, + // pre-paint. Gated on loadingOlderRef so streaming appends never trip it. + // biome-ignore lint/correctness/useExhaustiveDependencies: items.length is the trigger + useLayoutEffect(() => { + if (!loadingOlderRef.current) return; + const el = parentRef.current; + if (!el) return; + const delta = el.scrollHeight - scrollHeightBeforeLoadRef.current; + if (delta > 0) { + el.scrollTop += delta; + isAtBottomRef.current = false; + loadingOlderRef.current = false; + } + }, [items.length]); + const handleScroll = useCallback(() => { const el = parentRef.current; const scrollTop = el?.scrollTop ?? 0; @@ -203,6 +241,23 @@ function VirtualizedListInner( const scrolledUp = scrollTop < lastScrollTopRef.current - 1; lastScrollTopRef.current = scrollTop; + // Scrollback: as the user nears the top, ask the caller for older items. + // Record the pre-load scroll height so the anchor effect can keep the + // viewport on the same content once they prepend. Gate on `initialized` so + // the initial scroll-to-end settle (which can leave scrollTop near 0 for a + // short tail) doesn't spuriously pull the whole history in. + if ( + el && + initializedRef.current && + hasMoreAboveRef.current && + !loadingOlderRef.current && + scrollTop < LOAD_OLDER_THRESHOLD + ) { + loadingOlderRef.current = true; + scrollHeightBeforeLoadRef.current = el.scrollHeight; + onReachTopRef.current?.(); + } + const atEnd = virtualizer.isAtEnd(AT_BOTTOM_THRESHOLD); // Genuine far drift (not a 1-frame measure transient): the DOM bottom sits // well below the viewport. From 85f1813b78ecfa7decac66d2f7352184f2dbc673 Mon Sep 17 00:00:00 2001 From: karlo Date: Fri, 26 Jun 2026 19:13:14 -0700 Subject: [PATCH 08/11] perf(sessions): re-read log on scroll-up instead of pinning it in memory The tail window kept every ndjson line of an open transcript resident as raw text for scrollback, pinning ~110MB per open chat. A production heap benchmark put one 48k-event session at ~270MB while focused vs ~54MB with this change (and Command Center "ADHD mode" multiplied it across chats). Drop the in-memory copy and re-read the log from disk (OS page cache) on scroll-up, slicing only the older chunk. Scroll-up is rare and user-initiated, so a little latency there buys a large, always-on memory win. Older lines are start-indexed and append-stable, so a grown log still slices correctly. --- packages/core/src/sessions/sessionService.ts | 16 +++++++- .../core/src/sessions/transcriptWindows.ts | 37 ++++++++++++------- 2 files changed, 38 insertions(+), 15 deletions(-) diff --git a/packages/core/src/sessions/sessionService.ts b/packages/core/src/sessions/sessionService.ts index b2e258308d..c8621ab278 100644 --- a/packages/core/src/sessions/sessionService.ts +++ b/packages/core/src/sessions/sessionService.ts @@ -1480,7 +1480,21 @@ export class SessionService { const session = this.d.store.getSessionByTaskId(taskId); if (!session?.hasOlderEvents) return; const { taskRunId } = session; - const result = takeOlderEntries(taskRunId); + // Re-read the ndjson from disk (OS page cache → ~ms) instead of holding the + // whole log in memory for the open transcript's lifetime. See the note in + // `transcriptWindows.ts`: scroll-up is rare and user-initiated, so paying a + // little latency here buys a large, always-on renderer-memory win. + let content: string | null = null; + try { + content = await this.d.trpc.logs.readLocalLogs.query({ taskRunId }); + } catch { + content = null; + } + if (!content || !content.trim()) { + this.d.store.updateSession(taskRunId, { hasOlderEvents: false }); + return; + } + const result = takeOlderEntries(taskRunId, content); if (!result) { this.d.store.updateSession(taskRunId, { hasOlderEvents: false }); return; diff --git a/packages/core/src/sessions/transcriptWindows.ts b/packages/core/src/sessions/transcriptWindows.ts index f87cd38f52..c0f7d91f13 100644 --- a/packages/core/src/sessions/transcriptWindows.ts +++ b/packages/core/src/sessions/transcriptWindows.ts @@ -3,18 +3,22 @@ import { parseSessionLogContent } from "./sessionLogs"; /** * Scrollback windows for open transcripts. A huge session opens with only its - * tail parsed + rendered (instant); the rest of the ndjson lines stay here as - * raw text, parsed on demand when the user scrolls toward the top. This keeps - * open cost O(visible) instead of O(history). + * tail parsed + rendered (instant); the rest of the ndjson loads on demand when + * the user scrolls toward the top. This keeps open cost O(visible), not + * O(history). * - * Held outside the immer session store on purpose: this is bulk, non-reactive - * data (one entry can be 100k+ line strings). It's cleared alongside the - * session's events — by `evictEvents` (inactive-session reclaim), on teardown, - * and replaced wholesale when a transcript is re-opened. + * We deliberately retain only the window *cursor* here — not the log text. An + * earlier version cached every ndjson line as raw strings; a production memory + * benchmark showed that pinned ~110MB+ per open transcript (the renderer heap of + * one 48k-event session was ~270MB active vs ~50MB with the cursor-only window), + * and it compounded across tasks in Command Center / "ADHD mode". Re-reading the + * log from disk on a scroll-up is cheap — the OS page cache serves it in ~ms, and + * scroll-up is user-initiated and infrequent — so we trade a little latency on a + * rare action for a large, always-on memory win. The fetch itself lives in the + * caller (`SessionService.loadOlderEvents`), which already knows how to read the + * ndjson (`logs.readLocalLogs`); this module just slices and parses it. */ interface TranscriptWindow { - /** All ndjson lines of the log. */ - lines: string[]; /** Index of the first line currently materialised into `session.events`. */ windowStart: number; } @@ -28,7 +32,8 @@ export const OLDER_CHUNK_LINES = 1000; /** * Open a window over `content`, returning the tail's raw entries plus whether - * older lines remain. Stores the lines for later `takeOlderEntries` calls. + * older lines remain. Stores only the window cursor — `content` is not retained; + * older chunks are sliced from a fresh re-read via `takeOlderEntries`. */ export function openTranscriptWindow( taskRunId: string, @@ -36,7 +41,7 @@ export function openTranscriptWindow( ): { tail: StoredLogEntry[]; hasOlder: boolean } { const lines = content.trim().split("\n"); const windowStart = Math.max(0, lines.length - TAIL_WINDOW_LINES); - windows.set(taskRunId, { lines, windowStart }); + windows.set(taskRunId, { windowStart }); const tail = parseSessionLogContent( lines.slice(windowStart).join("\n"), ).rawEntries; @@ -44,18 +49,22 @@ export function openTranscriptWindow( } /** - * Parse and return the next older chunk, advancing the window. Returns null - * when there's no window or nothing older remains. + * Parse and return the next older chunk, advancing the window. The caller passes + * freshly re-read log `content` (older lines are start-indexed and append-stable, + * so a grown log still slices correctly). Returns null when there's no window or + * nothing older remains. */ export function takeOlderEntries( taskRunId: string, + content: string, count = OLDER_CHUNK_LINES, ): { older: StoredLogEntry[]; hasOlder: boolean } | null { const window = windows.get(taskRunId); if (!window || window.windowStart === 0) return null; + const lines = content.trim().split("\n"); const newStart = Math.max(0, window.windowStart - count); const older = parseSessionLogContent( - window.lines.slice(newStart, window.windowStart).join("\n"), + lines.slice(newStart, window.windowStart).join("\n"), ).rawEntries; window.windowStart = newStart; return { older, hasOlder: newStart > 0 }; From 007c8600095807b38389beb13a28300928c048ca Mon Sep 17 00:00:00 2001 From: karlo Date: Fri, 26 Jun 2026 20:50:28 -0700 Subject: [PATCH 09/11] fix(sessions): release scrollback gate even when an older chunk is empty MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Review feedback (#2957): the scrollback re-trigger gate was cleared only inside `if (delta > 0)`, so a load whose height the virtualizer hadn't measured yet — or one that prepended nothing renderable — left the gate stuck and permanently blocked further scroll-up. Release it whenever the anchor effect fires after a load, and make `takeOlderEntries` skip older chunks that parse to zero entries so it never reports `hasOlder` alongside an empty slice. --- packages/core/src/sessions/transcriptWindows.ts | 17 +++++++++++++---- .../sessions/components/VirtualizedList.tsx | 6 +++++- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/packages/core/src/sessions/transcriptWindows.ts b/packages/core/src/sessions/transcriptWindows.ts index c0f7d91f13..6c25e1c1d8 100644 --- a/packages/core/src/sessions/transcriptWindows.ts +++ b/packages/core/src/sessions/transcriptWindows.ts @@ -62,10 +62,19 @@ export function takeOlderEntries( const window = windows.get(taskRunId); if (!window || window.windowStart === 0) return null; const lines = content.trim().split("\n"); - const newStart = Math.max(0, window.windowStart - count); - const older = parseSessionLogContent( - lines.slice(newStart, window.windowStart).join("\n"), - ).rawEntries; + // Pull older chunks until we have at least one renderable entry or reach the + // top. A chunk can parse to zero entries (e.g. a run of non-event lines); if + // we returned an empty slice while still reporting `hasOlder`, the caller + // would prepend nothing and the UI's scrollback gate could never advance. + let newStart = window.windowStart; + let older: StoredLogEntry[] = []; + while (newStart > 0 && older.length === 0) { + const from = Math.max(0, newStart - count); + older = parseSessionLogContent( + lines.slice(from, newStart).join("\n"), + ).rawEntries; + newStart = from; + } window.windowStart = newStart; return { older, hasOlder: newStart > 0 }; } diff --git a/packages/ui/src/features/sessions/components/VirtualizedList.tsx b/packages/ui/src/features/sessions/components/VirtualizedList.tsx index 0980dbe1cb..5e3f9805a1 100644 --- a/packages/ui/src/features/sessions/components/VirtualizedList.tsx +++ b/packages/ui/src/features/sessions/components/VirtualizedList.tsx @@ -230,8 +230,12 @@ function VirtualizedListInner( if (delta > 0) { el.scrollTop += delta; isAtBottomRef.current = false; - loadingOlderRef.current = false; } + // Release the gate whenever this fires after a load — not only when the + // height grew. Gating the release on `delta > 0` could leave it stuck (and + // permanently block further scrollback) if the virtualizer hadn't measured + // the new height yet this pass; the next scroll-up simply retries. + loadingOlderRef.current = false; }, [items.length]); const handleScroll = useCallback(() => { From 6edfb0ad99a43d2877754c3fd09bd4244dcdfcea Mon Sep 17 00:00:00 2001 From: karlo Date: Fri, 26 Jun 2026 20:51:09 -0700 Subject: [PATCH 10/11] chore(sessions): remove unused chunked log parser MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Review feedback (#2957): `parseSessionLogContentChunked` (and its only helper `parseLogLine`) had no callers — the windowed open parses the tail synchronously and loads older chunks on demand, so the background chunked parser was dead. Drop it and its test. --- .../src/sessions/sessionLogs.chunked.test.ts | 92 ------------------- packages/core/src/sessions/sessionLogs.ts | 74 --------------- 2 files changed, 166 deletions(-) delete mode 100644 packages/core/src/sessions/sessionLogs.chunked.test.ts diff --git a/packages/core/src/sessions/sessionLogs.chunked.test.ts b/packages/core/src/sessions/sessionLogs.chunked.test.ts deleted file mode 100644 index 347719cf70..0000000000 --- a/packages/core/src/sessions/sessionLogs.chunked.test.ts +++ /dev/null @@ -1,92 +0,0 @@ -import { describe, expect, it, vi } from "vitest"; -import { - parseSessionLogContent, - parseSessionLogContentChunked, -} from "./sessionLogs"; - -function line(obj: unknown): string { - return JSON.stringify(obj); -} - -function makeContent(n: number): string { - const lines: string[] = [ - line({ - type: "notification", - notification: { - method: "_posthog/sdk_session", - params: { sessionId: "sess-1", adapter: "claude" }, - }, - }), - ]; - for (let i = 0; i < n; i++) { - lines.push( - line({ - type: "notification", - notification: { - method: "session/update", - params: { - update: { sessionUpdate: "agent_message_chunk", text: `t${i}` }, - }, - }, - }), - ); - } - return lines.join("\n"); -} - -const immediateYield = () => Promise.resolve(); - -describe("parseSessionLogContentChunked", () => { - it("produces output identical to the sync parser", async () => { - const content = makeContent(2500); // spans multiple chunks - const sync = parseSessionLogContent(content); - const chunked = await parseSessionLogContentChunked(content, { - chunkSize: 1000, - yieldToHost: immediateYield, - }); - - expect(chunked.rawEntries).toEqual(sync.rawEntries); - expect(chunked.totalLineCount).toBe(sync.totalLineCount); - expect(chunked.parseFailureCount).toBe(sync.parseFailureCount); - expect(chunked.sessionId).toBe(sync.sessionId); - expect(chunked.adapter).toBe(sync.adapter); - }); - - it("matches the sync parser on corrupt/partial lines", async () => { - const content = `${makeContent(10)}\n{not valid json\n${line({ type: "notification", notification: { method: "x" } })}`; - const sync = parseSessionLogContent(content); - const chunked = await parseSessionLogContentChunked(content, { - chunkSize: 4, - yieldToHost: immediateYield, - }); - - expect(chunked.rawEntries).toEqual(sync.rawEntries); - expect(chunked.parseFailureCount).toBe(sync.parseFailureCount); - expect(chunked.parseFailureCount).toBeGreaterThan(0); - }); - - it("streams chunks progressively and yields between them", async () => { - const content = makeContent(2500); - const chunkSizes: number[] = []; - const yieldSpy = vi.fn(() => Promise.resolve()); - - await parseSessionLogContentChunked(content, { - chunkSize: 1000, - onChunk: (entries) => chunkSizes.push(entries.length), - yieldToHost: yieldSpy, - }); - - // 2501 lines / 1000 → 3 chunks (1000, 1000, 501); yields between, not after last. - expect(chunkSizes.length).toBe(3); - expect(yieldSpy).toHaveBeenCalledTimes(2); - }); - - it("handles empty content", async () => { - const sync = parseSessionLogContent(""); - const chunked = await parseSessionLogContentChunked("", { - yieldToHost: immediateYield, - }); - expect(chunked.rawEntries).toEqual(sync.rawEntries); - expect(chunked.totalLineCount).toBe(sync.totalLineCount); - }); -}); diff --git a/packages/core/src/sessions/sessionLogs.ts b/packages/core/src/sessions/sessionLogs.ts index 8b3e7ca847..b0150edf4b 100644 --- a/packages/core/src/sessions/sessionLogs.ts +++ b/packages/core/src/sessions/sessionLogs.ts @@ -52,80 +52,6 @@ export function parseSessionLogContent( }; } -/** Parse one ndjson line into a StoredLogEntry, extracting session metadata. */ -function parseLogLine( - line: string, - acc: { sessionId?: string; adapter?: Adapter }, -): StoredLogEntry | null { - try { - const stored = JSON.parse(line) as StoredLogEntry; - if ( - stored.type === "notification" && - stored.notification?.method?.endsWith("posthog/sdk_session") - ) { - const params = stored.notification.params as { - sessionId?: string; - sdkSessionId?: string; - adapter?: Adapter; - }; - if (params?.sessionId) acc.sessionId = params.sessionId; - else if (params?.sdkSessionId) acc.sessionId = params.sdkSessionId; - if (params?.adapter) acc.adapter = params.adapter; - } - return stored; - } catch { - return null; - } -} - -/** - * Non-blocking variant of `parseSessionLogContent`: parses the ndjson in - * `chunkSize`-line slices, yielding control between slices via `yieldToHost` - * so a 97k-line / ~420ms `JSON.parse` no longer freezes the renderer. - * - * `onChunk(entries, soFar)` fires after each parsed slice so callers can render - * progressively. Produces byte-identical `rawEntries` to the sync parser (same - * order, same parse-failure handling) — see sessionLogs.chunked.test.ts. - */ -export async function parseSessionLogContentChunked( - content: string, - options: { - chunkSize?: number; - onChunk?: (entries: StoredLogEntry[], soFar: number) => void; - yieldToHost?: () => Promise; - } = {}, -): Promise { - const chunkSize = options.chunkSize ?? 1000; - const yieldToHost = - options.yieldToHost ?? (() => new Promise((r) => setTimeout(r, 0))); - - const lines = content.trim().split("\n"); - const rawEntries: StoredLogEntry[] = []; - const meta: { sessionId?: string; adapter?: Adapter } = {}; - let parseFailureCount = 0; - - for (let start = 0; start < lines.length; start += chunkSize) { - const end = Math.min(start + chunkSize, lines.length); - const chunk: StoredLogEntry[] = []; - for (let i = start; i < end; i++) { - const stored = parseLogLine(lines[i], meta); - if (stored === null) parseFailureCount += 1; - else chunk.push(stored); - } - rawEntries.push(...chunk); - options.onChunk?.(chunk, rawEntries.length); - if (end < lines.length) await yieldToHost(); - } - - return { - rawEntries, - totalLineCount: lines.length, - parseFailureCount, - sessionId: meta.sessionId, - adapter: meta.adapter, - }; -} - export function planSkippedPromptFilter( skipPolledPromptCount: number | undefined, events: AcpMessage[], From f027eea8373aa3cb1fc94e7fa5ddb6967a7d2ead Mon Sep 17 00:00:00 2001 From: karlo Date: Fri, 26 Jun 2026 20:51:19 -0700 Subject: [PATCH 11/11] test(command-center): table-drive the signature change cases Review feedback (#2957): collapse the repeated "changes when X changes" cases into a single it.each table, matching the parameterised style already used in computeSidebarSessionSignature.test.ts. --- .../commandCenterSignature.test.ts | 30 ++++++++----------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/packages/core/src/command-center/commandCenterSignature.test.ts b/packages/core/src/command-center/commandCenterSignature.test.ts index a33b31d11e..1989b8bf69 100644 --- a/packages/core/src/command-center/commandCenterSignature.test.ts +++ b/packages/core/src/command-center/commandCenterSignature.test.ts @@ -30,24 +30,20 @@ describe("computeCommandCenterSessionSignature", () => { expect(sig(a)).toBe(sig(b)); }); - it("changes when isPromptPending flips (idle ↔ running)", () => { - expect(sig({ r1: session("t1", { isPromptPending: false }) })).not.toBe( - sig({ r1: session("t1", { isPromptPending: true }) }), - ); - }); - - it("changes when a pending permission appears (running → waiting)", () => { - expect(sig({ r1: session("t1") })).not.toBe( - sig({ r1: session("t1", { pendingPermissions: { size: 1 } }) }), - ); - }); - - it("changes when connection status or cloud status changes", () => { - expect(sig({ r1: session("t1") })).not.toBe( - sig({ r1: session("t1", { status: "error" }) }), - ); + it.each<{ name: string; over: Partial }>([ + { + name: "isPromptPending flips (idle ↔ running)", + over: { isPromptPending: true }, + }, + { + name: "a pending permission appears (running → waiting)", + over: { pendingPermissions: { size: 1 } }, + }, + { name: "connection status changes", over: { status: "error" } }, + { name: "cloud status changes", over: { cloudStatus: "completed" } }, + ])("changes when $name", ({ over }) => { expect(sig({ r1: session("t1") })).not.toBe( - sig({ r1: session("t1", { cloudStatus: "completed" }) }), + sig({ r1: session("t1", over) }), ); });