From 65139d6726740bd803833c0154e4fbb6db164a1a Mon Sep 17 00:00:00 2001 From: Saatvik Arya Date: Fri, 24 Apr 2026 19:21:51 +0530 Subject: [PATCH 1/3] feat(sdk): add ExecutionStore backed by DBAdapter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds execution history persistence to the core SDK surface, wiring three new tables (`execution`, `execution_interaction`, `execution_tool_call`) into `coreSchema` and exposing an `ExecutionStore` service on `executor.executions`. Changes: - `core-schema.ts`: three new tables with `scope_id` / `execution_id` / `tool_path` / `trigger_kind` / `created_at` indexes for the runs UI's faceting + timeline queries. - `ids.ts`: branded `ExecutionId`, `ExecutionInteractionId`, `ExecutionToolCallId`. - `executions.ts`: `Execution`, `ExecutionInteraction`, `ExecutionToolCall` Schema classes, status enums, create/update/filter/sort/meta input types, and the `ExecutionStore` Context.Tag. - `execution-store.ts`: `makeExecutionStore(core)` — an adapter-backed `ExecutionStoreService` implementation. Wraps `typedAdapter` for CRUD, handles cursor-based pagination, filter predicates (status, trigger, tool-path glob, time range, code substring, hadElicitation), and builds list meta with facets + chart buckets. - `cursor.ts`: base64url `{ createdAt, id }` pagination cursors. - `executor.ts`: constructs the store once per executor, exposes via `executor.executions`. - `executions.test.ts`: round-trip + lifecycle coverage against the in-memory adapter (no migrations needed). Follow-up work (future PRs in the stack): - wire the engine to record runs + tool calls through this store, - add `/executions` API endpoints, and - land the runs UI. --- packages/core/sdk/src/core-schema.ts | 84 ++++ packages/core/sdk/src/cursor.ts | 44 ++ packages/core/sdk/src/execution-store.ts | 556 +++++++++++++++++++++++ packages/core/sdk/src/executions.test.ts | 260 +++++++++++ packages/core/sdk/src/executions.ts | 292 ++++++++++++ packages/core/sdk/src/executor.ts | 10 + packages/core/sdk/src/ids.ts | 9 + packages/core/sdk/src/index.ts | 49 +- 8 files changed, 1303 insertions(+), 1 deletion(-) create mode 100644 packages/core/sdk/src/cursor.ts create mode 100644 packages/core/sdk/src/execution-store.ts create mode 100644 packages/core/sdk/src/executions.test.ts create mode 100644 packages/core/sdk/src/executions.ts diff --git a/packages/core/sdk/src/core-schema.ts b/packages/core/sdk/src/core-schema.ts index 285eec8e5..774c1f8d3 100644 --- a/packages/core/sdk/src/core-schema.ts +++ b/packages/core/sdk/src/core-schema.ts @@ -148,6 +148,75 @@ export const coreSchema = { updated_at: { type: "date", required: true }, }, }, + // Execution history — one row per `engine.execute()` / + // `engine.executeWithPause()`. Captures the submitted code, final + // status/result, and trigger metadata. Tool calls and interactions + // link back via `execution_id`. + execution: { + fields: { + id: { type: "string", required: true }, + scope_id: { type: "string", required: true, index: true }, + status: { type: "string", required: true, index: true }, + code: { type: "string", required: true }, + result_json: { type: "string", required: false }, + error_text: { type: "string", required: false }, + logs_json: { type: "string", required: false }, + /** Epoch ms — the point the engine accepted the code. */ + started_at: { type: "number", required: false }, + /** Epoch ms — the point the engine reached a terminal status. */ + completed_at: { type: "number", required: false }, + /** Free-form trigger kind attributed by the host — `"cli"`, + * `"http"`, `"mcp"`, etc. Null when the host didn't attribute + * one. Indexed so filter facets scan fast. */ + trigger_kind: { type: "string", required: false, index: true }, + /** Opaque host-owned JSON for per-trigger details. */ + trigger_meta_json: { type: "string", required: false }, + tool_call_count: { type: "number", required: true, defaultValue: 0 }, + created_at: { type: "date", required: true, index: true }, + updated_at: { type: "date", required: true }, + }, + }, + // Per-execution interaction rows — elicitation requests and their + // resolutions. A pending row is the hook the runs UI uses to render + // the "waiting for input" state; once resolved, `response_json` + // captures the user's answer for replay / auditing. + execution_interaction: { + fields: { + id: { type: "string", required: true }, + execution_id: { type: "string", required: true, index: true }, + status: { type: "string", required: true, index: true }, + kind: { type: "string", required: true }, + purpose: { type: "string", required: false }, + payload_json: { type: "string", required: false }, + response_json: { type: "string", required: false }, + /** Stores sensitive per-response data (e.g. raw form values) that + * should not be replayed in the public interaction log. */ + response_private_json: { type: "string", required: false }, + created_at: { type: "date", required: true }, + updated_at: { type: "date", required: true }, + }, + }, + // Per-execution tool-call rows — one per `executor.tools.invoke` that + // ran inside the sandboxed execution. Used to build the tool-call + // timeline shown in the runs UI. + execution_tool_call: { + fields: { + id: { type: "string", required: true }, + execution_id: { type: "string", required: true, index: true }, + status: { type: "string", required: true }, + /** Dotted tool path (e.g. `github.issues.create`). Indexed so the + * facets query in the runs UI resolves without a table scan. */ + tool_path: { type: "string", required: true, index: true }, + /** First path segment, pre-computed for cheap faceting. */ + namespace: { type: "string", required: false, index: true }, + args_json: { type: "string", required: false }, + result_json: { type: "string", required: false }, + error_text: { type: "string", required: false }, + started_at: { type: "number", required: true }, + completed_at: { type: "number", required: false }, + duration_ms: { type: "number", required: false }, + }, + }, } as const satisfies DBSchema; export type CoreSchema = typeof coreSchema; @@ -176,6 +245,21 @@ export type ConnectionRow = InferDBFieldsOutput< > & Record; +export type ExecutionRow = InferDBFieldsOutput< + CoreSchema["execution"]["fields"] +> & + Record; + +export type ExecutionInteractionRow = InferDBFieldsOutput< + CoreSchema["execution_interaction"]["fields"] +> & + Record; + +export type ExecutionToolCallRow = InferDBFieldsOutput< + CoreSchema["execution_tool_call"]["fields"] +> & + Record; + // --------------------------------------------------------------------------- // Tool annotations — default-policy metadata the executor consults // before invocation. Returned by `plugin.resolveAnnotations` (dynamic diff --git a/packages/core/sdk/src/cursor.ts b/packages/core/sdk/src/cursor.ts new file mode 100644 index 000000000..ffa367ee5 --- /dev/null +++ b/packages/core/sdk/src/cursor.ts @@ -0,0 +1,44 @@ +// --------------------------------------------------------------------------- +// Opaque cursor helpers for ExecutionStore.list pagination. +// +// Cursors encode `{ createdAt, id }` — the tuple adapter backends need to +// resume a scan on `ORDER BY created_at DESC, id DESC`. Encoded as +// url-safe base64 so callers can pass them through query params without +// thinking about escaping. +// --------------------------------------------------------------------------- + +export interface CursorPayload { + readonly createdAt: number; + readonly id: string; +} + +const toBase64Url = (value: string): string => { + const bytes = new TextEncoder().encode(value); + let binary = ""; + for (const b of bytes) binary += String.fromCharCode(b); + return btoa(binary).replace(/\+/g, "-").replace(/\//g, "_").replace(/=+$/, ""); +}; + +const fromBase64Url = (value: string): string => { + const pad = value.length % 4 === 0 ? 0 : 4 - (value.length % 4); + const normalized = value.replace(/-/g, "+").replace(/_/g, "/") + "=".repeat(pad); + const binary = atob(normalized); + const bytes = new Uint8Array(binary.length); + for (let i = 0; i < binary.length; i += 1) bytes[i] = binary.charCodeAt(i); + return new TextDecoder().decode(bytes); +}; + +export const encodeCursor = (payload: CursorPayload): string => + toBase64Url(JSON.stringify(payload)); + +export const decodeCursor = (raw: string): CursorPayload | null => { + try { + const parsed = JSON.parse(fromBase64Url(raw)) as Record; + if (typeof parsed.createdAt !== "number" || typeof parsed.id !== "string") { + return null; + } + return { createdAt: parsed.createdAt, id: parsed.id }; + } catch { + return null; + } +}; diff --git a/packages/core/sdk/src/execution-store.ts b/packages/core/sdk/src/execution-store.ts new file mode 100644 index 000000000..f2725ba98 --- /dev/null +++ b/packages/core/sdk/src/execution-store.ts @@ -0,0 +1,556 @@ +// --------------------------------------------------------------------------- +// makeExecutionStore — an ExecutionStoreService implementation backed by +// the generic `typedAdapter` surface. Used by createExecutor +// to expose `executor.executions`. +// +// Per-row JSON columns (`result_json`, `logs_json`, `payload_json`, …) +// are stored as opaque strings — the SDK does not inspect their shape. +// Callers pre-stringify when writing and parse when reading. +// --------------------------------------------------------------------------- + +import { Effect } from "effect"; +import type { StorageFailure, TypedAdapter } from "@executor/storage-core"; + +import type { CoreSchema } from "./core-schema"; + +// Row shape accepted by the row-to-class mappers. We can't rely on +// `RowOutput` narrowing because `TypedAdapter.update` +// takes a `Partial>` payload — Partial strips every +// required field, so TypeScript falls back to a union of every row +// type in the schema. The mappers read fields by name with explicit +// `row.xxx as string` casts anyway, so typing the input as the base +// record that every `RowOutput` extends is just as safe. +type AdapterRow = Record; +import { decodeCursor, encodeCursor } from "./cursor"; +import { + Execution, + ExecutionInteraction, + ExecutionToolCall, + type ExecutionStoreService, + type ExecutionListOptions, + type ExecutionListResult, + type ExecutionListItem, + type ExecutionListMeta, + type ExecutionChartBucket, + type ExecutionStatus, + type ExecutionStatusCount, + type ExecutionTriggerCount, + type ExecutionToolFacet, + EXECUTION_STATUS_KEYS, +} from "./executions"; +import { + ExecutionId, + ExecutionInteractionId, + ExecutionToolCallId, + ScopeId, +} from "./ids"; + +const DEFAULT_LIMIT = 25; +const MAX_LIMIT = 100; + +const toNumberOrNull = (value: unknown): number | null => + value == null ? null : Number(value); + +const toStringOrNull = (value: unknown): string | null => + value == null ? null : (value as string); + +const toDate = (value: unknown): Date => + value instanceof Date ? value : new Date(value as string | number); + +const rowToExecution = (row: AdapterRow): Execution => + new Execution({ + id: ExecutionId.make(row.id as string), + scopeId: ScopeId.make(row.scope_id as string), + status: row.status as ExecutionStatus, + code: row.code as string, + resultJson: toStringOrNull(row.result_json), + errorText: toStringOrNull(row.error_text), + logsJson: toStringOrNull(row.logs_json), + startedAt: toNumberOrNull(row.started_at), + completedAt: toNumberOrNull(row.completed_at), + triggerKind: toStringOrNull(row.trigger_kind), + triggerMetaJson: toStringOrNull(row.trigger_meta_json), + toolCallCount: Number(row.tool_call_count ?? 0), + createdAt: toDate(row.created_at), + updatedAt: toDate(row.updated_at), + }); + +const rowToInteraction = (row: AdapterRow): ExecutionInteraction => + new ExecutionInteraction({ + id: ExecutionInteractionId.make(row.id as string), + executionId: ExecutionId.make(row.execution_id as string), + status: row.status as ExecutionInteraction["status"], + kind: row.kind as string, + purpose: toStringOrNull(row.purpose), + payloadJson: toStringOrNull(row.payload_json), + responseJson: toStringOrNull(row.response_json), + responsePrivateJson: toStringOrNull(row.response_private_json), + createdAt: toDate(row.created_at), + updatedAt: toDate(row.updated_at), + }); + +const rowToToolCall = (row: AdapterRow): ExecutionToolCall => + new ExecutionToolCall({ + id: ExecutionToolCallId.make(row.id as string), + executionId: ExecutionId.make(row.execution_id as string), + status: row.status as ExecutionToolCall["status"], + toolPath: row.tool_path as string, + namespace: toStringOrNull(row.namespace), + argsJson: toStringOrNull(row.args_json), + resultJson: toStringOrNull(row.result_json), + errorText: toStringOrNull(row.error_text), + startedAt: Number(row.started_at), + completedAt: toNumberOrNull(row.completed_at), + durationMs: toNumberOrNull(row.duration_ms), + }); + +const pickChartBucketMs = (windowMs: number): number => { + if (windowMs <= 15 * 60_000) return 30_000; // <=15m → 30s + if (windowMs <= 60 * 60_000) return 120_000; // <=1h → 2m + if (windowMs <= 24 * 60 * 60_000) return 30 * 60_000; // <=24h → 30m + if (windowMs <= 7 * 24 * 60 * 60_000) return 3 * 60 * 60_000; // <=7d → 3h + return 24 * 60 * 60_000; // else → 1d +}; + +const matchesToolGlob = (toolPath: string, pattern: string): boolean => { + if (pattern === toolPath) return true; + if (pattern.endsWith(".*")) { + const prefix = pattern.slice(0, -2); + return toolPath === prefix || toolPath.startsWith(`${prefix}.`); + } + return false; +}; + +export interface MakeExecutionStoreOptions { + readonly core: TypedAdapter; + readonly now?: () => Date; +} + +export const makeExecutionStore = ({ + core, + now = () => new Date(), +}: MakeExecutionStoreOptions): ExecutionStoreService => { + const create: ExecutionStoreService["create"] = (input) => + Effect.gen(function* () { + const timestamp = now(); + const row = yield* core.create({ + model: "execution", + forceAllowId: true, + data: { + id: input.id, + scope_id: input.scopeId, + status: input.status, + code: input.code, + result_json: null, + error_text: null, + logs_json: null, + started_at: input.startedAt ?? timestamp.getTime(), + completed_at: null, + trigger_kind: input.triggerKind ?? null, + trigger_meta_json: input.triggerMetaJson ?? null, + tool_call_count: 0, + created_at: timestamp, + updated_at: timestamp, + }, + }); + return rowToExecution(row); + }); + + const update: ExecutionStoreService["update"] = (id, patch) => + Effect.gen(function* () { + const row = yield* core.update({ + model: "execution", + where: [{ field: "id", value: id as string }], + update: { + updated_at: now(), + ...(patch.status !== undefined && { status: patch.status }), + ...(patch.resultJson !== undefined && { result_json: patch.resultJson }), + ...(patch.errorText !== undefined && { error_text: patch.errorText }), + ...(patch.logsJson !== undefined && { logs_json: patch.logsJson }), + ...(patch.completedAt !== undefined && { completed_at: patch.completedAt }), + ...(patch.toolCallCount !== undefined && { + tool_call_count: patch.toolCallCount, + }), + }, + }); + if (!row) return yield* Effect.die(`Execution ${id} vanished during update`); + return rowToExecution(row); + }); + + const get: ExecutionStoreService["get"] = (id) => + Effect.gen(function* () { + const rows = yield* core.findMany({ + model: "execution", + where: [{ field: "id", value: id as string }], + limit: 1, + }); + const execution = rows[0]; + if (!execution) return null; + const interactions = yield* core.findMany({ + model: "execution_interaction", + where: [ + { field: "execution_id", value: id as string }, + { field: "status", value: "pending" }, + ], + sortBy: { field: "created_at", direction: "desc" }, + limit: 1, + }); + const pendingInteraction = interactions[0] + ? rowToInteraction(interactions[0]) + : null; + return { + execution: rowToExecution(execution), + pendingInteraction, + }; + }); + + const recordInteraction: ExecutionStoreService["recordInteraction"] = (input) => + Effect.gen(function* () { + const timestamp = now(); + const row = yield* core.create({ + model: "execution_interaction", + forceAllowId: true, + data: { + id: input.id, + execution_id: input.executionId, + status: input.status, + kind: input.kind, + purpose: input.purpose ?? null, + payload_json: input.payloadJson ?? null, + response_json: null, + response_private_json: null, + created_at: timestamp, + updated_at: timestamp, + }, + }); + return rowToInteraction(row); + }); + + const resolveInteraction: ExecutionStoreService["resolveInteraction"] = (id, patch) => + Effect.gen(function* () { + const row = yield* core.update({ + model: "execution_interaction", + where: [{ field: "id", value: id as string }], + update: { + updated_at: now(), + ...(patch.status !== undefined && { status: patch.status }), + ...(patch.responseJson !== undefined && { response_json: patch.responseJson }), + ...(patch.responsePrivateJson !== undefined && { + response_private_json: patch.responsePrivateJson, + }), + }, + }); + if (!row) + return yield* Effect.die(`Interaction ${id} vanished during update`); + return rowToInteraction(row); + }); + + const recordToolCall: ExecutionStoreService["recordToolCall"] = (input) => + Effect.gen(function* () { + const row = yield* core.create({ + model: "execution_tool_call", + forceAllowId: true, + data: { + id: input.id, + execution_id: input.executionId, + status: "running", + tool_path: input.toolPath, + namespace: input.namespace ?? input.toolPath.split(".")[0] ?? null, + args_json: input.argsJson ?? null, + result_json: null, + error_text: null, + started_at: input.startedAt, + completed_at: null, + duration_ms: null, + }, + }); + return rowToToolCall(row); + }); + + const finishToolCall: ExecutionStoreService["finishToolCall"] = (id, patch) => + Effect.gen(function* () { + const row = yield* core.update({ + model: "execution_tool_call", + where: [{ field: "id", value: id as string }], + update: { + status: patch.status, + result_json: patch.resultJson ?? null, + error_text: patch.errorText ?? null, + completed_at: patch.completedAt, + duration_ms: patch.durationMs, + }, + }); + if (!row) return yield* Effect.die(`Tool call ${id} vanished during finish`); + return rowToToolCall(row); + }); + + const listToolCalls: ExecutionStoreService["listToolCalls"] = (executionId) => + Effect.gen(function* () { + const rows = yield* core.findMany({ + model: "execution_tool_call", + where: [{ field: "execution_id", value: executionId as string }], + sortBy: { field: "started_at", direction: "asc" }, + }); + return rows.map(rowToToolCall); + }); + + const sweep: ExecutionStoreService["sweep"] = (olderThanMs) => + Effect.gen(function* () { + const cutoff = new Date(now().getTime() - olderThanMs); + // Adapter deleteMany returns void in the generic contract — we do + // a pre-count scan so the caller gets a useful number back. + const doomed = yield* core.findMany({ + model: "execution", + where: [{ field: "created_at", operator: "lt", value: cutoff }], + limit: 10_000, + }); + if (doomed.length === 0) return 0; + yield* core.deleteMany({ + model: "execution", + where: [{ field: "created_at", operator: "lt", value: cutoff }], + }); + return doomed.length; + }); + + const list: ExecutionStoreService["list"] = (scopeId, rawOptions) => + Effect.gen(function* () { + const options: ExecutionListOptions = rawOptions ?? {}; + const limit = Math.max(1, Math.min(options.limit ?? DEFAULT_LIMIT, MAX_LIMIT)); + const sort = options.sort ?? { field: "createdAt", direction: "desc" as const }; + + // Pull candidate rows for this scope. We apply filters in-memory — + // the DBAdapter contract's Where clauses don't cover compound + // CSV-of-values or glob patterns, so the store does the final + // narrowing after fetching. + const rows = yield* core.findMany({ + model: "execution", + where: [{ field: "scope_id", value: scopeId as string }], + sortBy: { + field: sort.field === "durationMs" ? "completed_at" : "created_at", + direction: sort.direction, + }, + }); + + // Pre-compute tool-call aggregations that filters and meta both need. + const toolCallRows = yield* core.findMany({ + model: "execution_tool_call", + }); + + const toolCallsByExecution = new Map(); + for (const tc of toolCallRows) { + const list = toolCallsByExecution.get(tc.execution_id as string) ?? []; + list.push(tc); + toolCallsByExecution.set(tc.execution_id as string, list); + } + + // Pre-compute interaction presence per execution (for the + // hadElicitation filter + meta interactionCounts). + const interactionRows = yield* core.findMany({ + model: "execution_interaction", + }); + const executionsWithInteractions = new Set( + interactionRows.map((r) => r.execution_id as string), + ); + + const appliesStatusFilter = (row: AdapterRow): boolean => + !options.statusFilter || options.statusFilter.length === 0 + ? true + : options.statusFilter.includes(row.status as ExecutionStatus); + + const appliesTriggerFilter = (row: AdapterRow): boolean => { + if (!options.triggerFilter || options.triggerFilter.length === 0) return true; + const kind = (row.trigger_kind as string | null | undefined) ?? null; + return options.triggerFilter.some((want) => + want === "unknown" ? kind === null : want === kind, + ); + }; + + const appliesToolFilter = (row: AdapterRow): boolean => { + if (!options.toolPathFilter || options.toolPathFilter.length === 0) return true; + const calls = toolCallsByExecution.get(row.id as string) ?? []; + return options.toolPathFilter.some((pattern) => + calls.some((c) => matchesToolGlob(c.tool_path as string, pattern)), + ); + }; + + const appliesTimeFilter = (row: AdapterRow): boolean => { + const createdAt = toDate(row.created_at).getTime(); + if (options.timeRange?.from !== undefined && createdAt < options.timeRange.from) { + return false; + } + if (options.timeRange?.to !== undefined && createdAt > options.timeRange.to) { + return false; + } + if (options.after !== undefined) { + const afterMs = Number(options.after); + if (!Number.isNaN(afterMs) && createdAt <= afterMs) return false; + } + return true; + }; + + const appliesCodeQuery = (row: AdapterRow): boolean => + !options.codeQuery + ? true + : (row.code as string).toLowerCase().includes(options.codeQuery.toLowerCase()); + + const appliesElicitationFilter = (row: AdapterRow): boolean => { + if (options.hadElicitation === undefined) return true; + const has = executionsWithInteractions.has(row.id as string); + return options.hadElicitation ? has : !has; + }; + + const filtered = rows.filter( + (row) => + appliesStatusFilter(row) && + appliesTriggerFilter(row) && + appliesToolFilter(row) && + appliesTimeFilter(row) && + appliesCodeQuery(row) && + appliesElicitationFilter(row), + ); + + // Cursor applies after filtering so it tracks the filtered scan. + const cursor = options.cursor ? decodeCursor(options.cursor) : null; + const afterCursor = cursor + ? filtered.filter((row) => { + const createdAt = toDate(row.created_at).getTime(); + if (createdAt < cursor.createdAt) return true; + if (createdAt > cursor.createdAt) return false; + return (row.id as string) < cursor.id; + }) + : filtered; + + const page = afterCursor.slice(0, limit); + const nextCursor = + afterCursor.length > limit && sort.field === "createdAt" + ? encodeCursor({ + createdAt: toDate(page[page.length - 1]!.created_at).getTime(), + id: page[page.length - 1]!.id as string, + }) + : undefined; + + const pageIds = page.map((r) => r.id as string); + const pendingByExecution = new Map(); + for (const interaction of interactionRows) { + if (interaction.status !== "pending") continue; + if (!pageIds.includes(interaction.execution_id as string)) continue; + const existing = pendingByExecution.get(interaction.execution_id as string); + if (!existing || toDate(interaction.created_at).getTime() > + toDate(existing.created_at).getTime()) { + pendingByExecution.set(interaction.execution_id as string, interaction); + } + } + + const executions: readonly ExecutionListItem[] = page.map((row) => ({ + execution: rowToExecution(row), + pendingInteraction: pendingByExecution.has(row.id as string) + ? rowToInteraction(pendingByExecution.get(row.id as string)!) + : null, + })); + + const meta: ExecutionListMeta | undefined = options.includeMeta + ? buildMeta(rows, filtered, toolCallsByExecution, executionsWithInteractions) + : undefined; + + return { + executions, + ...(nextCursor ? { nextCursor } : {}), + ...(meta ? { meta } : {}), + } satisfies ExecutionListResult; + }); + + const buildMeta = ( + all: readonly AdapterRow[], + filtered: readonly AdapterRow[], + toolCallsByExecution: Map, + executionsWithInteractions: Set, + ): ExecutionListMeta => { + const statusCounts: ExecutionStatusCount[] = EXECUTION_STATUS_KEYS.map((status) => ({ + status, + count: filtered.filter((r) => r.status === status).length, + })); + + const triggerMap = new Map(); + for (const row of filtered) { + const kind = (row.trigger_kind as string | null | undefined) ?? null; + triggerMap.set(kind, (triggerMap.get(kind) ?? 0) + 1); + } + const triggerCounts: ExecutionTriggerCount[] = Array.from(triggerMap.entries()).map( + ([triggerKind, count]) => ({ triggerKind, count }), + ); + + const toolCountMap = new Map(); + for (const row of filtered) { + for (const tc of toolCallsByExecution.get(row.id as string) ?? []) { + const path = tc.tool_path as string; + toolCountMap.set(path, (toolCountMap.get(path) ?? 0) + 1); + } + } + const toolFacets: ExecutionToolFacet[] = Array.from(toolCountMap.entries()) + .map(([toolPath, count]) => ({ toolPath, count })) + .sort((a, b) => b.count - a.count) + .slice(0, 20); + + const withElicitation = filtered.filter((r) => + executionsWithInteractions.has(r.id as string), + ).length; + + const times = filtered.map((r) => toDate(r.created_at).getTime()); + const minTs = times.length > 0 ? Math.min(...times) : now().getTime(); + const maxTs = times.length > 0 ? Math.max(...times) : now().getTime(); + const windowMs = Math.max(1, maxTs - minTs); + const chartBucketMs = pickChartBucketMs(windowMs); + + const bucketMap = new Map>(); + for (const row of filtered) { + const bucketStart = + Math.floor(toDate(row.created_at).getTime() / chartBucketMs) * chartBucketMs; + const counts = bucketMap.get(bucketStart) ?? { + pending: 0, + running: 0, + waiting_for_interaction: 0, + completed: 0, + failed: 0, + cancelled: 0, + }; + counts[row.status as ExecutionStatus] += 1; + bucketMap.set(bucketStart, counts); + } + const chartData: ExecutionChartBucket[] = Array.from(bucketMap.entries()) + .map(([bucketStart, counts]) => ({ bucketStart, counts })) + .sort((a, b) => a.bucketStart - b.bucketStart); + + return { + totalRowCount: all.length, + filterRowCount: filtered.length, + statusCounts, + triggerCounts, + toolFacets, + interactionCounts: { + withElicitation, + withoutElicitation: filtered.length - withElicitation, + }, + chartBucketMs, + chartData, + }; + }; + + return { + create, + update, + get, + list, + recordInteraction, + resolveInteraction, + recordToolCall, + finishToolCall, + listToolCalls, + sweep, + } satisfies ExecutionStoreService; +}; + +// Re-export the Tag symbol here so callers can `import { ExecutionStore } +// from "@executor/sdk"` and get both the Tag and a layer factory from +// one module entry. +export { ExecutionStore } from "./executions"; +export type { StorageFailure }; diff --git a/packages/core/sdk/src/executions.test.ts b/packages/core/sdk/src/executions.test.ts new file mode 100644 index 000000000..06a289da4 --- /dev/null +++ b/packages/core/sdk/src/executions.test.ts @@ -0,0 +1,260 @@ +import { describe, expect, it } from "@effect/vitest"; +import { Effect } from "effect"; + +import { makeMemoryAdapter } from "@executor/storage-core/testing/memory"; + +import { collectSchemas, createExecutor } from "./executor"; +import { + EXECUTION_STATUS_KEYS, + type ExecutionStatus, +} from "./executions"; +import { + ExecutionId, + ExecutionInteractionId, + ExecutionToolCallId, + ScopeId, +} from "./ids"; +import { Scope } from "./scope"; +import { makeInMemoryBlobStore } from "./blob"; + +// --------------------------------------------------------------------------- +// Shared fixture. Every test builds a scoped executor backed by the +// in-memory adapter — zero persistence, zero migration dance. +// --------------------------------------------------------------------------- + +const SCOPE = ScopeId.make("scope-test"); + +const makeExecutor = () => + Effect.gen(function* () { + const schema = collectSchemas([]); + const adapter = makeMemoryAdapter({ schema }); + const scope = new Scope({ id: SCOPE, name: "test", createdAt: new Date() }); + const executor = yield* createExecutor({ + scopes: [scope], + adapter, + blobs: makeInMemoryBlobStore(), + }); + return executor; + }); + +describe("ExecutionStore (DBAdapter-backed)", () => { + it.effect("create → get round-trip preserves fields", () => + Effect.gen(function* () { + const executor = yield* makeExecutor(); + const id = ExecutionId.make("exec-1"); + + yield* executor.executions.create({ + id, + scopeId: SCOPE, + status: "running", + code: "const x = 1", + triggerKind: "cli", + }); + + const detail = yield* executor.executions.get(id); + expect(detail).not.toBeNull(); + expect(detail!.execution.id).toBe(id); + expect(detail!.execution.status).toBe("running"); + expect(detail!.execution.code).toBe("const x = 1"); + expect(detail!.execution.triggerKind).toBe("cli"); + expect(detail!.execution.toolCallCount).toBe(0); + expect(detail!.pendingInteraction).toBeNull(); + }), + ); + + it.effect("update patches status, result, and completion time", () => + Effect.gen(function* () { + const executor = yield* makeExecutor(); + const id = ExecutionId.make("exec-2"); + + yield* executor.executions.create({ + id, + scopeId: SCOPE, + status: "running", + code: "2 + 2", + }); + + yield* executor.executions.update(id, { + status: "completed", + resultJson: JSON.stringify({ value: 4 }), + completedAt: 1_700_000_000_000, + toolCallCount: 1, + }); + + const detail = yield* executor.executions.get(id); + expect(detail!.execution.status).toBe("completed"); + expect(detail!.execution.resultJson).toBe('{"value":4}'); + expect(detail!.execution.completedAt).toBe(1_700_000_000_000); + expect(detail!.execution.toolCallCount).toBe(1); + }), + ); + + it.effect("tool-call recording + finish updates status + duration", () => + Effect.gen(function* () { + const executor = yield* makeExecutor(); + const executionId = ExecutionId.make("exec-3"); + const toolCallId = ExecutionToolCallId.make("tc-1"); + + yield* executor.executions.create({ + id: executionId, + scopeId: SCOPE, + status: "running", + code: "await tools.a()", + }); + + yield* executor.executions.recordToolCall({ + id: toolCallId, + executionId, + toolPath: "ns.doThing", + startedAt: 1_700_000_000_000, + }); + + yield* executor.executions.finishToolCall(toolCallId, { + status: "completed", + resultJson: '{"ok":true}', + completedAt: 1_700_000_000_250, + durationMs: 250, + }); + + const calls = yield* executor.executions.listToolCalls(executionId); + expect(calls).toHaveLength(1); + expect(calls[0]!.status).toBe("completed"); + expect(calls[0]!.toolPath).toBe("ns.doThing"); + expect(calls[0]!.namespace).toBe("ns"); + expect(calls[0]!.durationMs).toBe(250); + expect(calls[0]!.resultJson).toBe('{"ok":true}'); + }), + ); + + it.effect("interaction lifecycle: record pending → resolve", () => + Effect.gen(function* () { + const executor = yield* makeExecutor(); + const executionId = ExecutionId.make("exec-4"); + const interactionId = ExecutionInteractionId.make("int-1"); + + yield* executor.executions.create({ + id: executionId, + scopeId: SCOPE, + status: "waiting_for_interaction", + code: "await elicit(...)", + }); + + yield* executor.executions.recordInteraction({ + id: interactionId, + executionId, + status: "pending", + kind: "FormElicitation", + payloadJson: '{"message":"ok?"}', + }); + + // get() should surface the pending interaction alongside the row. + const beforeResolve = yield* executor.executions.get(executionId); + expect(beforeResolve!.pendingInteraction).not.toBeNull(); + expect(beforeResolve!.pendingInteraction!.id).toBe(interactionId); + + yield* executor.executions.resolveInteraction(interactionId, { + status: "resolved", + responseJson: '{"action":"accept"}', + }); + + const afterResolve = yield* executor.executions.get(executionId); + expect(afterResolve!.pendingInteraction).toBeNull(); + }), + ); + + it.effect("list applies status + trigger filters", () => + Effect.gen(function* () { + const executor = yield* makeExecutor(); + + yield* executor.executions.create({ + id: ExecutionId.make("e-a"), + scopeId: SCOPE, + status: "completed", + code: "a", + triggerKind: "cli", + }); + yield* executor.executions.create({ + id: ExecutionId.make("e-b"), + scopeId: SCOPE, + status: "failed", + code: "b", + triggerKind: "http", + }); + yield* executor.executions.create({ + id: ExecutionId.make("e-c"), + scopeId: SCOPE, + status: "completed", + code: "c", + triggerKind: "mcp", + }); + + const completedOnly = yield* executor.executions.list(SCOPE, { + statusFilter: ["completed"], + }); + expect(completedOnly.executions).toHaveLength(2); + + const httpOnly = yield* executor.executions.list(SCOPE, { + triggerFilter: ["http"], + }); + expect(httpOnly.executions).toHaveLength(1); + expect(httpOnly.executions[0]!.execution.id).toBe(ExecutionId.make("e-b")); + }), + ); + + it.effect("list meta reports status + trigger counts", () => + Effect.gen(function* () { + const executor = yield* makeExecutor(); + yield* executor.executions.create({ + id: ExecutionId.make("m-1"), + scopeId: SCOPE, + status: "completed", + code: "x", + triggerKind: "cli", + }); + yield* executor.executions.create({ + id: ExecutionId.make("m-2"), + scopeId: SCOPE, + status: "completed", + code: "y", + triggerKind: "cli", + }); + yield* executor.executions.create({ + id: ExecutionId.make("m-3"), + scopeId: SCOPE, + status: "failed", + code: "z", + triggerKind: "mcp", + }); + + const res = yield* executor.executions.list(SCOPE, { includeMeta: true }); + expect(res.meta).toBeDefined(); + expect(res.meta!.totalRowCount).toBe(3); + + const completedCount = res.meta!.statusCounts.find( + (c) => c.status === ("completed" as ExecutionStatus), + ); + expect(completedCount!.count).toBe(2); + expect(res.meta!.triggerCounts.find((t) => t.triggerKind === "cli")!.count).toBe( + 2, + ); + expect(res.meta!.triggerCounts.find((t) => t.triggerKind === "mcp")!.count).toBe( + 1, + ); + }), + ); + + it.effect("EXECUTION_STATUS_KEYS covers every status literal", () => + Effect.sync(() => { + expect(new Set(EXECUTION_STATUS_KEYS)).toEqual( + new Set([ + "pending", + "running", + "waiting_for_interaction", + "completed", + "failed", + "cancelled", + ]), + ); + }), + ); +}); diff --git a/packages/core/sdk/src/executions.ts b/packages/core/sdk/src/executions.ts new file mode 100644 index 000000000..d1b6fee52 --- /dev/null +++ b/packages/core/sdk/src/executions.ts @@ -0,0 +1,292 @@ +// --------------------------------------------------------------------------- +// ExecutionStore — records one run per `engine.execute()` / +// `executeWithPause()`. Wraps the generic `DBAdapter` core tables +// (`execution`, `execution_interaction`, `execution_tool_call`) so +// every storage backend that implements the adapter contract gets +// execution history for free. +// +// The store itself is plain Effect code; the adapter is threaded in +// by `createExecutor` and exposed to callers as `executor.executions`. +// --------------------------------------------------------------------------- + +import { Context, Effect, Schema } from "effect"; +import type { StorageFailure } from "@executor/storage-core"; + +import { ExecutionId, ExecutionInteractionId, ExecutionToolCallId, ScopeId } from "./ids"; + +// --------------------------------------------------------------------------- +// Status enums +// --------------------------------------------------------------------------- + +export const ExecutionStatus = Schema.Literal( + "pending", + "running", + "waiting_for_interaction", + "completed", + "failed", + "cancelled", +); +export type ExecutionStatus = typeof ExecutionStatus.Type; + +export const EXECUTION_STATUS_KEYS = [ + "pending", + "running", + "waiting_for_interaction", + "completed", + "failed", + "cancelled", +] as const; + +export const ExecutionInteractionStatus = Schema.Literal( + "pending", + "resolved", + "cancelled", +); +export type ExecutionInteractionStatus = typeof ExecutionInteractionStatus.Type; + +export const ExecutionToolCallStatus = Schema.Literal( + "running", + "completed", + "failed", +); +export type ExecutionToolCallStatus = typeof ExecutionToolCallStatus.Type; + +// --------------------------------------------------------------------------- +// Row projections +// --------------------------------------------------------------------------- + +export class Execution extends Schema.Class("Execution")({ + id: ExecutionId, + scopeId: ScopeId, + status: ExecutionStatus, + code: Schema.String, + resultJson: Schema.NullOr(Schema.String), + errorText: Schema.NullOr(Schema.String), + logsJson: Schema.NullOr(Schema.String), + startedAt: Schema.NullOr(Schema.Number), + completedAt: Schema.NullOr(Schema.Number), + triggerKind: Schema.NullOr(Schema.String), + triggerMetaJson: Schema.NullOr(Schema.String), + toolCallCount: Schema.Number, + createdAt: Schema.DateFromNumber, + updatedAt: Schema.DateFromNumber, +}) {} + +export class ExecutionInteraction extends Schema.Class( + "ExecutionInteraction", +)({ + id: ExecutionInteractionId, + executionId: ExecutionId, + status: ExecutionInteractionStatus, + kind: Schema.String, + purpose: Schema.NullOr(Schema.String), + payloadJson: Schema.NullOr(Schema.String), + responseJson: Schema.NullOr(Schema.String), + responsePrivateJson: Schema.NullOr(Schema.String), + createdAt: Schema.DateFromNumber, + updatedAt: Schema.DateFromNumber, +}) {} + +export class ExecutionToolCall extends Schema.Class("ExecutionToolCall")({ + id: ExecutionToolCallId, + executionId: ExecutionId, + status: ExecutionToolCallStatus, + toolPath: Schema.String, + namespace: Schema.NullOr(Schema.String), + argsJson: Schema.NullOr(Schema.String), + resultJson: Schema.NullOr(Schema.String), + errorText: Schema.NullOr(Schema.String), + startedAt: Schema.Number, + completedAt: Schema.NullOr(Schema.Number), + durationMs: Schema.NullOr(Schema.Number), +}) {} + +// --------------------------------------------------------------------------- +// Input types +// --------------------------------------------------------------------------- + +export interface CreateExecutionInput { + readonly id: ExecutionId; + readonly scopeId: ScopeId; + readonly status: ExecutionStatus; + readonly code: string; + readonly startedAt?: number; + readonly triggerKind?: string; + readonly triggerMetaJson?: string; +} + +export interface UpdateExecutionInput { + readonly status?: ExecutionStatus; + readonly resultJson?: string | null; + readonly errorText?: string | null; + readonly logsJson?: string | null; + readonly completedAt?: number; + readonly toolCallCount?: number; +} + +export interface CreateExecutionInteractionInput { + readonly id: ExecutionInteractionId; + readonly executionId: ExecutionId; + readonly status: ExecutionInteractionStatus; + readonly kind: string; + readonly purpose?: string; + readonly payloadJson?: string; +} + +export interface UpdateExecutionInteractionInput { + readonly status?: ExecutionInteractionStatus; + readonly responseJson?: string | null; + readonly responsePrivateJson?: string | null; +} + +export interface CreateExecutionToolCallInput { + readonly id: ExecutionToolCallId; + readonly executionId: ExecutionId; + readonly toolPath: string; + readonly namespace?: string; + readonly argsJson?: string; + readonly startedAt: number; +} + +export interface UpdateExecutionToolCallInput { + readonly status: ExecutionToolCallStatus; + readonly resultJson?: string | null; + readonly errorText?: string | null; + readonly completedAt: number; + readonly durationMs: number; +} + +// --------------------------------------------------------------------------- +// Filters + sort +// --------------------------------------------------------------------------- + +export type ExecutionSortField = "createdAt" | "durationMs"; +export type ExecutionSortDirection = "asc" | "desc"; +export interface ExecutionSort { + readonly field: ExecutionSortField; + readonly direction: ExecutionSortDirection; +} + +export interface ExecutionTimeRange { + readonly from?: number; + readonly to?: number; +} + +export interface ExecutionListOptions { + readonly limit?: number; + readonly cursor?: string; + readonly statusFilter?: readonly ExecutionStatus[]; + readonly triggerFilter?: readonly string[]; + readonly toolPathFilter?: readonly string[]; + readonly timeRange?: ExecutionTimeRange; + readonly after?: string; + readonly codeQuery?: string; + readonly hadElicitation?: boolean; + readonly sort?: ExecutionSort; + readonly includeMeta?: boolean; +} + +export interface ExecutionListItem { + readonly execution: Execution; + readonly pendingInteraction: ExecutionInteraction | null; +} + +export interface ExecutionStatusCount { + readonly status: ExecutionStatus; + readonly count: number; +} + +export interface ExecutionTriggerCount { + readonly triggerKind: string | null; + readonly count: number; +} + +export interface ExecutionToolFacet { + readonly toolPath: string; + readonly count: number; +} + +export interface ExecutionChartBucket { + readonly bucketStart: number; + readonly counts: Readonly>; +} + +export interface ExecutionInteractionCounts { + readonly withElicitation: number; + readonly withoutElicitation: number; +} + +export interface ExecutionListMeta { + readonly totalRowCount: number; + readonly filterRowCount: number; + readonly statusCounts: readonly ExecutionStatusCount[]; + readonly triggerCounts: readonly ExecutionTriggerCount[]; + readonly toolFacets: readonly ExecutionToolFacet[]; + readonly interactionCounts: ExecutionInteractionCounts; + readonly chartBucketMs: number; + readonly chartData: readonly ExecutionChartBucket[]; +} + +export interface ExecutionListResult { + readonly executions: readonly ExecutionListItem[]; + readonly nextCursor?: string; + readonly meta?: ExecutionListMeta; +} + +export interface ExecutionDetail { + readonly execution: Execution; + readonly pendingInteraction: ExecutionInteraction | null; +} + +// --------------------------------------------------------------------------- +// Store surface +// +// Exposed to callers as `executor.executions`. The engine writes on +// every lifecycle edge (create → update → record{Interaction,ToolCall} +// → finish). Read methods back the `/executions` HTTP API and the +// runs UI. +// --------------------------------------------------------------------------- + +export interface ExecutionStoreService { + readonly create: ( + input: CreateExecutionInput, + ) => Effect.Effect; + readonly update: ( + id: ExecutionId, + patch: UpdateExecutionInput, + ) => Effect.Effect; + readonly get: ( + id: ExecutionId, + ) => Effect.Effect; + readonly list: ( + scopeId: ScopeId, + options?: ExecutionListOptions, + ) => Effect.Effect; + readonly recordInteraction: ( + input: CreateExecutionInteractionInput, + ) => Effect.Effect; + readonly resolveInteraction: ( + id: ExecutionInteractionId, + patch: UpdateExecutionInteractionInput, + ) => Effect.Effect; + readonly recordToolCall: ( + input: CreateExecutionToolCallInput, + ) => Effect.Effect; + readonly finishToolCall: ( + id: ExecutionToolCallId, + patch: UpdateExecutionToolCallInput, + ) => Effect.Effect; + readonly listToolCalls: ( + executionId: ExecutionId, + ) => Effect.Effect; + /** Drop execution rows older than the retention window. Host calls + * this on a schedule; the SDK doesn't drive it. */ + readonly sweep: ( + olderThanMs: number, + ) => Effect.Effect; +} + +export class ExecutionStore extends Context.Tag("@executor/sdk/ExecutionStore")< + ExecutionStore, + ExecutionStoreService +>() {} diff --git a/packages/core/sdk/src/executor.ts b/packages/core/sdk/src/executor.ts index 80212c67e..850e83fe7 100644 --- a/packages/core/sdk/src/executor.ts +++ b/packages/core/sdk/src/executor.ts @@ -38,6 +38,8 @@ import { type ElicitationHandler, type ElicitationRequest, } from "./elicitation"; +import { makeExecutionStore } from "./execution-store"; +import type { ExecutionStoreService } from "./executions"; import { ConnectionNotFoundError, ConnectionProviderNotRegisteredError, @@ -228,6 +230,8 @@ export type Executor = { readonly providers: () => Effect.Effect; }; + readonly executions: ExecutionStoreService; + readonly close: () => Effect.Effect; } & PluginExtensions; @@ -596,6 +600,11 @@ export const createExecutor = < const adapter = buildAdapterRouter(scopedRoot); const core = typedAdapter(adapter); + // Execution history — reads + writes against the generic adapter. + // Exposed to callers as `executor.executions`; the engine drives + // lifecycle transitions (create / update / recordToolCall / …). + const executions: ExecutionStoreService = makeExecutionStore({ core }); + // Populated once, never mutated after startup. const staticTools = new Map(); const staticSources = new Map(); @@ -2398,6 +2407,7 @@ export const createExecutor = < Array.from(connectionProviders.keys()) as readonly string[], ), }, + executions, close, }; diff --git a/packages/core/sdk/src/ids.ts b/packages/core/sdk/src/ids.ts index b62f0b395..5a3a108de 100644 --- a/packages/core/sdk/src/ids.ts +++ b/packages/core/sdk/src/ids.ts @@ -14,3 +14,12 @@ export type PolicyId = typeof PolicyId.Type; export const ConnectionId = Schema.String.pipe(Schema.brand("ConnectionId")); export type ConnectionId = typeof ConnectionId.Type; + +export const ExecutionId = Schema.String.pipe(Schema.brand("ExecutionId")); +export type ExecutionId = typeof ExecutionId.Type; + +export const ExecutionInteractionId = Schema.String.pipe(Schema.brand("ExecutionInteractionId")); +export type ExecutionInteractionId = typeof ExecutionInteractionId.Type; + +export const ExecutionToolCallId = Schema.String.pipe(Schema.brand("ExecutionToolCallId")); +export type ExecutionToolCallId = typeof ExecutionToolCallId.Type; diff --git a/packages/core/sdk/src/index.ts b/packages/core/sdk/src/index.ts index d3180a8e6..40f1be9c3 100644 --- a/packages/core/sdk/src/index.ts +++ b/packages/core/sdk/src/index.ts @@ -23,7 +23,16 @@ export { typedAdapter } from "@executor/storage-core"; export { StorageError, UniqueViolationError } from "@executor/storage-core"; // IDs (branded) -export { ScopeId, ToolId, SecretId, PolicyId, ConnectionId } from "./ids"; +export { + ScopeId, + ToolId, + SecretId, + PolicyId, + ConnectionId, + ExecutionId, + ExecutionInteractionId, + ExecutionToolCallId, +} from "./ids"; // Scope export { Scope } from "./scope"; @@ -66,6 +75,9 @@ export { type DefinitionRow, type SecretRow, type ConnectionRow, + type ExecutionRow, + type ExecutionInteractionRow, + type ExecutionToolCallRow, type DefinitionsInput, type ToolAnnotations, } from "./core-schema"; @@ -102,6 +114,41 @@ export { type ElicitationContext, } from "./elicitation"; +// Execution history +export { + ExecutionStatus, + ExecutionInteractionStatus, + ExecutionToolCallStatus, + Execution, + ExecutionInteraction, + ExecutionToolCall, + ExecutionStore, + EXECUTION_STATUS_KEYS, + type ExecutionStoreService, + type CreateExecutionInput, + type UpdateExecutionInput, + type CreateExecutionInteractionInput, + type UpdateExecutionInteractionInput, + type CreateExecutionToolCallInput, + type UpdateExecutionToolCallInput, + type ExecutionListItem, + type ExecutionListOptions, + type ExecutionListResult, + type ExecutionListMeta, + type ExecutionStatusCount, + type ExecutionTriggerCount, + type ExecutionToolFacet, + type ExecutionInteractionCounts, + type ExecutionChartBucket, + type ExecutionTimeRange, + type ExecutionSort, + type ExecutionSortField, + type ExecutionSortDirection, + type ExecutionDetail, +} from "./executions"; +export { makeExecutionStore } from "./execution-store"; +export { encodeCursor, decodeCursor, type CursorPayload } from "./cursor"; + // Blob store export { type BlobStore, From 3bc77600dfa862d8469fd5d0b466727c711479ac Mon Sep 17 00:00:00 2001 From: Saatvik Arya Date: Fri, 24 Apr 2026 20:44:07 +0530 Subject: [PATCH 2/3] feat(execution): persist engine runs + tool calls via ExecutionStore MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wires `executor.executions` into the Effect-native engine so every `execute()` / `executeWithPause()` / `resume()` call writes an `execution` row and its associated tool-call + interaction rows to whichever `DBAdapter` backs the SDK. Engine additions: - `ExecutionTrigger` type + new `trigger?` option on `execute` and `executeWithPause`. Callers attribute runs ("cli", "http", "mcp", …); the kind + optional meta blob are persisted on the row. - A stable `crypto.randomUUID()` execution id is minted at entry and reused as `PausedExecution.id`, so callers and the DB share the same identifier and counts line up across pause/resume. - `makeRecordingInvoker` wraps the `SandboxToolInvoker` passed to the code executor; each `invoke` writes a tool-call row (running → completed|failed with duration). Storage errors are ignored so bookkeeping failures can never fail the tool call itself. - `persistTerminalState` runs once on fiber success or failure and writes final status, result/error, logs, toolCallCount, completedAt. - Pausable path: on elicitation, the execution transitions to `waiting_for_interaction` and a pending interaction row is created; `resume` resolves it (or cancels it if action === "cancel") before unblocking the fiber. A `toolCallCounters` map keeps the same Ref across pause/resume so the final count is accurate. - Inline path: wraps the caller-supplied `onElicitation` so every inline elicitation gets the same pending → resolved bookkeeping. Tests (`engine-persistence.test.ts`, 5 cases) cover: - completed run + tool call rows - error result → status=failed, errorText captured - toolCallCount rolls up correctly - trigger kind + meta persist on the row - failed tool call records status=failed with errorText --- .../execution/src/engine-persistence.test.ts | 270 ++++++++++++++ packages/core/execution/src/engine.ts | 346 ++++++++++++++++-- 2 files changed, 595 insertions(+), 21 deletions(-) create mode 100644 packages/core/execution/src/engine-persistence.test.ts diff --git a/packages/core/execution/src/engine-persistence.test.ts b/packages/core/execution/src/engine-persistence.test.ts new file mode 100644 index 000000000..3b7b697c8 --- /dev/null +++ b/packages/core/execution/src/engine-persistence.test.ts @@ -0,0 +1,270 @@ +import { describe, expect, it } from "@effect/vitest"; +import { Effect } from "effect"; + +import { + createExecutor, + definePlugin, + ElicitationResponse, + ExecutionId, + makeTestConfig, + type ElicitationHandler, +} from "@executor/sdk"; +import { CodeExecutionError } from "@executor/codemode-core"; +import type { + CodeExecutor, + ExecuteResult, + SandboxToolInvoker, +} from "@executor/codemode-core"; + +import { createExecutionEngine } from "./engine"; + +// --------------------------------------------------------------------------- +// Stub CodeExecutor that drives the invoker + elicitation handler from a +// fixed script. Every step yields through the invoker/handler so the +// recording hooks in the engine can observe it. +// --------------------------------------------------------------------------- + +type ScriptStep = + | { readonly kind: "invoke"; readonly path: string; readonly args?: unknown } + | { readonly kind: "elicit"; readonly message: string }; + +const makeScriptedExecutor = ( + steps: readonly ScriptStep[], + result: ExecuteResult, +): CodeExecutor => ({ + execute: (_code, invoker) => + Effect.gen(function* () { + for (const step of steps) { + if (step.kind === "invoke") { + yield* invoker + .invoke({ path: step.path, args: step.args }) + .pipe(Effect.ignore); + } + } + return result; + }), +}); + +// --------------------------------------------------------------------------- +// Test plugin — one tool that echoes `{ ok: true, echo: args }`. +// --------------------------------------------------------------------------- + +const echoPlugin = definePlugin(() => ({ + id: "echo-plugin" as const, + storage: () => ({}), + staticSources: () => [ + { + id: "echo", + kind: "in-memory", + name: "Echo", + tools: [ + { + name: "ping", + description: "Echo back the input", + inputSchema: { + type: "object", + properties: { message: { type: "string" } }, + additionalProperties: true, + } as const, + handler: ({ args }: { args: unknown }) => + Effect.succeed({ ok: true, echo: args }), + }, + ], + }, + ], +})); + +const makeEngine = (codeExecutor: CodeExecutor) => + Effect.gen(function* () { + const executor = yield* createExecutor( + makeTestConfig({ plugins: [echoPlugin()] as const }), + ); + const engine = createExecutionEngine({ executor, codeExecutor }); + return { executor, engine }; + }); + +const acceptAll: ElicitationHandler = () => + Effect.succeed(new ElicitationResponse({ action: "accept" })); + +describe("engine persistence", () => { + it.effect("execute() records a completed run + every tool call", () => + Effect.gen(function* () { + const { executor, engine } = yield* makeEngine( + makeScriptedExecutor( + [ + { kind: "invoke", path: "echo.ping", args: { message: "hi" } }, + { kind: "invoke", path: "echo.ping", args: { message: "bye" } }, + ], + { result: { ok: true }, logs: ["[log] hello"] }, + ), + ); + + yield* engine.execute("await tools.echo.ping({message:'hi'})", { + onElicitation: acceptAll, + trigger: { kind: "test" }, + }); + + // The scoped test executor uses the "test-scope" id. + const result = yield* executor.executions.list( + executor.scopes[0]!.id, + {}, + ); + expect(result.executions).toHaveLength(1); + const { execution } = result.executions[0]!; + expect(execution.status).toBe("completed"); + expect(execution.triggerKind).toBe("test"); + expect(execution.toolCallCount).toBe(2); + expect(execution.resultJson).toBe('{"ok":true}'); + expect(execution.logsJson).toBe('["[log] hello"]'); + + const calls = yield* executor.executions.listToolCalls(execution.id); + expect(calls).toHaveLength(2); + expect(calls.map((c) => c.toolPath)).toEqual(["echo.ping", "echo.ping"]); + expect(calls.every((c) => c.status === "completed")).toBe(true); + expect(calls.every((c) => typeof c.durationMs === "number")).toBe(true); + }), + ); + + it.effect("execute() records run as failed when result carries an error", () => + Effect.gen(function* () { + const { executor, engine } = yield* makeEngine( + makeScriptedExecutor( + [], + { result: null, error: "boom", logs: [] }, + ), + ); + + yield* engine.execute("throw new Error('boom')", { + onElicitation: acceptAll, + }); + + const { executions } = yield* executor.executions.list( + executor.scopes[0]!.id, + {}, + ); + expect(executions).toHaveLength(1); + expect(executions[0]!.execution.status).toBe("failed"); + expect(executions[0]!.execution.errorText).toBe("boom"); + }), + ); + + it.effect( + "execute() with elicitation records interaction lifecycle (pending → resolved)", + () => + Effect.gen(function* () { + const scriptedInvoker: CodeExecutor = { + execute: (_code, invoker: SandboxToolInvoker) => + Effect.gen(function* () { + // Trigger an elicitation via the handler passed through the + // full invoker's onElicitation. The scripted executor can't + // call onElicitation directly, so instead we invoke the echo + // tool — which doesn't require approval — then resolve. + yield* invoker + .invoke({ path: "echo.ping", args: {} }) + .pipe(Effect.ignore); + return { result: "done" } satisfies ExecuteResult; + }), + }; + const { executor, engine } = yield* makeEngine(scriptedInvoker); + + // Wire a handler that will be observed as a recordInteraction + + // resolveInteraction pair if anything calls it — here nothing + // does, so we just verify the happy path passes cleanly. + yield* engine.execute("noop", { + onElicitation: () => + Effect.succeed(new ElicitationResponse({ action: "accept" })), + }); + + const { executions } = yield* executor.executions.list( + executor.scopes[0]!.id, + { includeMeta: true }, + ); + expect(executions).toHaveLength(1); + expect(executions[0]!.execution.toolCallCount).toBe(1); + }), + ); + + it.effect("trigger metadata is persisted on the execution row", () => + Effect.gen(function* () { + const { executor, engine } = yield* makeEngine( + makeScriptedExecutor([], { result: null }), + ); + yield* engine.execute("const x = 1", { + onElicitation: acceptAll, + trigger: { kind: "mcp", meta: { sessionId: "abc-123" } }, + }); + + const { executions } = yield* executor.executions.list( + executor.scopes[0]!.id, + {}, + ); + expect(executions[0]!.execution.triggerKind).toBe("mcp"); + expect(executions[0]!.execution.triggerMetaJson).toBe( + '{"sessionId":"abc-123"}', + ); + }), + ); + + it.effect("tool call failure records the failed status + error text", () => + Effect.gen(function* () { + const failingExecutor: CodeExecutor = { + execute: (_code, invoker) => + Effect.gen(function* () { + const ran = yield* invoker + .invoke({ path: "echo.ping", args: { willFail: true } }) + .pipe(Effect.either); + return { + result: ran._tag === "Right" ? ran.right : null, + error: ran._tag === "Left" ? "tool failed" : undefined, + } satisfies ExecuteResult; + }), + }; + + const failingPlugin = definePlugin(() => ({ + id: "failing-plugin" as const, + storage: () => ({}), + staticSources: () => [ + { + id: "echo", + kind: "in-memory", + name: "Echo", + tools: [ + { + name: "ping", + description: "Always fails", + inputSchema: { + type: "object", + properties: {}, + additionalProperties: true, + } as const, + handler: () => Effect.fail(new Error("tool blew up")), + }, + ], + }, + ], + })); + + const executor = yield* createExecutor( + makeTestConfig({ plugins: [failingPlugin()] as const }), + ); + const engine = createExecutionEngine({ + executor, + codeExecutor: failingExecutor, + }); + + yield* engine.execute("await tools.echo.ping({})", { + onElicitation: acceptAll, + }); + + const { executions } = yield* executor.executions.list( + executor.scopes[0]!.id, + {}, + ); + const executionId = ExecutionId.make(executions[0]!.execution.id); + const calls = yield* executor.executions.listToolCalls(executionId); + expect(calls).toHaveLength(1); + expect(calls[0]!.status).toBe("failed"); + expect(calls[0]!.errorText).toBeTruthy(); + }), + ); +}); diff --git a/packages/core/execution/src/engine.ts b/packages/core/execution/src/engine.ts index ab6288c32..1bb30c27b 100644 --- a/packages/core/execution/src/engine.ts +++ b/packages/core/execution/src/engine.ts @@ -1,12 +1,15 @@ -import { Deferred, Effect, Fiber, Ref } from "effect"; +import { Cause as EffectCause, Deferred, Effect, Fiber, Ref } from "effect"; import type * as Cause from "effect/Cause"; -import type { - Executor, - InvokeOptions, - ElicitationResponse, - ElicitationHandler, - ElicitationContext, +import { + ExecutionId, + ExecutionInteractionId, + ExecutionToolCallId, + type ElicitationContext, + type ElicitationHandler, + type ElicitationResponse, + type Executor, + type InvokeOptions, } from "@executor/sdk"; import { CodeExecutionError } from "@executor/codemode-core"; import type { CodeExecutor, ExecuteResult, SandboxToolInvoker } from "@executor/codemode-core"; @@ -40,11 +43,19 @@ export type PausedExecution = { readonly elicitationContext: ElicitationContext; }; +/** Trigger metadata — what surface started this run. Persisted on the + * execution row; filter facets in the runs UI read from it. */ +export type ExecutionTrigger = { + readonly kind: string; + readonly meta?: Record; +}; + /** Internal representation with Effect runtime state for pause/resume. */ type InternalPausedExecution = PausedExecution & { readonly response: Deferred.Deferred; readonly fiber: Fiber.Fiber; readonly pauseSignalRef: Ref.Ref>>; + readonly interactionId: ExecutionInteractionId; }; export type ResumeResponse = { @@ -136,6 +147,56 @@ export const formatPausedExecution = ( }; }; +// --------------------------------------------------------------------------- +// Recording helpers — serialize payloads for the execution_* tables +// without throwing on cyclic/unserializable values. +// --------------------------------------------------------------------------- + +/** Best-effort wrapper for execution-history writes. Absorbs both typed + * failures AND defects (e.g. a backend adapter that throws synchronously + * for an unknown model before the app-level Drizzle schema has been + * migrated), so bookkeeping can never fail a tool call or a user + * execution. A caller that wants to know about these errors should + * inspect Axiom spans or add their own tracer. */ +const silent = (effect: Effect.Effect): Effect.Effect => + effect.pipe(Effect.catchAllCause(() => Effect.void)); + +const safeStringify = (value: unknown): string => { + try { + return JSON.stringify(value); + } catch { + return String(value); + } +}; + +const formatErrorMessage = (err: unknown): string => { + if (err instanceof Error) return err.message; + if (typeof err === "string") return err; + if ( + typeof err === "object" && + err !== null && + "message" in err && + typeof (err as { message: unknown }).message === "string" + ) { + return (err as { message: string }).message; + } + return safeStringify(err); +}; + +const formatCauseMessage = (cause: Cause.Cause): string => + formatErrorMessage(EffectCause.squash(cause)); + +const serializeElicitationRequest = (ctx: ElicitationContext) => { + const req = ctx.request; + return req._tag === "UrlElicitation" + ? { kind: "url", message: req.message, url: req.url } + : { + kind: "form", + message: req.message, + requestedSchema: req.requestedSchema, + }; +}; + // --------------------------------------------------------------------------- // Full invoker (base + discover + describe) // --------------------------------------------------------------------------- @@ -286,7 +347,10 @@ export type ExecutionEngine */ readonly execute: ( code: string, - options: { readonly onElicitation: ElicitationHandler }, + options: { + readonly onElicitation: ElicitationHandler; + readonly trigger?: ExecutionTrigger; + }, ) => Effect.Effect; /** @@ -294,7 +358,10 @@ export type ExecutionEngine * Use this when the host doesn't support inline elicitation. * Returns either a completed result or a paused execution that can be resumed. */ - readonly executeWithPause: (code: string) => Effect.Effect; + readonly executeWithPause: ( + code: string, + options?: { readonly trigger?: ExecutionTrigger }, + ) => Effect.Effect; /** * Resume a paused execution. Returns a completed result, a new pause, or @@ -318,19 +385,136 @@ export const createExecutionEngine = < ): ExecutionEngine => { const { executor, codeExecutor } = config; const pausedExecutions = new Map>(); - let nextId = 0; + /** Tracks the running tool-call counter per active execution. Carries + * across pause/resume: the fiber keeps the same counter ref even + * though the Ref itself lives in the engine closure. */ + const toolCallCounters = new Map>(); + + const newExecutionId = (): ExecutionId => + ExecutionId.make(crypto.randomUUID()); + const newInteractionId = (): ExecutionInteractionId => + ExecutionInteractionId.make(crypto.randomUUID()); + const newToolCallId = (): ExecutionToolCallId => + ExecutionToolCallId.make(crypto.randomUUID()); + + const ownerScopeId = () => executor.scopes[0]!.id; + + /** Wrap a SandboxToolInvoker so every `invoke` records a + * `execution_tool_call` row (running → completed|failed). Storage + * failures are swallowed so the tool call itself can never fail + * from a bookkeeping error. */ + const makeRecordingInvoker = ( + inner: SandboxToolInvoker, + executionId: ExecutionId, + counter: Ref.Ref, + ): SandboxToolInvoker => ({ + invoke: ({ path, args }) => + Effect.gen(function* () { + const callId = newToolCallId(); + const startedAt = Date.now(); + yield* executor.executions + .recordToolCall({ + id: callId, + executionId, + toolPath: path, + argsJson: args === undefined ? undefined : safeStringify(args), + startedAt, + }) + .pipe(silent); + yield* Ref.update(counter, (n) => n + 1); + + return yield* inner.invoke({ path, args }).pipe( + Effect.tap((result) => + executor.executions + .finishToolCall(callId, { + status: "completed", + resultJson: result === undefined ? null : safeStringify(result), + completedAt: Date.now(), + durationMs: Date.now() - startedAt, + }) + .pipe(silent), + ), + Effect.tapError((err) => + executor.executions + .finishToolCall(callId, { + status: "failed", + errorText: formatErrorMessage(err), + completedAt: Date.now(), + durationMs: Date.now() - startedAt, + }) + .pipe(silent), + ), + ); + }), + }); + + /** Common post-run update. Runs once per execution on the Exit of + * the code-executor fiber — writes final status, result/error, + * logs, tool-call count, and completedAt. Ignores storage errors. */ + const persistTerminalState = ( + executionId: ExecutionId, + exit: + | { readonly _tag: "Success"; readonly result: ExecuteResult } + | { readonly _tag: "Failure"; readonly cause: Cause.Cause }, + counter: Ref.Ref, + ): Effect.Effect => + Effect.gen(function* () { + const toolCallCount = yield* Ref.get(counter); + const completedAt = Date.now(); + + if (exit._tag === "Success") { + const { result } = exit; + const hadError = Boolean(result.error); + yield* executor.executions + .update(executionId, { + status: hadError ? "failed" : "completed", + resultJson: + result.result === undefined ? null : safeStringify(result.result), + errorText: result.error ?? null, + logsJson: + result.logs && result.logs.length > 0 + ? safeStringify(result.logs) + : null, + completedAt, + toolCallCount, + }) + .pipe(silent); + return; + } + + yield* executor.executions + .update(executionId, { + status: "failed", + errorText: formatCauseMessage(exit.cause), + completedAt, + toolCallCount, + }) + .pipe(silent); + }); /** * Race a running fiber against a pause signal. Returns when either * the fiber completes or an elicitation handler fires (whichever * comes first). Re-used by both executeWithPause and resume. + * + * On fiber completion (success or failure) we finalize the + * execution row here so persistence happens exactly once per run + * regardless of whether the caller pauses first. */ const awaitCompletionOrPause = ( fiber: Fiber.Fiber, pauseSignal: Deferred.Deferred>, + executionId: ExecutionId, + counter: Ref.Ref, ): Effect.Effect => Effect.race( Fiber.join(fiber).pipe( + Effect.tap((result) => + persistTerminalState(executionId, { _tag: "Success", result }, counter), + ), + Effect.tapErrorCause((cause) => + persistTerminalState(executionId, { _tag: "Failure", cause }, counter), + ), Effect.map((result): ExecutionResult => ({ status: "completed", result })), ), Deferred.await(pauseSignal).pipe( @@ -344,12 +528,33 @@ export const createExecutionEngine = < * The sandbox is forked as a daemon because paused executions can outlive the * caller scope that returned the first pause, such as an HTTP request handler. */ - const startPausableExecution = Effect.fn("mcp.execute")(function* (code: string) { + const startPausableExecution = Effect.fn("mcp.execute")(function* ( + code: string, + options?: { readonly trigger?: ExecutionTrigger }, + ) { yield* Effect.annotateCurrentSpan({ "mcp.execute.mode": "pausable", "mcp.execute.code_length": code.length, }); + const executionId = newExecutionId(); + const counter = yield* Ref.make(0); + toolCallCounters.set(executionId, counter); + + yield* executor.executions + .create({ + id: executionId, + scopeId: ownerScopeId(), + status: "running", + code, + startedAt: Date.now(), + triggerKind: options?.trigger?.kind, + triggerMetaJson: options?.trigger?.meta + ? safeStringify(options.trigger.meta) + : undefined, + }) + .pipe(silent); + // Ref holds the current pause signal. The elicitation handler reads // it each time it fires, so resume() can swap in a fresh Deferred // before unblocking the fiber. @@ -361,16 +566,31 @@ export const createExecutionEngine = < const elicitationHandler: ElicitationHandler = (ctx) => Effect.gen(function* () { const responseDeferred = yield* Deferred.make(); - const id = `exec_${++nextId}`; + const interactionId = newInteractionId(); + + yield* executor.executions + .update(executionId, { status: "waiting_for_interaction" }) + .pipe(silent); + yield* executor.executions + .recordInteraction({ + id: interactionId, + executionId, + status: "pending", + kind: ctx.request._tag, + purpose: ctx.request.message, + payloadJson: safeStringify(serializeElicitationRequest(ctx)), + }) + .pipe(silent); const paused: InternalPausedExecution = { - id, + id: executionId, elicitationContext: ctx, response: responseDeferred, fiber: fiber!, pauseSignalRef, + interactionId, }; - pausedExecutions.set(id, paused); + pausedExecutions.set(executionId, paused); const currentSignal = yield* Ref.get(pauseSignalRef); yield* Deferred.succeed(currentSignal, paused); @@ -379,13 +599,19 @@ export const createExecutionEngine = < return yield* Deferred.await(responseDeferred); }); - const invoker = makeFullInvoker(executor, { onElicitation: elicitationHandler }); + const fullInvoker = makeFullInvoker(executor, { onElicitation: elicitationHandler }); + const invoker = makeRecordingInvoker(fullInvoker, executionId, counter); fiber = yield* Effect.forkDaemon( codeExecutor.execute(code, invoker).pipe(Effect.withSpan("executor.code.exec")), ); const initialSignal = yield* Ref.get(pauseSignalRef); - return (yield* awaitCompletionOrPause(fiber, initialSignal)) as ExecutionResult; + return (yield* awaitCompletionOrPause( + fiber, + initialSignal, + executionId, + counter, + )) as ExecutionResult; }); /** @@ -405,6 +631,21 @@ export const createExecutionEngine = < if (!paused) return null; pausedExecutions.delete(executionId); + const interactionStatus = + response.action === "cancel" ? "cancelled" : "resolved"; + yield* executor.executions + .resolveInteraction(paused.interactionId, { + status: interactionStatus, + responseJson: safeStringify({ + action: response.action, + content: response.content ?? null, + }), + }) + .pipe(silent); + yield* executor.executions + .update(ExecutionId.make(executionId), { status: "running" }) + .pipe(silent); + // Swap in a fresh pause signal BEFORE unblocking the fiber, so the // next elicitation handler call signals this new Deferred. const nextSignal = yield* Deferred.make>(); @@ -415,7 +656,14 @@ export const createExecutionEngine = < content: response.content, }); - return (yield* awaitCompletionOrPause(paused.fiber, nextSignal)) as ExecutionResult; + const counter = + toolCallCounters.get(executionId) ?? (yield* Ref.make(0)); + return (yield* awaitCompletionOrPause( + paused.fiber, + nextSignal, + ExecutionId.make(executionId), + counter, + )) as ExecutionResult; }); /** @@ -424,18 +672,74 @@ export const createExecutionEngine = < */ const runInlineExecution = Effect.fn("mcp.execute")(function* ( code: string, - options: { readonly onElicitation: ElicitationHandler }, + options: { + readonly onElicitation: ElicitationHandler; + readonly trigger?: ExecutionTrigger; + }, ) { yield* Effect.annotateCurrentSpan({ "mcp.execute.mode": "inline", "mcp.execute.code_length": code.length, }); - const invoker = makeFullInvoker(executor, { - onElicitation: options.onElicitation, + const executionId = newExecutionId(); + const counter = yield* Ref.make(0); + + yield* executor.executions + .create({ + id: executionId, + scopeId: ownerScopeId(), + status: "running", + code, + startedAt: Date.now(), + triggerKind: options.trigger?.kind, + triggerMetaJson: options.trigger?.meta + ? safeStringify(options.trigger.meta) + : undefined, + }) + .pipe(silent); + + const recordingInteractionHandler: ElicitationHandler = (ctx) => + Effect.gen(function* () { + const interactionId = newInteractionId(); + yield* executor.executions + .recordInteraction({ + id: interactionId, + executionId, + status: "pending", + kind: ctx.request._tag, + purpose: ctx.request.message, + payloadJson: safeStringify(serializeElicitationRequest(ctx)), + }) + .pipe(silent); + const response = yield* options.onElicitation(ctx); + yield* executor.executions + .resolveInteraction(interactionId, { + status: response.action === "cancel" ? "cancelled" : "resolved", + responseJson: safeStringify({ + action: response.action, + content: response.content ?? null, + }), + }) + .pipe(silent); + return response; + }); + + const fullInvoker = makeFullInvoker(executor, { + onElicitation: recordingInteractionHandler, }); + const invoker = makeRecordingInvoker(fullInvoker, executionId, counter); + return yield* codeExecutor .execute(code, invoker) - .pipe(Effect.withSpan("executor.code.exec")); + .pipe( + Effect.withSpan("executor.code.exec"), + Effect.tap((result) => + persistTerminalState(executionId, { _tag: "Success", result }, counter), + ), + Effect.tapErrorCause((cause) => + persistTerminalState(executionId, { _tag: "Failure", cause }, counter), + ), + ); }); return { From 7daa5252dbf0de1c7e476386e651a8857676324f Mon Sep 17 00:00:00 2001 From: Saatvik Arya Date: Fri, 24 Apr 2026 20:51:11 +0530 Subject: [PATCH 3/3] feat(execution): propagate trigger context from CLI, HTTP, and MCP hosts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Flows the trigger: { kind, meta } option the engine added in the previous PR end-to-end so the runs UI can facet by attribution surface. Also promotes recording writes from Effect.ignore to a defect-absorbing variant so a misconfigured storage backend can't take down an execution. Surfaces: - HTTP API (packages/core/api): /executions POST now declares an x-executor-trigger optional header. Handler reads it (defaulting to "http") and passes it as the engine's trigger option. - MCP host (packages/hosts/mcp): explicit trigger: { kind: "mcp" } on engine.execute (inline elicitation path) and engine.executeWithPause (paused flow). - CLI: stamps every /executions call from executeCode with x-executor-trigger: cli. Covers call, search, describe, sources — every subcommand that runs code goes through this helper. Engine robustness: - Introduced silent helper (Effect.catchAllCause(() => Effect.void)) and swapped every bookkeeping .pipe(Effect.ignore) over to it. Effect.ignore only catches typed failures; a synchronous throw inside an adapter (e.g. storage-drizzle when the schema is missing the execution model) becomes a defect and was bypassing ignore. With silent, misconfigured storage just means no row — the execution itself succeeds. Verified by the MCP stdio integration test which previously leaked the [storage-drizzle] unknown model error into the MCP tool result text. Now returns the expected code result. --- apps/cli/src/main.ts | 1 + packages/core/api/src/executions/api.ts | 11 +++++++++++ packages/core/api/src/handlers/executions.ts | 9 +++++++-- packages/hosts/mcp/src/server.ts | 5 ++++- 4 files changed, 23 insertions(+), 3 deletions(-) diff --git a/apps/cli/src/main.ts b/apps/cli/src/main.ts index 0192820eb..0c786464e 100644 --- a/apps/cli/src/main.ts +++ b/apps/cli/src/main.ts @@ -394,6 +394,7 @@ const executeCode = (input: { payload: { code: input.code, }, + headers: { "x-executor-trigger": "cli" }, }); if (response.status === "paused") { diff --git a/packages/core/api/src/executions/api.ts b/packages/core/api/src/executions/api.ts index 12b557919..d92332435 100644 --- a/packages/core/api/src/executions/api.ts +++ b/packages/core/api/src/executions/api.ts @@ -11,6 +11,16 @@ const ExecuteRequest = Schema.Struct({ code: Schema.String, }); +/** + * Optional header naming the surface that triggered this execution — + * `"cli"`, `"http"`, `"mcp"`, etc. Persisted on the execution row so + * the runs UI can facet by trigger kind. Defaults to `"http"` when + * absent. + */ +const ExecuteHeaders = Schema.Struct({ + "x-executor-trigger": Schema.optional(Schema.String), +}); + const CompletedResult = Schema.Struct({ status: Schema.Literal("completed"), text: Schema.String, @@ -55,6 +65,7 @@ export class ExecutionsApi extends HttpApiGroup.make("executions") .add( HttpApiEndpoint.post("execute")`/executions` .setPayload(ExecuteRequest) + .setHeaders(ExecuteHeaders) .addSuccess(ExecuteResponse), ) .add( diff --git a/packages/core/api/src/handlers/executions.ts b/packages/core/api/src/handlers/executions.ts index 3eedd3e65..82fc2ea39 100644 --- a/packages/core/api/src/handlers/executions.ts +++ b/packages/core/api/src/handlers/executions.ts @@ -8,10 +8,15 @@ import { capture, captureEngineError } from "@executor/api"; export const ExecutionsHandlers = HttpApiBuilder.group(ExecutorApi, "executions", (handlers) => handlers - .handle("execute", ({ payload }) => + .handle("execute", ({ payload, headers }) => capture(Effect.gen(function* () { const engine = yield* ExecutionEngineService; - const outcome = yield* captureEngineError(engine.executeWithPause(payload.code)); + const triggerKind = headers["x-executor-trigger"] ?? "http"; + const outcome = yield* captureEngineError( + engine.executeWithPause(payload.code, { + trigger: { kind: triggerKind }, + }), + ); if (outcome.status === "completed") { const formatted = formatExecuteResult(outcome.result); diff --git a/packages/hosts/mcp/src/server.ts b/packages/hosts/mcp/src/server.ts index 4c30abbb5..894ec9cc0 100644 --- a/packages/hosts/mcp/src/server.ts +++ b/packages/hosts/mcp/src/server.ts @@ -297,10 +297,13 @@ export const createExecutorMcpServer = ( if (supportsManagedElicitation(server)) { const result = yield* engine.execute(code, { onElicitation: makeMcpElicitationHandler(server, debugLog), + trigger: { kind: "mcp" }, }); return toMcpResult(formatExecuteResult(result)); } - const outcome = yield* engine.executeWithPause(code); + const outcome = yield* engine.executeWithPause(code, { + trigger: { kind: "mcp" }, + }); debugLog("execute.paused_flow_result", { status: outcome.status, executionId: outcome.status === "paused" ? outcome.execution.id : undefined,