diff --git a/apps/cloud/src/auth/workos.ts b/apps/cloud/src/auth/workos.ts index e42d87001..25587214c 100644 --- a/apps/cloud/src/auth/workos.ts +++ b/apps/cloud/src/auth/workos.ts @@ -3,7 +3,7 @@ // --------------------------------------------------------------------------- import { env } from "cloudflare:workers"; -import { Context, Effect, Layer } from "effect"; +import { Cache, Context, Duration, Effect, Layer } from "effect"; import { WorkOS } from "@workos-inc/node/worker"; import { WorkOSError, tryPromiseService, withServiceLogging } from "./errors"; @@ -31,7 +31,7 @@ const make = Effect.gen(function* () { tryPromiseService(() => fn(workos)), ); - const authenticateSealedSession = (sessionData: string) => + const authenticateSealedSessionUncached = (sessionData: string) => Effect.gen(function* () { if (!sessionData) return null; @@ -77,6 +77,23 @@ const make = Effect.gen(function* () { }; }); + // `authenticateSealedSession` is called multiple times per request + // (once in `lookupOrgForRequest` to pick the org, once in the + // `OrgAuth` middleware to populate AuthContext). Each uncached call + // re-runs the WorkOS refresh POST when the access token is expired — + // two round-trips for identical input. A short-TTL keyed cache + // collapses those into one lookup per (sessionData, ~10s window). + // The cache is concurrent-safe: simultaneous `get`s for the same key + // share a single in-flight effect. + const sessionCache = yield* Cache.make({ + capacity: 512, + timeToLive: Duration.seconds(10), + lookup: authenticateSealedSessionUncached, + }); + + const authenticateSealedSession = (sessionData: string) => + sessionCache.get(sessionData); + return { getAuthorizationUrl: (redirectUri: string) => workos.userManagement.getAuthorizationUrl({ diff --git a/apps/cloud/src/services/execution-stack.ts b/apps/cloud/src/services/execution-stack.ts index 196e86162..1ededf9b8 100644 --- a/apps/cloud/src/services/execution-stack.ts +++ b/apps/cloud/src/services/execution-stack.ts @@ -9,22 +9,53 @@ import { Effect } from "effect"; import { createExecutionEngine } from "@executor/execution"; import { makeDynamicWorkerExecutor } from "@executor/runtime-dynamic-worker"; +import type { Tool, ToolListFilter } from "@executor/sdk"; import { withExecutionUsageTracking } from "../api/execution-usage"; import { AutumnService } from "./autumn"; import { createScopedExecutor } from "./executor"; +// In-memory tools.list cache on the DO. `tools.search` scans the full +// list on every call, so caching it collapses the N calls in a session +// into one DB fetch. DO lifetime caps cache staleness at +// SESSION_TIMEOUT_MS (5m idle) — no TTL needed. +const makeToolsListCache = ( + inner: (filter?: ToolListFilter) => Effect.Effect, +) => { + const cache = new Map(); + return (filter?: ToolListFilter) => + Effect.gen(function* () { + const key = JSON.stringify(filter ?? null); + const hit = cache.get(key); + if (hit) { + yield* Effect.annotateCurrentSpan({ "cache.state": "hit" }); + return hit; + } + const value = yield* inner(filter); + cache.set(key, value); + yield* Effect.annotateCurrentSpan({ "cache.state": "miss" }); + return value; + }).pipe(Effect.withSpan("executor.tools.list.cached")); +}; + export const makeExecutionStack = ( userId: string, organizationId: string, organizationName: string, ) => Effect.gen(function* () { - const executor = yield* createScopedExecutor( + const rawExecutor = yield* createScopedExecutor( userId, organizationId, organizationName, ).pipe(Effect.withSpan("McpSessionDO.createScopedExecutor")); + const executor = { + ...rawExecutor, + tools: { + ...rawExecutor.tools, + list: makeToolsListCache(rawExecutor.tools.list), + }, + }; const codeExecutor = makeDynamicWorkerExecutor({ loader: env.LOADER }); const autumn = yield* AutumnService; const engine = withExecutionUsageTracking( diff --git a/packages/core/sdk/src/executor.ts b/packages/core/sdk/src/executor.ts index d9a89a2cc..b2640ad11 100644 --- a/packages/core/sdk/src/executor.ts +++ b/packages/core/sdk/src/executor.ts @@ -1762,7 +1762,16 @@ export const createExecutor = < const listTools = (filter?: ToolListFilter) => Effect.gen(function* () { - const dynamic = yield* core.findMany({ model: "tool" }); + // Push sourceId down to storage so we don't pull every tool row + // in the stack when the caller only wants one source (e.g. the + // `/sources/:id/tools` endpoint). `query` is a free-text match + // that still needs the row contents, so we apply it in memory. + const dynamic = yield* core.findMany({ + model: "tool", + where: filter?.sourceId + ? [{ field: "source_id", value: filter.sourceId }] + : undefined, + }); // Dedup by tool id, innermost scope winning — same reason as // `listSources` above: a shadowed id must surface as one entry // (the inner one), not two. diff --git a/packages/plugins/graphql/src/sdk/plugin.ts b/packages/plugins/graphql/src/sdk/plugin.ts index df03f95f6..dab503049 100644 --- a/packages/plugins/graphql/src/sdk/plugin.ts +++ b/packages/plugins/graphql/src/sdk/plugin.ts @@ -366,6 +366,7 @@ export const graphqlPlugin = definePlugin( const storedOps: StoredOperation[] = prepared.map((p) => ({ toolId: `${namespace}.${p.toolPath}`, sourceId: namespace, + scopeId: config.scope, binding: p.binding, })); @@ -527,22 +528,19 @@ export const graphqlPlugin = definePlugin( // toolRows for a single (plugin_id, source_id) group can still // straddle multiple scopes when the source is shadowed (e.g. an // org-level GraphQL source plus a per-user override that - // re-registers the same tool ids). Run one listOperationsBySource - // per distinct scope so each lookup pins {source_id, scope_id} - // and we don't fall through to the wrong scope's bindings. - const scopes = new Set(); - for (const row of toolRows as readonly ToolRow[]) { - scopes.add(row.scope_id as string); - } + // re-registers the same tool ids). One query returns every op + // row in the caller's stack (the scoped adapter pins + // `scope_id IN (stack)`); partition by `op.scopeId` so each + // tool row resolves against its own scope's binding. + const ops = yield* ctx.storage.listOperationsBySource(sourceId); const byScope = new Map>(); - for (const scope of scopes) { - const ops = yield* ctx.storage.listOperationsBySource( - sourceId, - scope, - ); - const byId = new Map(); - for (const op of ops) byId.set(op.toolId, op.binding); - byScope.set(scope, byId); + for (const op of ops) { + let byId = byScope.get(op.scopeId); + if (!byId) { + byId = new Map(); + byScope.set(op.scopeId, byId); + } + byId.set(op.toolId, op.binding); } const out: Record = {}; diff --git a/packages/plugins/graphql/src/sdk/store.ts b/packages/plugins/graphql/src/sdk/store.ts index 3eb160ee9..9b0c9013a 100644 --- a/packages/plugins/graphql/src/sdk/store.ts +++ b/packages/plugins/graphql/src/sdk/store.ts @@ -50,6 +50,7 @@ export interface StoredGraphqlSource { export interface StoredOperation { readonly toolId: string; readonly sourceId: string; + readonly scopeId: string; readonly binding: OperationBinding; } @@ -126,7 +127,6 @@ export interface GraphqlStore { readonly listOperationsBySource: ( sourceId: string, - scope: string, ) => Effect.Effect; readonly removeSource: ( @@ -153,6 +153,7 @@ export const makeDefaultGraphqlStore = ({ const rowToOperation = (row: Record): StoredOperation => ({ toolId: row.id as string, sourceId: row.source_id as string, + scopeId: row.scope_id as string, binding: decodeBinding(row.binding), }); @@ -257,14 +258,11 @@ export const makeDefaultGraphqlStore = ({ }) .pipe(Effect.map((row) => (row ? rowToOperation(row) : null))), - listOperationsBySource: (sourceId, scope) => + listOperationsBySource: (sourceId) => db .findMany({ model: "graphql_operation", - where: [ - { field: "source_id", value: sourceId }, - { field: "scope_id", value: scope }, - ], + where: [{ field: "source_id", value: sourceId }], }) .pipe(Effect.map((rows) => rows.map(rowToOperation))), diff --git a/packages/plugins/openapi/src/sdk/plugin.ts b/packages/plugins/openapi/src/sdk/plugin.ts index 46e815aca..3af373f4e 100644 --- a/packages/plugins/openapi/src/sdk/plugin.ts +++ b/packages/plugins/openapi/src/sdk/plugin.ts @@ -408,6 +408,7 @@ export const openApiPlugin = definePlugin( const storedOps: StoredOperation[] = definitions.map((def) => ({ toolId: `${namespace}.${def.toolPath}`, sourceId: namespace, + scopeId: input.scope, binding: toBinding(def), })); @@ -949,19 +950,19 @@ export const openApiPlugin = definePlugin( // toolRows for a single (plugin_id, source_id) group can still // straddle multiple scopes when the source is shadowed (e.g. an // org-level openapi source plus a per-user override that - // re-registers the same tool ids). Run one listOperationsBySource - // per distinct scope so each lookup pins {source_id, scope_id} - // and we don't fall through to the wrong scope's bindings. - const scopes = new Set(); - for (const row of toolRows as readonly ToolRow[]) { - scopes.add(row.scope_id as string); - } + // re-registers the same tool ids). One query returns every op + // row in the caller's stack (the scoped adapter pins + // `scope_id IN (stack)`); partition by `op.scopeId` so each + // tool row resolves against its own scope's binding. + const ops = yield* ctx.storage.listOperationsBySource(sourceId); const byScope = new Map>(); - for (const scope of scopes) { - const ops = yield* ctx.storage.listOperationsBySource(sourceId, scope); - const byId = new Map(); - for (const op of ops) byId.set(op.toolId, op.binding); - byScope.set(scope, byId); + for (const op of ops) { + let byId = byScope.get(op.scopeId); + if (!byId) { + byId = new Map(); + byScope.set(op.scopeId, byId); + } + byId.set(op.toolId, op.binding); } const out: Record = {}; diff --git a/packages/plugins/openapi/src/sdk/store.ts b/packages/plugins/openapi/src/sdk/store.ts index e993282c5..1a57abc12 100644 --- a/packages/plugins/openapi/src/sdk/store.ts +++ b/packages/plugins/openapi/src/sdk/store.ts @@ -119,6 +119,7 @@ export type StoredSourceSchemaType = typeof StoredSourceSchema.Type; export interface StoredOperation { readonly toolId: string; readonly sourceId: string; + readonly scopeId: string; readonly binding: OperationBinding; } @@ -202,7 +203,6 @@ export interface OpenapiStore { readonly listOperationsBySource: ( sourceId: string, - scope: string, ) => Effect.Effect; readonly removeSource: ( @@ -255,6 +255,7 @@ export const makeDefaultOpenapiStore = ({ const rowToOperation = (row: Record): StoredOperation => ({ toolId: row.id as string, sourceId: row.source_id as string, + scopeId: row.scope_id as string, binding: decodeBinding( typeof row.binding === "string" ? JSON.parse(row.binding) : row.binding, ), @@ -388,14 +389,11 @@ export const makeDefaultOpenapiStore = ({ }) .pipe(Effect.map((row) => (row ? rowToOperation(row) : null))), - listOperationsBySource: (sourceId, scope) => + listOperationsBySource: (sourceId) => adapter .findMany({ model: "openapi_operation", - where: [ - { field: "source_id", value: sourceId }, - { field: "scope_id", value: scope }, - ], + where: [{ field: "source_id", value: sourceId }], }) .pipe(Effect.map((rows) => rows.map(rowToOperation))),