Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions packages/core/src/command-center/commandCenterSignature.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { describe, expect, it } from "vitest";
import {
type CommandCenterSessionFields,
computeCommandCenterSessionSignature,
} from "./status";

function session(
taskId: string,
over: Partial<CommandCenterSessionFields> = {},
): CommandCenterSessionFields {
return {
taskId,
status: "connected",
cloudStatus: undefined,
pendingPermissions: { size: 0 },
isPromptPending: false,
...over,
};
}

const sig = (record: Record<string, CommandCenterSessionFields>) =>
computeCommandCenterSessionSignature(record);

describe("computeCommandCenterSessionSignature", () => {
it("is stable when only non-status fields change (e.g. streamed events)", () => {
// Two sessions identical in the 4 status-relevant fields → same signature,
// regardless of any other mutation the store makes (token appends etc.).
const a = { r1: session("t1") };
const b = { r1: session("t1") };
expect(sig(a)).toBe(sig(b));
});

it.each<{ name: string; over: Partial<CommandCenterSessionFields> }>([
{
name: "isPromptPending flips (idle ↔ running)",
over: { isPromptPending: true },
},
{
name: "a pending permission appears (running → waiting)",
over: { pendingPermissions: { size: 1 } },
},
{ name: "connection status changes", over: { status: "error" } },
{ name: "cloud status changes", over: { cloudStatus: "completed" } },
])("changes when $name", ({ over }) => {
expect(sig({ r1: session("t1") })).not.toBe(
sig({ r1: session("t1", over) }),
);
});

it("is order-independent across sessions", () => {
const s1 = session("t1");
const s2 = session("t2", { isPromptPending: true });
expect(sig({ r1: s1, r2: s2 })).toBe(sig({ r2: s2, r1: s1 }));
});

it("ignores sessions without a taskId", () => {
expect(sig({ r1: session("t1"), r2: session("", {}) })).toBe(
sig({ r1: session("t1") }),
);
});
});
25 changes: 25 additions & 0 deletions packages/core/src/command-center/status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,31 @@ export function deriveStatus(
return "idle";
}

export type CommandCenterSessionFields = SessionStatusInput & {
taskId?: string;
};

/**
* Stable string signature of the only session fields the command-center grid
* reads (the inputs to `deriveStatus`). Lets the grid subscribe to status
* changes instead of re-rendering on every streamed token — cell transcripts
* update independently through each EmbeddedSessionView's own subscription, so
* the grid data never needs to churn on token-level `events` mutations.
*/
export function computeCommandCenterSessionSignature(
sessions: Record<string, CommandCenterSessionFields>,
): string {
const parts: string[] = [];
for (const s of Object.values(sessions)) {
if (!s.taskId) continue;
parts.push(
`${s.taskId}\t${s.status}\t${s.cloudStatus ?? ""}\t${s.pendingPermissions.size}\t${s.isPromptPending ? 1 : 0}`,
);
}
parts.sort();
return parts.join("\n");
}

