diff --git a/packages/opencode/src/cli/cmd/tui/app.tsx b/packages/opencode/src/cli/cmd/tui/app.tsx index 530ba3ff239a..b94e8e40e959 100644 --- a/packages/opencode/src/cli/cmd/tui/app.tsx +++ b/packages/opencode/src/cli/cmd/tui/app.tsx @@ -939,6 +939,60 @@ function App(props: { onSnapshot?: () => Promise }) { return render({ params: route.data.data }) }) + sdk.event.on("event", (evt) => { + if (evt.payload.type === "mcp.resource.updated") { + toast.show({ + title: "Resource Updated", + message: `${evt.payload.properties.uri} (${evt.payload.properties.server})`, + variant: "info", + duration: 5000, + }) + + // If autoprompt is enabled for this server, trigger AI with updated resource info + const mcp = sync.data.config.mcp?.[evt.payload.properties.server] + if (mcp && typeof mcp === "object" && "autoprompt" in mcp && mcp.autoprompt) { + const prompt = { + system: `An MCP resource has been updated. Resource URI: "${evt.payload.properties.uri}" from server "${evt.payload.properties.server}". Read the resource to review the latest content and take appropriate action.`, + parts: [ + { + type: "text" as const, + text: `Resource updated: ${evt.payload.properties.uri} (${evt.payload.properties.server})`, + }, + ], + } + if (route.data.type === "session") { + const status = sync.data.session_status?.[route.data.sessionID] + if (!status || status.type === "idle") { + sdk.client.session + .promptAsync({ sessionID: route.data.sessionID, ...prompt }) + .catch((e) => console.error("failed to trigger AI for resource update", e)) + } + return + } + sdk.client.session + .create({}) + .then((res) => { + const id = res.data?.id + if (!id) return + route.navigate({ type: "session", sessionID: id }) + sdk.client.session + .promptAsync({ sessionID: id, ...prompt }) + .catch((e) => console.error("failed to trigger AI for resource update", e)) + }) + .catch((e) => console.error("failed to create session for resource update", e)) + } + return + } + + if (evt.payload.type === "mcp.resource.list.changed") { + toast.show({ + title: "MCP Resources Changed", + message: `Server "${evt.payload.properties.server}" resource list updated`, + variant: "info", + duration: 3000, + }) + } + }) return ( @@ -51,6 +57,12 @@ export const Remote = Schema.Struct({ timeout: Schema.optional(PositiveInt).annotate({ description: "Timeout in ms for MCP server requests. Defaults to 5000 (5 seconds) if not specified.", }), + subscriptions: Schema.optional(Schema.mutable(Schema.Array(Schema.String))).annotate({ + description: "Resource URIs to automatically subscribe to for update notifications", + }), + autoprompt: Schema.optional(Schema.Boolean).annotate({ + description: "Automatically prompt the AI when a subscribed resource is updated. Defaults to false.", + }), }).annotate({ identifier: "McpRemoteConfig" }) export type Remote = Schema.Schema.Type diff --git a/packages/opencode/src/mcp/index.ts b/packages/opencode/src/mcp/index.ts index efd72c0c1f71..75fb7ff59b21 100644 --- a/packages/opencode/src/mcp/index.ts +++ b/packages/opencode/src/mcp/index.ts @@ -9,6 +9,8 @@ import { CallToolResultSchema, ListToolsResultSchema, ToolSchema, + ResourceListChangedNotificationSchema, + ResourceUpdatedNotificationSchema, type Tool as MCPToolDef, ToolListChangedNotificationSchema, } from "@modelcontextprotocol/sdk/types.js" @@ -63,6 +65,21 @@ export const BrowserOpenFailed = BusEvent.define( }), ) +export const ResourceUpdated = BusEvent.define( + "mcp.resource.updated", + Schema.Struct({ + server: Schema.String, + uri: Schema.String, + }), +) + +export const ResourceListChanged = BusEvent.define( + "mcp.resource.list.changed", + Schema.Struct({ + server: Schema.String, + }), +) + export const Failed = NamedError.create("MCPFailed", { name: Schema.String, }) @@ -219,6 +236,10 @@ function fetchFromClient( ) } +function supportsSubscriptions(client: MCPClient) { + return client.getServerCapabilities()?.resources?.subscribe === true +} + interface CreateResult { mcpClient?: MCPClient status: Status @@ -237,6 +258,7 @@ interface State { status: Record clients: Record defs: Record + subscriptions: Map> } export interface Interface { @@ -257,6 +279,9 @@ export interface Interface { clientName: string, resourceUri: string, ) => Effect.Effect> | undefined> + readonly subscribe: (clientName: string, uri: string) => Effect.Effect + readonly unsubscribe: (clientName: string, uri: string) => Effect.Effect + readonly subscriptions: () => Effect.Effect> readonly startAuth: ( mcpName: string, ) => Effect.Effect<{ authorizationUrl: string; oauthState: string }, NotFoundError> @@ -517,6 +542,19 @@ export const layer = Layer.effect( s.defs[name] = listed await bridge.promise(bus.publish(ToolsChanged, { server: name }).pipe(Effect.ignore)) }) + client.setNotificationHandler(ResourceUpdatedNotificationSchema, async (msg) => { + log.info("resource updated notification received", { + server: name, + uri: msg.params.uri, + }) + if (s.clients[name] !== client || s.status[name]?.status !== "connected") return + await bridge.promise(bus.publish(ResourceUpdated, { server: name, uri: msg.params.uri }).pipe(Effect.ignore)) + }) + client.setNotificationHandler(ResourceListChangedNotificationSchema, async () => { + log.info("resource list changed notification received", { server: name }) + if (s.clients[name] !== client || s.status[name]?.status !== "connected") return + await bridge.promise(bus.publish(ResourceListChanged, { server: name }).pipe(Effect.ignore)) + }) } const state = yield* InstanceState.make( @@ -528,6 +566,7 @@ export const layer = Layer.effect( status: {}, clients: {}, defs: {}, + subscriptions: new Map(), } yield* Effect.forEach( @@ -749,10 +788,91 @@ export const layer = Layer.effect( }) }) + const subscribe = Effect.fn("MCP.subscribe")(function* (clientName: string, uri: string) { + const s = yield* InstanceState.get(state) + const client = s.clients[clientName] + if (!client) { + log.warn("client not found for subscription", { clientName }) + return false + } + + if (!supportsSubscriptions(client)) { + log.debug("server does not support resource subscriptions", { clientName }) + return false + } + + if (s.subscriptions.get(clientName)?.has(uri)) return true + + return yield* Effect.tryPromise({ + try: () => client.subscribeResource({ uri }), + catch: (e) => { + log.error("failed to subscribe to resource", { + clientName, + uri, + error: e instanceof Error ? e.message : String(e), + }) + return e + }, + }).pipe( + Effect.map(() => { + if (!s.subscriptions.has(clientName)) s.subscriptions.set(clientName, new Set()) + s.subscriptions.get(clientName)?.add(uri) + log.info("subscribed to resource", { clientName, uri }) + return true + }), + Effect.orElseSucceed(() => false), + ) + }) + + const unsubscribe = Effect.fn("MCP.unsubscribe")(function* (clientName: string, uri: string) { + const s = yield* InstanceState.get(state) + const client = s.clients[clientName] + s.subscriptions.get(clientName)?.delete(uri) + + if (!client) { + log.warn("client not found for unsubscription", { clientName }) + return false + } + + if (!supportsSubscriptions(client)) return true + + return yield* Effect.tryPromise({ + try: () => client.unsubscribeResource({ uri }), + catch: (e) => { + log.error("failed to unsubscribe from resource", { + clientName, + uri, + error: e instanceof Error ? e.message : String(e), + }) + return e + }, + }).pipe( + Effect.map(() => { + log.info("unsubscribed from resource", { clientName, uri }) + return true + }), + Effect.orElseSucceed(() => false), + ) + }) + + const subscriptionsList = Effect.fn("MCP.subscriptions")(function* () { + const s = yield* InstanceState.get(state) + return Object.fromEntries( + [...s.subscriptions].filter(([, uris]) => uris.size > 0).map(([server, uris]) => [server, [...uris]]), + ) + }) + const readResource = Effect.fn("MCP.readResource")(function* (clientName: string, resourceUri: string) { - return yield* withClient(clientName, (client) => client.readResource({ uri: resourceUri }), "readResource", { - resourceUri, - }) + const result = yield* withClient( + clientName, + (client) => client.readResource({ uri: resourceUri }), + "readResource", + { + resourceUri, + }, + ) + if (result) yield* subscribe(clientName, resourceUri) + return result }) const getMcpConfig = Effect.fnUntraced(function* (mcpName: string) { @@ -945,6 +1065,9 @@ export const layer = Layer.effect( disconnect, getPrompt, readResource, + subscribe, + unsubscribe, + subscriptions: subscriptionsList, startAuth, authenticate, finishAuth, diff --git a/packages/opencode/test/mcp/headers.test.ts b/packages/opencode/test/mcp/headers.test.ts index c51ed00d32f6..d1bd2d9dcdb0 100644 --- a/packages/opencode/test/mcp/headers.test.ts +++ b/packages/opencode/test/mcp/headers.test.ts @@ -48,6 +48,10 @@ beforeEach(() => { const { MCP } = await import("../../src/mcp/index") const it = testEffect(MCP.defaultLayer) +function targetCalls() { + return transportCalls.filter((call) => call.url === "https://example.com/mcp") +} + describe("mcp.headers", () => { it.instance("headers are passed to transports when oauth is enabled (default)", () => Effect.gen(function* () { @@ -64,9 +68,10 @@ describe("mcp.headers", () => { .pipe(Effect.catch(() => Effect.void)) // Both transports should have been created with headers - expect(transportCalls.length).toBeGreaterThanOrEqual(1) + const calls = targetCalls() + expect(calls.length).toBeGreaterThanOrEqual(1) - for (const call of transportCalls) { + for (const call of calls) { expect(call.options.requestInit).toBeDefined() expect(call.options.requestInit?.headers).toEqual({ Authorization: "Bearer test-token", @@ -92,9 +97,10 @@ describe("mcp.headers", () => { }) .pipe(Effect.catch(() => Effect.void)) - expect(transportCalls.length).toBeGreaterThanOrEqual(1) + const calls = targetCalls() + expect(calls.length).toBeGreaterThanOrEqual(1) - for (const call of transportCalls) { + for (const call of calls) { expect(call.options.requestInit).toBeDefined() expect(call.options.requestInit?.headers).toEqual({ Authorization: "Bearer test-token", @@ -115,9 +121,10 @@ describe("mcp.headers", () => { }) .pipe(Effect.catch(() => Effect.void)) - expect(transportCalls.length).toBeGreaterThanOrEqual(1) + const calls = targetCalls() + expect(calls.length).toBeGreaterThanOrEqual(1) - for (const call of transportCalls) { + for (const call of calls) { // No headers means requestInit should be undefined expect(call.options.requestInit).toBeUndefined() } diff --git a/packages/opencode/test/mcp/lifecycle.test.ts b/packages/opencode/test/mcp/lifecycle.test.ts index dddfd9875e39..cf00ad52b692 100644 --- a/packages/opencode/test/mcp/lifecycle.test.ts +++ b/packages/opencode/test/mcp/lifecycle.test.ts @@ -14,6 +14,9 @@ interface MockClientState { listToolsError: string listPromptsShouldFail: boolean listResourcesShouldFail: boolean + supportsSubscriptions: boolean + subscribeCalls: string[] + unsubscribeCalls: string[] prompts: Array<{ name: string; description?: string }> resources: Array<{ name: string; uri: string; description?: string }> closed: boolean @@ -42,6 +45,9 @@ function getOrCreateClientState(name?: string): MockClientState { listToolsError: "listTools failed", listPromptsShouldFail: false, listResourcesShouldFail: false, + supportsSubscriptions: false, + subscribeCalls: [], + unsubscribeCalls: [], prompts: [], resources: [], closed: false, @@ -133,6 +139,11 @@ void mock.module("@modelcontextprotocol/sdk/client/index.js", () => ({ this._state?.notificationHandlers.set(schema, handler) } + getServerCapabilities() { + if (!this._state?.supportsSubscriptions) return {} + return { resources: { subscribe: true } } + } + async listTools() { if (this._state) this._state.listToolsCalls++ if (this._state?.listToolsShouldFail) { @@ -161,6 +172,14 @@ void mock.module("@modelcontextprotocol/sdk/client/index.js", () => ({ return { resources: this._state?.resources ?? [] } } + async subscribeResource(input: { uri: string }) { + this._state?.subscribeCalls.push(input.uri) + } + + async unsubscribeResource(input: { uri: string }) { + this._state?.unsubscribeCalls.push(input.uri) + } + async close() { if (this._state) this._state.closed = true } @@ -598,6 +617,49 @@ it.instance( }, ) +it.instance( + "service exposes subscribe, unsubscribe, and subscriptions for resource updates", + () => + MCP.Service.use((mcp: MCPNS.Interface) => + Effect.gen(function* () { + lastCreatedClientName = "sub-server" + const serverState = getOrCreateClientState("sub-server") + serverState.supportsSubscriptions = true + + expect(typeof mcp.subscribe).toBe("function") + expect(typeof mcp.unsubscribe).toBe("function") + expect(typeof mcp.subscriptions).toBe("function") + + yield* mcp.add("sub-server", { + type: "local", + command: ["echo", "test"], + }) + + const subscribe = mcp.subscribe + const unsubscribe = mcp.unsubscribe + const subscriptions = mcp.subscriptions + + expect(yield* subscribe("sub-server", "file:///notes.md")).toBe(true) + expect(yield* subscriptions()).toEqual({ "sub-server": ["file:///notes.md"] }) + expect(yield* unsubscribe("sub-server", "file:///notes.md")).toBe(true) + expect(yield* subscriptions()).toEqual({}) + + expect(serverState.subscribeCalls).toEqual(["file:///notes.md"]) + expect(serverState.unsubscribeCalls).toEqual(["file:///notes.md"]) + }), + ), + { + config: { + mcp: { + "sub-server": { + type: "local", + command: ["echo", "test"], + }, + }, + }, + }, +) + it.instance( "prompts() skips disconnected servers", () => @@ -738,6 +800,13 @@ it.live("McpOAuthCallback.cancelPending is keyed by mcpName but pendingAuths use (callback) => Effect.gen(function* () { McpOAuthCallback.cancelPending("my-mcp-server") + let resolved = false + callback.then( + () => { + resolved = true + }, + () => {}, + ) const exit = yield* Effect.tryPromise({ try: () => callback, @@ -751,6 +820,7 @@ it.live("McpOAuthCallback.cancelPending is keyed by mcpName but pendingAuths use ) expect(Exit.isFailure(exit)).toBe(true) + expect(resolved).toBe(false) }), () => Effect.promise(() => McpOAuthCallback.stop()).pipe(Effect.ignore), ), diff --git a/packages/opencode/test/session/prompt.test.ts b/packages/opencode/test/session/prompt.test.ts index 4c4647457814..4bf50069647a 100644 --- a/packages/opencode/test/session/prompt.test.ts +++ b/packages/opencode/test/session/prompt.test.ts @@ -122,6 +122,9 @@ const mcp = Layer.succeed( disconnect: () => Effect.void, getPrompt: () => Effect.succeed(undefined), readResource: () => Effect.succeed(undefined), + subscribe: () => Effect.succeed(false), + unsubscribe: () => Effect.succeed(false), + subscriptions: () => Effect.succeed({}), startAuth: () => Effect.die("unexpected MCP auth in prompt-effect tests"), authenticate: () => Effect.die("unexpected MCP auth in prompt-effect tests"), finishAuth: () => Effect.die("unexpected MCP auth in prompt-effect tests"), diff --git a/packages/opencode/test/session/snapshot-tool-race.test.ts b/packages/opencode/test/session/snapshot-tool-race.test.ts index 89ed11613e15..a6f50886c343 100644 --- a/packages/opencode/test/session/snapshot-tool-race.test.ts +++ b/packages/opencode/test/session/snapshot-tool-race.test.ts @@ -79,6 +79,9 @@ const mcp = Layer.succeed( disconnect: () => Effect.void, getPrompt: () => Effect.succeed(undefined), readResource: () => Effect.succeed(undefined), + subscribe: () => Effect.succeed(false), + unsubscribe: () => Effect.succeed(false), + subscriptions: () => Effect.succeed({}), startAuth: () => Effect.die("unexpected MCP auth"), authenticate: () => Effect.die("unexpected MCP auth"), finishAuth: () => Effect.die("unexpected MCP auth"), diff --git a/packages/sdk/js/src/v2/gen/types.gen.ts b/packages/sdk/js/src/v2/gen/types.gen.ts index aae1b06ad320..e0da12a8aefe 100644 --- a/packages/sdk/js/src/v2/gen/types.gen.ts +++ b/packages/sdk/js/src/v2/gen/types.gen.ts @@ -29,6 +29,8 @@ export type Event = | EventSessionIdle | EventMcpToolsChanged | EventMcpBrowserOpenFailed + | EventMcpResourceUpdated + | EventMcpResourceListChanged | EventCommandExecuted | EventProjectUpdated | EventSessionCompacted @@ -830,6 +832,8 @@ export type GlobalEvent = { | EventSessionIdle | EventMcpToolsChanged | EventMcpBrowserOpenFailed + | EventMcpResourceUpdated + | EventMcpResourceListChanged | EventCommandExecuted | EventProjectUpdated | EventSessionCompacted @@ -2671,6 +2675,23 @@ export type EventMcpBrowserOpenFailed = { } } +export type EventMcpResourceUpdated = { + id: string + type: "mcp.resource.updated" + properties: { + server: string + uri: string + } +} + +export type EventMcpResourceListChanged = { + id: string + type: "mcp.resource.list.changed" + properties: { + server: string + } +} + export type EventCommandExecuted = { id: string type: "command.executed"