Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 59 additions & 9 deletions astrbot/core/agent/mcp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,17 @@
}
)
_SHELL_META_RE = re.compile(r"[\r\n\x00;&|<>`$]")
_PYTHON_MODULE_REQUIRED_MSG = (
"MCP stdio Python servers must be launched from a module or package; "
"local Python script/archive paths are not allowed."
)
_PYTHON_INLINE_CODE_DENIED_MSG = (
"MCP stdio Python servers must be launched from a module or package; "
"inline code flags such as -c are not allowed."
)
_PYTHON_INLINE_CODE_FLAGS = frozenset({"-c"})
_PYTHON_OPTIONS_WITH_VALUE = frozenset({"-W", "-X", "--check-hash-based-pycs"})
_PYTHON_OPTION_PREFIXES_WITH_VALUE = ("-W", "-X", "--check-hash-based-pycs=")
_JS_INLINE_CODE_FLAGS = frozenset({"-e", "--eval", "-p", "--print"})
_DENIED_DOCKER_ARGS = frozenset(
{
Expand Down Expand Up @@ -145,6 +155,50 @@ def _normalize_stdio_command_name(command: str) -> str:
return command_name


def _is_python_option_with_inline_value(arg: str) -> bool:
return any(
arg.startswith(prefix) and arg != prefix
for prefix in _PYTHON_OPTION_PREFIXES_WITH_VALUE
)


def _ensure_valid_python_module(module_name: str | None) -> None:
if not module_name or module_name.startswith("-"):
raise ValueError(_PYTHON_MODULE_REQUIRED_MSG)


def _validate_python_stdio_args(args: list[str]) -> None:
Comment thread
sourcery-ai[bot] marked this conversation as resolved.
index = 0
while index < len(args):
arg = args[index]
if arg == "--":
index += 1
break
if arg in _PYTHON_OPTIONS_WITH_VALUE:
index += 2
continue
if _is_python_option_with_inline_value(arg):
index += 1
continue
if arg == "-m":
module_name = args[index + 1] if index + 1 < len(args) else None
_ensure_valid_python_module(module_name)
return
if arg.startswith("-m") and not arg.startswith("--"):
_ensure_valid_python_module(arg[2:])
return
if arg in _PYTHON_INLINE_CODE_FLAGS or (
arg.startswith("-") and not arg.startswith("--") and "c" in arg
):
raise ValueError(_PYTHON_INLINE_CODE_DENIED_MSG)
if arg.startswith("-"):
index += 1
continue
break

raise ValueError(_PYTHON_MODULE_REQUIRED_MSG)


def _get_stdio_command_allowlist() -> set[str]:
allowed = set(_DEFAULT_STDIO_COMMAND_ALLOWLIST)
configured = os.environ.get(_STDIO_ALLOWLIST_ENV, "")
Expand All @@ -163,7 +217,10 @@ def _is_stdio_config(config: dict) -> bool:


def _validate_stdio_args(command_name: str, args: object) -> None:
is_python_command = command_name.startswith("python") or command_name == "py"
if args is None:
if is_python_command:
raise ValueError(_PYTHON_MODULE_REQUIRED_MSG)
return
if not isinstance(args, list) or not all(isinstance(arg, str) for arg in args):
raise ValueError("MCP stdio args must be a list of strings.")
Expand All @@ -172,15 +229,8 @@ def _validate_stdio_args(command_name: str, args: object) -> None:
if "\x00" in arg or "\r" in arg or "\n" in arg:
raise ValueError("MCP stdio args cannot contain control characters.")

if command_name.startswith("python") or command_name == "py":
if any(
arg == "-c"
or (arg.startswith("-") and not arg.startswith("--") and "c" in arg)
for arg in args
):
raise ValueError(
"MCP stdio Python servers must be launched from a module or file; inline code flags such as -c are not allowed."
)
if is_python_command:
_validate_python_stdio_args(args)
elif command_name in {"node", "deno", "bun"} or command_name.startswith("node"):
if any(
arg in _JS_INLINE_CODE_FLAGS
Expand Down
4 changes: 2 additions & 2 deletions astrbot/core/computer/computer_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,10 +504,10 @@ async def _sync_skills_to_sandbox(booter: ComputerBooter) -> None:
for skill_name, skill_dir in sync_skill_dirs:
shutil.copytree(skill_dir, bundle_root / skill_name)
shutil.make_archive(str(zip_base), "zip", str(bundle_root))
remote_zip = Path(SANDBOX_SKILLS_ROOT) / "skills.zip"
remote_zip = f"{SANDBOX_SKILLS_ROOT}/skills.zip"
logger.info("Uploading skills bundle to sandbox...")
await booter.shell.exec(f"mkdir -p {SANDBOX_SKILLS_ROOT}")
upload_result = await booter.upload_file(str(zip_path), str(remote_zip))
upload_result = await booter.upload_file(str(zip_path), remote_zip)
if not upload_result.get("success", False):
raise RuntimeError("Failed to upload skills bundle to sandbox.")
else:
Expand Down
3 changes: 2 additions & 1 deletion astrbot/dashboard/services/tools_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ async def update_mcp_server(self, server_data: Any) -> str:
old_config,
active,
)
self._validate_server_config(server_config)
if active or not only_update_active:
self._validate_server_config(server_config)

if is_rename:
config["mcpServers"].pop(old_name)
Expand Down
97 changes: 96 additions & 1 deletion tests/unit/test_mcp_client_schema.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
from types import SimpleNamespace
from unittest.mock import MagicMock

from astrbot.core.agent.mcp_client import MCPTool, _normalize_mcp_input_schema
import pytest

from astrbot.core.agent.mcp_client import (
MCPTool,
_normalize_mcp_input_schema,
validate_mcp_stdio_config,
)


class TestNormalizeMcpInputSchema:
Expand Down Expand Up @@ -115,3 +121,92 @@ def test_mcp_tool_accepts_property_level_required_booleans(self):
assert tool.parameters["required"] == ["stock_code"]
assert "required" not in tool.parameters["properties"]["stock_code"]
assert "required" not in tool.parameters["properties"]["market"]


class TestValidateMcpStdioConfig:
@pytest.mark.parametrize("suffix", [".py", ".pyw", ".pyc", ".pyo", ".pyz"])
def test_rejects_python_local_script_targets(self, suffix: str) -> None:
config = {"command": "python", "args": [f"./payload{suffix}"]}

with pytest.raises(ValueError, match="local Python script/archive"):
validate_mcp_stdio_config(config)

def test_rejects_windows_python_archive_target_case_insensitively(self) -> None:
config = {
"command": "python3",
"args": ["-I", "-u", "C:\\Users\\Public\\payload.PYZ"],
}

with pytest.raises(ValueError, match="local Python script/archive"):
validate_mcp_stdio_config(config)

def test_rejects_python_script_target_after_option_separator(self) -> None:
config = {"command": "python", "args": ["--", "payload.py"]}

with pytest.raises(ValueError, match="local Python script/archive"):
validate_mcp_stdio_config(config)

def test_rejects_python_local_target_without_known_suffix(self) -> None:
config = {"command": "python", "args": ["./payload"]}

with pytest.raises(ValueError, match="local Python script/archive"):
validate_mcp_stdio_config(config)

@pytest.mark.parametrize(
"config",
[
{"command": "python", "args": ["-m", "mcp_server"]},
{"command": "python3", "args": ["-I", "-m", "package.module"]},
{"command": "python3", "args": ["-mmcp_server"]},
{"command": "py", "args": ["-3.12", "-m", "mcp_server"]},
{
"command": "python",
"args": ["-m", "mcp_server", "--config", "server.py"],
},
{
"mcpServers": {
"demo": {"command": "python", "args": ["-m", "mcp_server"]}
}
},
],
)
def test_allows_python_module_launches(self, config: dict) -> None:
validate_mcp_stdio_config(config)

def test_rejects_python_inline_code_flags(self) -> None:
config = {"command": "python", "args": ["-c", "print('x')"]}

with pytest.raises(ValueError, match="inline code flags"):
validate_mcp_stdio_config(config)

def test_rejects_python_compact_inline_code_flags(self) -> None:
config = {"command": "python", "args": ["-Ic", "print('x')"]}

with pytest.raises(ValueError, match="inline code flags"):
validate_mcp_stdio_config(config)

def test_rejects_python_missing_module_name(self) -> None:
config = {"command": "python", "args": ["-m"]}

with pytest.raises(ValueError, match="module or package"):
validate_mcp_stdio_config(config)

def test_rejects_python_option_only_launch(self) -> None:
config = {"command": "python", "args": ["-I", "-u"]}

with pytest.raises(ValueError, match="module or package"):
validate_mcp_stdio_config(config)

def test_rejects_python_launch_without_args(self) -> None:
config = {"command": "python"}

with pytest.raises(ValueError, match="module or package"):
validate_mcp_stdio_config(config)

def test_allows_module_owned_dash_c_argument(self) -> None:
config = {
"command": "python",
"args": ["-m", "mcp_server", "-c", "config.toml"],
}

validate_mcp_stdio_config(config)