From 65139d6726740bd803833c0154e4fbb6db164a1a Mon Sep 17 00:00:00 2001 From: Saatvik Arya Date: Fri, 24 Apr 2026 19:21:51 +0530 Subject: [PATCH] feat(sdk): add ExecutionStore backed by DBAdapter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds execution history persistence to the core SDK surface, wiring three new tables (`execution`, `execution_interaction`, `execution_tool_call`) into `coreSchema` and exposing an `ExecutionStore` service on `executor.executions`. Changes: - `core-schema.ts`: three new tables with `scope_id` / `execution_id` / `tool_path` / `trigger_kind` / `created_at` indexes for the runs UI's faceting + timeline queries. - `ids.ts`: branded `ExecutionId`, `ExecutionInteractionId`, `ExecutionToolCallId`. - `executions.ts`: `Execution`, `ExecutionInteraction`, `ExecutionToolCall` Schema classes, status enums, create/update/filter/sort/meta input types, and the `ExecutionStore` Context.Tag. - `execution-store.ts`: `makeExecutionStore(core)` — an adapter-backed `ExecutionStoreService` implementation. Wraps `typedAdapter` for CRUD, handles cursor-based pagination, filter predicates (status, trigger, tool-path glob, time range, code substring, hadElicitation), and builds list meta with facets + chart buckets. - `cursor.ts`: base64url `{ createdAt, id }` pagination cursors. - `executor.ts`: constructs the store once per executor, exposes via `executor.executions`. - `executions.test.ts`: round-trip + lifecycle coverage against the in-memory adapter (no migrations needed). Follow-up work (future PRs in the stack): - wire the engine to record runs + tool calls through this store, - add `/executions` API endpoints, and - land the runs UI. --- packages/core/sdk/src/core-schema.ts | 84 ++++ packages/core/sdk/src/cursor.ts | 44 ++ packages/core/sdk/src/execution-store.ts | 556 +++++++++++++++++++++++ packages/core/sdk/src/executions.test.ts | 260 +++++++++++ packages/core/sdk/src/executions.ts | 292 ++++++++++++ packages/core/sdk/src/executor.ts | 10 + packages/core/sdk/src/ids.ts | 9 + packages/core/sdk/src/index.ts | 49 +- 8 files changed, 1303 insertions(+), 1 deletion(-) create mode 100644 packages/core/sdk/src/cursor.ts create mode 100644 packages/core/sdk/src/execution-store.ts create mode 100644 packages/core/sdk/src/executions.test.ts create mode 100644 packages/core/sdk/src/executions.ts diff --git a/packages/core/sdk/src/core-schema.ts b/packages/core/sdk/src/core-schema.ts index 285eec8e5..774c1f8d3 100644 --- a/packages/core/sdk/src/core-schema.ts +++ b/packages/core/sdk/src/core-schema.ts @@ -148,6 +148,75 @@ export const coreSchema = { updated_at: { type: "date", required: true }, }, }, + // Execution history — one row per `engine.execute()` / + // `engine.executeWithPause()`. Captures the submitted code, final + // status/result, and trigger metadata. Tool calls and interactions + // link back via `execution_id`. + execution: { + fields: { + id: { type: "string", required: true }, + scope_id: { type: "string", required: true, index: true }, + status: { type: "string", required: true, index: true }, + code: { type: "string", required: true }, + result_json: { type: "string", required: false }, + error_text: { type: "string", required: false }, + logs_json: { type: "string", required: false }, + /** Epoch ms — the point the engine accepted the code. */ + started_at: { type: "number", required: false }, + /** Epoch ms — the point the engine reached a terminal status. */ + completed_at: { type: "number", required: false }, + /** Free-form trigger kind attributed by the host — `"cli"`, + * `"http"`, `"mcp"`, etc. Null when the host didn't attribute + * one. Indexed so filter facets scan fast. */ + trigger_kind: { type: "string", required: false, index: true }, + /** Opaque host-owned JSON for per-trigger details. */ + trigger_meta_json: { type: "string", required: false }, + tool_call_count: { type: "number", required: true, defaultValue: 0 }, + created_at: { type: "date", required: true, index: true }, + updated_at: { type: "date", required: true }, + }, + }, + // Per-execution interaction rows — elicitation requests and their + // resolutions. A pending row is the hook the runs UI uses to render + // the "waiting for input" state; once resolved, `response_json` + // captures the user's answer for replay / auditing. + execution_interaction: { + fields: { + id: { type: "string", required: true }, + execution_id: { type: "string", required: true, index: true }, + status: { type: "string", required: true, index: true }, + kind: { type: "string", required: true }, + purpose: { type: "string", required: false }, + payload_json: { type: "string", required: false }, + response_json: { type: "string", required: false }, + /** Stores sensitive per-response data (e.g. raw form values) that + * should not be replayed in the public interaction log. */ + response_private_json: { type: "string", required: false }, + created_at: { type: "date", required: true }, + updated_at: { type: "date", required: true }, + }, + }, + // Per-execution tool-call rows — one per `executor.tools.invoke` that + // ran inside the sandboxed execution. Used to build the tool-call + // timeline shown in the runs UI. + execution_tool_call: { + fields: { + id: { type: "string", required: true }, + execution_id: { type: "string", required: true, index: true }, + status: { type: "string", required: true }, + /** Dotted tool path (e.g. `github.issues.create`). Indexed so the + * facets query in the runs UI resolves without a table scan. */ + tool_path: { type: "string", required: true, index: true }, + /** First path segment, pre-computed for cheap faceting. */ + namespace: { type: "string", required: false, index: true }, + args_json: { type: "string", required: false }, + result_json: { type: "string", required: false }, + error_text: { type: "string", required: false }, + started_at: { type: "number", required: true }, + completed_at: { type: "number", required: false }, + duration_ms: { type: "number", required: false }, + }, + }, } as const satisfies DBSchema; export type CoreSchema = typeof coreSchema; @@ -176,6 +245,21 @@ export type ConnectionRow = InferDBFieldsOutput< > & Record; +export type ExecutionRow = InferDBFieldsOutput< + CoreSchema["execution"]["fields"] +> & + Record; + +export type ExecutionInteractionRow = InferDBFieldsOutput< + CoreSchema["execution_interaction"]["fields"] +> & + Record; + +export type ExecutionToolCallRow = InferDBFieldsOutput< + CoreSchema["execution_tool_call"]["fields"] +> & + Record; + // --------------------------------------------------------------------------- // Tool annotations — default-policy metadata the executor consults // before invocation. Returned by `plugin.resolveAnnotations` (dynamic diff --git a/packages/core/sdk/src/cursor.ts b/packages/core/sdk/src/cursor.ts new file mode 100644 index 000000000..ffa367ee5 --- /dev/null +++ b/packages/core/sdk/src/cursor.ts @@ -0,0 +1,44 @@ +// --------------------------------------------------------------------------- +// Opaque cursor helpers for ExecutionStore.list pagination. +// +// Cursors encode `{ createdAt, id }` — the tuple adapter backends need to +// resume a scan on `ORDER BY created_at DESC, id DESC`. Encoded as +// url-safe base64 so callers can pass them through query params without +// thinking about escaping. +// --------------------------------------------------------------------------- + +export interface CursorPayload { + readonly createdAt: number; + readonly id: string; +} + +const toBase64Url = (value: string): string => { + const bytes = new TextEncoder().encode(value); + let binary = ""; + for (const b of bytes) binary += String.fromCharCode(b); + return btoa(binary).replace(/\+/g, "-").replace(/\//g, "_").replace(/=+$/, ""); +}; + +const fromBase64Url = (value: string): string => { + const pad = value.length % 4 === 0 ? 0 : 4 - (value.length % 4); + const normalized = value.replace(/-/g, "+").replace(/_/g, "/") + "=".repeat(pad); + const binary = atob(normalized); + const bytes = new Uint8Array(binary.length); + for (let i = 0; i < binary.length; i += 1) bytes[i] = binary.charCodeAt(i); + return new TextDecoder().decode(bytes); +}; + +export const encodeCursor = (payload: CursorPayload): string => + toBase64Url(JSON.stringify(payload)); + +export const decodeCursor = (raw: string): CursorPayload | null => { + try { + const parsed = JSON.parse(fromBase64Url(raw)) as Record; + if (typeof parsed.createdAt !== "number" || typeof parsed.id !== "string") { + return null; + } + return { createdAt: parsed.createdAt, id: parsed.id }; + } catch { + return null; + } +}; diff --git a/packages/core/sdk/src/execution-store.ts b/packages/core/sdk/src/execution-store.ts new file mode 100644 index 000000000..f2725ba98 --- /dev/null +++ b/packages/core/sdk/src/execution-store.ts @@ -0,0 +1,556 @@ +// --------------------------------------------------------------------------- +// makeExecutionStore — an ExecutionStoreService implementation backed by +// the generic `typedAdapter` surface. Used by createExecutor +// to expose `executor.executions`. +// +// Per-row JSON columns (`result_json`, `logs_json`, `payload_json`, …) +// are stored as opaque strings — the SDK does not inspect their shape. +// Callers pre-stringify when writing and parse when reading. +// --------------------------------------------------------------------------- + +import { Effect } from "effect"; +import type { StorageFailure, TypedAdapter } from "@executor/storage-core"; + +import type { CoreSchema } from "./core-schema"; + +// Row shape accepted by the row-to-class mappers. We can't rely on +// `RowOutput` narrowing because `TypedAdapter.update` +// takes a `Partial>` payload — Partial strips every +// required field, so TypeScript falls back to a union of every row +// type in the schema. The mappers read fields by name with explicit +// `row.xxx as string` casts anyway, so typing the input as the base +// record that every `RowOutput` extends is just as safe. +type AdapterRow = Record; +import { decodeCursor, encodeCursor } from "./cursor"; +import { + Execution, + ExecutionInteraction, + ExecutionToolCall, + type ExecutionStoreService, + type ExecutionListOptions, + type ExecutionListResult, + type ExecutionListItem, + type ExecutionListMeta, + type ExecutionChartBucket, + type ExecutionStatus, + type ExecutionStatusCount, + type ExecutionTriggerCount, + type ExecutionToolFacet, + EXECUTION_STATUS_KEYS, +} from "./executions"; +import { + ExecutionId, + ExecutionInteractionId, + ExecutionToolCallId, + ScopeId, +} from "./ids"; + +const DEFAULT_LIMIT = 25; +const MAX_LIMIT = 100; + +const toNumberOrNull = (value: unknown): number | null => + value == null ? null : Number(value); + +const toStringOrNull = (value: unknown): string | null => + value == null ? null : (value as string); + +const toDate = (value: unknown): Date => + value instanceof Date ? value : new Date(value as string | number); + +const rowToExecution = (row: AdapterRow): Execution => + new Execution({ + id: ExecutionId.make(row.id as string), + scopeId: ScopeId.make(row.scope_id as string), + status: row.status as ExecutionStatus, + code: row.code as string, + resultJson: toStringOrNull(row.result_json), + errorText: toStringOrNull(row.error_text), + logsJson: toStringOrNull(row.logs_json), + startedAt: toNumberOrNull(row.started_at), + completedAt: toNumberOrNull(row.completed_at), + triggerKind: toStringOrNull(row.trigger_kind), + triggerMetaJson: toStringOrNull(row.trigger_meta_json), + toolCallCount: Number(row.tool_call_count ?? 0), + createdAt: toDate(row.created_at), + updatedAt: toDate(row.updated_at), + }); + +const rowToInteraction = (row: AdapterRow): ExecutionInteraction => + new ExecutionInteraction({ + id: ExecutionInteractionId.make(row.id as string), + executionId: ExecutionId.make(row.execution_id as string), + status: row.status as ExecutionInteraction["status"], + kind: row.kind as string, + purpose: toStringOrNull(row.purpose), + payloadJson: toStringOrNull(row.payload_json), + responseJson: toStringOrNull(row.response_json), + responsePrivateJson: toStringOrNull(row.response_private_json), + createdAt: toDate(row.created_at), + updatedAt: toDate(row.updated_at), + }); + +const rowToToolCall = (row: AdapterRow): ExecutionToolCall => + new ExecutionToolCall({ + id: ExecutionToolCallId.make(row.id as string), + executionId: ExecutionId.make(row.execution_id as string), + status: row.status as ExecutionToolCall["status"], + toolPath: row.tool_path as string, + namespace: toStringOrNull(row.namespace), + argsJson: toStringOrNull(row.args_json), + resultJson: toStringOrNull(row.result_json), + errorText: toStringOrNull(row.error_text), + startedAt: Number(row.started_at), + completedAt: toNumberOrNull(row.completed_at), + durationMs: toNumberOrNull(row.duration_ms), + }); + +const pickChartBucketMs = (windowMs: number): number => { + if (windowMs <= 15 * 60_000) return 30_000; // <=15m → 30s + if (windowMs <= 60 * 60_000) return 120_000; // <=1h → 2m + if (windowMs <= 24 * 60 * 60_000) return 30 * 60_000; // <=24h → 30m + if (windowMs <= 7 * 24 * 60 * 60_000) return 3 * 60 * 60_000; // <=7d → 3h + return 24 * 60 * 60_000; // else → 1d +}; + +const matchesToolGlob = (toolPath: string, pattern: string): boolean => { + if (pattern === toolPath) return true; + if (pattern.endsWith(".*")) { + const prefix = pattern.slice(0, -2); + return toolPath === prefix || toolPath.startsWith(`${prefix}.`); + } + return false; +}; + +export interface MakeExecutionStoreOptions { + readonly core: TypedAdapter; + readonly now?: () => Date; +} + +export const makeExecutionStore = ({ + core, + now = () => new Date(), +}: MakeExecutionStoreOptions): ExecutionStoreService => { + const create: ExecutionStoreService["create"] = (input) => + Effect.gen(function* () { + const timestamp = now(); + const row = yield* core.create({ + model: "execution", + forceAllowId: true, + data: { + id: input.id, + scope_id: input.scopeId, + status: input.status, + code: input.code, + result_json: null, + error_text: null, + logs_json: null, + started_at: input.startedAt ?? timestamp.getTime(), + completed_at: null, + trigger_kind: input.triggerKind ?? null, + trigger_meta_json: input.triggerMetaJson ?? null, + tool_call_count: 0, + created_at: timestamp, + updated_at: timestamp, + }, + }); + return rowToExecution(row); + }); + + const update: ExecutionStoreService["update"] = (id, patch) => + Effect.gen(function* () { + const row = yield* core.update({ + model: "execution", + where: [{ field: "id", value: id as string }], + update: { + updated_at: now(), + ...(patch.status !== undefined && { status: patch.status }), + ...(patch.resultJson !== undefined && { result_json: patch.resultJson }), + ...(patch.errorText !== undefined && { error_text: patch.errorText }), + ...(patch.logsJson !== undefined && { logs_json: patch.logsJson }), + ...(patch.completedAt !== undefined && { completed_at: patch.completedAt }), + ...(patch.toolCallCount !== undefined && { + tool_call_count: patch.toolCallCount, + }), + }, + }); + if (!row) return yield* Effect.die(`Execution ${id} vanished during update`); + return rowToExecution(row); + }); + + const get: ExecutionStoreService["get"] = (id) => + Effect.gen(function* () { + const rows = yield* core.findMany({ + model: "execution", + where: [{ field: "id", value: id as string }], + limit: 1, + }); + const execution = rows[0]; + if (!execution) return null; + const interactions = yield* core.findMany({ + model: "execution_interaction", + where: [ + { field: "execution_id", value: id as string }, + { field: "status", value: "pending" }, + ], + sortBy: { field: "created_at", direction: "desc" }, + limit: 1, + }); + const pendingInteraction = interactions[0] + ? rowToInteraction(interactions[0]) + : null; + return { + execution: rowToExecution(execution), + pendingInteraction, + }; + }); + + const recordInteraction: ExecutionStoreService["recordInteraction"] = (input) => + Effect.gen(function* () { + const timestamp = now(); + const row = yield* core.create({ + model: "execution_interaction", + forceAllowId: true, + data: { + id: input.id, + execution_id: input.executionId, + status: input.status, + kind: input.kind, + purpose: input.purpose ?? null, + payload_json: input.payloadJson ?? null, + response_json: null, + response_private_json: null, + created_at: timestamp, + updated_at: timestamp, + }, + }); + return rowToInteraction(row); + }); + + const resolveInteraction: ExecutionStoreService["resolveInteraction"] = (id, patch) => + Effect.gen(function* () { + const row = yield* core.update({ + model: "execution_interaction", + where: [{ field: "id", value: id as string }], + update: { + updated_at: now(), + ...(patch.status !== undefined && { status: patch.status }), + ...(patch.responseJson !== undefined && { response_json: patch.responseJson }), + ...(patch.responsePrivateJson !== undefined && { + response_private_json: patch.responsePrivateJson, + }), + }, + }); + if (!row) + return yield* Effect.die(`Interaction ${id} vanished during update`); + return rowToInteraction(row); + }); + + const recordToolCall: ExecutionStoreService["recordToolCall"] = (input) => + Effect.gen(function* () { + const row = yield* core.create({ + model: "execution_tool_call", + forceAllowId: true, + data: { + id: input.id, + execution_id: input.executionId, + status: "running", + tool_path: input.toolPath, + namespace: input.namespace ?? input.toolPath.split(".")[0] ?? null, + args_json: input.argsJson ?? null, + result_json: null, + error_text: null, + started_at: input.startedAt, + completed_at: null, + duration_ms: null, + }, + }); + return rowToToolCall(row); + }); + + const finishToolCall: ExecutionStoreService["finishToolCall"] = (id, patch) => + Effect.gen(function* () { + const row = yield* core.update({ + model: "execution_tool_call", + where: [{ field: "id", value: id as string }], + update: { + status: patch.status, + result_json: patch.resultJson ?? null, + error_text: patch.errorText ?? null, + completed_at: patch.completedAt, + duration_ms: patch.durationMs, + }, + }); + if (!row) return yield* Effect.die(`Tool call ${id} vanished during finish`); + return rowToToolCall(row); + }); + + const listToolCalls: ExecutionStoreService["listToolCalls"] = (executionId) => + Effect.gen(function* () { + const rows = yield* core.findMany({ + model: "execution_tool_call", + where: [{ field: "execution_id", value: executionId as string }], + sortBy: { field: "started_at", direction: "asc" }, + }); + return rows.map(rowToToolCall); + }); + + const sweep: ExecutionStoreService["sweep"] = (olderThanMs) => + Effect.gen(function* () { + const cutoff = new Date(now().getTime() - olderThanMs); + // Adapter deleteMany returns void in the generic contract — we do + // a pre-count scan so the caller gets a useful number back. + const doomed = yield* core.findMany({ + model: "execution", + where: [{ field: "created_at", operator: "lt", value: cutoff }], + limit: 10_000, + }); + if (doomed.length === 0) return 0; + yield* core.deleteMany({ + model: "execution", + where: [{ field: "created_at", operator: "lt", value: cutoff }], + }); + return doomed.length; + }); + + const list: ExecutionStoreService["list"] = (scopeId, rawOptions) => + Effect.gen(function* () { + const options: ExecutionListOptions = rawOptions ?? {}; + const limit = Math.max(1, Math.min(options.limit ?? DEFAULT_LIMIT, MAX_LIMIT)); + const sort = options.sort ?? { field: "createdAt", direction: "desc" as const }; + + // Pull candidate rows for this scope. We apply filters in-memory — + // the DBAdapter contract's Where clauses don't cover compound + // CSV-of-values or glob patterns, so the store does the final + // narrowing after fetching. + const rows = yield* core.findMany({ + model: "execution", + where: [{ field: "scope_id", value: scopeId as string }], + sortBy: { + field: sort.field === "durationMs" ? "completed_at" : "created_at", + direction: sort.direction, + }, + }); + + // Pre-compute tool-call aggregations that filters and meta both need. + const toolCallRows = yield* core.findMany({ + model: "execution_tool_call", + }); + + const toolCallsByExecution = new Map(); + for (const tc of toolCallRows) { + const list = toolCallsByExecution.get(tc.execution_id as string) ?? []; + list.push(tc); + toolCallsByExecution.set(tc.execution_id as string, list); + } + + // Pre-compute interaction presence per execution (for the + // hadElicitation filter + meta interactionCounts). + const interactionRows = yield* core.findMany({ + model: "execution_interaction", + }); + const executionsWithInteractions = new Set( + interactionRows.map((r) => r.execution_id as string), + ); + + const appliesStatusFilter = (row: AdapterRow): boolean => + !options.statusFilter || options.statusFilter.length === 0 + ? true + : options.statusFilter.includes(row.status as ExecutionStatus); + + const appliesTriggerFilter = (row: AdapterRow): boolean => { + if (!options.triggerFilter || options.triggerFilter.length === 0) return true; + const kind = (row.trigger_kind as string | null | undefined) ?? null; + return options.triggerFilter.some((want) => + want === "unknown" ? kind === null : want === kind, + ); + }; + + const appliesToolFilter = (row: AdapterRow): boolean => { + if (!options.toolPathFilter || options.toolPathFilter.length === 0) return true; + const calls = toolCallsByExecution.get(row.id as string) ?? []; + return options.toolPathFilter.some((pattern) => + calls.some((c) => matchesToolGlob(c.tool_path as string, pattern)), + ); + }; + + const appliesTimeFilter = (row: AdapterRow): boolean => { + const createdAt = toDate(row.created_at).getTime(); + if (options.timeRange?.from !== undefined && createdAt < options.timeRange.from) { + return false; + } + if (options.timeRange?.to !== undefined && createdAt > options.timeRange.to) { + return false; + } + if (options.after !== undefined) { + const afterMs = Number(options.after); + if (!Number.isNaN(afterMs) && createdAt <= afterMs) return false; + } + return true; + }; + + const appliesCodeQuery = (row: AdapterRow): boolean => + !options.codeQuery + ? true + : (row.code as string).toLowerCase().includes(options.codeQuery.toLowerCase()); + + const appliesElicitationFilter = (row: AdapterRow): boolean => { + if (options.hadElicitation === undefined) return true; + const has = executionsWithInteractions.has(row.id as string); + return options.hadElicitation ? has : !has; + }; + + const filtered = rows.filter( + (row) => + appliesStatusFilter(row) && + appliesTriggerFilter(row) && + appliesToolFilter(row) && + appliesTimeFilter(row) && + appliesCodeQuery(row) && + appliesElicitationFilter(row), + ); + + // Cursor applies after filtering so it tracks the filtered scan. + const cursor = options.cursor ? decodeCursor(options.cursor) : null; + const afterCursor = cursor + ? filtered.filter((row) => { + const createdAt = toDate(row.created_at).getTime(); + if (createdAt < cursor.createdAt) return true; + if (createdAt > cursor.createdAt) return false; + return (row.id as string) < cursor.id; + }) + : filtered; + + const page = afterCursor.slice(0, limit); + const nextCursor = + afterCursor.length > limit && sort.field === "createdAt" + ? encodeCursor({ + createdAt: toDate(page[page.length - 1]!.created_at).getTime(), + id: page[page.length - 1]!.id as string, + }) + : undefined; + + const pageIds = page.map((r) => r.id as string); + const pendingByExecution = new Map(); + for (const interaction of interactionRows) { + if (interaction.status !== "pending") continue; + if (!pageIds.includes(interaction.execution_id as string)) continue; + const existing = pendingByExecution.get(interaction.execution_id as string); + if (!existing || toDate(interaction.created_at).getTime() > + toDate(existing.created_at).getTime()) { + pendingByExecution.set(interaction.execution_id as string, interaction); + } + } + + const executions: readonly ExecutionListItem[] = page.map((row) => ({ + execution: rowToExecution(row), + pendingInteraction: pendingByExecution.has(row.id as string) + ? rowToInteraction(pendingByExecution.get(row.id as string)!) + : null, + })); + + const meta: ExecutionListMeta | undefined = options.includeMeta + ? buildMeta(rows, filtered, toolCallsByExecution, executionsWithInteractions) + : undefined; + + return { + executions, + ...(nextCursor ? { nextCursor } : {}), + ...(meta ? { meta } : {}), + } satisfies ExecutionListResult; + }); + + const buildMeta = ( + all: readonly AdapterRow[], + filtered: readonly AdapterRow[], + toolCallsByExecution: Map, + executionsWithInteractions: Set, + ): ExecutionListMeta => { + const statusCounts: ExecutionStatusCount[] = EXECUTION_STATUS_KEYS.map((status) => ({ + status, + count: filtered.filter((r) => r.status === status).length, + })); + + const triggerMap = new Map(); + for (const row of filtered) { + const kind = (row.trigger_kind as string | null | undefined) ?? null; + triggerMap.set(kind, (triggerMap.get(kind) ?? 0) + 1); + } + const triggerCounts: ExecutionTriggerCount[] = Array.from(triggerMap.entries()).map( + ([triggerKind, count]) => ({ triggerKind, count }), + ); + + const toolCountMap = new Map(); + for (const row of filtered) { + for (const tc of toolCallsByExecution.get(row.id as string) ?? []) { + const path = tc.tool_path as string; + toolCountMap.set(path, (toolCountMap.get(path) ?? 0) + 1); + } + } + const toolFacets: ExecutionToolFacet[] = Array.from(toolCountMap.entries()) + .map(([toolPath, count]) => ({ toolPath, count })) + .sort((a, b) => b.count - a.count) + .slice(0, 20); + + const withElicitation = filtered.filter((r) => + executionsWithInteractions.has(r.id as string), + ).length; + + const times = filtered.map((r) => toDate(r.created_at).getTime()); + const minTs = times.length > 0 ? Math.min(...times) : now().getTime(); + const maxTs = times.length > 0 ? Math.max(...times) : now().getTime(); + const windowMs = Math.max(1, maxTs - minTs); + const chartBucketMs = pickChartBucketMs(windowMs); + + const bucketMap = new Map>(); + for (const row of filtered) { + const bucketStart = + Math.floor(toDate(row.created_at).getTime() / chartBucketMs) * chartBucketMs; + const counts = bucketMap.get(bucketStart) ?? { + pending: 0, + running: 0, + waiting_for_interaction: 0, + completed: 0, + failed: 0, + cancelled: 0, + }; + counts[row.status as ExecutionStatus] += 1; + bucketMap.set(bucketStart, counts); + } + const chartData: ExecutionChartBucket[] = Array.from(bucketMap.entries()) + .map(([bucketStart, counts]) => ({ bucketStart, counts })) + .sort((a, b) => a.bucketStart - b.bucketStart); + + return { + totalRowCount: all.length, + filterRowCount: filtered.length, + statusCounts, + triggerCounts, + toolFacets, + interactionCounts: { + withElicitation, + withoutElicitation: filtered.length - withElicitation, + }, + chartBucketMs, + chartData, + }; + }; + + return { + create, + update, + get, + list, + recordInteraction, + resolveInteraction, + recordToolCall, + finishToolCall, + listToolCalls, + sweep, + } satisfies ExecutionStoreService; +}; + +// Re-export the Tag symbol here so callers can `import { ExecutionStore } +// from "@executor/sdk"` and get both the Tag and a layer factory from +// one module entry. +export { ExecutionStore } from "./executions"; +export type { StorageFailure }; diff --git a/packages/core/sdk/src/executions.test.ts b/packages/core/sdk/src/executions.test.ts new file mode 100644 index 000000000..06a289da4 --- /dev/null +++ b/packages/core/sdk/src/executions.test.ts @@ -0,0 +1,260 @@ +import { describe, expect, it } from "@effect/vitest"; +import { Effect } from "effect"; + +import { makeMemoryAdapter } from "@executor/storage-core/testing/memory"; + +import { collectSchemas, createExecutor } from "./executor"; +import { + EXECUTION_STATUS_KEYS, + type ExecutionStatus, +} from "./executions"; +import { + ExecutionId, + ExecutionInteractionId, + ExecutionToolCallId, + ScopeId, +} from "./ids"; +import { Scope } from "./scope"; +import { makeInMemoryBlobStore } from "./blob"; + +// --------------------------------------------------------------------------- +// Shared fixture. Every test builds a scoped executor backed by the +// in-memory adapter — zero persistence, zero migration dance. +// --------------------------------------------------------------------------- + +const SCOPE = ScopeId.make("scope-test"); + +const makeExecutor = () => + Effect.gen(function* () { + const schema = collectSchemas([]); + const adapter = makeMemoryAdapter({ schema }); + const scope = new Scope({ id: SCOPE, name: "test", createdAt: new Date() }); + const executor = yield* createExecutor({ + scopes: [scope], + adapter, + blobs: makeInMemoryBlobStore(), + }); + return executor; + }); + +describe("ExecutionStore (DBAdapter-backed)", () => { + it.effect("create → get round-trip preserves fields", () => + Effect.gen(function* () { + const executor = yield* makeExecutor(); + const id = ExecutionId.make("exec-1"); + + yield* executor.executions.create({ + id, + scopeId: SCOPE, + status: "running", + code: "const x = 1", + triggerKind: "cli", + }); + + const detail = yield* executor.executions.get(id); + expect(detail).not.toBeNull(); + expect(detail!.execution.id).toBe(id); + expect(detail!.execution.status).toBe("running"); + expect(detail!.execution.code).toBe("const x = 1"); + expect(detail!.execution.triggerKind).toBe("cli"); + expect(detail!.execution.toolCallCount).toBe(0); + expect(detail!.pendingInteraction).toBeNull(); + }), + ); + + it.effect("update patches status, result, and completion time", () => + Effect.gen(function* () { + const executor = yield* makeExecutor(); + const id = ExecutionId.make("exec-2"); + + yield* executor.executions.create({ + id, + scopeId: SCOPE, + status: "running", + code: "2 + 2", + }); + + yield* executor.executions.update(id, { + status: "completed", + resultJson: JSON.stringify({ value: 4 }), + completedAt: 1_700_000_000_000, + toolCallCount: 1, + }); + + const detail = yield* executor.executions.get(id); + expect(detail!.execution.status).toBe("completed"); + expect(detail!.execution.resultJson).toBe('{"value":4}'); + expect(detail!.execution.completedAt).toBe(1_700_000_000_000); + expect(detail!.execution.toolCallCount).toBe(1); + }), + ); + + it.effect("tool-call recording + finish updates status + duration", () => + Effect.gen(function* () { + const executor = yield* makeExecutor(); + const executionId = ExecutionId.make("exec-3"); + const toolCallId = ExecutionToolCallId.make("tc-1"); + + yield* executor.executions.create({ + id: executionId, + scopeId: SCOPE, + status: "running", + code: "await tools.a()", + }); + + yield* executor.executions.recordToolCall({ + id: toolCallId, + executionId, + toolPath: "ns.doThing", + startedAt: 1_700_000_000_000, + }); + + yield* executor.executions.finishToolCall(toolCallId, { + status: "completed", + resultJson: '{"ok":true}', + completedAt: 1_700_000_000_250, + durationMs: 250, + }); + + const calls = yield* executor.executions.listToolCalls(executionId); + expect(calls).toHaveLength(1); + expect(calls[0]!.status).toBe("completed"); + expect(calls[0]!.toolPath).toBe("ns.doThing"); + expect(calls[0]!.namespace).toBe("ns"); + expect(calls[0]!.durationMs).toBe(250); + expect(calls[0]!.resultJson).toBe('{"ok":true}'); + }), + ); + + it.effect("interaction lifecycle: record pending → resolve", () => + Effect.gen(function* () { + const executor = yield* makeExecutor(); + const executionId = ExecutionId.make("exec-4"); + const interactionId = ExecutionInteractionId.make("int-1"); + + yield* executor.executions.create({ + id: executionId, + scopeId: SCOPE, + status: "waiting_for_interaction", + code: "await elicit(...)", + }); + + yield* executor.executions.recordInteraction({ + id: interactionId, + executionId, + status: "pending", + kind: "FormElicitation", + payloadJson: '{"message":"ok?"}', + }); + + // get() should surface the pending interaction alongside the row. + const beforeResolve = yield* executor.executions.get(executionId); + expect(beforeResolve!.pendingInteraction).not.toBeNull(); + expect(beforeResolve!.pendingInteraction!.id).toBe(interactionId); + + yield* executor.executions.resolveInteraction(interactionId, { + status: "resolved", + responseJson: '{"action":"accept"}', + }); + + const afterResolve = yield* executor.executions.get(executionId); + expect(afterResolve!.pendingInteraction).toBeNull(); + }), + ); + + it.effect("list applies status + trigger filters", () => + Effect.gen(function* () { + const executor = yield* makeExecutor(); + + yield* executor.executions.create({ + id: ExecutionId.make("e-a"), + scopeId: SCOPE, + status: "completed", + code: "a", + triggerKind: "cli", + }); + yield* executor.executions.create({ + id: ExecutionId.make("e-b"), + scopeId: SCOPE, + status: "failed", + code: "b", + triggerKind: "http", + }); + yield* executor.executions.create({ + id: ExecutionId.make("e-c"), + scopeId: SCOPE, + status: "completed", + code: "c", + triggerKind: "mcp", + }); + + const completedOnly = yield* executor.executions.list(SCOPE, { + statusFilter: ["completed"], + }); + expect(completedOnly.executions).toHaveLength(2); + + const httpOnly = yield* executor.executions.list(SCOPE, { + triggerFilter: ["http"], + }); + expect(httpOnly.executions).toHaveLength(1); + expect(httpOnly.executions[0]!.execution.id).toBe(ExecutionId.make("e-b")); + }), + ); + + it.effect("list meta reports status + trigger counts", () => + Effect.gen(function* () { + const executor = yield* makeExecutor(); + yield* executor.executions.create({ + id: ExecutionId.make("m-1"), + scopeId: SCOPE, + status: "completed", + code: "x", + triggerKind: "cli", + }); + yield* executor.executions.create({ + id: ExecutionId.make("m-2"), + scopeId: SCOPE, + status: "completed", + code: "y", + triggerKind: "cli", + }); + yield* executor.executions.create({ + id: ExecutionId.make("m-3"), + scopeId: SCOPE, + status: "failed", + code: "z", + triggerKind: "mcp", + }); + + const res = yield* executor.executions.list(SCOPE, { includeMeta: true }); + expect(res.meta).toBeDefined(); + expect(res.meta!.totalRowCount).toBe(3); + + const completedCount = res.meta!.statusCounts.find( + (c) => c.status === ("completed" as ExecutionStatus), + ); + expect(completedCount!.count).toBe(2); + expect(res.meta!.triggerCounts.find((t) => t.triggerKind === "cli")!.count).toBe( + 2, + ); + expect(res.meta!.triggerCounts.find((t) => t.triggerKind === "mcp")!.count).toBe( + 1, + ); + }), + ); + + it.effect("EXECUTION_STATUS_KEYS covers every status literal", () => + Effect.sync(() => { + expect(new Set(EXECUTION_STATUS_KEYS)).toEqual( + new Set([ + "pending", + "running", + "waiting_for_interaction", + "completed", + "failed", + "cancelled", + ]), + ); + }), + ); +}); diff --git a/packages/core/sdk/src/executions.ts b/packages/core/sdk/src/executions.ts new file mode 100644 index 000000000..d1b6fee52 --- /dev/null +++ b/packages/core/sdk/src/executions.ts @@ -0,0 +1,292 @@ +// --------------------------------------------------------------------------- +// ExecutionStore — records one run per `engine.execute()` / +// `executeWithPause()`. Wraps the generic `DBAdapter` core tables +// (`execution`, `execution_interaction`, `execution_tool_call`) so +// every storage backend that implements the adapter contract gets +// execution history for free. +// +// The store itself is plain Effect code; the adapter is threaded in +// by `createExecutor` and exposed to callers as `executor.executions`. +// --------------------------------------------------------------------------- + +import { Context, Effect, Schema } from "effect"; +import type { StorageFailure } from "@executor/storage-core"; + +import { ExecutionId, ExecutionInteractionId, ExecutionToolCallId, ScopeId } from "./ids"; + +// --------------------------------------------------------------------------- +// Status enums +// --------------------------------------------------------------------------- + +export const ExecutionStatus = Schema.Literal( + "pending", + "running", + "waiting_for_interaction", + "completed", + "failed", + "cancelled", +); +export type ExecutionStatus = typeof ExecutionStatus.Type; + +export const EXECUTION_STATUS_KEYS = [ + "pending", + "running", + "waiting_for_interaction", + "completed", + "failed", + "cancelled", +] as const; + +export const ExecutionInteractionStatus = Schema.Literal( + "pending", + "resolved", + "cancelled", +); +export type ExecutionInteractionStatus = typeof ExecutionInteractionStatus.Type; + +export const ExecutionToolCallStatus = Schema.Literal( + "running", + "completed", + "failed", +); +export type ExecutionToolCallStatus = typeof ExecutionToolCallStatus.Type; + +// --------------------------------------------------------------------------- +// Row projections +// --------------------------------------------------------------------------- + +export class Execution extends Schema.Class("Execution")({ + id: ExecutionId, + scopeId: ScopeId, + status: ExecutionStatus, + code: Schema.String, + resultJson: Schema.NullOr(Schema.String), + errorText: Schema.NullOr(Schema.String), + logsJson: Schema.NullOr(Schema.String), + startedAt: Schema.NullOr(Schema.Number), + completedAt: Schema.NullOr(Schema.Number), + triggerKind: Schema.NullOr(Schema.String), + triggerMetaJson: Schema.NullOr(Schema.String), + toolCallCount: Schema.Number, + createdAt: Schema.DateFromNumber, + updatedAt: Schema.DateFromNumber, +}) {} + +export class ExecutionInteraction extends Schema.Class( + "ExecutionInteraction", +)({ + id: ExecutionInteractionId, + executionId: ExecutionId, + status: ExecutionInteractionStatus, + kind: Schema.String, + purpose: Schema.NullOr(Schema.String), + payloadJson: Schema.NullOr(Schema.String), + responseJson: Schema.NullOr(Schema.String), + responsePrivateJson: Schema.NullOr(Schema.String), + createdAt: Schema.DateFromNumber, + updatedAt: Schema.DateFromNumber, +}) {} + +export class ExecutionToolCall extends Schema.Class("ExecutionToolCall")({ + id: ExecutionToolCallId, + executionId: ExecutionId, + status: ExecutionToolCallStatus, + toolPath: Schema.String, + namespace: Schema.NullOr(Schema.String), + argsJson: Schema.NullOr(Schema.String), + resultJson: Schema.NullOr(Schema.String), + errorText: Schema.NullOr(Schema.String), + startedAt: Schema.Number, + completedAt: Schema.NullOr(Schema.Number), + durationMs: Schema.NullOr(Schema.Number), +}) {} + +// --------------------------------------------------------------------------- +// Input types +// --------------------------------------------------------------------------- + +export interface CreateExecutionInput { + readonly id: ExecutionId; + readonly scopeId: ScopeId; + readonly status: ExecutionStatus; + readonly code: string; + readonly startedAt?: number; + readonly triggerKind?: string; + readonly triggerMetaJson?: string; +} + +export interface UpdateExecutionInput { + readonly status?: ExecutionStatus; + readonly resultJson?: string | null; + readonly errorText?: string | null; + readonly logsJson?: string | null; + readonly completedAt?: number; + readonly toolCallCount?: number; +} + +export interface CreateExecutionInteractionInput { + readonly id: ExecutionInteractionId; + readonly executionId: ExecutionId; + readonly status: ExecutionInteractionStatus; + readonly kind: string; + readonly purpose?: string; + readonly payloadJson?: string; +} + +export interface UpdateExecutionInteractionInput { + readonly status?: ExecutionInteractionStatus; + readonly responseJson?: string | null; + readonly responsePrivateJson?: string | null; +} + +export interface CreateExecutionToolCallInput { + readonly id: ExecutionToolCallId; + readonly executionId: ExecutionId; + readonly toolPath: string; + readonly namespace?: string; + readonly argsJson?: string; + readonly startedAt: number; +} + +export interface UpdateExecutionToolCallInput { + readonly status: ExecutionToolCallStatus; + readonly resultJson?: string | null; + readonly errorText?: string | null; + readonly completedAt: number; + readonly durationMs: number; +} + +// --------------------------------------------------------------------------- +// Filters + sort +// --------------------------------------------------------------------------- + +export type ExecutionSortField = "createdAt" | "durationMs"; +export type ExecutionSortDirection = "asc" | "desc"; +export interface ExecutionSort { + readonly field: ExecutionSortField; + readonly direction: ExecutionSortDirection; +} + +export interface ExecutionTimeRange { + readonly from?: number; + readonly to?: number; +} + +export interface ExecutionListOptions { + readonly limit?: number; + readonly cursor?: string; + readonly statusFilter?: readonly ExecutionStatus[]; + readonly triggerFilter?: readonly string[]; + readonly toolPathFilter?: readonly string[]; + readonly timeRange?: ExecutionTimeRange; + readonly after?: string; + readonly codeQuery?: string; + readonly hadElicitation?: boolean; + readonly sort?: ExecutionSort; + readonly includeMeta?: boolean; +} + +export interface ExecutionListItem { + readonly execution: Execution; + readonly pendingInteraction: ExecutionInteraction | null; +} + +export interface ExecutionStatusCount { + readonly status: ExecutionStatus; + readonly count: number; +} + +export interface ExecutionTriggerCount { + readonly triggerKind: string | null; + readonly count: number; +} + +export interface ExecutionToolFacet { + readonly toolPath: string; + readonly count: number; +} + +export interface ExecutionChartBucket { + readonly bucketStart: number; + readonly counts: Readonly>; +} + +export interface ExecutionInteractionCounts { + readonly withElicitation: number; + readonly withoutElicitation: number; +} + +export interface ExecutionListMeta { + readonly totalRowCount: number; + readonly filterRowCount: number; + readonly statusCounts: readonly ExecutionStatusCount[]; + readonly triggerCounts: readonly ExecutionTriggerCount[]; + readonly toolFacets: readonly ExecutionToolFacet[]; + readonly interactionCounts: ExecutionInteractionCounts; + readonly chartBucketMs: number; + readonly chartData: readonly ExecutionChartBucket[]; +} + +export interface ExecutionListResult { + readonly executions: readonly ExecutionListItem[]; + readonly nextCursor?: string; + readonly meta?: ExecutionListMeta; +} + +export interface ExecutionDetail { + readonly execution: Execution; + readonly pendingInteraction: ExecutionInteraction | null; +} + +// --------------------------------------------------------------------------- +// Store surface +// +// Exposed to callers as `executor.executions`. The engine writes on +// every lifecycle edge (create → update → record{Interaction,ToolCall} +// → finish). Read methods back the `/executions` HTTP API and the +// runs UI. +// --------------------------------------------------------------------------- + +export interface ExecutionStoreService { + readonly create: ( + input: CreateExecutionInput, + ) => Effect.Effect; + readonly update: ( + id: ExecutionId, + patch: UpdateExecutionInput, + ) => Effect.Effect; + readonly get: ( + id: ExecutionId, + ) => Effect.Effect; + readonly list: ( + scopeId: ScopeId, + options?: ExecutionListOptions, + ) => Effect.Effect; + readonly recordInteraction: ( + input: CreateExecutionInteractionInput, + ) => Effect.Effect; + readonly resolveInteraction: ( + id: ExecutionInteractionId, + patch: UpdateExecutionInteractionInput, + ) => Effect.Effect; + readonly recordToolCall: ( + input: CreateExecutionToolCallInput, + ) => Effect.Effect; + readonly finishToolCall: ( + id: ExecutionToolCallId, + patch: UpdateExecutionToolCallInput, + ) => Effect.Effect; + readonly listToolCalls: ( + executionId: ExecutionId, + ) => Effect.Effect; + /** Drop execution rows older than the retention window. Host calls + * this on a schedule; the SDK doesn't drive it. */ + readonly sweep: ( + olderThanMs: number, + ) => Effect.Effect; +} + +export class ExecutionStore extends Context.Tag("@executor/sdk/ExecutionStore")< + ExecutionStore, + ExecutionStoreService +>() {} diff --git a/packages/core/sdk/src/executor.ts b/packages/core/sdk/src/executor.ts index 80212c67e..850e83fe7 100644 --- a/packages/core/sdk/src/executor.ts +++ b/packages/core/sdk/src/executor.ts @@ -38,6 +38,8 @@ import { type ElicitationHandler, type ElicitationRequest, } from "./elicitation"; +import { makeExecutionStore } from "./execution-store"; +import type { ExecutionStoreService } from "./executions"; import { ConnectionNotFoundError, ConnectionProviderNotRegisteredError, @@ -228,6 +230,8 @@ export type Executor = { readonly providers: () => Effect.Effect; }; + readonly executions: ExecutionStoreService; + readonly close: () => Effect.Effect; } & PluginExtensions; @@ -596,6 +600,11 @@ export const createExecutor = < const adapter = buildAdapterRouter(scopedRoot); const core = typedAdapter(adapter); + // Execution history — reads + writes against the generic adapter. + // Exposed to callers as `executor.executions`; the engine drives + // lifecycle transitions (create / update / recordToolCall / …). + const executions: ExecutionStoreService = makeExecutionStore({ core }); + // Populated once, never mutated after startup. const staticTools = new Map(); const staticSources = new Map(); @@ -2398,6 +2407,7 @@ export const createExecutor = < Array.from(connectionProviders.keys()) as readonly string[], ), }, + executions, close, }; diff --git a/packages/core/sdk/src/ids.ts b/packages/core/sdk/src/ids.ts index b62f0b395..5a3a108de 100644 --- a/packages/core/sdk/src/ids.ts +++ b/packages/core/sdk/src/ids.ts @@ -14,3 +14,12 @@ export type PolicyId = typeof PolicyId.Type; export const ConnectionId = Schema.String.pipe(Schema.brand("ConnectionId")); export type ConnectionId = typeof ConnectionId.Type; + +export const ExecutionId = Schema.String.pipe(Schema.brand("ExecutionId")); +export type ExecutionId = typeof ExecutionId.Type; + +export const ExecutionInteractionId = Schema.String.pipe(Schema.brand("ExecutionInteractionId")); +export type ExecutionInteractionId = typeof ExecutionInteractionId.Type; + +export const ExecutionToolCallId = Schema.String.pipe(Schema.brand("ExecutionToolCallId")); +export type ExecutionToolCallId = typeof ExecutionToolCallId.Type; diff --git a/packages/core/sdk/src/index.ts b/packages/core/sdk/src/index.ts index d3180a8e6..40f1be9c3 100644 --- a/packages/core/sdk/src/index.ts +++ b/packages/core/sdk/src/index.ts @@ -23,7 +23,16 @@ export { typedAdapter } from "@executor/storage-core"; export { StorageError, UniqueViolationError } from "@executor/storage-core"; // IDs (branded) -export { ScopeId, ToolId, SecretId, PolicyId, ConnectionId } from "./ids"; +export { + ScopeId, + ToolId, + SecretId, + PolicyId, + ConnectionId, + ExecutionId, + ExecutionInteractionId, + ExecutionToolCallId, +} from "./ids"; // Scope export { Scope } from "./scope"; @@ -66,6 +75,9 @@ export { type DefinitionRow, type SecretRow, type ConnectionRow, + type ExecutionRow, + type ExecutionInteractionRow, + type ExecutionToolCallRow, type DefinitionsInput, type ToolAnnotations, } from "./core-schema"; @@ -102,6 +114,41 @@ export { type ElicitationContext, } from "./elicitation"; +// Execution history +export { + ExecutionStatus, + ExecutionInteractionStatus, + ExecutionToolCallStatus, + Execution, + ExecutionInteraction, + ExecutionToolCall, + ExecutionStore, + EXECUTION_STATUS_KEYS, + type ExecutionStoreService, + type CreateExecutionInput, + type UpdateExecutionInput, + type CreateExecutionInteractionInput, + type UpdateExecutionInteractionInput, + type CreateExecutionToolCallInput, + type UpdateExecutionToolCallInput, + type ExecutionListItem, + type ExecutionListOptions, + type ExecutionListResult, + type ExecutionListMeta, + type ExecutionStatusCount, + type ExecutionTriggerCount, + type ExecutionToolFacet, + type ExecutionInteractionCounts, + type ExecutionChartBucket, + type ExecutionTimeRange, + type ExecutionSort, + type ExecutionSortField, + type ExecutionSortDirection, + type ExecutionDetail, +} from "./executions"; +export { makeExecutionStore } from "./execution-store"; +export { encodeCursor, decodeCursor, type CursorPayload } from "./cursor"; + // Blob store export { type BlobStore,