Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 197 additions & 3 deletions apps/cloud/src/mcp-flow.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,31 @@
},
});

const seedOrg = async (id: string, name = "MCP Flow Org"): Promise<void> => {
const seedOrg = async (
id: string,
name = "MCP Flow Org",
): Promise<{ handle: string }> => {
const response = await SELF.fetch(`${BASE}/__test__/seed-org`, {
method: "POST",
headers: { "content-type": CONTENT_TYPE_JSON },
body: JSON.stringify({ id, name }),
});
expect(response.status).toBe(204);
expect(response.status).toBe(200);
return (await response.json()) as { handle: string };
};

const seedWorkspace = async (input: {
organizationId: string;
name: string;
slug?: string;
}): Promise<{ id: string; slug: string }> => {
const response = await SELF.fetch(`${BASE}/__test__/seed-workspace`, {
method: "POST",
headers: { "content-type": CONTENT_TYPE_JSON },
body: JSON.stringify(input),
});
expect(response.status).toBe(200);
return (await response.json()) as { id: string; slug: string };
};

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -320,7 +338,7 @@
expect(notificationResponse.status).toBe(202);
expect(notificationResponse.headers.get("content-type")).toBeNull();
expect(await notificationResponse.text()).toBe("");
});
}, 15_000);
});

describe("/mcp session restore", () => {
Expand Down Expand Up @@ -393,7 +411,7 @@
body: TOOLS_LIST_REQUEST,
}),
new Promise<never>((_, reject) =>
setTimeout(() => reject(new Error("POST did not return after GET restore")), 5_000),

Check failure on line 414 in apps/cloud/src/mcp-flow.test.ts

View workflow job for this annotation

GitHub Actions / Test

src/mcp-flow.test.ts > /mcp session restore > keeps JSON POST responses after a session is restored by a GET reconnect

Error: POST did not return after GET restore ❯ src/mcp-flow.test.ts:414:33
),
]);
expect(response.status).toBe(200);
Expand Down Expand Up @@ -549,3 +567,179 @@
expect(stored.alarm).toBeNull();
});
});

// ---------------------------------------------------------------------------
// /:org/mcp + /:org/:workspace/mcp — context-addressed MCP routes
// ---------------------------------------------------------------------------
//
// Sister coverage to the legacy `/mcp` fallback above. The plan moves the
// "active org/workspace" off hidden state and onto the URL — these tests
// confirm the worker:
// 1. classifies the new path shapes,
// 2. resolves URL `:org` (and optional `:workspace`) to org/workspace rows,
// 3. seeds session-meta from the URL (NOT from the JWT's org_id claim),
// 4. publishes a per-context `oauth-protected-resource/:org(/:workspace)/mcp`
// metadata document.

describe("/:org/mcp context routes", () => {
it("publishes per-org protected-resource metadata pointing at /:org/mcp", async () => {
const orgId = nextOrgId();
const { handle } = await seedOrg(orgId, `MCP Org ${orgId}`);

const response = await SELF.fetch(
`${BASE}/.well-known/oauth-protected-resource/${handle}/mcp`,
);
expect(response.status).toBe(200);
const body = (await response.json()) as Record<string, unknown>;
expect(body.resource).toBe(`${BASE}/${handle}/mcp`);
expect(body.authorization_servers).toEqual(["https://test-authkit.example.com"]);
});

it("creates a session bound to the URL-resolved org (not the JWT claim)", async () => {
const urlOrgId = nextOrgId();
const jwtOrgId = nextOrgId();
const accountId = nextAccountId();
const { handle } = await seedOrg(urlOrgId, `URL Org ${urlOrgId}`);
// The JWT carries a different org id. The URL is the source of truth,
// so the session's stored organizationId should be the URL one.
await seedOrg(jwtOrgId, `JWT Org ${jwtOrgId}`);

const response = await SELF.fetch(`${BASE}/${handle}/mcp`, {
method: "POST",
headers: {
"content-type": CONTENT_TYPE_JSON,
accept: JSON_AND_SSE,
authorization: `Bearer ${makeTestBearer(accountId, jwtOrgId)}`,
},
body: JSON.stringify(INITIALIZE_REQUEST),
});

expect(response.status).toBe(200);
const sessionId = response.headers.get("mcp-session-id");
expect(sessionId).toBeTruthy();

const stub = env.MCP_SESSION.get(env.MCP_SESSION.idFromString(sessionId!));
const stored = await runInDurableObject(stub, async (_instance, state) => ({
sessionMeta: await state.storage.get<{
readonly organizationId: string;
readonly userId: string;
readonly workspaceId?: string;
}>(SESSION_META_KEY),
}));
expect(stored.sessionMeta?.organizationId).toBe(urlOrgId);
expect(stored.sessionMeta?.workspaceId).toBeUndefined();
expect(stored.sessionMeta?.userId).toBe(accountId);
}, 15_000);

it("returns 404 for an unknown org handle", async () => {
const response = await SELF.fetch(`${BASE}/no-such-org-here/mcp`, {
method: "POST",
headers: {
"content-type": CONTENT_TYPE_JSON,
accept: JSON_AND_SSE,
authorization: `Bearer ${makeTestBearer(nextAccountId(), nextOrgId())}`,
},
body: JSON.stringify(INITIALIZE_REQUEST),
});
expect(response.status).toBe(404);
const body = (await response.json()) as {
error?: { message?: string };
};
expect(body.error?.message ?? "").toMatch(/not found/i);
});
});

