From edfd2a210b3060be9003ef66bdc1caa443b77655 Mon Sep 17 00:00:00 2001 From: Roast-2007 <2732882245@qq.com> Date: Wed, 24 Jun 2026 02:09:25 +0800 Subject: [PATCH] fix: reject Python MCP stdio script archives --- astrbot/core/agent/mcp_client.py | 68 +++++++++++++-- astrbot/core/computer/computer_client.py | 4 +- astrbot/dashboard/services/tools_service.py | 3 +- tests/unit/test_mcp_client_schema.py | 97 ++++++++++++++++++++- 4 files changed, 159 insertions(+), 13 deletions(-) diff --git a/astrbot/core/agent/mcp_client.py b/astrbot/core/agent/mcp_client.py index dad9e5be4e..d7e1b8a1d9 100644 --- a/astrbot/core/agent/mcp_client.py +++ b/astrbot/core/agent/mcp_client.py @@ -81,7 +81,17 @@ } ) _SHELL_META_RE = re.compile(r"[\r\n\x00;&|<>`$]") +_PYTHON_MODULE_REQUIRED_MSG = ( + "MCP stdio Python servers must be launched from a module or package; " + "local Python script/archive paths are not allowed." +) +_PYTHON_INLINE_CODE_DENIED_MSG = ( + "MCP stdio Python servers must be launched from a module or package; " + "inline code flags such as -c are not allowed." +) _PYTHON_INLINE_CODE_FLAGS = frozenset({"-c"}) +_PYTHON_OPTIONS_WITH_VALUE = frozenset({"-W", "-X", "--check-hash-based-pycs"}) +_PYTHON_OPTION_PREFIXES_WITH_VALUE = ("-W", "-X", "--check-hash-based-pycs=") _JS_INLINE_CODE_FLAGS = frozenset({"-e", "--eval", "-p", "--print"}) _DENIED_DOCKER_ARGS = frozenset( { @@ -145,6 +155,50 @@ def _normalize_stdio_command_name(command: str) -> str: return command_name +def _is_python_option_with_inline_value(arg: str) -> bool: + return any( + arg.startswith(prefix) and arg != prefix + for prefix in _PYTHON_OPTION_PREFIXES_WITH_VALUE + ) + + +def _ensure_valid_python_module(module_name: str | None) -> None: + if not module_name or module_name.startswith("-"): + raise ValueError(_PYTHON_MODULE_REQUIRED_MSG) + + +def _validate_python_stdio_args(args: list[str]) -> None: + index = 0 + while index < len(args): + arg = args[index] + if arg == "--": + index += 1 + break + if arg in _PYTHON_OPTIONS_WITH_VALUE: + index += 2 + continue + if _is_python_option_with_inline_value(arg): + index += 1 + continue + if arg == "-m": + module_name = args[index + 1] if index + 1 < len(args) else None + _ensure_valid_python_module(module_name) + return + if arg.startswith("-m") and not arg.startswith("--"): + _ensure_valid_python_module(arg[2:]) + return + if arg in _PYTHON_INLINE_CODE_FLAGS or ( + arg.startswith("-") and not arg.startswith("--") and "c" in arg + ): + raise ValueError(_PYTHON_INLINE_CODE_DENIED_MSG) + if arg.startswith("-"): + index += 1 + continue + break + + raise ValueError(_PYTHON_MODULE_REQUIRED_MSG) + + def _get_stdio_command_allowlist() -> set[str]: allowed = set(_DEFAULT_STDIO_COMMAND_ALLOWLIST) configured = os.environ.get(_STDIO_ALLOWLIST_ENV, "") @@ -163,7 +217,10 @@ def _is_stdio_config(config: dict) -> bool: def _validate_stdio_args(command_name: str, args: object) -> None: + is_python_command = command_name.startswith("python") or command_name == "py" if args is None: + if is_python_command: + raise ValueError(_PYTHON_MODULE_REQUIRED_MSG) return if not isinstance(args, list) or not all(isinstance(arg, str) for arg in args): raise ValueError("MCP stdio args must be a list of strings.") @@ -172,15 +229,8 @@ def _validate_stdio_args(command_name: str, args: object) -> None: if "\x00" in arg or "\r" in arg or "\n" in arg: raise ValueError("MCP stdio args cannot contain control characters.") - if command_name.startswith("python") or command_name == "py": - if any( - arg == "-c" - or (arg.startswith("-") and not arg.startswith("--") and "c" in arg) - for arg in args - ): - raise ValueError( - "MCP stdio Python servers must be launched from a module or file; inline code flags such as -c are not allowed." - ) + if is_python_command: + _validate_python_stdio_args(args) elif command_name in {"node", "deno", "bun"} or command_name.startswith("node"): if any( arg in _JS_INLINE_CODE_FLAGS diff --git a/astrbot/core/computer/computer_client.py b/astrbot/core/computer/computer_client.py index 9be646265e..4e085e3492 100644 --- a/astrbot/core/computer/computer_client.py +++ b/astrbot/core/computer/computer_client.py @@ -504,10 +504,10 @@ async def _sync_skills_to_sandbox(booter: ComputerBooter) -> None: for skill_name, skill_dir in sync_skill_dirs: shutil.copytree(skill_dir, bundle_root / skill_name) shutil.make_archive(str(zip_base), "zip", str(bundle_root)) - remote_zip = Path(SANDBOX_SKILLS_ROOT) / "skills.zip" + remote_zip = f"{SANDBOX_SKILLS_ROOT}/skills.zip" logger.info("Uploading skills bundle to sandbox...") await booter.shell.exec(f"mkdir -p {SANDBOX_SKILLS_ROOT}") - upload_result = await booter.upload_file(str(zip_path), str(remote_zip)) + upload_result = await booter.upload_file(str(zip_path), remote_zip) if not upload_result.get("success", False): raise RuntimeError("Failed to upload skills bundle to sandbox.") else: diff --git a/astrbot/dashboard/services/tools_service.py b/astrbot/dashboard/services/tools_service.py index efd487185d..026b1d2c6e 100644 --- a/astrbot/dashboard/services/tools_service.py +++ b/astrbot/dashboard/services/tools_service.py @@ -165,7 +165,8 @@ async def update_mcp_server(self, server_data: Any) -> str: old_config, active, ) - self._validate_server_config(server_config) + if active or not only_update_active: + self._validate_server_config(server_config) if is_rename: config["mcpServers"].pop(old_name) diff --git a/tests/unit/test_mcp_client_schema.py b/tests/unit/test_mcp_client_schema.py index 0c3d9bc6ae..b0813815f9 100644 --- a/tests/unit/test_mcp_client_schema.py +++ b/tests/unit/test_mcp_client_schema.py @@ -1,7 +1,13 @@ from types import SimpleNamespace from unittest.mock import MagicMock -from astrbot.core.agent.mcp_client import MCPTool, _normalize_mcp_input_schema +import pytest + +from astrbot.core.agent.mcp_client import ( + MCPTool, + _normalize_mcp_input_schema, + validate_mcp_stdio_config, +) class TestNormalizeMcpInputSchema: @@ -115,3 +121,92 @@ def test_mcp_tool_accepts_property_level_required_booleans(self): assert tool.parameters["required"] == ["stock_code"] assert "required" not in tool.parameters["properties"]["stock_code"] assert "required" not in tool.parameters["properties"]["market"] + + +class TestValidateMcpStdioConfig: + @pytest.mark.parametrize("suffix", [".py", ".pyw", ".pyc", ".pyo", ".pyz"]) + def test_rejects_python_local_script_targets(self, suffix: str) -> None: + config = {"command": "python", "args": [f"./payload{suffix}"]} + + with pytest.raises(ValueError, match="local Python script/archive"): + validate_mcp_stdio_config(config) + + def test_rejects_windows_python_archive_target_case_insensitively(self) -> None: + config = { + "command": "python3", + "args": ["-I", "-u", "C:\\Users\\Public\\payload.PYZ"], + } + + with pytest.raises(ValueError, match="local Python script/archive"): + validate_mcp_stdio_config(config) + + def test_rejects_python_script_target_after_option_separator(self) -> None: + config = {"command": "python", "args": ["--", "payload.py"]} + + with pytest.raises(ValueError, match="local Python script/archive"): + validate_mcp_stdio_config(config) + + def test_rejects_python_local_target_without_known_suffix(self) -> None: + config = {"command": "python", "args": ["./payload"]} + + with pytest.raises(ValueError, match="local Python script/archive"): + validate_mcp_stdio_config(config) + + @pytest.mark.parametrize( + "config", + [ + {"command": "python", "args": ["-m", "mcp_server"]}, + {"command": "python3", "args": ["-I", "-m", "package.module"]}, + {"command": "python3", "args": ["-mmcp_server"]}, + {"command": "py", "args": ["-3.12", "-m", "mcp_server"]}, + { + "command": "python", + "args": ["-m", "mcp_server", "--config", "server.py"], + }, + { + "mcpServers": { + "demo": {"command": "python", "args": ["-m", "mcp_server"]} + } + }, + ], + ) + def test_allows_python_module_launches(self, config: dict) -> None: + validate_mcp_stdio_config(config) + + def test_rejects_python_inline_code_flags(self) -> None: + config = {"command": "python", "args": ["-c", "print('x')"]} + + with pytest.raises(ValueError, match="inline code flags"): + validate_mcp_stdio_config(config) + + def test_rejects_python_compact_inline_code_flags(self) -> None: + config = {"command": "python", "args": ["-Ic", "print('x')"]} + + with pytest.raises(ValueError, match="inline code flags"): + validate_mcp_stdio_config(config) + + def test_rejects_python_missing_module_name(self) -> None: + config = {"command": "python", "args": ["-m"]} + + with pytest.raises(ValueError, match="module or package"): + validate_mcp_stdio_config(config) + + def test_rejects_python_option_only_launch(self) -> None: + config = {"command": "python", "args": ["-I", "-u"]} + + with pytest.raises(ValueError, match="module or package"): + validate_mcp_stdio_config(config) + + def test_rejects_python_launch_without_args(self) -> None: + config = {"command": "python"} + + with pytest.raises(ValueError, match="module or package"): + validate_mcp_stdio_config(config) + + def test_allows_module_owned_dash_c_argument(self) -> None: + config = { + "command": "python", + "args": ["-m", "mcp_server", "-c", "config.toml"], + } + + validate_mcp_stdio_config(config)