export function getRepoName(task: Task): string | null {
const repository = getTaskRepository(task);
if (!repository) return null;
Expand Down
165 changes: 165 additions & 0 deletions packages/core/src/sessions/ensureEventsLoaded.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import type { AgentSession } from "@posthog/shared";
import { describe, expect, it, vi } from "vitest";
import { createBaseSession } from "./sessionFactory";
import { SessionService, type SessionServiceDeps } from "./sessionService";

const TASK_ID = "task-1";
const RUN_ID = "run-1";

function ndjson(...texts: string[]): string {
return texts
.map((text) =>
JSON.stringify({
type: "notification",
timestamp: "2026-06-16T15:22:35.396Z",
notification: {
jsonrpc: "2.0",
method: "session/update",
params: { update: { sessionUpdate: "agent_message_chunk", text } },
},
}),
)
.join("\n");
}

function createHarness(
sessionOverrides: Partial<AgentSession> = {},
localLogs = ndjson("hello", "world"),
) {
const sessions: Record<string, AgentSession> = {};
const base = createBaseSession(RUN_ID, TASK_ID, "Task");
sessions[RUN_ID] = { ...base, events: [], ...sessionOverrides };

const updateSession = vi.fn(
(taskRunId: string, updates: Partial<AgentSession>) => {
const session = sessions[taskRunId];
if (session) Object.assign(session, updates);
},
);

const readLocalLogs = vi.fn().mockResolvedValue(localLogs);

const store = {
getSessions: () => sessions,
getSessionByTaskId: (taskId: string) =>
Object.values(sessions).find((s) => s.taskId === taskId),
setSession: (s: AgentSession) => {
sessions[s.taskRunId] = s;
},
updateSession,
prependEvents: (taskRunId: string, older: AgentSession["events"]) => {
const session = sessions[taskRunId];
if (session) session.events = [...older, ...session.events];
},
};

const deps = {
store,
log: { info: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() },
trpc: {
agent: {
onSessionIdleKilled: {
subscribe: () => ({ unsubscribe: vi.fn() }),
},
},
logs: {
readLocalLogs: { query: readLocalLogs },
fetchS3Logs: { query: vi.fn().mockResolvedValue("") },
writeLocalLogs: { mutate: vi.fn().mockResolvedValue(undefined) },
},
},
} as unknown as SessionServiceDeps;

return {
service: new SessionService(deps),
sessions,
updateSession,
readLocalLogs,
};
}

describe("SessionService.ensureEventsLoaded", () => {
it("rehydrates events from disk for an evicted (empty) session", async () => {
const h = createHarness();

await h.service.ensureEventsLoaded(TASK_ID);

expect(h.readLocalLogs).toHaveBeenCalledWith({ taskRunId: RUN_ID });
expect(h.sessions[RUN_ID].events.length).toBeGreaterThan(0);
});

it("no-ops when events are already resident (no disk read)", async () => {
const h = createHarness({
events: [{ message: {} }] as unknown as AgentSession["events"],
});

await h.service.ensureEventsLoaded(TASK_ID);

expect(h.readLocalLogs).not.toHaveBeenCalled();
expect(h.updateSession).not.toHaveBeenCalled();
});

it("no-ops for an unknown task", async () => {
const h = createHarness();
await h.service.ensureEventsLoaded("nope");
expect(h.readLocalLogs).not.toHaveBeenCalled();
});

it("sets processedLineCount for cloud sessions, leaves it undefined for local", async () => {
const cloud = createHarness({ isCloud: true });
await cloud.service.ensureEventsLoaded(TASK_ID);
expect(cloud.sessions[RUN_ID].processedLineCount).toBe(2);

const local = createHarness({ isCloud: false });
await local.service.ensureEventsLoaded(TASK_ID);
expect(local.sessions[RUN_ID].processedLineCount).toBeUndefined();
});

it("does not clobber events when a turn starts streaming during the load", async () => {
const h = createHarness();
// Simulate a turn starting (isPromptPending) + a streamed event landing
// while the disk read is in flight — the live transcript is authoritative.
h.readLocalLogs.mockImplementationOnce(async () => {
h.sessions[RUN_ID].isPromptPending = true;
h.sessions[RUN_ID].events = [
{ message: {} },
] as unknown as AgentSession["events"];
return ndjson("stale");
});

await h.service.ensureEventsLoaded(TASK_ID);

// The streamed event survives; the historical load is discarded.
expect(h.sessions[RUN_ID].events).toHaveLength(1);
expect(h.updateSession).not.toHaveBeenCalled();
});

it("opens a tail window for a large local log, flagging older events", async () => {
// 4000 lines > the 1000-line tail window → only the tail renders, the rest
// stays loadable via scrollback.
const bigLog = ndjson(...Array.from({ length: 4000 }, (_, i) => `m${i}`));
const h = createHarness({}, bigLog);

await h.service.ensureEventsLoaded(TASK_ID);

expect(h.sessions[RUN_ID].events.length).toBe(1000); // tail window only
expect(h.sessions[RUN_ID].hasOlderEvents).toBe(true);
});

it("loadOlderEvents prepends the next older chunk", async () => {
const bigLog = ndjson(...Array.from({ length: 4000 }, (_, i) => `m${i}`));
const h = createHarness({}, bigLog);
await h.service.ensureEventsLoaded(TASK_ID);
expect(h.sessions[RUN_ID].events.length).toBe(1000);

await h.service.loadOlderEvents(TASK_ID);

expect(h.sessions[RUN_ID].events.length).toBe(2000); // +1000 older, prepended
expect(h.sessions[RUN_ID].hasOlderEvents).toBe(true);
// The newly prepended chunk sits before the tail (older first).
await h.service.loadOlderEvents(TASK_ID);
await h.service.loadOlderEvents(TASK_ID);
expect(h.sessions[RUN_ID].events.length).toBe(4000); // all loaded
expect(h.sessions[RUN_ID].hasOlderEvents).toBe(false);
});
});
17 changes: 14 additions & 3 deletions packages/core/src/sessions/sessionEvents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,25 @@ export function convertStoredEntriesToEvents(
const startTs = entries[0]?.timestamp
? new Date(entries[0].timestamp).getTime() - 1
: Date.now();
events.push(createUserMessageEvent(taskDescription, startTs));
events.push(
Object.freeze(createUserMessageEvent(taskDescription, startTs)),
);
}

for (const entry of entries) {
events.push(storedEntryToAcpMessage(entry));
// Freeze each event as it's born. Events are immutable log data, and the
// store is immer-backed: when the array is committed, immer's auto-freeze
// would otherwise deep-freeze every element — measured at ~240ms for a
// 48k-event session (immer's per-element isDraftable/handleValue machinery
// is ~50x slower than a plain Object.freeze loop). Pre-freezing makes immer
// short-circuit on Object.isFrozen, turning that ~240ms into ~5ms.
events.push(Object.freeze(storedEntryToAcpMessage(entry)));
}

return events;
// Freezing the array too lets immer skip the whole array on commit (it bails
// on Object.isFrozen before recursing), so a full-transcript load no longer
// stalls the renderer.
return Object.freeze(events) as AcpMessage[];
}

/**
Expand Down
Loading