describe("/:org/:workspace/mcp context routes", () => {
it("creates a session bound to the URL-resolved workspace", async () => {
const orgId = nextOrgId();
const accountId = nextAccountId();
const { handle } = await seedOrg(orgId, `WS Org ${orgId}`);
const workspace = await seedWorkspace({
organizationId: orgId,
name: "Production",
});

const response = await SELF.fetch(
`${BASE}/${handle}/${workspace.slug}/mcp`,
{
method: "POST",
headers: {
"content-type": CONTENT_TYPE_JSON,
accept: JSON_AND_SSE,
authorization: `Bearer ${makeTestBearer(accountId, orgId)}`,
},
body: JSON.stringify(INITIALIZE_REQUEST),
},
);
expect(response.status).toBe(200);
const sessionId = response.headers.get("mcp-session-id");
expect(sessionId).toBeTruthy();

const stub = env.MCP_SESSION.get(env.MCP_SESSION.idFromString(sessionId!));
const stored = await runInDurableObject(stub, async (_instance, state) => ({
sessionMeta: await state.storage.get<{
readonly organizationId: string;
readonly workspaceId?: string;
readonly workspaceName?: string;
}>(SESSION_META_KEY),
}));
expect(stored.sessionMeta?.organizationId).toBe(orgId);
expect(stored.sessionMeta?.workspaceId).toBe(workspace.id);
expect(stored.sessionMeta?.workspaceName).toBe("Production");
}, 15_000);

it("rejects a session-id that was bound to a different workspace", async () => {
const orgId = nextOrgId();
const accountId = nextAccountId();
const { handle } = await seedOrg(orgId, `Cross WS ${orgId}`);
const wsA = await seedWorkspace({ organizationId: orgId, name: "Alpha" });
const wsB = await seedWorkspace({ organizationId: orgId, name: "Beta" });

const initA = await SELF.fetch(`${BASE}/${handle}/${wsA.slug}/mcp`, {
method: "POST",
headers: {
"content-type": CONTENT_TYPE_JSON,
accept: JSON_AND_SSE,
authorization: `Bearer ${makeTestBearer(accountId, orgId)}`,
},
body: JSON.stringify(INITIALIZE_REQUEST),
});
expect(initA.status).toBe(200);
const sessionId = initA.headers.get("mcp-session-id");
expect(sessionId).toBeTruthy();

const stolen = await SELF.fetch(`${BASE}/${handle}/${wsB.slug}/mcp`, {
method: "POST",
headers: {
"content-type": CONTENT_TYPE_JSON,
accept: JSON_AND_SSE,
authorization: `Bearer ${makeTestBearer(accountId, orgId)}`,
"mcp-session-id": sessionId!,
},
body: JSON.stringify(TOOLS_LIST_REQUEST),
});
expect(stolen.status).toBe(403);
const body = (await stolen.json()) as {
readonly error?: { readonly code: number };
};
expect(body.error?.code).toBe(-32003);
}, 15_000);

it("returns 404 for an unknown workspace slug", async () => {
const orgId = nextOrgId();
const { handle } = await seedOrg(orgId, `WS 404 ${orgId}`);
const response = await SELF.fetch(
`${BASE}/${handle}/nope/mcp`,
{
method: "POST",
headers: {
"content-type": CONTENT_TYPE_JSON,
accept: JSON_AND_SSE,
authorization: `Bearer ${makeTestBearer(nextAccountId(), orgId)}`,
},
body: JSON.stringify(INITIALIZE_REQUEST),
},
);
expect(response.status).toBe(404);
});
});
2 changes: 1 addition & 1 deletion apps/cloud/src/mcp-miniflare.e2e.node.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ const WorkerLive = Layer.effect(Worker)(Effect.gen(function* () {
headers: { "content-type": "application/json" },
body: JSON.stringify({ id, name }),
});
if (res.status !== 204) {
if (res.status !== 200 && res.status !== 204) {
throw new Error(`seed-org failed: ${res.status} ${await res.text()}`);
}
},
Expand Down
69 changes: 56 additions & 13 deletions apps/cloud/src/mcp-session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ import { DoTelemetryLive } from "./services/telemetry";

export type McpSessionInit = {
organizationId: string;
organizationName: string;
userId: string;
workspaceId?: string;
workspaceName?: string;
};

export type IncomingTraceHeaders = {
Expand All @@ -54,6 +57,7 @@ const SESSION_META_KEY = "session-meta";
const LAST_ACTIVITY_KEY = "last-activity-ms";
const INTERNAL_ACCOUNT_ID_HEADER = "x-executor-mcp-account-id";
const INTERNAL_ORGANIZATION_ID_HEADER = "x-executor-mcp-organization-id";
const INTERNAL_WORKSPACE_ID_HEADER = "x-executor-mcp-workspace-id";

// ---------------------------------------------------------------------------
// Errors
Expand Down Expand Up @@ -119,6 +123,8 @@ type SessionMeta = {
readonly organizationId: string;
readonly organizationName: string;
readonly userId: string;
readonly workspaceId?: string;
readonly workspaceName?: string;
};

/**
Expand Down Expand Up @@ -170,18 +176,34 @@ const makeResolveOrganizationServices = (dbHandle: DbHandle) => {
// at the DO method boundary.
const makeSessionServices = (dbHandle: DbHandle) => makeResolveOrganizationServices(dbHandle);

// The worker resolves the URL `:org` (and optional `:workspace`) before
// calling `init`, so we get the org row's id+name directly. We still fall
// back to a local lookup if the caller passed an id without a name —
// preserves the old single-arg call shape used by older test paths.
const resolveSessionMeta = Effect.fn("McpSessionDO.resolveSessionMeta")(function* (
organizationId: string,
userId: string,
init: McpSessionInit,
) {
const org = yield* resolveOrganization(organizationId);
if (init.organizationName) {
return {
organizationId: init.organizationId,
organizationName: init.organizationName,
userId: init.userId,
...(init.workspaceId
? { workspaceId: init.workspaceId, workspaceName: init.workspaceName ?? "" }
: {}),
} satisfies SessionMeta;
}
const org = yield* resolveOrganization(init.organizationId);
if (!org) {
return yield* new OrganizationNotFoundError({ organizationId });
return yield* new OrganizationNotFoundError({ organizationId: init.organizationId });
}
return {
organizationId: org.id,
organizationName: org.name,
userId,
userId: init.userId,
...(init.workspaceId
? { workspaceId: init.workspaceId, workspaceName: init.workspaceName ?? "" }
: {}),
} satisfies SessionMeta;
});

Expand Down Expand Up @@ -282,11 +304,21 @@ export class McpSessionDO extends DurableObject {
) {
const self = this;
return Effect.gen(function* () {
const { executor, engine } = yield* makeExecutionStack({
userId: sessionMeta.userId,
organizationId: sessionMeta.organizationId,
organizationName: sessionMeta.organizationName,
});
const { executor, engine } = yield* makeExecutionStack(
sessionMeta.workspaceId
? {
userId: sessionMeta.userId,
organizationId: sessionMeta.organizationId,
organizationName: sessionMeta.organizationName,
workspaceId: sessionMeta.workspaceId,
workspaceName: sessionMeta.workspaceName ?? "",
}
: {
userId: sessionMeta.userId,
organizationId: sessionMeta.organizationId,
organizationName: sessionMeta.organizationName,
},
);
// Build the description here so the postgres query it runs
// (`executor.sources.list`) lands as a child of
// `McpSessionDO.createRuntime`. host-mcp would otherwise call
Expand Down Expand Up @@ -420,11 +452,19 @@ export class McpSessionDO extends DurableObject {

const accountId = request.headers.get(INTERNAL_ACCOUNT_ID_HEADER);
const organizationId = request.headers.get(INTERNAL_ORGANIZATION_ID_HEADER);
// The header carries an empty string when the request is hitting an
// org-only context. Treat "" identically to undefined.
const headerWorkspaceId =
request.headers.get(INTERNAL_WORKSPACE_ID_HEADER) || null;
const sessionWorkspaceId = sessionMeta.workspaceId ?? null;
const matches =
accountId === sessionMeta.userId && organizationId === sessionMeta.organizationId;
accountId === sessionMeta.userId &&
organizationId === sessionMeta.organizationId &&
headerWorkspaceId === sessionWorkspaceId;

yield* Effect.annotateCurrentSpan({
"mcp.session.owner_match": matches,
"mcp.session.workspace_id": sessionWorkspaceId ?? "",
});

return matches ? null : sessionOwnerMismatch();
Expand All @@ -436,7 +476,7 @@ export class McpSessionDO extends DurableObject {
return Effect.gen(function* () {
const dbHandle = makeEphemeralDb();
try {
const sessionMeta = yield* resolveSessionMeta(token.organizationId, token.userId).pipe(
const sessionMeta = yield* resolveSessionMeta(token).pipe(
Effect.provide(makeResolveOrganizationServices(dbHandle)),
);
yield* Effect.promise(() => self.saveSessionMeta(sessionMeta)).pipe(
Expand All @@ -459,7 +499,10 @@ export class McpSessionDO extends DurableObject {
yield* self.doInit(token);
}).pipe(
Effect.withSpan("McpSessionDO.init", {
attributes: { "mcp.auth.organization_id": token.organizationId },
attributes: {
"mcp.auth.organization_id": token.organizationId,
"mcp.auth.workspace_id": token.workspaceId ?? "",
},
}),
(eff) => withIncomingParent(incoming, eff),
Effect.provide(DoTelemetryLive),
Expand Down
Loading
Loading