Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions apps/cloud/src/auth/workos.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// ---------------------------------------------------------------------------

import { env } from "cloudflare:workers";
import { Context, Effect, Layer } from "effect";
import { Cache, Context, Duration, Effect, Layer } from "effect";
import { WorkOS } from "@workos-inc/node/worker";
import { WorkOSError, tryPromiseService, withServiceLogging } from "./errors";

Expand Down Expand Up @@ -31,7 +31,7 @@ const make = Effect.gen(function* () {
tryPromiseService(() => fn(workos)),
);

const authenticateSealedSession = (sessionData: string) =>
const authenticateSealedSessionUncached = (sessionData: string) =>
Effect.gen(function* () {
if (!sessionData) return null;

Expand Down Expand Up @@ -77,6 +77,23 @@ const make = Effect.gen(function* () {
};
});

// `authenticateSealedSession` is called multiple times per request
// (once in `lookupOrgForRequest` to pick the org, once in the
// `OrgAuth` middleware to populate AuthContext). Each uncached call
// re-runs the WorkOS refresh POST when the access token is expired —
// two round-trips for identical input. A short-TTL keyed cache
// collapses those into one lookup per (sessionData, ~10s window).
// The cache is concurrent-safe: simultaneous `get`s for the same key
// share a single in-flight effect.
const sessionCache = yield* Cache.make({
capacity: 512,
timeToLive: Duration.seconds(10),
lookup: authenticateSealedSessionUncached,
});

const authenticateSealedSession = (sessionData: string) =>
sessionCache.get(sessionData);

return {
getAuthorizationUrl: (redirectUri: string) =>
workos.userManagement.getAuthorizationUrl({
Expand Down
33 changes: 32 additions & 1 deletion apps/cloud/src/services/execution-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,53 @@ import { Effect } from "effect";

import { createExecutionEngine } from "@executor/execution";
import { makeDynamicWorkerExecutor } from "@executor/runtime-dynamic-worker";
import type { Tool, ToolListFilter } from "@executor/sdk";

import { withExecutionUsageTracking } from "../api/execution-usage";
import { AutumnService } from "./autumn";
import { createScopedExecutor } from "./executor";

// In-memory tools.list cache on the DO. `tools.search` scans the full
// list on every call, so caching it collapses the N calls in a session
// into one DB fetch. DO lifetime caps cache staleness at
// SESSION_TIMEOUT_MS (5m idle) — no TTL needed.
const makeToolsListCache = <E, R>(
inner: (filter?: ToolListFilter) => Effect.Effect<readonly Tool[], E, R>,
) => {
const cache = new Map<string, readonly Tool[]>();
return (filter?: ToolListFilter) =>
Effect.gen(function* () {
const key = JSON.stringify(filter ?? null);
const hit = cache.get(key);
if (hit) {
yield* Effect.annotateCurrentSpan({ "cache.state": "hit" });
return hit;
}
const value = yield* inner(filter);
cache.set(key, value);
yield* Effect.annotateCurrentSpan({ "cache.state": "miss" });
return value;
}).pipe(Effect.withSpan("executor.tools.list.cached"));
};

export const makeExecutionStack = (
userId: string,
organizationId: string,
organizationName: string,
) =>
Effect.gen(function* () {
const executor = yield* createScopedExecutor(
const rawExecutor = yield* createScopedExecutor(
userId,
organizationId,
organizationName,
).pipe(Effect.withSpan("McpSessionDO.createScopedExecutor"));
const executor = {
...rawExecutor,
tools: {
...rawExecutor.tools,
list: makeToolsListCache(rawExecutor.tools.list),
},
};
const codeExecutor = makeDynamicWorkerExecutor({ loader: env.LOADER });
const autumn = yield* AutumnService;
const engine = withExecutionUsageTracking(
Expand Down
11 changes: 10 additions & 1 deletion packages/core/sdk/src/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1762,7 +1762,16 @@ export const createExecutor = <

const listTools = (filter?: ToolListFilter) =>
Effect.gen(function* () {
const dynamic = yield* core.findMany({ model: "tool" });
// Push sourceId down to storage so we don't pull every tool row
// in the stack when the caller only wants one source (e.g. the
// `/sources/:id/tools` endpoint). `query` is a free-text match
// that still needs the row contents, so we apply it in memory.
const dynamic = yield* core.findMany({
model: "tool",
where: filter?.sourceId
? [{ field: "source_id", value: filter.sourceId }]
: undefined,
});
// Dedup by tool id, innermost scope winning — same reason as
// `listSources` above: a shadowed id must surface as one entry
// (the inner one), not two.
Expand Down
28 changes: 13 additions & 15 deletions packages/plugins/graphql/src/sdk/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ export const graphqlPlugin = definePlugin(
const storedOps: StoredOperation[] = prepared.map((p) => ({
toolId: `${namespace}.${p.toolPath}`,
sourceId: namespace,
scopeId: config.scope,
binding: p.binding,
}));

Expand Down Expand Up @@ -527,22 +528,19 @@ export const graphqlPlugin = definePlugin(
// toolRows for a single (plugin_id, source_id) group can still
// straddle multiple scopes when the source is shadowed (e.g. an
// org-level GraphQL source plus a per-user override that
// re-registers the same tool ids). Run one listOperationsBySource
// per distinct scope so each lookup pins {source_id, scope_id}
// and we don't fall through to the wrong scope's bindings.
const scopes = new Set<string>();
for (const row of toolRows as readonly ToolRow[]) {
scopes.add(row.scope_id as string);
}
// re-registers the same tool ids). One query returns every op
// row in the caller's stack (the scoped adapter pins
// `scope_id IN (stack)`); partition by `op.scopeId` so each
// tool row resolves against its own scope's binding.
const ops = yield* ctx.storage.listOperationsBySource(sourceId);
const byScope = new Map<string, Map<string, OperationBinding>>();
for (const scope of scopes) {
const ops = yield* ctx.storage.listOperationsBySource(
sourceId,
scope,
);
const byId = new Map<string, OperationBinding>();
for (const op of ops) byId.set(op.toolId, op.binding);
byScope.set(scope, byId);
for (const op of ops) {
let byId = byScope.get(op.scopeId);
if (!byId) {
byId = new Map<string, OperationBinding>();
byScope.set(op.scopeId, byId);
}
byId.set(op.toolId, op.binding);
}

const out: Record<string, ToolAnnotations> = {};
Expand Down
10 changes: 4 additions & 6 deletions packages/plugins/graphql/src/sdk/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ export interface StoredGraphqlSource {
export interface StoredOperation {
readonly toolId: string;
readonly sourceId: string;
readonly scopeId: string;
readonly binding: OperationBinding;
}

Expand Down Expand Up @@ -126,7 +127,6 @@ export interface GraphqlStore {

readonly listOperationsBySource: (
sourceId: string,
scope: string,
) => Effect.Effect<readonly StoredOperation[], StorageFailure>;

readonly removeSource: (
Expand All @@ -153,6 +153,7 @@ export const makeDefaultGraphqlStore = ({
const rowToOperation = (row: Record<string, unknown>): StoredOperation => ({
toolId: row.id as string,
sourceId: row.source_id as string,
scopeId: row.scope_id as string,
binding: decodeBinding(row.binding),
});

Expand Down Expand Up @@ -257,14 +258,11 @@ export const makeDefaultGraphqlStore = ({
})
.pipe(Effect.map((row) => (row ? rowToOperation(row) : null))),

listOperationsBySource: (sourceId, scope) =>
listOperationsBySource: (sourceId) =>
db
.findMany({
model: "graphql_operation",
where: [
{ field: "source_id", value: sourceId },
{ field: "scope_id", value: scope },
],
where: [{ field: "source_id", value: sourceId }],
})
.pipe(Effect.map((rows) => rows.map(rowToOperation))),

Expand Down
25 changes: 13 additions & 12 deletions packages/plugins/openapi/src/sdk/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ export const openApiPlugin = definePlugin(
const storedOps: StoredOperation[] = definitions.map((def) => ({
toolId: `${namespace}.${def.toolPath}`,
sourceId: namespace,
scopeId: input.scope,
binding: toBinding(def),
}));

Expand Down Expand Up @@ -949,19 +950,19 @@ export const openApiPlugin = definePlugin(
// toolRows for a single (plugin_id, source_id) group can still
// straddle multiple scopes when the source is shadowed (e.g. an
// org-level openapi source plus a per-user override that
// re-registers the same tool ids). Run one listOperationsBySource
// per distinct scope so each lookup pins {source_id, scope_id}
// and we don't fall through to the wrong scope's bindings.
const scopes = new Set<string>();
for (const row of toolRows as readonly ToolRow[]) {
scopes.add(row.scope_id as string);
}
// re-registers the same tool ids). One query returns every op
// row in the caller's stack (the scoped adapter pins
// `scope_id IN (stack)`); partition by `op.scopeId` so each
// tool row resolves against its own scope's binding.
const ops = yield* ctx.storage.listOperationsBySource(sourceId);
const byScope = new Map<string, Map<string, OperationBinding>>();
for (const scope of scopes) {
const ops = yield* ctx.storage.listOperationsBySource(sourceId, scope);
const byId = new Map<string, OperationBinding>();
for (const op of ops) byId.set(op.toolId, op.binding);
byScope.set(scope, byId);
for (const op of ops) {
let byId = byScope.get(op.scopeId);
if (!byId) {
byId = new Map<string, OperationBinding>();
byScope.set(op.scopeId, byId);
}
byId.set(op.toolId, op.binding);
}

const out: Record<string, ToolAnnotations> = {};
Expand Down
10 changes: 4 additions & 6 deletions packages/plugins/openapi/src/sdk/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ export type StoredSourceSchemaType = typeof StoredSourceSchema.Type;
export interface StoredOperation {
readonly toolId: string;
readonly sourceId: string;
readonly scopeId: string;
readonly binding: OperationBinding;
}

Expand Down Expand Up @@ -202,7 +203,6 @@ export interface OpenapiStore {

readonly listOperationsBySource: (
sourceId: string,
scope: string,
) => Effect.Effect<readonly StoredOperation[], StorageFailure>;

readonly removeSource: (
Expand Down Expand Up @@ -255,6 +255,7 @@ export const makeDefaultOpenapiStore = ({
const rowToOperation = (row: Record<string, unknown>): StoredOperation => ({
toolId: row.id as string,
sourceId: row.source_id as string,
scopeId: row.scope_id as string,
binding: decodeBinding(
typeof row.binding === "string" ? JSON.parse(row.binding) : row.binding,
),
Expand Down Expand Up @@ -388,14 +389,11 @@ export const makeDefaultOpenapiStore = ({
})
.pipe(Effect.map((row) => (row ? rowToOperation(row) : null))),

listOperationsBySource: (sourceId, scope) =>
listOperationsBySource: (sourceId) =>
adapter
.findMany({
model: "openapi_operation",
where: [
{ field: "source_id", value: sourceId },
{ field: "scope_id", value: scope },
],
where: [{ field: "source_id", value: sourceId }],
})
.pipe(Effect.map((rows) => rows.map(rowToOperation))),

Expand Down
Loading