Skip to content

fix: mcp cancel scope cross task spin (#9068)#9070

Open
GlowingBrick wants to merge 3 commits into
AstrBotDevs:masterfrom
GlowingBrick:fix/mcp-cancel-scope
Open

fix: mcp cancel scope cross task spin (#9068)#9070
GlowingBrick wants to merge 3 commits into
AstrBotDevs:masterfrom
GlowingBrick:fix/mcp-cancel-scope

Conversation

@GlowingBrick

@GlowingBrick GlowingBrick commented Jun 28, 2026

Copy link
Copy Markdown

#9068

修复了

Modifications / 改动点

修复了禁用mcp服务器时由于作用域不匹配出现的处理器空转问题

  • This is NOT a breaking change. / 这不是一个破坏性变更。

Screenshots or Test Results / 运行截图或测试结果

禁用一个MCP,在Debug下:

[21:25:48.931] [Core] [INFO] [provider.func_tool_manager:720]: Connected to MCP server paddleocr, Tools: ['paddleocr_vl']
[21:25:52.805] [Core] [INFO] [provider.func_tool_manager:729]: Received shutdown signal for MCP client paddleocr
[21:25:53.138] [Core] [INFO] [provider.func_tool_manager:848]: Disconnected from MCP server paddleocr

可以注意到不再出现Error closing current exit stack错误


Checklist / 检查清单

  • 😊 If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
    / 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。

  • 👀 My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
    / 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”

  • 🤓 I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in requirements.txt and pyproject.toml.
    / 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到 requirements.txtpyproject.toml 文件相应位置。

  • 😮 My changes do not introduce malicious code.
    / 我的更改没有引入恶意代码。

Summary by Sourcery

Improve MCP client startup and shutdown handling to prevent idle spinning when a server is disabled and ensure proper cleanup on errors, timeouts, and cancellation.

Bug Fixes:

  • Prevent MCP client tasks from spinning when an MCP server is disabled due to mismatched cancellation scope.
  • Ensure MCP client runtimes are correctly removed and cleaned up on connection errors, timeouts, or task cancellation.

Enhancements:

  • Unify MCP client connection, tool registration, lifecycle management, and cleanup into a single task for safer resource handling.
  • Refine MCP startup bookkeeping by updating _mcp_starting and _mcp_server_runtime only after connection completes or fails.
  • Remove the separate MCP client initialization helper in favor of the integrated lifecycle flow.

@dosubot dosubot Bot added size:L This PR changes 100-499 lines, ignoring generated files. area:provider The bug / feature is about AI Provider, Models, LLM Agent, LLM Agent Runner. labels Jun 28, 2026

@sourcery-ai sourcery-ai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 1 issue

Prompt for AI Agents
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location path="astrbot/core/provider/func_tool_manager.py" line_range="675-684" />
<code_context>
+        connect_done = asyncio.Event()
+        connect_error: BaseException | None = None
+
+        async def connect_and_lifecycle() -> None:
+            #Single task that handles connect, lifecycle, and cleanup.
+
+            nonlocal connect_error
+            try:
+                await mcp_client.connect_to_server(cfg, name)
+                await mcp_client.list_tools_and_save()
+            except asyncio.CancelledError:
+                # cleanup on cancellation
+                try:
+                    await mcp_client.cleanup()
+                except BaseException:
+                    pass
+                raise
+            except Exception as e:
+                connect_error = e
+                try:
+                    await mcp_client.cleanup()
+                except Exception:
+                    pass
+                connect_done.set()
+                return
+
+            # Register tools
+            self.func_list = [
+                f
</code_context>
<issue_to_address>
**issue (bug_risk):** Tool registration and logging errors won’t propagate via connect_error, leaving the caller to see only a timeout.

Because the `try` ends at `await mcp_client.list_tools_and_save()`, any exception during tool registration or logging won’t set `connect_error` or `connect_done`. The caller will then hit the timeout and raise `MCPInitTimeoutError` instead of the real failure. Please extend the `try` to include the registration/logging block, or add a dedicated `try/except` there that sets `connect_error` and signals `connect_done` on error.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +675 to +684
async def connect_and_lifecycle() -> None:
#Single task that handles connect, lifecycle, and cleanup.

nonlocal connect_error
try:
await mcp_client.connect_to_server(cfg, name)
await mcp_client.list_tools_and_save()
except asyncio.CancelledError:
# cleanup on cancellation
try:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Tool registration and logging errors won’t propagate via connect_error, leaving the caller to see only a timeout.

Because the try ends at await mcp_client.list_tools_and_save(), any exception during tool registration or logging won’t set connect_error or connect_done. The caller will then hit the timeout and raise MCPInitTimeoutError instead of the real failure. Please extend the try to include the registration/logging block, or add a dedicated try/except there that sets connect_error and signals connect_done on error.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the MCP server startup and lifecycle management in func_tool_manager.py by consolidating connection, lifecycle, and cleanup logic into a single background task (connect_and_lifecycle). Feedback on these changes highlights two critical concurrency issues: first, a potential leak of the background lifecycle_task and uncleaned runtime state if _start_mcp_server is cancelled while waiting for the connection; second, the risk of leaving the manager in an inconsistent state if the cleanup call in the finally block is interrupted by cancellation. Suggestions were provided to catch asyncio.CancelledError during the connection wait and to wrap the termination cleanup in asyncio.shield.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment thread astrbot/core/provider/func_tool_manager.py Outdated
Comment thread astrbot/core/provider/func_tool_manager.py Outdated
GlowingBrick and others added 2 commits June 28, 2026 21:56
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
@GlowingBrick GlowingBrick force-pushed the fix/mcp-cancel-scope branch from 92328a1 to ff99de4 Compare June 28, 2026 13:57
@zouyonghe

Copy link
Copy Markdown
Member

@sourcery-ai review

@sourcery-ai sourcery-ai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 1 issue

Prompt for AI Agents
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location path="astrbot/core/provider/func_tool_manager.py" line_range="672" />
<code_context>
+        mcp_client = MCPClient()
+        mcp_client.name = name
+
+        connect_done = asyncio.Event()
+        connect_error: BaseException | None = None
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider replacing the `connect_done`/`connect_error` pattern with a single connect `Future` and centralized cleanup to simplify the MCP client startup lifecycle and error handling.

The main added complexity comes from `connect_done` + `connect_error` plus duplicated cleanup inside `connect_and_lifecycle`. You can keep the “single task owns anyio contexts and cleanup” behavior while simplifying the handshake and reusing the existing cleanup helper.

### 1. Replace `connect_done` / `connect_error` with a single `Future`

Use a single `Future` to signal completion/failure of the connect phase, instead of an `Event` + shared exception variable:

```python
mcp_client = MCPClient()
mcp_client.name = name

loop = asyncio.get_running_loop()
connect_future: asyncio.Future[None] = loop.create_future()

async def connect_and_lifecycle() -> None:
    try:
        await mcp_client.connect_to_server(cfg, name)
        await mcp_client.list_tools_and_save()
    except asyncio.CancelledError as e:
        # Ensure connect_future is completed
        if not connect_future.done():
            connect_future.set_exception(e)
        try:
            await self._cleanup_mcp_client_safely(mcp_client, name)
        except BaseException:
            pass
        raise
    except Exception as e:
        if not connect_future.done():
            connect_future.set_exception(e)
        try:
            await self._cleanup_mcp_client_safely(mcp_client, name)
        except BaseException:
            pass
        return

    # Register tools (same as current behavior)
    self.func_list = [
        f
        for f in self.func_list
        if not (isinstance(f, MCPTool) and f.mcp_server_name == name)
    ]
    for tool in mcp_client.tools:
        func_tool = MCPTool(
            mcp_tool=tool,
            mcp_client=mcp_client,
            mcp_server_name=name,
        )
        self.func_list.append(func_tool)

    logger.info(
        f"Connected to MCP server {name}, "
        f"Tools: {[t.name for t in mcp_client.tools]}"
    )

    # Signal successful connect
    if not connect_future.done():
        connect_future.set_result(None)

    try:
        await shutdown_event.wait()
        logger.info(f"Received shutdown signal for MCP client {name}")
    except asyncio.CancelledError:
        logger.debug(f"MCP client {name} task was cancelled")
        raise
    finally:
        await asyncio.shield(self._terminate_mcp_client(name))
```

Then the outer timeout / error handling becomes simpler and more idiomatic:

```python
lifecycle_task = asyncio.create_task(
    connect_and_lifecycle(), name=f"mcp-client:{name}"
)

async with self._runtime_lock:
    self._mcp_server_runtime[name] = _MCPServerRuntime(
        name=name,
        client=mcp_client,
        shutdown_event=shutdown_event,
        lifecycle_task=lifecycle_task,
    )
    self._mcp_starting.discard(name)

try:
    await asyncio.wait_for(connect_future, timeout=timeout)
except asyncio.TimeoutError as exc:
    lifecycle_task.cancel()
    await asyncio.gather(lifecycle_task, return_exceptions=True)
    async with self._runtime_lock:
        self._mcp_starting.discard(name)
        self._mcp_server_runtime.pop(name, None)
    raise MCPInitTimeoutError(
        f"Connected to MCP server {name} timeout ({timeout:g} seconds)"
    ) from exc
except asyncio.CancelledError:
    lifecycle_task.cancel()
    await asyncio.gather(lifecycle_task, return_exceptions=True)
    async with self._runtime_lock:
        self._mcp_starting.discard(name)
        self._mcp_server_runtime.pop(name, None)
    raise
except Exception:
    lifecycle_task.cancel()
    await asyncio.gather(lifecycle_task, return_exceptions=True)
    async with self._runtime_lock:
        self._mcp_starting.discard(name)
        self._mcp_server_runtime.pop(name, None)
    raise
```

This keeps:

- A single task owning connection, tool registration, shutdown wait, and `_terminate_mcp_client`.
- Precise timeout on the connect phase.
- Centralized cleanup via `_cleanup_mcp_client_safely` and `_terminate_mcp_client`.

But removes the manual `connect_done` signaling and `connect_error` shared state, making the control flow and error propagation easier to reason about.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

mcp_client = MCPClient()
mcp_client.name = name

connect_done = asyncio.Event()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider replacing the connect_done/connect_error pattern with a single connect Future and centralized cleanup to simplify the MCP client startup lifecycle and error handling.

The main added complexity comes from connect_done + connect_error plus duplicated cleanup inside connect_and_lifecycle. You can keep the “single task owns anyio contexts and cleanup” behavior while simplifying the handshake and reusing the existing cleanup helper.

1. Replace connect_done / connect_error with a single Future

Use a single Future to signal completion/failure of the connect phase, instead of an Event + shared exception variable:

mcp_client = MCPClient()
mcp_client.name = name

loop = asyncio.get_running_loop()
connect_future: asyncio.Future[None] = loop.create_future()

async def connect_and_lifecycle() -> None:
    try:
        await mcp_client.connect_to_server(cfg, name)
        await mcp_client.list_tools_and_save()
    except asyncio.CancelledError as e:
        # Ensure connect_future is completed
        if not connect_future.done():
            connect_future.set_exception(e)
        try:
            await self._cleanup_mcp_client_safely(mcp_client, name)
        except BaseException:
            pass
        raise
    except Exception as e:
        if not connect_future.done():
            connect_future.set_exception(e)
        try:
            await self._cleanup_mcp_client_safely(mcp_client, name)
        except BaseException:
            pass
        return

    # Register tools (same as current behavior)
    self.func_list = [
        f
        for f in self.func_list
        if not (isinstance(f, MCPTool) and f.mcp_server_name == name)
    ]
    for tool in mcp_client.tools:
        func_tool = MCPTool(
            mcp_tool=tool,
            mcp_client=mcp_client,
            mcp_server_name=name,
        )
        self.func_list.append(func_tool)

    logger.info(
        f"Connected to MCP server {name}, "
        f"Tools: {[t.name for t in mcp_client.tools]}"
    )

    # Signal successful connect
    if not connect_future.done():
        connect_future.set_result(None)

    try:
        await shutdown_event.wait()
        logger.info(f"Received shutdown signal for MCP client {name}")
    except asyncio.CancelledError:
        logger.debug(f"MCP client {name} task was cancelled")
        raise
    finally:
        await asyncio.shield(self._terminate_mcp_client(name))

Then the outer timeout / error handling becomes simpler and more idiomatic:

lifecycle_task = asyncio.create_task(
    connect_and_lifecycle(), name=f"mcp-client:{name}"
)

async with self._runtime_lock:
    self._mcp_server_runtime[name] = _MCPServerRuntime(
        name=name,
        client=mcp_client,
        shutdown_event=shutdown_event,
        lifecycle_task=lifecycle_task,
    )
    self._mcp_starting.discard(name)

try:
    await asyncio.wait_for(connect_future, timeout=timeout)
except asyncio.TimeoutError as exc:
    lifecycle_task.cancel()
    await asyncio.gather(lifecycle_task, return_exceptions=True)
    async with self._runtime_lock:
        self._mcp_starting.discard(name)
        self._mcp_server_runtime.pop(name, None)
    raise MCPInitTimeoutError(
        f"Connected to MCP server {name} timeout ({timeout:g} seconds)"
    ) from exc
except asyncio.CancelledError:
    lifecycle_task.cancel()
    await asyncio.gather(lifecycle_task, return_exceptions=True)
    async with self._runtime_lock:
        self._mcp_starting.discard(name)
        self._mcp_server_runtime.pop(name, None)
    raise
except Exception:
    lifecycle_task.cancel()
    await asyncio.gather(lifecycle_task, return_exceptions=True)
    async with self._runtime_lock:
        self._mcp_starting.discard(name)
        self._mcp_server_runtime.pop(name, None)
    raise

This keeps:

  • A single task owning connection, tool registration, shutdown wait, and _terminate_mcp_client.
  • Precise timeout on the connect phase.
  • Centralized cleanup via _cleanup_mcp_client_safely and _terminate_mcp_client.

But removes the manual connect_done signaling and connect_error shared state, making the control flow and error propagation easier to reason about.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:provider The bug / feature is about AI Provider, Models, LLM Agent, LLM Agent Runner. size:L This PR changes 100-499 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants