diff --git a/docs/copilot-bot.md b/docs/copilot-bot.md
new file mode 100644
index 0000000..ca379fc
--- /dev/null
+++ b/docs/copilot-bot.md
@@ -0,0 +1,336 @@
+# CopilotBot
+
+CopilotBot is a Microbots agent that delegates the entire agent loop to the **GitHub Copilot runtime**. Unlike other Microbots bots (ReadingBot, WritingBot, etc.) where Microbots manages the LLM ↔ tool loop, CopilotBot lets the Copilot runtime handle planning, tool invocation, file edits, shell commands, and multi-turn reasoning — all within a secure Docker sandbox.
+
+## Prerequisites
+
+- **Docker** — a running Docker daemon
+- **Python 3.10+**
+- **One of the following** for authentication:
+ - A GitHub Copilot subscription (for native Copilot auth), **or**
+ - API credentials for any OpenAI-compatible, Azure OpenAI, or Anthropic endpoint (BYOK — no Copilot subscription needed)
+
+## Installation
+
+```bash
+pip install microbots[ghcp]
+```
+
+This installs the `github-copilot-sdk` package alongside Microbots.
+
+!!! note
+ You do **not** need to install `copilot-cli` on your host machine. Microbots automatically installs and runs it inside the Docker container during initialization.
+
+## Quick Start
+
+```python
+from microbots.bot.CopilotBot import CopilotBot
+
+bot = CopilotBot(
+ model="gpt-4.1",
+ folder_to_mount="/path/to/your/project",
+ github_token="ghp_your_github_token",
+)
+
+result = bot.run("Fix the failing unit tests and make sure all tests pass.")
+
+print(result.status) # True if successful
+print(result.result) # The agent's final response
+print(result.error) # Error message if status is False
+
+bot.stop()
+```
+
+## Authentication Methods
+
+CopilotBot supports multiple authentication methods. The first two require a GitHub Copilot subscription; the BYOK methods do not.
+
+### 1. GitHub Token (Native Copilot Auth)
+
+Pass a GitHub token directly or let Microbots discover it from the environment.
+
+```python
+# Option A: Pass explicitly
+bot = CopilotBot(
+ model="gpt-4.1",
+ folder_to_mount="/path/to/project",
+ github_token="ghp_your_token",
+)
+
+# Option B: Set environment variable (auto-discovered)
+# export GITHUB_TOKEN="ghp_your_token"
+# — or —
+# export COPILOT_GITHUB_TOKEN="ghp_your_token"
+# — or —
+# export GH_TOKEN="ghp_your_token"
+
+bot = CopilotBot(
+ model="gpt-4.1",
+ folder_to_mount="/path/to/project",
+)
+```
+
+If no token is provided and no environment variable is set, Microbots will attempt to retrieve a token from a local GitHub Copilot login (e.g. via `gh copilot`).
+
+**Token discovery order:** explicit `github_token` → `COPILOT_GITHUB_TOKEN` → `GITHUB_TOKEN` → `GH_TOKEN` → local Copilot login.
+
+!!! note
+ The local Copilot login fallback requires `copilot-cli` to be installed on your **host** machine and a valid login session in your home directory (e.g. via `copilot login`). If `copilot-cli` is not installed or no login is found, this step is skipped.
+
+### 2. BYOK — API Key (No Copilot Subscription Required)
+
+Use your own API key and endpoint. This works with any OpenAI-compatible API, Anthropic, or Azure OpenAI — no GitHub Copilot subscription needed.
+
+#### OpenAI
+
+```python
+bot = CopilotBot(
+ model="gpt-4.1",
+ folder_to_mount="/path/to/project",
+ api_key="sk-your-openai-key",
+ base_url="https://api.openai.com/v1",
+ provider_type="openai", # default, can be omitted
+)
+```
+
+#### Anthropic
+
+```python
+bot = CopilotBot(
+ model="claude-sonnet-4.5",
+ folder_to_mount="/path/to/project",
+ api_key="sk-ant-your-key",
+ base_url="https://api.anthropic.com",
+ provider_type="anthropic",
+)
+```
+
+#### Azure OpenAI
+
+```python
+bot = CopilotBot(
+ model="my-gpt4-deployment",
+ folder_to_mount="/path/to/project",
+ api_key="your-azure-api-key",
+ base_url="https://your-resource.openai.azure.com",
+ provider_type="azure",
+ azure_api_version="2024-10-21",
+)
+```
+
+#### Using `wire_api` for newer models
+
+For models that use the Responses API (e.g. GPT-5 series), set `wire_api="responses"`:
+
+```python
+bot = CopilotBot(
+ model="gpt-5",
+ folder_to_mount="/path/to/project",
+ api_key="sk-your-key",
+ base_url="https://api.openai.com/v1",
+ wire_api="responses",
+)
+```
+
+### 3. BYOK — Bearer Token
+
+If your provider uses bearer token authentication instead of an API key:
+
+```python
+bot = CopilotBot(
+ model="gpt-4.1",
+ folder_to_mount="/path/to/project",
+ bearer_token="your-bearer-token",
+ base_url="https://your-endpoint.com/v1",
+)
+```
+
+!!! note
+ When both `api_key` and `bearer_token` are provided, `bearer_token` takes precedence.
+
+### 4. BYOK — Token Provider (e.g. Azure AD)
+
+For environments that use dynamic token authentication (such as Azure AD managed identity), pass a callable that returns a fresh token:
+
+```python
+from azure.identity import DefaultAzureCredential
+
+credential = DefaultAzureCredential()
+
+def get_token():
+ return credential.get_token("https://cognitiveservices.azure.com/.default").token
+
+bot = CopilotBot(
+ model="my-gpt4-deployment",
+ folder_to_mount="/path/to/project",
+ base_url="https://your-resource.openai.azure.com",
+ provider_type="azure",
+ azure_api_version="2024-10-21",
+ token_provider=get_token,
+)
+```
+
+The `token_provider` must be a callable that returns a non-empty string. It is called once at initialization time.
+
+### 5. BYOK — Environment Variables
+
+Configure BYOK entirely through environment variables without changing any code:
+
+```bash
+export COPILOT_BYOK_BASE_URL="https://api.openai.com/v1"
+export COPILOT_BYOK_API_KEY="sk-your-key"
+export COPILOT_BYOK_PROVIDER_TYPE="openai" # optional, defaults to "openai"
+export COPILOT_BYOK_MODEL="gpt-4.1" # optional, overrides the model param
+export COPILOT_BYOK_WIRE_API="completions" # optional
+export COPILOT_BYOK_AZURE_API_VERSION="2024-10-21" # optional, for Azure only
+```
+
+Then create the bot without any auth parameters:
+
+```python
+bot = CopilotBot(
+ folder_to_mount="/path/to/project",
+)
+```
+
+You can also use `COPILOT_BYOK_BEARER_TOKEN` instead of `COPILOT_BYOK_API_KEY` for bearer-token authentication.
+
+## Authentication Priority
+
+When multiple auth methods are configured simultaneously, CopilotBot resolves them in this order:
+
+| Priority | Method | Condition |
+|----------|--------|-----------|
+| 1 | Explicit API key / bearer token | `api_key` or `bearer_token` parameter is set |
+| 2 | Environment variables | `COPILOT_BYOK_BASE_URL` + `COPILOT_BYOK_API_KEY` or `COPILOT_BYOK_BEARER_TOKEN` |
+| 3 | Token provider | `token_provider` parameter is set |
+| 4 | Native GitHub Copilot | `github_token` or `GITHUB_TOKEN` / `COPILOT_GITHUB_TOKEN` / `GH_TOKEN` env vars |
+
+## Parameters
+
+### Constructor
+
+| Parameter | Type | Default | Description |
+|-----------|------|---------|-------------|
+| `model` | `str` | `"gpt-4.1"` | Model name (e.g. `"gpt-4.1"`, `"claude-sonnet-4.5"`). No provider prefix needed. |
+| `folder_to_mount` | `str` | `None` | Absolute path to the folder to mount into the sandbox. |
+| `permission` | `PermissionLabels` | `READ_WRITE` | Mount permission — `READ_ONLY` or `READ_WRITE`. |
+| `environment` | `LocalDockerEnvironment` | `None` | Pre-created Docker environment. Auto-created if not provided. |
+| `additional_tools` | `list[ToolAbstract]` | `[]` | Extra tools to install in the sandbox. |
+| `github_token` | `str` | `None` | GitHub token for native Copilot auth. |
+| `api_key` | `str` | `None` | API key for BYOK. |
+| `bearer_token` | `str` | `None` | Bearer token for BYOK. |
+| `base_url` | `str` | `None` | API endpoint URL for BYOK. |
+| `provider_type` | `str` | `"openai"` | BYOK provider: `"openai"`, `"azure"`, or `"anthropic"`. |
+| `wire_api` | `str` | `None` | API format: `"completions"` or `"responses"`. |
+| `azure_api_version` | `str` | `None` | Azure API version (for `provider_type="azure"` only). |
+| `token_provider` | `Callable[[], str]` | `None` | Callable returning a bearer token string. |
+
+### `run()` method
+
+| Parameter | Type | Default | Description |
+|-----------|------|---------|-------------|
+| `task` | `str` | *(required)* | Natural-language description of the task. |
+| `additional_mounts` | `list[Mount]` | `None` | Extra folders to copy into the container. |
+| `timeout_in_seconds` | `int` | `600` | Maximum wall-clock time for the agent run. |
+| `streaming` | `bool` | `False` | Enable streaming delta events (logged at DEBUG level). |
+
+### Return value — `BotRunResult`
+
+| Field | Type | Description |
+|-------|------|-------------|
+| `status` | `bool` | `True` if the agent completed successfully. |
+| `result` | `str` or `None` | The agent's final response text. |
+| `error` | `str` or `None` | Error description if `status` is `False`. |
+
+## Examples
+
+### Read-only code analysis
+
+```python
+from microbots.bot.CopilotBot import CopilotBot
+from microbots.constants import PermissionLabels
+
+bot = CopilotBot(
+ model="gpt-4.1",
+ folder_to_mount="/path/to/project",
+ permission=PermissionLabels.READ_ONLY,
+ github_token="ghp_your_token",
+)
+
+result = bot.run("Analyze the codebase and list all public API endpoints.")
+print(result.result)
+bot.stop()
+```
+
+### Fix a bug with BYOK (OpenAI)
+
+```python
+from microbots.bot.CopilotBot import CopilotBot
+
+bot = CopilotBot(
+ model="gpt-4.1",
+ folder_to_mount="/path/to/project",
+ api_key="sk-your-openai-key",
+ base_url="https://api.openai.com/v1",
+)
+
+result = bot.run(
+ "The login form crashes when email contains a '+'. Fix the validation logic.",
+ timeout_in_seconds=300,
+)
+print(result.result)
+bot.stop()
+```
+
+### Using additional tools
+
+```python
+from microbots.bot.CopilotBot import CopilotBot
+from microbots.tools.internal_tool import InternalTool
+
+my_tool = InternalTool(tool_definition_path="path/to/tool.yaml")
+
+bot = CopilotBot(
+ model="gpt-4.1",
+ folder_to_mount="/path/to/project",
+ github_token="ghp_your_token",
+ additional_tools=[my_tool],
+)
+
+result = bot.run("Use the custom tool to lint and then fix all issues.")
+bot.stop()
+```
+
+!!! warning
+ `ExternalTool` is **not supported** with CopilotBot. Only tools that run inside the Docker container (internal tools) can be used.
+
+### Mounting additional folders at runtime
+
+```python
+from microbots.bot.CopilotBot import CopilotBot
+from microbots.extras.mount import Mount
+
+bot = CopilotBot(
+ model="gpt-4.1",
+ folder_to_mount="/path/to/project",
+ github_token="ghp_your_token",
+)
+
+extra = Mount("/path/to/test-data", "/workdir/test-data", "READ_ONLY")
+result = bot.run(
+ "Run the integration tests using the data in /workdir/test-data.",
+ additional_mounts=[extra],
+)
+bot.stop()
+```
+
+## Cleanup
+
+Always call `bot.stop()` when you are done. This tears down the SDK client, the CLI server, and the Docker container:
+
+```python
+bot.stop()
+```
+
+`stop()` is idempotent — calling it multiple times is safe. It is also called automatically when the object is garbage-collected, but explicit cleanup is recommended.
diff --git a/mkdocs.yml b/mkdocs.yml
index d7e50a8..9fa483b 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -41,6 +41,9 @@ markdown_extensions:
nav:
- Getting Started:
- Home: index.md
+ - Guides:
+ - CopilotBot: copilot-bot.md
+ - Authentication: authentication.md
- Blogs:
- blog/index.md
- "Microbots : Safety First Agentic Workflow": blog/microbots-safety-first-ai-agent.md
diff --git a/pyproject.toml b/pyproject.toml
index becf06a..fd5f0ef 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -21,6 +21,10 @@ classifiers = [
]
requires-python = ">=3.11"
+[project.optional-dependencies]
+ghcp = ["github-copilot-sdk"]
+azure_ad = ["azure-identity>=1.15.0"]
+
[tool.setuptools.dynamic]
dependencies = { file = ["requirements.txt"] }
@@ -30,9 +34,6 @@ dependencies = { file = ["requirements.txt"] }
[tool.setuptools.packages.find]
where = ["src"]
-[project.optional-dependencies]
-azure_ad = ["azure-identity>=1.15.0"]
-
[project.urls]
"Source Repo" = "https://github.com/microsoft/minions"
Issues = "https://github.com/microsoft/minions/issues"
\ No newline at end of file
diff --git a/src/microbots/__init__.py b/src/microbots/__init__.py
index cc344c0..7415ffd 100644
--- a/src/microbots/__init__.py
+++ b/src/microbots/__init__.py
@@ -1,4 +1,4 @@
-from microbots.bot import ReadingBot, WritingBot, LogAnalysisBot, BrowsingBot, AgentBoss
+from microbots.bot import ReadingBot, WritingBot, LogAnalysisBot, BrowsingBot, AgentBoss, CopilotBot
from microbots.MicroBot import MicroBot, BotRunResult
__all__ = [
"ReadingBot",
@@ -6,6 +6,7 @@
"LogAnalysisBot",
"BrowsingBot",
"AgentBoss",
+ "CopilotBot",
"MicroBot",
"BotRunResult"
]
diff --git a/src/microbots/bot/CopilotBot.py b/src/microbots/bot/CopilotBot.py
new file mode 100644
index 0000000..bb8e656
--- /dev/null
+++ b/src/microbots/bot/CopilotBot.py
@@ -0,0 +1,716 @@
+"""
+CopilotBot — A wrapper around the GitHub Copilot SDK.
+
+Unlike MicroBot (which manages its own LLM ↔ shell agent loop), CopilotBot
+delegates the **entire agent loop to the Copilot runtime**. Copilot handles
+planning, tool invocation (file edits, shell commands, web requests, etc.)
+and multi-turn reasoning autonomously.
+
+CopilotBot reuses the Microbots infrastructure:
+ - Docker sandbox (LocalDockerEnvironment) for isolated execution
+ - Mount system for folder access control
+ - ToolAbstract lifecycle (install → verify → setup) for additional tools
+ - copilot-cli is installed **inside** the container and run in headless
+ server mode; the SDK on the host connects to it via TCP.
+
+Architecture:
+
+ Host Docker Container
+ ───── ────────────────
+ CopilotBot copilot-cli --headless --port
+ │ │
+ ├── Copilot SDK ──TCP──────────┘
+ │ (ExternalServerConfig)
+ │
+ ├── additional tools
+ │ (define_tool → SDK session)
+ │
+ └── BotRunResult
+
+Prerequisites:
+ - pip install microbots[ghcp] (github-copilot-sdk)
+ - Docker daemon running
+ - GitHub authentication (GITHUB_TOKEN / COPILOT_GITHUB_TOKEN or copilot login)
+"""
+
+import asyncio
+import os
+import time
+import threading
+from collections.abc import Callable
+from logging import getLogger
+from typing import Optional
+
+from microbots.constants import (
+ DOCKER_WORKING_DIR,
+ PermissionLabels,
+)
+from microbots.environment.local_docker.LocalDockerEnvironment import (
+ LocalDockerEnvironment,
+)
+from microbots.extras.mount import Mount, MountType
+from microbots.MicroBot import BotRunResult
+from microbots.tools.external_tool import ExternalTool
+from microbots.tools.tool import ToolAbstract
+from microbots.utils.copilot_auth import get_copilot_token
+from microbots.utils.network import get_free_port # still used for _create_environment
+
+logger = getLogger(" CopilotBot ")
+
+# Default model when none is specified (just the deployment name, no provider prefix)
+_DEFAULT_MODEL = "gpt-4.1"
+
+# Time (seconds) to wait for copilot-cli to start inside the container
+_CLI_STARTUP_TIMEOUT = 60
+
+# copilot-cli port inside the container
+_CONTAINER_CLI_PORT = 4321
+
+# Environment variable names for BYOK configuration
+_BYOK_ENV_PROVIDER_TYPE = "COPILOT_BYOK_PROVIDER_TYPE"
+_BYOK_ENV_BASE_URL = "COPILOT_BYOK_BASE_URL"
+_BYOK_ENV_API_KEY = "COPILOT_BYOK_API_KEY"
+_BYOK_ENV_BEARER_TOKEN = "COPILOT_BYOK_BEARER_TOKEN"
+_BYOK_ENV_WIRE_API = "COPILOT_BYOK_WIRE_API"
+_BYOK_ENV_AZURE_API_VERSION = "COPILOT_BYOK_AZURE_API_VERSION"
+_BYOK_ENV_MODEL = "COPILOT_BYOK_MODEL"
+
+
+def resolve_auth_config(
+ model: str = _DEFAULT_MODEL,
+ github_token: Optional[str] = None,
+ api_key: Optional[str] = None,
+ bearer_token: Optional[str] = None,
+ base_url: Optional[str] = None,
+ provider_type: Optional[str] = None,
+ wire_api: Optional[str] = None,
+ azure_api_version: Optional[str] = None,
+ token_provider: Optional[Callable[[], str]] = None,
+) -> tuple[str, Optional[str], Optional[dict]]:
+ """Resolve authentication and provider configuration for CopilotBot.
+
+ Determines whether to use BYOK (Bring Your Own Key) or native GitHub
+ Copilot authentication, and builds the appropriate provider config.
+
+ Priority order:
+ 1. Explicit ``api_key`` or ``bearer_token`` with ``base_url`` → BYOK
+ 2. Environment variables (``COPILOT_BYOK_*``) → BYOK
+ 3. ``token_provider`` (e.g. Azure AD token provider) → BYOK with bearer token
+ 4. GitHub token → native Copilot authentication
+
+ Parameters
+ ----------
+ model : str
+ Model name (e.g. ``"gpt-4.1"``, ``"claude-sonnet-4.5"``).
+ github_token : Optional[str]
+ GitHub token for native Copilot auth.
+ api_key : Optional[str]
+ API key for BYOK provider.
+ bearer_token : Optional[str]
+ Bearer token for BYOK (takes precedence over ``api_key``).
+ base_url : Optional[str]
+ API endpoint URL for BYOK provider.
+ provider_type : Optional[str]
+ Provider type: ``"openai"``, ``"azure"``, or ``"anthropic"``.
+ wire_api : Optional[str]
+ API format: ``"completions"`` or ``"responses"``.
+ azure_api_version : Optional[str]
+ Azure API version (only for ``type: "azure"``).
+ token_provider : Optional[Callable[[], str]]
+ Callable that returns a bearer token string (e.g. Azure AD
+ token provider). The token is fetched once at config resolution
+ time. For long-running sessions, create a new session with a
+ refreshed token.
+
+ Returns
+ -------
+ tuple[str, Optional[str], Optional[dict]]
+ ``(model, github_token, provider_config)`` where
+ ``provider_config`` is *None* for native Copilot auth or a dict
+ suitable for the ``provider`` kwarg of ``create_session``.
+
+ Raises
+ ------
+ ValueError
+ If BYOK is requested but ``base_url`` is missing, or if
+ ``token_provider`` is not a valid callable.
+ """
+
+ # ── 1. Explicit api_key / bearer_token ───────────────────────────
+ if api_key or bearer_token:
+ if not base_url:
+ raise ValueError(
+ "BYOK requires a base_url when api_key or bearer_token is provided."
+ )
+ provider = _build_provider_config(
+ provider_type=provider_type or "openai",
+ base_url=base_url,
+ api_key=api_key,
+ bearer_token=bearer_token,
+ wire_api=wire_api,
+ azure_api_version=azure_api_version,
+ )
+ logger.info("🔑 BYOK auth resolved via explicit credentials (type=%s)", provider["type"])
+ return model, None, provider
+
+ # ── 2. Environment variables ─────────────────────────────────────
+ env_base_url = os.environ.get(_BYOK_ENV_BASE_URL)
+ env_api_key = os.environ.get(_BYOK_ENV_API_KEY)
+ env_bearer_token = os.environ.get(_BYOK_ENV_BEARER_TOKEN)
+
+ if env_base_url and (env_api_key or env_bearer_token):
+ env_model = os.environ.get(_BYOK_ENV_MODEL, model)
+ provider = _build_provider_config(
+ provider_type=os.environ.get(_BYOK_ENV_PROVIDER_TYPE, "openai"),
+ base_url=env_base_url,
+ api_key=env_api_key,
+ bearer_token=env_bearer_token,
+ wire_api=os.environ.get(_BYOK_ENV_WIRE_API),
+ azure_api_version=os.environ.get(_BYOK_ENV_AZURE_API_VERSION),
+ )
+ logger.info("🔑 BYOK auth resolved via environment variables (type=%s)", provider["type"])
+ return env_model, None, provider
+
+ # ── 3. Token provider (e.g. Azure AD) ────────────────────────────
+ if token_provider:
+ if not callable(token_provider):
+ raise ValueError("token_provider must be a callable that returns a string token.")
+ resolved_url = base_url or env_base_url
+ if not resolved_url:
+ raise ValueError(
+ "BYOK with token_provider requires a base_url (pass it directly "
+ "or set COPILOT_BYOK_BASE_URL)."
+ )
+ try:
+ token = token_provider()
+ except Exception as e:
+ raise ValueError(f"token_provider failed during validation: {e}") from e
+ if not isinstance(token, str) or not token:
+ raise ValueError("token_provider must return a non-empty string token.")
+
+ provider = _build_provider_config(
+ provider_type=provider_type or os.environ.get(_BYOK_ENV_PROVIDER_TYPE, "openai"),
+ base_url=resolved_url,
+ bearer_token=token,
+ wire_api=wire_api or os.environ.get(_BYOK_ENV_WIRE_API),
+ azure_api_version=azure_api_version or os.environ.get(_BYOK_ENV_AZURE_API_VERSION),
+ )
+ logger.info("🔑 BYOK auth resolved via token_provider (type=%s)", provider["type"])
+ return model, None, provider
+
+ # ── 4. Native GitHub Copilot auth ────────────────────────────────
+ resolved_github_token = (
+ github_token
+ or os.environ.get("COPILOT_GITHUB_TOKEN")
+ or os.environ.get("GITHUB_TOKEN")
+ or os.environ.get("GH_TOKEN")
+ or get_copilot_token()
+ )
+ logger.info("🔑 Using native GitHub Copilot authentication")
+ return model, resolved_github_token, None
+
+
+def _build_provider_config(
+ provider_type: str,
+ base_url: str,
+ api_key: Optional[str] = None,
+ bearer_token: Optional[str] = None,
+ wire_api: Optional[str] = None,
+ azure_api_version: Optional[str] = None,
+) -> dict:
+ """Build the ``provider`` dict accepted by ``create_session``."""
+ config: dict = {
+ "type": provider_type,
+ "base_url": base_url,
+ }
+ # bearer_token takes precedence over api_key per SDK docs
+ if bearer_token:
+ config["bearer_token"] = bearer_token
+ elif api_key:
+ config["api_key"] = api_key
+
+ if wire_api:
+ config["wire_api"] = wire_api
+
+ if provider_type == "azure" and azure_api_version:
+ config["azure"] = {"api_version": azure_api_version}
+
+ return config
+
+
+class CopilotBot:
+ """Wrapper around the GitHub Copilot SDK with a sandboxed Docker environment.
+
+ The Copilot runtime manages the agent loop (planning, tool calls,
+ multi-turn reasoning). CopilotBot sets up the sandbox, installs
+ copilot-cli inside it, connects the SDK, and exposes a simple
+ ``run(task)`` interface.
+
+ Parameters
+ ----------
+ model : str
+ Copilot model name (e.g. ``"gpt-4.1"``, ``"claude-sonnet-4.5"``).
+ Unlike MicroBot, no ``/`` prefix is needed.
+ folder_to_mount : str
+ Absolute host path to mount into the sandbox.
+ permission : PermissionLabels
+ Mount permission — READ_ONLY or READ_WRITE. Defaults to READ_WRITE.
+ environment : Optional[LocalDockerEnvironment]
+ Pre-created environment. One is created automatically when *None*.
+ additional_tools : Optional[list[ToolAbstract]]
+ Extra Microbots tools to install in the sandbox. Their
+ ``usage_instructions_to_llm`` are appended to the system message
+ and, where possible, they are registered as SDK custom tools.
+ github_token : Optional[str]
+ Explicit GitHub token. Falls back to ``GITHUB_TOKEN`` /
+ ``COPILOT_GITHUB_TOKEN`` env vars. Used only when BYOK is not
+ configured.
+ api_key : Optional[str]
+ API key for BYOK provider. When provided with ``base_url``,
+ bypasses GitHub Copilot auth and uses the key directly.
+ bearer_token : Optional[str]
+ Bearer token for BYOK provider. Takes precedence over ``api_key``.
+ base_url : Optional[str]
+ API endpoint URL for BYOK (e.g.
+ ``"https://api.openai.com/v1"``).
+ provider_type : Optional[str]
+ BYOK provider type: ``"openai"``, ``"azure"``, or
+ ``"anthropic"``. Defaults to ``"openai"``.
+ wire_api : Optional[str]
+ API format: ``"completions"`` (default) or ``"responses"``
+ (for GPT-5 series).
+ azure_api_version : Optional[str]
+ Azure API version string (only for ``provider_type="azure"``).
+ token_provider : Optional[Callable[[], str]]
+ A callable returning a bearer token (e.g. Azure AD token
+ provider). Requires ``base_url``.
+ """
+
+ def __init__(
+ self,
+ model: str = _DEFAULT_MODEL,
+ folder_to_mount: Optional[str] = None,
+ permission: PermissionLabels = PermissionLabels.READ_WRITE,
+ environment: Optional[LocalDockerEnvironment] = None,
+ additional_tools: Optional[list[ToolAbstract]] = None,
+ github_token: Optional[str] = None,
+ api_key: Optional[str] = None,
+ bearer_token: Optional[str] = None,
+ base_url: Optional[str] = None,
+ provider_type: Optional[str] = None,
+ wire_api: Optional[str] = None,
+ azure_api_version: Optional[str] = None,
+ token_provider: Optional[Callable[[], str]] = None,
+ ):
+ try:
+ from copilot import CopilotClient, ExternalServerConfig
+ from copilot.types import PermissionHandler
+ except ImportError:
+ raise ImportError(
+ "CopilotBot requires the github-copilot-sdk package. "
+ "Install with: pip install microbots[ghcp]"
+ )
+
+ self.additional_tools = additional_tools or []
+
+ # ── Resolve auth: BYOK vs native GitHub Copilot ─────────────
+ self.model, self.github_token, self._provider_config = resolve_auth_config(
+ model=model,
+ github_token=github_token,
+ api_key=api_key,
+ bearer_token=bearer_token,
+ base_url=base_url,
+ provider_type=provider_type,
+ wire_api=wire_api,
+ azure_api_version=azure_api_version,
+ token_provider=token_provider,
+ )
+
+ # ── Mount setup ─────────────────────────────────────────────
+ self.folder_to_mount: Optional[Mount] = None
+ if folder_to_mount:
+ sandbox_path = f"/{DOCKER_WORKING_DIR}/{os.path.basename(folder_to_mount)}"
+ self.folder_to_mount = Mount(folder_to_mount, sandbox_path, permission)
+
+ # ── Docker environment ──────────────────────────────────────
+ self.environment = environment
+ if not self.environment:
+ self._create_environment()
+
+ # ── Validate tools — ExternalTool is not supported ──────────
+ # __ And ___
+ # ── Install additional tools inside the container ───────────
+ for tool in self.additional_tools:
+ if isinstance(tool, ExternalTool):
+ raise ValueError(
+ f"CopilotBot does not support ExternalTool '{tool.name}'. "
+ f"copilot-cli runs inside the Docker container, so only "
+ f"internal (container-side) tools are allowed."
+ )
+
+ logger.info("🔧 Installing additional tool '%s'...", tool.name)
+ tool.install_tool(self.environment)
+ tool.verify_tool_installation(self.environment)
+ logger.info("✅ Tool '%s' installed and verified", tool.name)
+
+ # ── Install & start copilot-cli inside the container ────────
+ self._install_copilot_cli()
+ self._start_copilot_cli_server()
+
+ # ── Background event loop for async SDK calls ───────────────
+ self._loop = asyncio.new_event_loop()
+ self._thread = threading.Thread(target=self._loop.run_forever, daemon=True)
+ self._thread.start()
+
+ # ── Connect SDK to in-container CLI ─────────────────────────
+ container_ip = self.environment.get_ipv4_address()
+ self._client = CopilotClient(
+ ExternalServerConfig(url=f"{container_ip}:{_CONTAINER_CLI_PORT}")
+ )
+ self._run_async(self._client.start())
+ self._PermissionHandler = PermissionHandler
+
+ logger.info(
+ "✅ CopilotBot initialised — model=%s, cli=%s:%d",
+ self.model,
+ container_ip,
+ _CONTAINER_CLI_PORT,
+ )
+
+ # ──────────────────────────────────────────────────────────────────
+ # Public API
+ # ──────────────────────────────────────────────────────────────────
+
+ def run(
+ self,
+ task: str,
+ additional_mounts: Optional[list[Mount]] = None,
+ timeout_in_seconds: int = 600,
+ streaming: bool = False,
+ ) -> BotRunResult:
+ """Send *task* to the Copilot agent and wait for completion.
+
+ The Copilot runtime manages the full agent loop autonomously —
+ planning, tool invocation, multi-turn reasoning, and file edits
+ all happen inside the sandboxed environment.
+
+ Parameters
+ ----------
+ task : str
+ A natural-language description of the task.
+ additional_mounts : Optional[list[Mount]]
+ Extra folders to copy into the container before running.
+ timeout_in_seconds : int
+ Maximum wall-clock time for the agent run.
+ streaming : bool
+ Whether to enable streaming delta events (logged at DEBUG level).
+
+ Returns
+ -------
+ BotRunResult
+ status=True on success with the agent's final message in *result*,
+ or status=False with an error description.
+ """
+ logger.info("🚀 Starting CopilotBot run — task: %.120s...", task)
+
+ # Setup additional tools (env vars, files, setup_commands)
+ for tool in self.additional_tools:
+ logger.info("⚙️ Setting up tool '%s'", tool.name)
+ tool.setup_tool(self.environment)
+
+ # Mount additional folders
+ for mount in additional_mounts or []:
+ self._mount_additional(mount)
+
+ # Build system message with tool instructions
+ system_content = self._build_system_message()
+
+ try:
+ result_text = self._run_async(
+ self._execute_session(
+ task=task,
+ system_content=system_content,
+ timeout=timeout_in_seconds,
+ streaming=streaming,
+ )
+ )
+ logger.info("✅ CopilotBot run completed successfully")
+ return BotRunResult(status=True, result=result_text, error=None)
+ except Exception as e:
+ logger.exception("❌ CopilotBot run failed: %s", e)
+ return BotRunResult(status=False, result=None, error=str(e))
+
+ def stop(self):
+ """Tear down the SDK client, CLI server, and Docker environment."""
+ if getattr(self, "_stopped", False):
+ return
+ self._stopped = True
+
+ # Stop the SDK client (best-effort, with timeout to avoid deadlock)
+ try:
+ if self._loop.is_running():
+ future = asyncio.run_coroutine_threadsafe(
+ self._client.stop(), self._loop
+ )
+ future.result(timeout=10)
+ except Exception:
+ pass
+
+ # Shut down the background event loop
+ try:
+ self._loop.call_soon_threadsafe(self._loop.stop)
+ self._thread.join(timeout=5)
+ except Exception:
+ pass
+
+ if self.environment:
+ self.environment.stop()
+ self.environment = None
+ logger.info("🛑 CopilotBot stopped")
+
+ def __del__(self):
+ try:
+ self.stop()
+ except Exception:
+ pass
+
+ # ──────────────────────────────────────────────────────────────────
+ # Private — environment & CLI setup
+ # ──────────────────────────────────────────────────────────────────
+
+ def _create_environment(self):
+ free_port = get_free_port()
+ self.environment = LocalDockerEnvironment(
+ port=free_port,
+ folder_to_mount=self.folder_to_mount,
+ )
+
+ def _install_copilot_cli(self):
+ """Install copilot-cli inside the Docker container."""
+ logger.info("📦 Installing copilot-cli inside container...")
+
+ # Install Node.js (required for copilot-cli via npm)
+ install_commands = [
+ # Remove stale third-party repos that may have expired GPG keys
+ "rm -f /etc/apt/sources.list.d/yarn.list",
+ # Install Node.js 22.x (copilot-cli requires Node 22+)
+ "apt-get update -qq && apt-get install -y -qq curl ca-certificates > /dev/null 2>&1",
+ "curl -fsSL https://deb.nodesource.com/setup_22.x | bash - > /dev/null 2>&1",
+ "apt-get install -y -qq nodejs > /dev/null 2>&1",
+ # Install copilot-cli globally
+ "npm install -g @github/copilot > /dev/null 2>&1",
+ ]
+
+ for cmd in install_commands:
+ result = self.environment.execute(cmd, timeout=300)
+ if result.return_code != 0:
+ raise RuntimeError(
+ f"Failed to install copilot-cli: {cmd}\n"
+ f"stdout: {result.stdout}\nstderr: {result.stderr}"
+ )
+
+ # Verify installation
+ verify = self.environment.execute("copilot --version")
+ if verify.return_code != 0:
+ raise RuntimeError(
+ f"copilot-cli installation verification failed: {verify.stderr}"
+ )
+ logger.info("✅ copilot-cli installed: %s", verify.stdout.strip())
+
+ def _start_copilot_cli_server(self):
+ """Start copilot-cli in headless server mode inside the container.
+
+ The CLI listens on ``_CONTAINER_CLI_PORT`` inside the container.
+ The host connects directly to the container's bridge-network IP.
+ Authentication is handled via the GITHUB_TOKEN environment variable
+ injected into the container.
+ """
+ # Inject the GitHub token into the container for native Copilot auth.
+ # When BYOK is active, authentication is handled via the provider
+ # config passed to create_session — no container-side token needed.
+ if self.github_token and not self._provider_config:
+ self.environment.execute(
+ f'export GITHUB_TOKEN="{self.github_token}"', sensitive=True
+ )
+ self.environment.execute(
+ f'export COPILOT_GITHUB_TOKEN="{self.github_token}"', sensitive=True
+ )
+
+ # Start copilot in headless mode in the background
+ # Using nohup + & to run it as a background process inside the container's shell
+ start_cmd = (
+ f"nohup copilot --headless --port {_CONTAINER_CLI_PORT} "
+ f"> /var/log/copilot-cli.log 2>&1 &"
+ )
+ result = self.environment.execute(start_cmd)
+ if result.return_code != 0:
+ raise RuntimeError(
+ f"Failed to start copilot-cli server: {result.stderr}"
+ )
+
+ # Wait for the server to be ready
+ self._wait_for_cli_ready()
+ logger.info(
+ "✅ copilot-cli headless server running on container port %d",
+ _CONTAINER_CLI_PORT,
+ )
+
+ def _wait_for_cli_ready(self):
+ """Poll until the copilot-cli server is accepting connections."""
+ import socket as _socket
+
+ container_ip = self.environment.get_ipv4_address()
+ deadline = time.time() + _CLI_STARTUP_TIMEOUT
+ while time.time() < deadline:
+ try:
+ sock = _socket.create_connection(
+ (container_ip, _CONTAINER_CLI_PORT), timeout=2
+ )
+ sock.close()
+ return
+ except (ConnectionRefusedError, OSError):
+ time.sleep(1)
+ raise TimeoutError(
+ f"copilot-cli did not become ready within {_CLI_STARTUP_TIMEOUT}s "
+ f"on {container_ip}:{_CONTAINER_CLI_PORT}"
+ )
+
+ # ──────────────────────────────────────────────────────────────────
+ # Private — SDK session & execution
+ # ──────────────────────────────────────────────────────────────────
+
+ def _run_async(self, coro):
+ """Submit an async coroutine to the background loop and block."""
+ future = asyncio.run_coroutine_threadsafe(coro, self._loop)
+ return future.result()
+
+ async def _execute_session(
+ self,
+ task: str,
+ system_content: str,
+ timeout: int,
+ streaming: bool,
+ ) -> str:
+ """Create a Copilot session, send the task, and collect the result."""
+ from copilot.generated.session_events import SessionEventType
+
+ session_kwargs = {
+ "model": self.model,
+ "on_permission_request": self._PermissionHandler.approve_all,
+ "streaming": streaming,
+ "hooks": {
+ "on_pre_tool_use": self._on_pre_tool_use,
+ "on_post_tool_use": self._on_post_tool_use,
+ },
+ }
+
+ if self._provider_config:
+ session_kwargs["provider"] = self._provider_config
+
+ if system_content:
+ session_kwargs["system_message"] = {"content": system_content}
+
+ logger.info("📡 Creating Copilot session (model=%s, streaming=%s, byok=%s)", self.model, streaming, self._provider_config is not None)
+ logger.debug("Session kwargs: %s", session_kwargs)
+ session = await self._client.create_session(**session_kwargs)
+
+ collected_text = []
+ done_event = asyncio.Event()
+
+ def _on_event(event):
+ if event.type == SessionEventType.ASSISTANT_MESSAGE:
+ if event.data and event.data.content:
+ collected_text.append(event.data.content)
+ logger.info("💬 Assistant message received (%d chars)", len(event.data.content))
+ elif event.type == SessionEventType.ASSISTANT_MESSAGE_DELTA:
+ if event.data and event.data.delta_content:
+ logger.debug("📝 %s", event.data.delta_content)
+ elif event.type == SessionEventType.SESSION_IDLE:
+ logger.info("⏹️ Session idle — agent finished processing")
+ done_event.set()
+ else:
+ logger.debug("📨 Session event: %s", event.type)
+
+ session.on(_on_event)
+
+ # Send the task prompt and wait for completion
+ logger.info("📤 Sending task to Copilot agent...")
+ logger.debug("Task content: %s", task)
+ response = await session.send_and_wait(task, timeout=float(timeout))
+
+ # If send_and_wait returned a full response, use it
+ if response and response.data and response.data.content:
+ logger.info("✅ Received response from send_and_wait with %d chars", len(response.data.content))
+ logger.info("Response content: %s", response.data.content)
+ return response.data.content
+
+ # Otherwise wait for the collected events
+ if not collected_text:
+ try:
+ await asyncio.wait_for(done_event.wait(), timeout=float(timeout))
+ except asyncio.TimeoutError:
+ logger.warning("⏱️ Timed out waiting for session idle after %ds", timeout)
+
+ await session.disconnect()
+
+ if collected_text:
+ return collected_text[-1] # Return the last assistant message
+
+ return "Agent completed without producing a final message."
+
+ def _build_system_message(self) -> str:
+ """Compose the system message from mount info and tool instructions."""
+ parts = []
+
+ if self.folder_to_mount:
+ parts.append(
+ f"The working directory is mounted at {self.folder_to_mount.sandbox_path}. "
+ f"You can access files using paths relative to or absolute from that directory."
+ )
+
+ for tool in self.additional_tools:
+ if tool.usage_instructions_to_llm:
+ parts.append(tool.usage_instructions_to_llm)
+
+ return "\n\n".join(parts)
+
+ # ──────────────────────────────────────────────────────────────────
+ # Private — SDK hooks for tool-use logging
+ # ──────────────────────────────────────────────────────────────────
+
+ async def _on_pre_tool_use(self, input_data, invocation):
+ """Hook called before each tool execution — log the call."""
+ tool_name = input_data.get("toolName", "unknown")
+ tool_args = input_data.get("toolArgs", {})
+ logger.info("➡️ Tool call: %s — args: %s", tool_name, tool_args)
+ return {"permissionDecision": "allow"}
+
+ async def _on_post_tool_use(self, input_data, invocation):
+ """Hook called after each tool execution — log the result."""
+ tool_name = input_data.get("toolName", "unknown")
+ result = input_data.get("toolResult", "")
+ # Truncate long results for readable logs
+ result_str = str(result)
+ logger.debug("Tool '%s'\nexecution result: %s", tool_name, result_str)
+ if len(result_str) > 500:
+ result_str = result_str[:500] + "... (truncated)"
+ logger.info("⬅️ Tool result: %s — output: %s", tool_name, result_str)
+ return {}
+
+ # ──────────────────────────────────────────────────────────────────
+ # Private — mount helpers
+ # ──────────────────────────────────────────────────────────────────
+
+ def _mount_additional(self, mount: Mount):
+ """Copy an additional folder into the running container."""
+ if mount.mount_type != MountType.COPY:
+ raise ValueError(
+ "Only COPY mount type is supported for additional mounts"
+ )
+ if not self.environment.copy_to_container(
+ mount.host_path_info.abs_path, mount.sandbox_path
+ ):
+ raise ValueError(
+ f"Failed to copy additional mount: "
+ f"{mount.host_path_info.abs_path} -> {mount.sandbox_path}"
+ )
diff --git a/src/microbots/bot/__init__.py b/src/microbots/bot/__init__.py
index fd6cd62..25e61b5 100644
--- a/src/microbots/bot/__init__.py
+++ b/src/microbots/bot/__init__.py
@@ -3,5 +3,6 @@
from .WritingBot import WritingBot
from .BrowsingBot import BrowsingBot
from .AgentBoss import AgentBoss
+from .CopilotBot import CopilotBot
-__all__ = ["LogAnalysisBot", "ReadingBot", "WritingBot", "BrowsingBot", "AgentBoss"]
+__all__ = ["LogAnalysisBot", "ReadingBot", "WritingBot", "BrowsingBot", "AgentBoss", "CopilotBot"]
diff --git a/src/microbots/environment/Environment.py b/src/microbots/environment/Environment.py
index f2fdabd..83df871 100644
--- a/src/microbots/environment/Environment.py
+++ b/src/microbots/environment/Environment.py
@@ -33,3 +33,24 @@ def copy_from_container(self, src_path: str, dest_path: str) -> bool:
f"{self.__class__.__name__} does not support copying files from container. "
f"This is an optional feature - only implement if needed for your use case."
)
+
+ def get_ipv4_address(self) -> str:
+ """Return the IPv4 address of the running environment.
+
+ This allows host-side code to connect directly to services
+ running inside the environment without port forwarding.
+
+ Returns
+ -------
+ str
+ The IPv4 address of the environment.
+
+ Raises
+ ------
+ NotImplementedError
+ If the environment does not support retrieving its IP address.
+ """
+ raise NotImplementedError(
+ f"{self.__class__.__name__} does not support retrieving its IP address. "
+ f"This is an optional feature - only implement if needed for your use case."
+ )
diff --git a/src/microbots/environment/local_docker/LocalDockerEnvironment.py b/src/microbots/environment/local_docker/LocalDockerEnvironment.py
index 874b80d..d91e98e 100644
--- a/src/microbots/environment/local_docker/LocalDockerEnvironment.py
+++ b/src/microbots/environment/local_docker/LocalDockerEnvironment.py
@@ -153,6 +153,18 @@ def _teardown_overlay_mount(self):
except Exception as e:
logger.error("❌ Failed to teardown overlay mount: %s", e)
+ def get_ipv4_address(self) -> str:
+ """Return the container's IPv4 address on the Docker bridge network."""
+ if not self.container:
+ raise RuntimeError("No active container to get IP address from")
+
+ self.container.reload()
+ networks = self.container.attrs["NetworkSettings"]["Networks"]
+ container_ip = next(iter(networks.values()))["IPAddress"]
+ if not container_ip:
+ raise RuntimeError("Could not determine container IP address")
+ return container_ip
+
def stop(self):
"""Stop and remove the container"""
if self.container:
diff --git a/src/microbots/utils/copilot_auth.py b/src/microbots/utils/copilot_auth.py
new file mode 100644
index 0000000..4d3aeeb
--- /dev/null
+++ b/src/microbots/utils/copilot_auth.py
@@ -0,0 +1,40 @@
+"""Utility to read GitHub Copilot CLI credentials from ~/.copilot/config.json."""
+
+import json
+from logging import getLogger
+from pathlib import Path
+from typing import Optional
+
+logger = getLogger(__name__)
+
+COPILOT_CONFIG_PATH = Path.home() / ".copilot" / "config.json"
+
+
+def get_copilot_token(config_path: Path = COPILOT_CONFIG_PATH) -> Optional[str]:
+ """Extract the OAuth token from the Copilot CLI config file.
+
+ The Copilot CLI stores credentials in ``~/.copilot/config.json`` after
+ ``copilot auth login``. This function reads the first available token
+ from the ``copilot_tokens`` map.
+
+ Returns ``None`` if the file doesn't exist or contains no tokens.
+ """
+ if not config_path.is_file():
+ logger.debug("Copilot config not found at %s", config_path)
+ return None
+
+ try:
+ data = json.loads(config_path.read_text(encoding="utf-8"))
+ except (json.JSONDecodeError, OSError) as exc:
+ logger.warning("Failed to read Copilot config at %s: %s", config_path, exc)
+ return None
+
+ tokens = data.get("copilot_tokens", {})
+ if not tokens:
+ logger.debug("No copilot_tokens found in %s", config_path)
+ return None
+
+ # Return the first available token
+ token = next(iter(tokens.values()))
+ logger.debug("Resolved Copilot token from %s", config_path)
+ return token
diff --git a/src/microbots/utils/multi_agent_log_parser.py b/src/microbots/utils/multi_agent_log_parser.py
index ebdf9fb..0183e61 100644
--- a/src/microbots/utils/multi_agent_log_parser.py
+++ b/src/microbots/utils/multi_agent_log_parser.py
@@ -3,19 +3,21 @@
Parse microbots info.log files into markdown trajectory files.
Usage:
- python multi_agent_log_parser.py _info.log [output_dir]
+ python multi_agent_log_parser.py [output_dir] [--single-file]
-Creates:
- _trajectory/
+Creates either:
+ _trajectory/
main_agent.md
sub_agent_1.md
sub_agent_2.md
...
+Or with --single-file:
+ _trajectory.md
-The info.log file should be named as _info.log.
-A directory _trajectory will be created with all the markdown files.
+The log file name (minus _info.log or .log suffix) determines the output name.
"""
+import argparse
import re
import os
import sys
@@ -26,6 +28,18 @@
# ─────────────────────────── Data Classes ───────────────────────────
+@dataclass
+class SetupInfo:
+ """Captured setup information before the agent starts working."""
+ container_id: str = ""
+ image: str = ""
+ host_port: str = ""
+ working_dir: str = ""
+ volume_mappings: List[str] = field(default_factory=list)
+ tools_installed: List[str] = field(default_factory=list)
+ files_copied: List[str] = field(default_factory=list)
+
+
@dataclass
class Step:
"""Represents a single step in an agent's execution."""
@@ -39,7 +53,6 @@ class Step:
is_sub_agent_call: bool = False
sub_agent_task: str = ""
sub_agent_index: int = -1 # index into the test case's sub_agents list
- is_final: bool = False # True if this represents LLM final thoughts
@dataclass
@@ -51,6 +64,7 @@ class Agent:
final_thoughts: str = ""
completed: bool = False
max_iterations_reached: bool = False
+ error_message: str = ""
@dataclass
@@ -59,12 +73,21 @@ class TestCase:
name: str = ""
main_agent: Optional[Agent] = None
sub_agents: List[Agent] = field(default_factory=list)
+ setup: SetupInfo = field(default_factory=SetupInfo)
# ─────────────────────────── Log Parsing ───────────────────────────
-# Regex for parsing log line timestamps
+# Format: TIMESTAMP MODULE LEVEL CONTENT
+# e.g. "2026-03-26 12:45:20,277 microbots.environment.local_docker.LocalDockerEnvironment INFO ..."
+# e.g. "2026-03-26 12:46:35,819 MicroBot INFO ℹ️ TASK STARTED : ..."
+# e.g. "2026-03-26 12:49:30,653 🤖 MicroBot-Sub INFO Sub-agent completed..."
LOG_LINE_RE = re.compile(
+ r'^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3})\s+(.*?)\s+(INFO|ERROR|WARNING|DEBUG)\s(.*)$'
+)
+
+# Legacy format: TIMESTAMP [LEVEL] CONTENT
+LOG_LINE_LEGACY_RE = re.compile(
r'^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) \[(INFO|ERROR|WARNING|DEBUG)\] (.*)$'
)
@@ -73,8 +96,11 @@ def parse_log_entries(log_path: str) -> List[dict]:
"""
Parse a log file into a list of entries.
Multi-line log entries (continuation lines without timestamps) are joined.
+ Supports both the current log format (TIMESTAMP MODULE LEVEL CONTENT) and
+ the legacy format (TIMESTAMP [LEVEL] CONTENT).
- Returns a list of dicts: {'timestamp': str, 'level': str, 'content': str, 'line_num': int}
+ Returns a list of dicts:
+ {'timestamp': str, 'level': str, 'module': str, 'content': str, 'line_num': int}
"""
entries = []
current_entry = None
@@ -82,24 +108,36 @@ def parse_log_entries(log_path: str) -> List[dict]:
with open(log_path, 'r', encoding='utf-8', errors='replace') as f:
for line_num, raw_line in enumerate(f, 1):
line = raw_line.rstrip('\n')
+
+ # Try current format first, then legacy
match = LOG_LINE_RE.match(line)
if match:
- # Save previous entry
if current_entry is not None:
entries.append(current_entry)
current_entry = {
'timestamp': match.group(1),
- 'level': match.group(2),
- 'content': match.group(3),
+ 'module': match.group(2).strip(),
+ 'level': match.group(3),
+ 'content': match.group(4),
'line_num': line_num,
}
else:
- # Continuation of previous entry
- if current_entry is not None:
- current_entry['content'] += '\n' + line
- # else: lines before any log entry (skip)
+ legacy = LOG_LINE_LEGACY_RE.match(line)
+ if legacy:
+ if current_entry is not None:
+ entries.append(current_entry)
+ current_entry = {
+ 'timestamp': legacy.group(1),
+ 'module': '',
+ 'level': legacy.group(2),
+ 'content': legacy.group(3),
+ 'line_num': line_num,
+ }
+ else:
+ # Continuation of previous entry
+ if current_entry is not None:
+ current_entry['content'] += '\n' + line
- # Don't forget the last entry
if current_entry is not None:
entries.append(current_entry)
@@ -111,33 +149,26 @@ def parse_log_entries(log_path: str) -> List[dict]:
def extract_task_from_microbot_sub(command: str) -> str:
"""Extract the --task argument from a microbot_sub command."""
- # Normalize escaped quotes: \" -> "
normalized = command.replace('\\"', '"').replace('\\n', '\n')
- # Try to find --task "..." followed by " --iterations or end
match = re.search(r'--task\s+"(.*?)"\s+--(?:iterations|timeout)', normalized, re.DOTALL)
if match:
return match.group(1).strip()
- # Try to find --task "..." at end of command
match = re.search(r'--task\s+"(.*?)"\s*$', normalized, re.DOTALL)
if match:
return match.group(1).strip()
- # Try single quotes
match = re.search(r"--task\s+'(.*?)'\s+--(?:iterations|timeout)", normalized, re.DOTALL)
if match:
return match.group(1).strip()
- # Fallback: grab everything after --task " until the last " before --iterations
match = re.search(r'--task\s+"(.+)', normalized, re.DOTALL)
if match:
text = match.group(1)
- # Try to find closing quote before --iterations or --timeout
iter_match = re.search(r'"\s+--(?:iterations|timeout)', text)
if iter_match:
return text[:iter_match.start()].strip()
- # Try the last quote
quote_end = text.rfind('"')
if quote_end > 0:
return text[:quote_end].strip()
@@ -145,6 +176,50 @@ def extract_task_from_microbot_sub(command: str) -> str:
return command
+def _extract_setup_info(entries: List[dict]) -> SetupInfo:
+ """Extract environment setup information from log entries before the first TASK STARTED."""
+ setup = SetupInfo()
+ for entry in entries:
+ content = entry['content']
+ if 'TASK STARTED' in content:
+ break
+
+ # Container start
+ m = re.search(r'Started container (\w+) with image (\S+) on host port (\d+)', content)
+ if m:
+ setup.container_id = m.group(1)
+ setup.image = m.group(2)
+ setup.host_port = m.group(3)
+ continue
+
+ # Working directory
+ m = re.search(r'Created working directory at (\S+)', content)
+ if m:
+ setup.working_dir = m.group(1)
+ continue
+
+ # Volume mapping
+ if 'Volume mapping:' in content:
+ setup.volume_mappings.append(content.split('Volume mapping:', 1)[1].strip())
+ continue
+
+ # Tool installed
+ m = re.search(r'Successfully (?:installed|set up|setup) (?:external )?tool:\s*(\S+)', content)
+ if m:
+ tool_name = m.group(1)
+ if tool_name not in setup.tools_installed:
+ setup.tools_installed.append(tool_name)
+ continue
+
+ # Files copied to container
+ m = re.search(r'Successfully copied (.+?) to container:(.+)', content)
+ if m:
+ setup.files_copied.append(f"{m.group(1).strip()} → {m.group(2).strip()}")
+ continue
+
+ return setup
+
+
def build_test_cases(entries: List[dict]) -> List[TestCase]:
"""
Walk through log entries and build a list of TestCase objects,
@@ -153,11 +228,10 @@ def build_test_cases(entries: List[dict]) -> List[TestCase]:
test_cases = []
current_test: Optional[TestCase] = None
- # Agent tracking
- agent_stack: List[Agent] = [] # stack: [main_agent, sub_agent, ...]
+ agent_stack: List[Agent] = []
current_step: Optional[Step] = None
- pending_sub_agent_step: Optional[Step] = None # main agent step that called microbot_sub
- current_field: Optional[str] = None # track what we're collecting multi-line for
+ pending_sub_agent_step: Optional[Step] = None
+ current_field: Optional[str] = None
def current_agent() -> Optional[Agent]:
return agent_stack[-1] if agent_stack else None
@@ -191,22 +265,18 @@ def finalize_test_case():
continue
# ── Task started ──
- if 'ℹ️ TASK STARTED' in content:
+ if 'TASK STARTED' in content:
task_text = content.split('TASK STARTED', 1)[1].lstrip(' :').strip()
new_agent = Agent(task=task_text)
if not current_test:
- # No test case context yet, create one from filename
current_test = TestCase(name="unknown")
if not current_test.main_agent:
- # First agent = main agent
new_agent.is_main = True
current_test.main_agent = new_agent
agent_stack = [new_agent]
else:
- # Sub-agent
- # Use the task from the microbot_sub command if available
if pending_sub_agent_step and pending_sub_agent_step.sub_agent_task:
new_agent.task = pending_sub_agent_step.sub_agent_task
elif task_text:
@@ -215,7 +285,6 @@ def finalize_test_case():
sub_idx = len(current_test.sub_agents)
current_test.sub_agents.append(new_agent)
- # Link the parent step to this sub-agent
if pending_sub_agent_step:
pending_sub_agent_step.sub_agent_index = sub_idx
pending_sub_agent_step = None
@@ -227,16 +296,15 @@ def finalize_test_case():
continue
# ── Task completed ──
- if '🔚 TASK COMPLETED' in content:
+ if 'TASK COMPLETED' in content:
agent = current_agent()
if agent:
agent.completed = True
- current_field = None # Stop accumulating text
+ current_field = None
continue
# ── Sub-agent completed message ──
if 'Sub-agent completed successfully with output:' in content:
- # Pop sub-agent from stack
if len(agent_stack) > 1:
agent_stack.pop()
current_step = None
@@ -249,13 +317,22 @@ def finalize_test_case():
if agent and not agent.is_main:
agent.max_iterations_reached = True
agent.completed = False
- # Pop sub-agent from stack
+ agent.error_message = content
if len(agent_stack) > 1:
agent_stack.pop()
current_step = None
current_field = None
continue
+ # ── Failed to parse sub-agent command ──
+ if level == 'ERROR' and 'Failed to parse microbot_sub command' in content:
+ if current_step:
+ current_step.is_blocked = True
+ current_step.blocked_reason = content
+ pending_sub_agent_step = None
+ current_field = None
+ continue
+
# ── Max iterations reached ──
if level == 'ERROR' and 'Max iterations' in content:
agent = current_agent()
@@ -275,8 +352,8 @@ def finalize_test_case():
continue
# ── LLM final thoughts ──
- if '💭 LLM final thoughts:' in content:
- text = content.split('💭 LLM final thoughts:', 1)[1].strip()
+ if 'LLM final thoughts:' in content:
+ text = content.split('LLM final thoughts:', 1)[1].strip()
agent = current_agent()
if agent:
agent.final_thoughts = text
@@ -284,17 +361,16 @@ def finalize_test_case():
continue
# ── LLM thoughts ──
- if '💭 LLM thoughts:' in content:
- text = content.split('💭 LLM thoughts:', 1)[1].strip()
+ if 'LLM thoughts:' in content and 'final' not in content.split('LLM thoughts:')[0].lower():
+ text = content.split('LLM thoughts:', 1)[1].strip()
if current_step:
current_step.thought = text
current_field = 'thought'
continue
# ── LLM tool call ──
- if '➡️ LLM tool call :' in content:
- cmd = content.split('➡️ LLM tool call :', 1)[1].strip()
- # Remove surrounding quotes if present
+ if 'LLM tool call' in content and ':' in content.split('LLM tool call')[1]:
+ cmd = content.split('LLM tool call', 1)[1].split(':', 1)[1].strip()
if cmd.startswith('"') and cmd.endswith('"'):
cmd = cmd[1:-1]
if current_step:
@@ -307,22 +383,29 @@ def finalize_test_case():
continue
# ── Command output ──
- if '⬅️ Command output:' in content:
- text = content.split('⬅️ Command output:', 1)[1].strip()
+ if 'Command output:' in content:
+ text = content.split('Command output:', 1)[1].strip()
if current_step:
current_step.output = text
current_field = 'output'
continue
# ── Dangerous command blocked ──
- if '⚠️ Dangerous command detected' in content:
+ if 'Dangerous command detected' in content:
if current_step:
current_step.is_blocked = True
- current_step.blocked_reason = content
+ # Parse REASON/ALTERNATIVE from multi-line content
+ lines = content.split('\n')
+ current_step.blocked_reason = lines[0]
+ for bline in lines[1:]:
+ if bline.startswith('REASON:'):
+ current_step.blocked_reason = bline
+ elif bline.startswith('ALTERNATIVE:'):
+ current_step.blocked_alternative = bline
current_field = 'blocked'
continue
- # ── REASON / ALTERNATIVE for blocked commands ──
+ # ── REASON / ALTERNATIVE for blocked commands (separate entries) ──
if current_field == 'blocked' and current_step:
if content.startswith('REASON:'):
current_step.blocked_reason = content
@@ -332,7 +415,10 @@ def finalize_test_case():
# ── Invoking MicroBotSubAgent ──
if 'Invoking MicroBotSubAgent with task:' in content:
- # This is just a log message; the sub-agent TASK STARTED follows
+ continue
+
+ # ── Memory tool operations ──
+ if 'Memory file created:' in content or 'Memory file updated:' in content:
continue
# ── Multi-line continuation for known fields ──
@@ -366,20 +452,13 @@ def finalize_test_case():
agent.final_thoughts = content
continue
- # Finalize last test case
finalize_test_case()
-
return test_cases
# ─────────────────────────── Markdown Generation ───────────────────────────
-def escape_md(text: str) -> str:
- """Escape text for markdown display (minimal escaping for code blocks)."""
- return text
-
-
def truncate_text(text: str, max_lines: int = 200) -> str:
"""Truncate text if it exceeds max_lines."""
lines = text.split('\n')
@@ -388,7 +467,34 @@ def truncate_text(text: str, max_lines: int = 200) -> str:
return text
-def generate_step_md(step: Step, sub_agent_filename: str = "") -> str:
+def generate_setup_md(setup: SetupInfo) -> str:
+ """Generate markdown for the setup/environment section."""
+ if not setup.container_id and not setup.tools_installed:
+ return ""
+
+ md = "## 🔧 Environment Setup\n\n"
+
+ if setup.container_id:
+ md += f"- **Container:** `{setup.container_id}` (image: `{setup.image}`, port: {setup.host_port})\n"
+ if setup.working_dir:
+ md += f"- **Working directory:** `{setup.working_dir}`\n"
+ for vol in setup.volume_mappings:
+ md += f"- **Volume:** {vol}\n"
+
+ if setup.tools_installed:
+ md += f"- **Tools:** {', '.join(setup.tools_installed)}\n"
+
+ if setup.files_copied:
+ md += "\n\nFiles copied to container
\n\n"
+ for fc in setup.files_copied:
+ md += f"- {fc}\n"
+ md += "\n \n"
+
+ md += "\n---\n\n"
+ return md
+
+
+def generate_step_md(step: Step, sub_agent_filename: str = "", heading_level: int = 3) -> str:
"""Generate markdown for a single step as a collapsible details section."""
status = "🚫 Blocked" if step.is_blocked else ""
if step.is_sub_agent_call:
@@ -396,39 +502,36 @@ def generate_step_md(step: Step, sub_agent_filename: str = "") -> str:
summary = f"Step {step.number}"
if status:
- summary += f" - {status}"
+ summary += f" — {status}"
- # Build brief description from the thought (first sentence)
if step.thought:
first_line = step.thought.split('\n')[0]
if len(first_line) > 120:
first_line = first_line[:117] + "..."
summary += f": {first_line}"
+ h = '#' * heading_level
+
md = f"\n{summary}
\n\n"
- # Thought section
if step.thought:
- md += "### 💭 Thought\n\n"
+ md += f"{h} 💭 Thought\n\n"
md += f"{step.thought}\n\n"
- # Blocked command warning
if step.is_blocked:
- md += "### ⚠️ Command Blocked\n\n"
+ md += f"{h} ⚠️ Command Blocked\n\n"
if step.blocked_reason:
md += f"> {step.blocked_reason}\n"
if step.blocked_alternative:
md += f"> {step.blocked_alternative}\n"
md += "\n"
- # Command section
if step.command:
- md += "### ➡️ Command\n\n"
+ md += f"{h} ➡️ Command\n\n"
if step.is_sub_agent_call:
md += "**Sub-agent invocation:**\n\n"
if sub_agent_filename:
md += f"📎 **[View Sub-Agent Trajectory]({sub_agent_filename})**\n\n"
- # Show the task
if step.sub_agent_task:
md += "\nSub-agent task description
\n\n"
md += f"```\n{step.sub_agent_task}\n```\n\n"
@@ -436,9 +539,8 @@ def generate_step_md(step: Step, sub_agent_filename: str = "") -> str:
else:
md += f"```bash\n{step.command}\n```\n\n"
- # Output section
if step.output:
- md += "### ⬅️ Output\n\n"
+ md += f"{h} ⬅️ Output\n\n"
output_text = truncate_text(step.output)
md += f"```\n{output_text}\n```\n\n"
@@ -446,9 +548,19 @@ def generate_step_md(step: Step, sub_agent_filename: str = "") -> str:
return md
+def _agent_status_str(agent: Agent) -> str:
+ if agent.completed:
+ return "✅ Completed"
+ if agent.max_iterations_reached:
+ return "❌ Failed (max iterations / timeout)"
+ return "❓ Unknown"
+
+
def generate_main_agent_md(test_case: TestCase) -> str:
"""Generate the main agent markdown file content."""
- md = f"# 🤖 Main Agent Trajectory: {test_case.name}\n\n"
+ md = f"# 🤖 Agent Trajectory: {test_case.name}\n\n"
+
+ md += generate_setup_md(test_case.setup)
if test_case.main_agent and test_case.main_agent.task:
md += "## Task\n\n"
@@ -471,7 +583,6 @@ def generate_main_agent_md(test_case: TestCase) -> str:
md += "---\n\n"
- # Summary
if agent.completed:
md += "## ✅ Task Completed\n\n"
if agent.final_thoughts:
@@ -480,7 +591,6 @@ def generate_main_agent_md(test_case: TestCase) -> str:
md += "## ❌ Max Iterations Reached\n\n"
md += "The agent did not complete the task within the maximum allowed iterations.\n\n"
- # Sub-agent index
if test_case.sub_agents:
md += "## 📋 Sub-Agents\n\n"
md += "| # | Task | Status | Link |\n"
@@ -490,7 +600,7 @@ def generate_main_agent_md(test_case: TestCase) -> str:
first_line = clean.split('\n')[0]
task_summary = first_line[:80] + "..." if len(first_line) > 80 else first_line
task_summary = task_summary.replace('|', '\\|')
- status = "✅ Completed" if sub.completed else "❌ Failed"
+ status = _agent_status_str(sub)
link = f"[sub_agent_{i + 1}.md](sub_agent_{i + 1}.md)"
md += f"| {i + 1} | {task_summary} | {status} | {link} |\n"
md += "\n"
@@ -501,22 +611,18 @@ def generate_main_agent_md(test_case: TestCase) -> str:
def clean_task_text(task: str) -> str:
"""Clean up a task string: remove microbot_sub prefix, escaped quotes, etc."""
text = task.strip()
- # Remove microbot_sub --task "..." wrapper if present
if text.startswith('microbot_sub'):
match = re.search(r'--task\s+["\'](.+)', text, re.DOTALL)
if match:
text = match.group(1)
- # Remove trailing quote + flags
text = re.sub(r'["\']\s*--(?:iterations|timeout).*$', '', text, flags=re.DOTALL)
text = text.strip().strip('"').strip("'").strip()
- # Unescape
- text = text.replace('\\"', '"').replace('\\n', '\n').replace("\\'" , "'")
+ text = text.replace('\\"', '"').replace('\\n', '\n').replace("\\'", "'")
return text
def generate_sub_agent_md(sub_agent: Agent, index: int, test_case_name: str) -> str:
"""Generate a sub-agent markdown file content."""
- # Clean and use the first line of the task as heading
clean_task = clean_task_text(sub_agent.task)
task_heading = clean_task.split('\n')[0] if clean_task else f"Sub-Agent {index + 1}"
if len(task_heading) > 150:
@@ -539,14 +645,114 @@ def generate_sub_agent_md(sub_agent: Agent, index: int, test_case_name: str) ->
md += "---\n\n"
- # Summary
if sub_agent.completed:
md += "## ✅ Task Completed\n\n"
if sub_agent.final_thoughts:
md += f"{sub_agent.final_thoughts}\n\n"
elif sub_agent.max_iterations_reached:
md += "## ❌ Max Iterations Reached\n\n"
- md += "The sub-agent did not complete the task within the maximum allowed iterations.\n\n"
+ if sub_agent.error_message:
+ md += f"> {sub_agent.error_message}\n\n"
+ else:
+ md += "The sub-agent did not complete the task within the maximum allowed iterations.\n\n"
+
+ return md
+
+
+# ─────────────────────────── Single-File Mode ───────────────────────────
+
+
+def generate_single_file_md(test_case: TestCase) -> str:
+ """Generate a single markdown file containing the main agent and all sub-agents."""
+ md = f"# 🤖 Agent Trajectory: {test_case.name}\n\n"
+
+ md += generate_setup_md(test_case.setup)
+
+ # Table of contents
+ if test_case.sub_agents:
+ md += "## 📑 Table of Contents\n\n"
+ md += "- [Main Agent](#main-agent)\n"
+ for i, sub in enumerate(test_case.sub_agents):
+ clean = clean_task_text(sub.task)
+ first_line = clean.split('\n')[0][:60]
+ md += f"- [Sub-Agent {i + 1}: {first_line}](#sub-agent-{i + 1})\n"
+ md += "\n---\n\n"
+
+ # Main agent section
+ md += "## Main Agent\n\n"
+
+ if test_case.main_agent and test_case.main_agent.task:
+ md += "### Task\n\n"
+ task_text = test_case.main_agent.task
+ if len(task_text) > 500:
+ md += f"\nFull task description
\n\n{task_text}\n\n \n\n"
+ else:
+ md += f"{task_text}\n\n"
+
+ md += "---\n\n"
+ md += "### Steps\n\n"
+
+ if test_case.main_agent:
+ agent = test_case.main_agent
+ for step in agent.steps:
+ sub_ref = ""
+ if step.is_sub_agent_call and step.sub_agent_index >= 0:
+ sub_ref = f"#sub-agent-{step.sub_agent_index + 1}"
+ md += generate_step_md(step, sub_agent_filename=sub_ref, heading_level=4)
+
+ md += "---\n\n"
+
+ if agent.completed:
+ md += "### ✅ Task Completed\n\n"
+ if agent.final_thoughts:
+ md += f"{agent.final_thoughts}\n\n"
+ elif agent.max_iterations_reached:
+ md += "### ❌ Max Iterations Reached\n\n"
+
+ # Sub-agent summary table
+ if test_case.sub_agents:
+ md += "### 📋 Sub-Agents Summary\n\n"
+ md += "| # | Task | Status |\n"
+ md += "|---|------|--------|\n"
+ for i, sub in enumerate(test_case.sub_agents):
+ clean = clean_task_text(sub.task)
+ first_line = clean.split('\n')[0]
+ task_summary = first_line[:80] + "..." if len(first_line) > 80 else first_line
+ task_summary = task_summary.replace('|', '\\|')
+ status = _agent_status_str(sub)
+ md += f"| [{i + 1}](#sub-agent-{i + 1}) | {task_summary} | {status} |\n"
+ md += "\n"
+
+ # Sub-agent sections
+ for i, sub in enumerate(test_case.sub_agents):
+ clean_task = clean_task_text(sub.task)
+ task_heading = clean_task.split('\n')[0] if clean_task else f"Sub-Agent {i + 1}"
+ if len(task_heading) > 120:
+ task_heading = task_heading[:117] + "..."
+
+ md += f"\n---\n\n## Sub-Agent {i + 1}\n\n"
+ md += f"**{task_heading}**\n\n"
+
+ if clean_task and '\n' in clean_task:
+ md += "\nFull task description
\n\n"
+ md += f"```\n{clean_task}\n```\n\n"
+ md += " \n\n"
+
+ md += "### Steps\n\n"
+
+ for step in sub.steps:
+ md += generate_step_md(step, heading_level=4)
+
+ md += "---\n\n"
+
+ if sub.completed:
+ md += "### ✅ Task Completed\n\n"
+ if sub.final_thoughts:
+ md += f"{sub.final_thoughts}\n\n"
+ elif sub.max_iterations_reached:
+ md += "### ❌ Max Iterations Reached\n\n"
+ if sub.error_message:
+ md += f"> {sub.error_message}\n\n"
return md
@@ -554,19 +760,19 @@ def generate_sub_agent_md(sub_agent: Agent, index: int, test_case_name: str) ->
# ─────────────────────────── Main ───────────────────────────
-def parse_and_generate(log_path: str, output_base_dir: str = None):
+def parse_and_generate(log_path: str, output_base_dir: str = None, single_file: bool = False):
"""
Parse an info.log file and generate markdown trajectory files.
Args:
log_path: Path to the info.log file
output_base_dir: Base directory for output. If None, uses the log file's directory.
+ single_file: If True, generate a single markdown file instead of a directory.
"""
if not os.path.isfile(log_path):
print(f"Error: Log file not found: {log_path}")
sys.exit(1)
- # Derive test case name from filename
basename = os.path.basename(log_path)
if basename.endswith('_info.log'):
default_test_name = basename[:-len('_info.log')]
@@ -580,44 +786,59 @@ def parse_and_generate(log_path: str, output_base_dir: str = None):
print(f"Parsing log file: {log_path}")
- # Parse
entries = parse_log_entries(log_path)
print(f" Parsed {len(entries)} log entries")
+ # Extract setup info before building test cases
+ setup = _extract_setup_info(entries)
+
test_cases = build_test_cases(entries)
print(f" Found {len(test_cases)} test case(s)")
if not test_cases:
- # If no test case boundaries found, create a single test case
print(" No test case boundaries found, treating entire log as one test case")
- tc = TestCase(name=default_test_name)
- # Re-parse with a dummy test case
test_cases = _build_single_test_case(entries, default_test_name)
- for tc in test_cases:
- # Create output directory
- trajectory_dir = os.path.join(output_base_dir, f"{tc.name}_trajectory")
- os.makedirs(trajectory_dir, exist_ok=True)
- print(f"\n Test case: {tc.name}")
- print(f" Output directory: {trajectory_dir}")
-
- # Generate main agent markdown
- main_md = generate_main_agent_md(tc)
- main_path = os.path.join(trajectory_dir, "main_agent.md")
- with open(main_path, 'w', encoding='utf-8') as f:
- f.write(main_md)
- main_steps = len(tc.main_agent.steps) if tc.main_agent else 0
- print(f" Created: main_agent.md ({main_steps} steps)")
+ # Attach setup info to first test case
+ if test_cases:
+ test_cases[0].setup = setup
- # Generate sub-agent markdowns
- for i, sub in enumerate(tc.sub_agents):
- sub_md = generate_sub_agent_md(sub, i, tc.name)
- sub_path = os.path.join(trajectory_dir, f"sub_agent_{i + 1}.md")
- with open(sub_path, 'w', encoding='utf-8') as f:
- f.write(sub_md)
- print(f" Created: sub_agent_{i + 1}.md ({len(sub.steps)} steps)")
+ for tc in test_cases:
+ tc_name = tc.name if tc.name != "unknown" else default_test_name
+ tc.name = tc_name
- print(f"\nDone! Generated trajectory files for {len(test_cases)} test case(s).")
+ main_steps = len(tc.main_agent.steps) if tc.main_agent else 0
+ sub_count = len(tc.sub_agents)
+ print(f"\n Test case: {tc_name}")
+ print(f" Main agent: {main_steps} steps, {sub_count} sub-agent(s)")
+
+ if single_file:
+ # Single file mode
+ md = generate_single_file_md(tc)
+ out_path = os.path.join(output_base_dir, f"{tc_name}_trajectory.md")
+ with open(out_path, 'w', encoding='utf-8') as f:
+ f.write(md)
+ print(f" Created: {out_path}")
+ else:
+ # Multi-file mode
+ trajectory_dir = os.path.join(output_base_dir, f"{tc_name}_trajectory")
+ os.makedirs(trajectory_dir, exist_ok=True)
+ print(f" Output directory: {trajectory_dir}")
+
+ main_md = generate_main_agent_md(tc)
+ main_path = os.path.join(trajectory_dir, "main_agent.md")
+ with open(main_path, 'w', encoding='utf-8') as f:
+ f.write(main_md)
+ print(f" Created: main_agent.md ({main_steps} steps)")
+
+ for i, sub in enumerate(tc.sub_agents):
+ sub_md = generate_sub_agent_md(sub, i, tc_name)
+ sub_path = os.path.join(trajectory_dir, f"sub_agent_{i + 1}.md")
+ with open(sub_path, 'w', encoding='utf-8') as f:
+ f.write(sub_md)
+ print(f" Created: sub_agent_{i + 1}.md ({len(sub.steps)} steps)")
+
+ print(f"\nDone! Generated trajectory for {len(test_cases)} test case(s).")
return test_cases
@@ -628,6 +849,7 @@ def _build_single_test_case(entries: List[dict], name: str) -> List[TestCase]:
"""
fake_boundary = {
'timestamp': '2000-01-01 00:00:00,000',
+ 'module': '',
'level': 'INFO',
'content': f'Test directory set up at: /fake/{name}',
'line_num': 0,
@@ -636,17 +858,17 @@ def _build_single_test_case(entries: List[dict], name: str) -> List[TestCase]:
def main():
- if len(sys.argv) < 2:
- print("Usage: python multi_agent_log_parser.py _info.log [output_dir]")
- print("\nParses an info.log file and generates markdown trajectory files.")
- print("The log file should be named as _info.log.")
- print("A directory _trajectory will be created with all markdown files.")
- sys.exit(1)
-
- log_path = sys.argv[1]
- output_dir = sys.argv[2] if len(sys.argv) > 2 else None
-
- parse_and_generate(log_path, output_dir)
+ parser = argparse.ArgumentParser(
+ description="Parse microbots info.log files into markdown trajectory files."
+ )
+ parser.add_argument("log_file", help="Path to the info.log file to parse")
+ parser.add_argument("output_dir", nargs="?", default=None,
+ help="Output directory (default: same directory as log file)")
+ parser.add_argument("--single-file", action="store_true",
+ help="Generate a single markdown file instead of a directory with separate files")
+
+ args = parser.parse_args()
+ parse_and_generate(args.log_file, args.output_dir, args.single_file)
if __name__ == '__main__':
diff --git a/test/bot/test_copilot_auth.py b/test/bot/test_copilot_auth.py
new file mode 100644
index 0000000..57dd871
--- /dev/null
+++ b/test/bot/test_copilot_auth.py
@@ -0,0 +1,67 @@
+"""Unit tests for microbots.utils.copilot_auth.get_copilot_token."""
+
+import json
+import os
+import sys
+from pathlib import Path
+
+import pytest
+
+sys.path.insert(
+ 0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../../src"))
+)
+
+from microbots.utils.copilot_auth import get_copilot_token
+
+
+# ---------------------------------------------------------------------------
+# Unit tests
+# ---------------------------------------------------------------------------
+
+@pytest.mark.unit
+class TestGetCopilotToken:
+ """Tests for get_copilot_token()."""
+
+ def test_returns_none_when_file_missing(self, tmp_path):
+ """Returns None when the config file does not exist."""
+ missing = tmp_path / "nonexistent.json"
+ assert get_copilot_token(config_path=missing) is None
+
+ def test_returns_none_on_invalid_json(self, tmp_path):
+ """Returns None and logs a warning when the file contains invalid JSON."""
+ bad_file = tmp_path / "config.json"
+ bad_file.write_text("this is not json", encoding="utf-8")
+ assert get_copilot_token(config_path=bad_file) is None
+
+ def test_returns_none_when_no_copilot_tokens_key(self, tmp_path):
+ """Returns None when the JSON has no 'copilot_tokens' key."""
+ cfg = tmp_path / "config.json"
+ cfg.write_text(json.dumps({"other_key": "value"}), encoding="utf-8")
+ assert get_copilot_token(config_path=cfg) is None
+
+ def test_returns_none_when_copilot_tokens_empty(self, tmp_path):
+ """Returns None when 'copilot_tokens' is an empty dict."""
+ cfg = tmp_path / "config.json"
+ cfg.write_text(json.dumps({"copilot_tokens": {}}), encoding="utf-8")
+ assert get_copilot_token(config_path=cfg) is None
+
+ def test_returns_first_token(self, tmp_path):
+ """Returns the first token value from 'copilot_tokens'."""
+ cfg = tmp_path / "config.json"
+ cfg.write_text(
+ json.dumps({"copilot_tokens": {"host1": "token-abc", "host2": "token-xyz"}}),
+ encoding="utf-8",
+ )
+ token = get_copilot_token(config_path=cfg)
+ assert token == "token-abc"
+
+ def test_returns_none_on_os_error(self, tmp_path):
+ """Returns None when the file cannot be read (OSError)."""
+ cfg = tmp_path / "config.json"
+ cfg.write_text("{}", encoding="utf-8")
+ cfg.chmod(0o000) # remove read permission
+ try:
+ result = get_copilot_token(config_path=cfg)
+ assert result is None
+ finally:
+ cfg.chmod(0o644) # restore permissions for cleanup
diff --git a/test/bot/test_copilot_bot.py b/test/bot/test_copilot_bot.py
new file mode 100644
index 0000000..5804d4c
--- /dev/null
+++ b/test/bot/test_copilot_bot.py
@@ -0,0 +1,1441 @@
+"""
+Unit and integration tests for CopilotBot.
+
+Unit tests mock the copilot SDK and Docker environment to verify the
+wiring and lifecycle. Integration tests (marked ``@pytest.mark.integration``)
+require a real Docker daemon, copilot-cli, and GitHub authentication.
+"""
+
+import importlib
+import os
+import shutil
+import subprocess
+import sys
+from unittest.mock import AsyncMock, MagicMock, Mock, patch
+
+import pytest
+
+sys.path.insert(
+ 0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../../src"))
+)
+
+# ---------------------------------------------------------------------------
+# Mock the copilot SDK before importing CopilotBot (optional dependency)
+# ---------------------------------------------------------------------------
+_mock_copilot = MagicMock()
+_mock_copilot.CopilotClient = MagicMock
+_mock_copilot.ExternalServerConfig = MagicMock
+
+_mock_session = MagicMock()
+_mock_session.PermissionRequestResult = MagicMock
+
+_mock_events = MagicMock()
+_mock_events.SessionEventType = MagicMock()
+_mock_events.SessionEventType.ASSISTANT_MESSAGE = "assistant.message"
+_mock_events.SessionEventType.ASSISTANT_MESSAGE_DELTA = "assistant.message_delta"
+_mock_events.SessionEventType.SESSION_IDLE = "session.idle"
+
+_mock_tools = MagicMock()
+_mock_tools.Tool = MagicMock
+_mock_tools.ToolInvocation = MagicMock
+_mock_tools.ToolResult = MagicMock
+_mock_tools.define_tool = MagicMock
+
+_mock_types = MagicMock()
+_mock_types.PermissionHandler = MagicMock()
+_mock_types.PermissionHandler.approve_all = MagicMock()
+
+sys.modules.setdefault("copilot", _mock_copilot)
+sys.modules.setdefault("copilot.session", _mock_session)
+sys.modules.setdefault("copilot.generated.session_events", _mock_events)
+sys.modules.setdefault("copilot.tools", _mock_tools)
+sys.modules.setdefault("copilot.types", _mock_types)
+
+# Reload to pick up mock
+if "microbots.bot.CopilotBot" in sys.modules:
+ importlib.reload(sys.modules["microbots.bot.CopilotBot"])
+
+from microbots.MicroBot import BotRunResult
+
+
+def _restore_real_copilot_modules():
+ """Remove mock copilot modules from sys.modules and reload CopilotBot.
+
+ This allows integration tests to use the real copilot SDK instead of
+ the mocks injected at module level for unit tests.
+ """
+ mock_keys = [k for k in sys.modules if k == "copilot" or k.startswith("copilot.")]
+ for key in mock_keys:
+ del sys.modules[key]
+ # Also force CopilotBot to re-import the real SDK on next import
+ if "microbots.bot.CopilotBot" in sys.modules:
+ del sys.modules["microbots.bot.CopilotBot"]
+
+
+# ---------------------------------------------------------------------------
+# Helpers
+# ---------------------------------------------------------------------------
+
+def _copilot_cli_available():
+ return shutil.which("copilot") is not None
+
+
+def _copilot_sdk_installed():
+ try:
+ from importlib.metadata import version
+ version("github-copilot-sdk")
+ return True
+ except Exception:
+ return False
+
+
+def _copilot_auth_available():
+ if os.environ.get("GITHUB_TOKEN") or os.environ.get("COPILOT_GITHUB_TOKEN"):
+ return True
+ if shutil.which("gh"):
+ try:
+ result = subprocess.run(
+ ["gh", "auth", "status"], capture_output=True, timeout=10
+ )
+ return result.returncode == 0
+ except Exception:
+ pass
+ return False
+
+
+# ---------------------------------------------------------------------------
+# Unit test fixtures
+# ---------------------------------------------------------------------------
+
+@pytest.fixture
+def mock_environment():
+ """Create a mock LocalDockerEnvironment."""
+ env = MagicMock()
+ env.port = 9000
+ env.container_port = 8080
+ env.container = MagicMock()
+ env.container.id = "abc123def456"
+ env.image = "kavyasree261002/shell_server:latest"
+ env.working_dir = "/tmp/mock_workdir"
+ env.folder_to_mount = None
+ env.overlay_mount = False
+
+ # Make execute return success by default
+ success_return = MagicMock()
+ success_return.return_code = 0
+ success_return.stdout = "copilot version 1.0.0"
+ success_return.stderr = ""
+ env.execute = MagicMock(return_value=success_return)
+ env.copy_to_container = MagicMock(return_value=True)
+ env.stop = MagicMock()
+ env.get_ipv4_address = MagicMock(return_value="172.17.0.2")
+ return env
+
+
+@pytest.fixture
+def mock_copilot_session():
+ """Mock Copilot SDK session."""
+ session = AsyncMock()
+ session.disconnect = AsyncMock()
+
+ response = Mock()
+ response.data = Mock()
+ response.data.content = "Task completed successfully."
+ session.send_and_wait = AsyncMock(return_value=response)
+ session.on = MagicMock()
+ return session
+
+
+@pytest.fixture
+def mock_copilot_client(mock_copilot_session):
+ """Mock CopilotClient."""
+ client = AsyncMock()
+ client.start = AsyncMock()
+ client.stop = AsyncMock()
+ client.create_session = AsyncMock(return_value=mock_copilot_session)
+ return client
+
+
+@pytest.fixture
+def copilot_bot(mock_environment, mock_copilot_client):
+ """Create a CopilotBot with all external dependencies mocked."""
+ with (
+ patch("microbots.bot.CopilotBot.LocalDockerEnvironment", return_value=mock_environment),
+ patch("microbots.bot.CopilotBot.get_free_port", side_effect=[9000]),
+ patch("microbots.bot.CopilotBot.CopilotBot._install_copilot_cli"),
+ patch("microbots.bot.CopilotBot.CopilotBot._start_copilot_cli_server"),
+ patch("microbots.bot.CopilotBot.CopilotBot._wait_for_cli_ready"),
+ patch("copilot.CopilotClient", return_value=mock_copilot_client),
+ patch("copilot.ExternalServerConfig", return_value=MagicMock()),
+ ):
+ from microbots.bot.CopilotBot import CopilotBot
+ bot = CopilotBot(
+ model="gpt-4.1",
+ environment=mock_environment,
+ github_token="ghp_test_token_123",
+ )
+ yield bot
+ # Stop the event loop thread properly before teardown
+ try:
+ bot._loop.call_soon_threadsafe(bot._loop.stop)
+ bot._thread.join(timeout=2)
+ except Exception:
+ pass
+ bot.environment = None # Prevent stop() from trying env.stop() again
+
+
+# ---------------------------------------------------------------------------
+# Unit tests
+# ---------------------------------------------------------------------------
+
+@pytest.mark.unit
+class TestCopilotBotInit:
+ """Tests for CopilotBot initialisation."""
+
+ def test_stores_model(self, copilot_bot):
+ assert copilot_bot.model == "gpt-4.1"
+
+ def test_stores_github_token(self, copilot_bot):
+ assert copilot_bot.github_token == "ghp_test_token_123"
+
+ def test_environment_assigned(self, copilot_bot, mock_environment):
+ assert copilot_bot.environment is mock_environment
+
+ def test_additional_tools_default_empty(self, copilot_bot):
+ assert copilot_bot.additional_tools == []
+
+ def test_rejects_external_tool(self, mock_environment, mock_copilot_client):
+ """CopilotBot raises ValueError if an ExternalTool is passed."""
+ from microbots.tools.external_tool import ExternalTool
+
+ ext_tool = MagicMock(spec=ExternalTool)
+ ext_tool.name = "my_external"
+
+ with (
+ patch("microbots.bot.CopilotBot.LocalDockerEnvironment", return_value=mock_environment),
+ patch("microbots.bot.CopilotBot.get_free_port", side_effect=[9000]),
+ patch("microbots.bot.CopilotBot.CopilotBot._install_copilot_cli"),
+ patch("microbots.bot.CopilotBot.CopilotBot._start_copilot_cli_server"),
+ patch("microbots.bot.CopilotBot.CopilotBot._wait_for_cli_ready"),
+ patch("copilot.CopilotClient", return_value=mock_copilot_client),
+ patch("copilot.ExternalServerConfig", return_value=MagicMock()),
+ ):
+ from microbots.bot.CopilotBot import CopilotBot
+ with pytest.raises(ValueError, match="does not support ExternalTool"):
+ CopilotBot(
+ model="gpt-4.1",
+ environment=mock_environment,
+ additional_tools=[ext_tool],
+ github_token="ghp_test",
+ )
+
+ def test_import_error_without_sdk(self):
+ """CopilotBot raises ImportError when copilot SDK is not installed."""
+ # Temporarily remove the mock so the import fails
+ saved = sys.modules.get("copilot")
+ try:
+ sys.modules["copilot"] = None # Force ImportError on import
+ # Need to reload the module
+ if "microbots.bot.CopilotBot" in sys.modules:
+ importlib.reload(sys.modules["microbots.bot.CopilotBot"])
+ from microbots.bot.CopilotBot import CopilotBot as CB
+ with pytest.raises(ImportError, match="github-copilot-sdk"):
+ CB(model="gpt-4.1")
+ finally:
+ sys.modules["copilot"] = saved
+ if "microbots.bot.CopilotBot" in sys.modules:
+ importlib.reload(sys.modules["microbots.bot.CopilotBot"])
+
+
+@pytest.mark.unit
+class TestCopilotBotRun:
+ """Tests for CopilotBot.run()."""
+
+ def test_run_returns_bot_run_result(self, copilot_bot):
+ result = copilot_bot.run("Fix the bug in main.py")
+ assert isinstance(result, BotRunResult)
+
+ def test_run_success(self, copilot_bot):
+ result = copilot_bot.run("Fix the bug in main.py")
+ assert result.status is True
+ assert result.error is None
+ assert result.result is not None
+
+ def test_run_calls_tool_setup(self, copilot_bot, mock_environment):
+ mock_tool = MagicMock()
+ mock_tool.usage_instructions_to_llm = "Use tool X"
+ mock_tool.install_commands = []
+ mock_tool.verify_commands = []
+ copilot_bot.additional_tools = [mock_tool]
+
+ copilot_bot.run("test task")
+ mock_tool.setup_tool.assert_called_once_with(mock_environment)
+
+ def test_run_handles_exception(self, copilot_bot):
+ """Run returns failure BotRunResult on exceptions."""
+ with patch.object(copilot_bot, "_run_async", side_effect=RuntimeError("boom")):
+ result = copilot_bot.run("test")
+ assert result.status is False
+ assert "boom" in result.error
+
+
+@pytest.mark.unit
+class TestCopilotBotSystemMessage:
+ """Tests for system message construction."""
+
+ def test_system_message_empty_no_mount_no_tools(self, copilot_bot):
+ msg = copilot_bot._build_system_message()
+ assert msg == ""
+
+ def test_system_message_includes_mount_path(self, mock_environment, mock_copilot_client):
+ with (
+ patch("microbots.bot.CopilotBot.LocalDockerEnvironment", return_value=mock_environment),
+ patch("microbots.bot.CopilotBot.get_free_port", side_effect=[9000]),
+ patch("microbots.bot.CopilotBot.CopilotBot._install_copilot_cli"),
+ patch("microbots.bot.CopilotBot.CopilotBot._start_copilot_cli_server"),
+ patch("microbots.bot.CopilotBot.CopilotBot._wait_for_cli_ready"),
+ patch("copilot.CopilotClient", return_value=mock_copilot_client),
+ patch("copilot.ExternalServerConfig", return_value=MagicMock()),
+ ):
+ from microbots.bot.CopilotBot import CopilotBot
+ from microbots.extras.mount import Mount
+ mount = Mount("/tmp/test_repo", "/workdir/test_repo", "READ_WRITE")
+ bot = CopilotBot(
+ model="gpt-4.1",
+ environment=mock_environment,
+ github_token="ghp_test",
+ )
+ bot.folder_to_mount = mount
+ msg = bot._build_system_message()
+ assert "/workdir/test_repo" in msg
+ bot.stop()
+
+ def test_system_message_includes_tool_instructions(self, copilot_bot):
+ mock_tool = MagicMock()
+ mock_tool.usage_instructions_to_llm = "# Use browser command"
+ copilot_bot.additional_tools = [mock_tool]
+
+ msg = copilot_bot._build_system_message()
+ assert "browser" in msg
+
+
+@pytest.mark.unit
+class TestCopilotBotStop:
+ """Tests for CopilotBot.stop()."""
+
+ def test_stop_cleans_environment(self, copilot_bot, mock_environment):
+ copilot_bot.stop()
+ mock_environment.stop.assert_called_once()
+
+ def test_stop_idempotent(self, copilot_bot, mock_environment):
+ copilot_bot.stop()
+ copilot_bot.stop() # Should not raise
+
+
+@pytest.mark.unit
+class TestCopilotBotCLIInstall:
+ """Tests for copilot-cli installation logic."""
+
+ def test_install_cli_calls_execute(self, mock_environment):
+ from microbots.bot.CopilotBot import CopilotBot
+
+ with (
+ patch("microbots.bot.CopilotBot.get_free_port", side_effect=[9000]),
+ patch("microbots.bot.CopilotBot.CopilotBot._start_copilot_cli_server"),
+ patch("microbots.bot.CopilotBot.CopilotBot._wait_for_cli_ready"),
+ patch("copilot.CopilotClient", return_value=AsyncMock()),
+ patch("copilot.ExternalServerConfig", return_value=MagicMock()),
+ ):
+ bot = CopilotBot(
+ model="gpt-4.1",
+ environment=mock_environment,
+ github_token="ghp_test",
+ )
+ # _install_copilot_cli was called during __init__
+ # Verify that execute was called with npm install command
+ calls = [str(c) for c in mock_environment.execute.call_args_list]
+ npm_calls = [c for c in calls if "npm install" in c or "copilot" in c]
+ assert len(npm_calls) > 0, "Expected copilot-cli install commands"
+ bot.stop()
+
+ def test_install_cli_raises_on_failure(self, mock_environment):
+ from microbots.bot.CopilotBot import CopilotBot
+
+ fail_return = MagicMock()
+ fail_return.return_code = 1
+ fail_return.stdout = ""
+ fail_return.stderr = "npm ERR! not found"
+ mock_environment.execute = MagicMock(return_value=fail_return)
+
+ with (
+ patch("microbots.bot.CopilotBot.get_free_port", side_effect=[9000]),
+ patch("microbots.bot.CopilotBot.CopilotBot._start_copilot_cli_server"),
+ patch("microbots.bot.CopilotBot.CopilotBot._wait_for_cli_ready"),
+ patch("copilot.CopilotClient", return_value=AsyncMock()),
+ patch("copilot.ExternalServerConfig", return_value=MagicMock()),
+ ):
+ with pytest.raises(RuntimeError, match="Failed to install copilot-cli"):
+ CopilotBot(
+ model="gpt-4.1",
+ environment=mock_environment,
+ github_token="ghp_test",
+ )
+
+
+# ---------------------------------------------------------------------------
+# Unit tests — resolve_auth_config and BYOK
+# ---------------------------------------------------------------------------
+
+@pytest.mark.unit
+class TestResolveAuthConfig:
+ """Tests for the standalone resolve_auth_config function."""
+
+ def test_explicit_api_key_returns_byok_provider(self):
+ from microbots.bot.CopilotBot import resolve_auth_config
+
+ model, gh_token, provider = resolve_auth_config(
+ model="gpt-4.1",
+ api_key="sk-test-key",
+ base_url="https://api.openai.com/v1",
+ )
+ assert model == "gpt-4.1"
+ assert gh_token is None
+ assert provider is not None
+ assert provider["type"] == "openai"
+ assert provider["base_url"] == "https://api.openai.com/v1"
+ assert provider["api_key"] == "sk-test-key"
+ assert "bearer_token" not in provider
+
+ def test_explicit_bearer_token_takes_precedence_over_api_key(self):
+ from microbots.bot.CopilotBot import resolve_auth_config
+
+ _, _, provider = resolve_auth_config(
+ model="gpt-4.1",
+ api_key="sk-key",
+ bearer_token="my-bearer",
+ base_url="https://api.openai.com/v1",
+ )
+ assert provider["bearer_token"] == "my-bearer"
+ assert "api_key" not in provider
+
+ def test_explicit_api_key_without_base_url_raises(self):
+ from microbots.bot.CopilotBot import resolve_auth_config
+
+ with pytest.raises(ValueError, match="base_url"):
+ resolve_auth_config(model="gpt-4.1", api_key="sk-test")
+
+ def test_azure_provider_type_includes_api_version(self):
+ from microbots.bot.CopilotBot import resolve_auth_config
+
+ _, _, provider = resolve_auth_config(
+ model="gpt-4.1",
+ api_key="azure-key",
+ base_url="https://my-resource.openai.azure.com",
+ provider_type="azure",
+ azure_api_version="2024-10-21",
+ )
+ assert provider["type"] == "azure"
+ assert provider["azure"] == {"api_version": "2024-10-21"}
+
+ def test_wire_api_included_when_set(self):
+ from microbots.bot.CopilotBot import resolve_auth_config
+
+ _, _, provider = resolve_auth_config(
+ model="gpt-5",
+ api_key="key",
+ base_url="https://endpoint.com/v1",
+ wire_api="responses",
+ )
+ assert provider["wire_api"] == "responses"
+
+ def test_env_vars_resolve_byok(self, monkeypatch):
+ from microbots.bot.CopilotBot import resolve_auth_config
+
+ monkeypatch.setenv("COPILOT_BYOK_BASE_URL", "https://env-endpoint.com/v1")
+ monkeypatch.setenv("COPILOT_BYOK_API_KEY", "env-key")
+ monkeypatch.setenv("COPILOT_BYOK_PROVIDER_TYPE", "anthropic")
+ monkeypatch.setenv("COPILOT_BYOK_MODEL", "claude-sonnet-4.5")
+
+ model, gh_token, provider = resolve_auth_config(model="gpt-4.1")
+ assert model == "claude-sonnet-4.5"
+ assert gh_token is None
+ assert provider["type"] == "anthropic"
+ assert provider["base_url"] == "https://env-endpoint.com/v1"
+ assert provider["api_key"] == "env-key"
+
+ def test_env_vars_bearer_token(self, monkeypatch):
+ from microbots.bot.CopilotBot import resolve_auth_config
+
+ monkeypatch.setenv("COPILOT_BYOK_BASE_URL", "https://endpoint.com/v1")
+ monkeypatch.setenv("COPILOT_BYOK_BEARER_TOKEN", "env-bearer")
+
+ _, _, provider = resolve_auth_config(model="gpt-4.1")
+ assert provider["bearer_token"] == "env-bearer"
+ assert "api_key" not in provider
+
+ def test_env_vars_ignored_when_explicit_key_provided(self, monkeypatch):
+ from microbots.bot.CopilotBot import resolve_auth_config
+
+ monkeypatch.setenv("COPILOT_BYOK_BASE_URL", "https://env-endpoint.com/v1")
+ monkeypatch.setenv("COPILOT_BYOK_API_KEY", "env-key")
+
+ _, _, provider = resolve_auth_config(
+ model="gpt-4.1",
+ api_key="explicit-key",
+ base_url="https://explicit.com/v1",
+ )
+ assert provider["api_key"] == "explicit-key"
+ assert provider["base_url"] == "https://explicit.com/v1"
+
+ def test_token_provider_returns_byok_with_bearer(self):
+ from microbots.bot.CopilotBot import resolve_auth_config
+
+ _, _, provider = resolve_auth_config(
+ model="gpt-4.1",
+ base_url="https://azure.endpoint.com/v1",
+ token_provider=lambda: "ad-token-123",
+ )
+ assert provider["bearer_token"] == "ad-token-123"
+ assert "api_key" not in provider
+
+ def test_token_provider_without_base_url_raises(self):
+ from microbots.bot.CopilotBot import resolve_auth_config
+
+ with pytest.raises(ValueError, match="base_url"):
+ resolve_auth_config(
+ model="gpt-4.1",
+ token_provider=lambda: "token",
+ )
+
+ def test_token_provider_not_callable_raises(self):
+ from microbots.bot.CopilotBot import resolve_auth_config
+
+ with pytest.raises(ValueError, match="callable"):
+ resolve_auth_config(
+ model="gpt-4.1",
+ base_url="https://endpoint.com/v1",
+ token_provider="not-a-callable",
+ )
+
+ def test_token_provider_returning_empty_raises(self):
+ from microbots.bot.CopilotBot import resolve_auth_config
+
+ with pytest.raises(ValueError, match="non-empty"):
+ resolve_auth_config(
+ model="gpt-4.1",
+ base_url="https://endpoint.com/v1",
+ token_provider=lambda: "",
+ )
+
+ def test_token_provider_exception_raises(self):
+ from microbots.bot.CopilotBot import resolve_auth_config
+
+ def bad_provider():
+ raise RuntimeError("auth failed")
+
+ with pytest.raises(ValueError, match="auth failed"):
+ resolve_auth_config(
+ model="gpt-4.1",
+ base_url="https://endpoint.com/v1",
+ token_provider=bad_provider,
+ )
+
+ def test_fallback_to_github_token(self):
+ from microbots.bot.CopilotBot import resolve_auth_config
+
+ model, gh_token, provider = resolve_auth_config(
+ model="gpt-4.1",
+ github_token="ghp_test123",
+ )
+ assert model == "gpt-4.1"
+ assert gh_token == "ghp_test123"
+ assert provider is None
+
+ def test_default_provider_type_is_openai(self):
+ from microbots.bot.CopilotBot import resolve_auth_config
+
+ _, _, provider = resolve_auth_config(
+ model="m", api_key="k", base_url="https://x.com/v1"
+ )
+ assert provider["type"] == "openai"
+
+ def test_anthropic_provider_type(self):
+ from microbots.bot.CopilotBot import resolve_auth_config
+
+ _, _, provider = resolve_auth_config(
+ model="claude-sonnet-4.5",
+ api_key="ant-key",
+ base_url="https://api.anthropic.com",
+ provider_type="anthropic",
+ )
+ assert provider["type"] == "anthropic"
+
+
+@pytest.mark.unit
+class TestCopilotBotBYOKInit:
+ """Tests for CopilotBot initialisation with BYOK parameters."""
+
+ def test_byok_api_key_sets_provider_config(self, mock_environment, mock_copilot_client):
+ with (
+ patch("microbots.bot.CopilotBot.LocalDockerEnvironment", return_value=mock_environment),
+ patch("microbots.bot.CopilotBot.get_free_port", side_effect=[9000]),
+ patch("microbots.bot.CopilotBot.CopilotBot._install_copilot_cli"),
+ patch("microbots.bot.CopilotBot.CopilotBot._start_copilot_cli_server"),
+ patch("microbots.bot.CopilotBot.CopilotBot._wait_for_cli_ready"),
+ patch("copilot.CopilotClient", return_value=mock_copilot_client),
+ patch("copilot.ExternalServerConfig", return_value=MagicMock()),
+ ):
+ from microbots.bot.CopilotBot import CopilotBot
+ bot = CopilotBot(
+ model="gpt-4.1",
+ environment=mock_environment,
+ api_key="sk-byok-key",
+ base_url="https://api.openai.com/v1",
+ )
+ assert bot._provider_config is not None
+ assert bot._provider_config["api_key"] == "sk-byok-key"
+ assert bot.github_token is None
+ bot.stop()
+
+ def test_byok_token_provider_sets_provider_config(self, mock_environment, mock_copilot_client):
+ with (
+ patch("microbots.bot.CopilotBot.LocalDockerEnvironment", return_value=mock_environment),
+ patch("microbots.bot.CopilotBot.get_free_port", side_effect=[9000]),
+ patch("microbots.bot.CopilotBot.CopilotBot._install_copilot_cli"),
+ patch("microbots.bot.CopilotBot.CopilotBot._start_copilot_cli_server"),
+ patch("microbots.bot.CopilotBot.CopilotBot._wait_for_cli_ready"),
+ patch("copilot.CopilotClient", return_value=mock_copilot_client),
+ patch("copilot.ExternalServerConfig", return_value=MagicMock()),
+ ):
+ from microbots.bot.CopilotBot import CopilotBot
+ bot = CopilotBot(
+ model="gpt-4.1",
+ environment=mock_environment,
+ base_url="https://azure.endpoint.com/v1",
+ token_provider=lambda: "ad-token-xyz",
+ )
+ assert bot._provider_config is not None
+ assert bot._provider_config["bearer_token"] == "ad-token-xyz"
+ assert bot.github_token is None
+ bot.stop()
+
+ def test_native_auth_has_no_provider_config(self, copilot_bot):
+ assert copilot_bot._provider_config is None
+ assert copilot_bot.github_token == "ghp_test_token_123"
+
+
+# ---------------------------------------------------------------------------
+# Helper context manager shared by several new test classes
+# ---------------------------------------------------------------------------
+
+def _standard_init_patches(mock_environment, mock_copilot_client):
+ """Return a combined context manager for standard CopilotBot init patches."""
+ return (
+ patch("microbots.bot.CopilotBot.LocalDockerEnvironment", return_value=mock_environment),
+ patch("microbots.bot.CopilotBot.get_free_port", side_effect=[9000]),
+ patch("microbots.bot.CopilotBot.CopilotBot._install_copilot_cli"),
+ patch("microbots.bot.CopilotBot.CopilotBot._start_copilot_cli_server"),
+ patch("microbots.bot.CopilotBot.CopilotBot._wait_for_cli_ready"),
+ patch("copilot.CopilotClient", return_value=mock_copilot_client),
+ patch("copilot.ExternalServerConfig", return_value=MagicMock()),
+ )
+
+
+# ---------------------------------------------------------------------------
+# Unit tests — __init__ with folder_to_mount and auto-created environment
+# ---------------------------------------------------------------------------
+
+@pytest.mark.unit
+class TestCopilotBotInitFolderMount:
+ """Tests for __init__ paths: folder_to_mount string and auto environment."""
+
+ def test_folder_to_mount_creates_mount_object(self, mock_environment, mock_copilot_client):
+ """When folder_to_mount string is provided, a Mount is stored."""
+ with (
+ patch("microbots.bot.CopilotBot.LocalDockerEnvironment", return_value=mock_environment),
+ patch("microbots.bot.CopilotBot.get_free_port", side_effect=[9000]),
+ patch("microbots.bot.CopilotBot.CopilotBot._install_copilot_cli"),
+ patch("microbots.bot.CopilotBot.CopilotBot._start_copilot_cli_server"),
+ patch("microbots.bot.CopilotBot.CopilotBot._wait_for_cli_ready"),
+ patch("copilot.CopilotClient", return_value=mock_copilot_client),
+ patch("copilot.ExternalServerConfig", return_value=MagicMock()),
+ ):
+ from microbots.bot.CopilotBot import CopilotBot
+ bot = CopilotBot(
+ model="gpt-4.1",
+ environment=mock_environment,
+ github_token="ghp_test",
+ folder_to_mount="/tmp/test_repo",
+ )
+ assert bot.folder_to_mount is not None
+ assert "test_repo" in bot.folder_to_mount.sandbox_path
+ bot.stop()
+
+ def test_auto_creates_environment_when_none(self, mock_environment, mock_copilot_client):
+ """When environment=None, LocalDockerEnvironment is instantiated."""
+ mock_environment.get_ipv4_address = MagicMock(return_value="172.17.0.2")
+ with (
+ patch(
+ "microbots.bot.CopilotBot.LocalDockerEnvironment",
+ return_value=mock_environment,
+ ) as mock_lde,
+ patch("microbots.bot.CopilotBot.get_free_port", return_value=9000),
+ patch("microbots.bot.CopilotBot.CopilotBot._install_copilot_cli"),
+ patch("microbots.bot.CopilotBot.CopilotBot._start_copilot_cli_server"),
+ patch("microbots.bot.CopilotBot.CopilotBot._wait_for_cli_ready"),
+ patch("copilot.CopilotClient", return_value=mock_copilot_client),
+ patch("copilot.ExternalServerConfig", return_value=MagicMock()),
+ ):
+ from microbots.bot.CopilotBot import CopilotBot
+ bot = CopilotBot(
+ model="gpt-4.1",
+ github_token="ghp_test",
+ )
+ mock_lde.assert_called_once()
+ assert bot.environment is mock_environment
+ bot.stop()
+
+
+# ---------------------------------------------------------------------------
+# Unit tests — non-ExternalTool installation in __init__
+# ---------------------------------------------------------------------------
+
+@pytest.mark.unit
+class TestCopilotBotNonExternalToolInstall:
+ """Tests that non-ExternalTool tools are installed during __init__."""
+
+ def test_non_external_tool_install_and_verify_called(
+ self, mock_environment, mock_copilot_client
+ ):
+ """install_tool and verify_tool_installation are called for regular tools."""
+ from microbots.tools.tool import ToolAbstract
+
+ mock_tool = MagicMock(spec=ToolAbstract)
+ mock_tool.name = "my_tool"
+ mock_tool.usage_instructions_to_llm = "Use my_tool"
+
+ with (
+ patch("microbots.bot.CopilotBot.LocalDockerEnvironment", return_value=mock_environment),
+ patch("microbots.bot.CopilotBot.get_free_port", side_effect=[9000]),
+ patch("microbots.bot.CopilotBot.CopilotBot._install_copilot_cli"),
+ patch("microbots.bot.CopilotBot.CopilotBot._start_copilot_cli_server"),
+ patch("microbots.bot.CopilotBot.CopilotBot._wait_for_cli_ready"),
+ patch("copilot.CopilotClient", return_value=mock_copilot_client),
+ patch("copilot.ExternalServerConfig", return_value=MagicMock()),
+ ):
+ from microbots.bot.CopilotBot import CopilotBot
+ bot = CopilotBot(
+ model="gpt-4.1",
+ environment=mock_environment,
+ github_token="ghp_test",
+ additional_tools=[mock_tool],
+ )
+ mock_tool.install_tool.assert_called_once_with(mock_environment)
+ mock_tool.verify_tool_installation.assert_called_once_with(mock_environment)
+ bot.stop()
+
+
+# ---------------------------------------------------------------------------
+# Unit tests — _install_copilot_cli verification failure
+# ---------------------------------------------------------------------------
+
+@pytest.mark.unit
+class TestCopilotBotCLIVerification:
+ """Tests that copilot-cli verification failure raises RuntimeError."""
+
+ def test_install_cli_verify_fails_raises(self, mock_environment):
+ """RuntimeError raised when install commands succeed but 'copilot --version' fails."""
+ from microbots.bot.CopilotBot import CopilotBot
+
+ success_ret = MagicMock()
+ success_ret.return_code = 0
+ success_ret.stdout = ""
+ success_ret.stderr = ""
+
+ fail_ret = MagicMock()
+ fail_ret.return_code = 1
+ fail_ret.stdout = ""
+ fail_ret.stderr = "command not found: copilot"
+
+ def side_effect(cmd, **kwargs):
+ if "copilot --version" in cmd:
+ return fail_ret
+ return success_ret
+
+ mock_environment.execute = MagicMock(side_effect=side_effect)
+
+ with (
+ patch("microbots.bot.CopilotBot.get_free_port", side_effect=[9000]),
+ patch("microbots.bot.CopilotBot.CopilotBot._start_copilot_cli_server"),
+ patch("microbots.bot.CopilotBot.CopilotBot._wait_for_cli_ready"),
+ patch("copilot.CopilotClient", return_value=AsyncMock()),
+ patch("copilot.ExternalServerConfig", return_value=MagicMock()),
+ ):
+ with pytest.raises(RuntimeError, match="verification failed"):
+ CopilotBot(
+ model="gpt-4.1",
+ environment=mock_environment,
+ github_token="ghp_test",
+ )
+
+
+# ---------------------------------------------------------------------------
+# Unit tests — _start_copilot_cli_server
+# ---------------------------------------------------------------------------
+
+@pytest.mark.unit
+class TestCopilotBotStartServer:
+ """Tests for _start_copilot_cli_server()."""
+
+ def _make_bot_for_server_test(self, mock_environment, mock_copilot_client, github_token=None, provider_config=None):
+ """Build a CopilotBot with _start_copilot_cli_server NOT patched."""
+ with (
+ patch("microbots.bot.CopilotBot.LocalDockerEnvironment", return_value=mock_environment),
+ patch("microbots.bot.CopilotBot.get_free_port", side_effect=[9000]),
+ patch("microbots.bot.CopilotBot.CopilotBot._install_copilot_cli"),
+ patch("microbots.bot.CopilotBot.CopilotBot._wait_for_cli_ready"),
+ patch("copilot.CopilotClient", return_value=mock_copilot_client),
+ patch("copilot.ExternalServerConfig", return_value=MagicMock()),
+ ):
+ from microbots.bot.CopilotBot import CopilotBot
+ if github_token:
+ bot = CopilotBot(
+ model="gpt-4.1",
+ environment=mock_environment,
+ github_token=github_token,
+ )
+ elif provider_config:
+ bot = CopilotBot(
+ model="gpt-4.1",
+ environment=mock_environment,
+ api_key="sk-key",
+ base_url="https://api.openai.com/v1",
+ )
+ else:
+ bot = CopilotBot(
+ model="gpt-4.1",
+ environment=mock_environment,
+ )
+ return bot
+
+ def test_start_server_injects_github_token(self, mock_environment, mock_copilot_client):
+ """Token injection calls when github_token is set without provider_config."""
+ bot = self._make_bot_for_server_test(
+ mock_environment, mock_copilot_client, github_token="ghp_server_test"
+ )
+ execute_args = [str(c) for c in mock_environment.execute.call_args_list]
+ assert any("GITHUB_TOKEN" in a for a in execute_args)
+ assert any("COPILOT_GITHUB_TOKEN" in a for a in execute_args)
+ bot.stop()
+
+ def test_start_server_skips_token_injection_for_byok(
+ self, mock_environment, mock_copilot_client
+ ):
+ """No token injection when BYOK provider_config is active."""
+ bot = self._make_bot_for_server_test(
+ mock_environment, mock_copilot_client, provider_config=True
+ )
+ execute_args = [str(c) for c in mock_environment.execute.call_args_list]
+ assert not any("GITHUB_TOKEN" in a for a in execute_args)
+ bot.stop()
+
+ def test_start_server_raises_on_execute_failure(self, mock_environment, mock_copilot_client):
+ """RuntimeError raised when start_cmd execute fails."""
+ fail_ret = MagicMock()
+ fail_ret.return_code = 1
+ fail_ret.stderr = "failed to start"
+
+ success_ret = MagicMock()
+ success_ret.return_code = 0
+ success_ret.stdout = ""
+ success_ret.stderr = ""
+
+ def side_effect(cmd, **kwargs):
+ if "copilot --headless" in cmd:
+ return fail_ret
+ return success_ret
+
+ mock_environment.execute = MagicMock(side_effect=side_effect)
+
+ with (
+ patch("microbots.bot.CopilotBot.LocalDockerEnvironment", return_value=mock_environment),
+ patch("microbots.bot.CopilotBot.get_free_port", side_effect=[9000]),
+ patch("microbots.bot.CopilotBot.CopilotBot._install_copilot_cli"),
+ patch("microbots.bot.CopilotBot.CopilotBot._wait_for_cli_ready"),
+ patch("copilot.CopilotClient", return_value=mock_copilot_client),
+ patch("copilot.ExternalServerConfig", return_value=MagicMock()),
+ ):
+ from microbots.bot.CopilotBot import CopilotBot
+ with pytest.raises(RuntimeError, match="Failed to start copilot-cli server"):
+ CopilotBot(
+ model="gpt-4.1",
+ environment=mock_environment,
+ github_token="ghp_test",
+ )
+
+
+# ---------------------------------------------------------------------------
+# Unit tests — _wait_for_cli_ready
+# ---------------------------------------------------------------------------
+
+@pytest.mark.unit
+class TestCopilotBotWaitReady:
+ """Tests for _wait_for_cli_ready() called directly on a minimal instance."""
+
+ def _make_minimal_bot(self):
+ """Return a bare CopilotBot instance with only environment set."""
+ from microbots.bot.CopilotBot import CopilotBot
+
+ bot = object.__new__(CopilotBot)
+ mock_env = MagicMock()
+ mock_env.get_ipv4_address = MagicMock(return_value="127.0.0.1")
+ bot.environment = mock_env
+ return bot
+
+ def test_wait_for_cli_ready_success(self):
+ """Returns immediately when socket connection succeeds."""
+ bot = self._make_minimal_bot()
+ mock_sock = MagicMock()
+ with patch("socket.create_connection", return_value=mock_sock) as mock_conn:
+ bot._wait_for_cli_ready()
+ mock_conn.assert_called_once()
+ mock_sock.close.assert_called_once()
+
+ def test_wait_for_cli_ready_timeout(self):
+ """Raises TimeoutError when connections always fail past the deadline."""
+ bot = self._make_minimal_bot()
+ with (
+ patch("socket.create_connection", side_effect=ConnectionRefusedError()),
+ patch("microbots.bot.CopilotBot.time") as mock_time,
+ ):
+ # First call sets deadline (0 + _CLI_STARTUP_TIMEOUT), second exceeds it
+ mock_time.time.side_effect = [0, 9999]
+ mock_time.sleep = MagicMock()
+ with pytest.raises(TimeoutError, match="copilot-cli did not become ready"):
+ bot._wait_for_cli_ready()
+
+ def test_wait_for_cli_ready_oserror_retries(self):
+ """OSError is caught and retried like ConnectionRefusedError."""
+ bot = self._make_minimal_bot()
+ mock_sock = MagicMock()
+ # First attempt raises OSError, second attempt succeeds
+ with patch("socket.create_connection", side_effect=[OSError("network error"), mock_sock]):
+ with patch("microbots.bot.CopilotBot.time") as mock_time:
+ mock_time.time.side_effect = [0, 1, 2]
+ mock_time.sleep = MagicMock()
+ bot._wait_for_cli_ready()
+ mock_sock.close.assert_called_once()
+
+
+# ---------------------------------------------------------------------------
+# Unit tests — run() with additional_mounts
+# ---------------------------------------------------------------------------
+
+@pytest.mark.unit
+class TestCopilotBotRunMounts:
+ """Tests for run() with additional_mounts parameter."""
+
+ def test_run_with_additional_mounts_calls_mount_additional(self, copilot_bot):
+ """_mount_additional is called for each mount in additional_mounts."""
+ from microbots.extras.mount import Mount, MountType
+
+ mock_mount = MagicMock(spec=Mount)
+ mock_mount.mount_type = MountType.COPY
+ mock_mount.host_path_info = MagicMock()
+ mock_mount.host_path_info.abs_path = "/tmp/extra"
+ mock_mount.sandbox_path = "/workdir/extra"
+
+ copilot_bot.environment.copy_to_container = MagicMock(return_value=True)
+
+ with patch.object(copilot_bot, "_mount_additional") as mock_ma:
+ copilot_bot.run("test task", additional_mounts=[mock_mount])
+ mock_ma.assert_called_once_with(mock_mount)
+
+
+# ---------------------------------------------------------------------------
+# Unit tests — _execute_session
+# ---------------------------------------------------------------------------
+
+@pytest.mark.unit
+class TestCopilotBotExecuteSession:
+ """Tests for _execute_session() paths."""
+
+ def test_execute_session_includes_provider_config(self, mock_environment, mock_copilot_client):
+ """provider is added to session kwargs when _provider_config is set."""
+ import asyncio
+
+ session = AsyncMock()
+ response = Mock()
+ response.data = Mock()
+ response.data.content = "done"
+ session.send_and_wait = AsyncMock(return_value=response)
+ session.on = MagicMock()
+ session.disconnect = AsyncMock()
+ mock_copilot_client.create_session = AsyncMock(return_value=session)
+
+ with (
+ patch("microbots.bot.CopilotBot.LocalDockerEnvironment", return_value=mock_environment),
+ patch("microbots.bot.CopilotBot.get_free_port", side_effect=[9000]),
+ patch("microbots.bot.CopilotBot.CopilotBot._install_copilot_cli"),
+ patch("microbots.bot.CopilotBot.CopilotBot._start_copilot_cli_server"),
+ patch("microbots.bot.CopilotBot.CopilotBot._wait_for_cli_ready"),
+ patch("copilot.CopilotClient", return_value=mock_copilot_client),
+ patch("copilot.ExternalServerConfig", return_value=MagicMock()),
+ ):
+ from microbots.bot.CopilotBot import CopilotBot
+ bot = CopilotBot(
+ model="gpt-4.1",
+ environment=mock_environment,
+ api_key="sk-key",
+ base_url="https://api.openai.com/v1",
+ )
+ result = asyncio.run(
+ bot._execute_session(
+ task="do something",
+ system_content="",
+ timeout=30,
+ streaming=False,
+ )
+ )
+ _, call_kwargs = mock_copilot_client.create_session.call_args
+ assert "provider" in call_kwargs
+ assert result == "done"
+ bot.stop()
+
+ def test_execute_session_includes_system_message(self, mock_environment, mock_copilot_client):
+ """system_message is added to session kwargs when system_content is non-empty."""
+ import asyncio
+
+ session = AsyncMock()
+ response = Mock()
+ response.data = Mock()
+ response.data.content = "done"
+ session.send_and_wait = AsyncMock(return_value=response)
+ session.on = MagicMock()
+ session.disconnect = AsyncMock()
+ mock_copilot_client.create_session = AsyncMock(return_value=session)
+
+ with (
+ patch("microbots.bot.CopilotBot.LocalDockerEnvironment", return_value=mock_environment),
+ patch("microbots.bot.CopilotBot.get_free_port", side_effect=[9000]),
+ patch("microbots.bot.CopilotBot.CopilotBot._install_copilot_cli"),
+ patch("microbots.bot.CopilotBot.CopilotBot._start_copilot_cli_server"),
+ patch("microbots.bot.CopilotBot.CopilotBot._wait_for_cli_ready"),
+ patch("copilot.CopilotClient", return_value=mock_copilot_client),
+ patch("copilot.ExternalServerConfig", return_value=MagicMock()),
+ ):
+ from microbots.bot.CopilotBot import CopilotBot
+ bot = CopilotBot(
+ model="gpt-4.1",
+ environment=mock_environment,
+ github_token="ghp_test",
+ )
+ asyncio.run(
+ bot._execute_session(
+ task="do something",
+ system_content="You are a helper.",
+ timeout=30,
+ streaming=False,
+ )
+ )
+ _, call_kwargs = mock_copilot_client.create_session.call_args
+ assert "system_message" in call_kwargs
+ assert call_kwargs["system_message"]["content"] == "You are a helper."
+ bot.stop()
+
+ def test_execute_session_returns_collected_event_text(
+ self, mock_environment, mock_copilot_client
+ ):
+ """Returns last collected text when send_and_wait returns no content."""
+ import asyncio
+
+ session = AsyncMock()
+ # send_and_wait returns response with no content
+ empty_response = Mock()
+ empty_response.data = Mock()
+ empty_response.data.content = ""
+ session.send_and_wait = AsyncMock(return_value=empty_response)
+ session.disconnect = AsyncMock()
+
+ captured = []
+
+ def capture_on(callback):
+ captured.append(callback)
+
+ session.on = MagicMock(side_effect=capture_on)
+ mock_copilot_client.create_session = AsyncMock(return_value=session)
+
+ with (
+ patch("microbots.bot.CopilotBot.LocalDockerEnvironment", return_value=mock_environment),
+ patch("microbots.bot.CopilotBot.get_free_port", side_effect=[9000]),
+ patch("microbots.bot.CopilotBot.CopilotBot._install_copilot_cli"),
+ patch("microbots.bot.CopilotBot.CopilotBot._start_copilot_cli_server"),
+ patch("microbots.bot.CopilotBot.CopilotBot._wait_for_cli_ready"),
+ patch("copilot.CopilotClient", return_value=mock_copilot_client),
+ patch("copilot.ExternalServerConfig", return_value=MagicMock()),
+ ):
+ from microbots.bot.CopilotBot import CopilotBot
+ from copilot.generated.session_events import SessionEventType
+
+ bot = CopilotBot(
+ model="gpt-4.1",
+ environment=mock_environment,
+ github_token="ghp_test",
+ )
+
+ # Simulate an ASSISTANT_MESSAGE event arriving before send_and_wait returns
+ async def _send_and_wait_with_event(task, timeout):
+ if captured:
+ msg_event = Mock()
+ msg_event.type = SessionEventType.ASSISTANT_MESSAGE
+ msg_event.data = Mock()
+ msg_event.data.content = "from event"
+ captured[0](msg_event)
+ return empty_response
+
+ session.send_and_wait = _send_and_wait_with_event
+
+ result = asyncio.run(
+ bot._execute_session(
+ task="do something",
+ system_content="",
+ timeout=30,
+ streaming=False,
+ )
+ )
+ assert result == "from event"
+ bot.stop()
+
+ def test_execute_session_returns_fallback_when_no_content(
+ self, mock_environment, mock_copilot_client
+ ):
+ """Returns fallback message when no text is collected at all."""
+ import asyncio
+
+ session = AsyncMock()
+ empty_response = Mock()
+ empty_response.data = Mock()
+ empty_response.data.content = ""
+ session.send_and_wait = AsyncMock(return_value=empty_response)
+ session.on = MagicMock()
+ session.disconnect = AsyncMock()
+ mock_copilot_client.create_session = AsyncMock(return_value=session)
+
+ with (
+ patch("microbots.bot.CopilotBot.LocalDockerEnvironment", return_value=mock_environment),
+ patch("microbots.bot.CopilotBot.get_free_port", side_effect=[9000]),
+ patch("microbots.bot.CopilotBot.CopilotBot._install_copilot_cli"),
+ patch("microbots.bot.CopilotBot.CopilotBot._start_copilot_cli_server"),
+ patch("microbots.bot.CopilotBot.CopilotBot._wait_for_cli_ready"),
+ patch("copilot.CopilotClient", return_value=mock_copilot_client),
+ patch("copilot.ExternalServerConfig", return_value=MagicMock()),
+ ):
+ from microbots.bot.CopilotBot import CopilotBot
+ bot = CopilotBot(
+ model="gpt-4.1",
+ environment=mock_environment,
+ github_token="ghp_test",
+ )
+ result = asyncio.run(
+ bot._execute_session(
+ task="do something",
+ system_content="",
+ timeout=1,
+ streaming=False,
+ )
+ )
+ assert "without producing" in result
+ bot.stop()
+
+ def test_execute_session_on_event_handlers(self, mock_environment, mock_copilot_client):
+ """_on_event handles ASSISTANT_MESSAGE_DELTA, SESSION_IDLE, and unknown types."""
+ import asyncio
+
+ session = AsyncMock()
+ empty_response = Mock()
+ empty_response.data = Mock()
+ empty_response.data.content = ""
+ session.disconnect = AsyncMock()
+
+ captured = []
+
+ def capture_on(callback):
+ captured.append(callback)
+
+ session.on = MagicMock(side_effect=capture_on)
+
+ async def _send_and_wait_with_events(task, timeout):
+ if captured:
+ cb = captured[0]
+ # ASSISTANT_MESSAGE_DELTA with delta_content
+ delta_event = Mock()
+ delta_event.type = "assistant.message_delta"
+ delta_event.data = Mock()
+ delta_event.data.delta_content = "partial"
+ cb(delta_event)
+
+ # SESSION_IDLE
+ idle_event = Mock()
+ idle_event.type = "session.idle"
+ cb(idle_event)
+
+ # Unknown event type
+ unknown_event = Mock()
+ unknown_event.type = "some.other.event"
+ cb(unknown_event)
+
+ # ASSISTANT_MESSAGE with no content (data.content is empty)
+ msg_empty = Mock()
+ msg_empty.type = "assistant.message"
+ msg_empty.data = Mock()
+ msg_empty.data.content = ""
+ cb(msg_empty)
+
+ return empty_response
+
+ session.send_and_wait = _send_and_wait_with_events
+ mock_copilot_client.create_session = AsyncMock(return_value=session)
+
+ with (
+ patch("microbots.bot.CopilotBot.LocalDockerEnvironment", return_value=mock_environment),
+ patch("microbots.bot.CopilotBot.get_free_port", side_effect=[9000]),
+ patch("microbots.bot.CopilotBot.CopilotBot._install_copilot_cli"),
+ patch("microbots.bot.CopilotBot.CopilotBot._start_copilot_cli_server"),
+ patch("microbots.bot.CopilotBot.CopilotBot._wait_for_cli_ready"),
+ patch("copilot.CopilotClient", return_value=mock_copilot_client),
+ patch("copilot.ExternalServerConfig", return_value=MagicMock()),
+ ):
+ from microbots.bot.CopilotBot import CopilotBot
+ bot = CopilotBot(
+ model="gpt-4.1",
+ environment=mock_environment,
+ github_token="ghp_test",
+ )
+ # Should not raise — covers all branches of _on_event
+ asyncio.run(
+ bot._execute_session(
+ task="do something",
+ system_content="",
+ timeout=5,
+ streaming=False,
+ )
+ )
+ bot.stop()
+
+
+# ---------------------------------------------------------------------------
+# Unit tests — _on_pre_tool_use and _on_post_tool_use hooks
+# ---------------------------------------------------------------------------
+
+@pytest.mark.unit
+class TestCopilotBotToolUseHooks:
+ """Tests for _on_pre_tool_use() and _on_post_tool_use() async hooks."""
+
+ def test_on_pre_tool_use_returns_allow(self, copilot_bot):
+ import asyncio
+
+ result = asyncio.run(
+ copilot_bot._on_pre_tool_use(
+ {"toolName": "bash", "toolArgs": {"command": "ls"}},
+ None,
+ )
+ )
+ assert result == {"permissionDecision": "allow"}
+
+ def test_on_pre_tool_use_missing_keys(self, copilot_bot):
+ import asyncio
+
+ result = asyncio.run(copilot_bot._on_pre_tool_use({}, None))
+ assert result == {"permissionDecision": "allow"}
+
+ def test_on_post_tool_use_returns_empty_dict(self, copilot_bot):
+ import asyncio
+
+ result = asyncio.run(
+ copilot_bot._on_post_tool_use(
+ {"toolName": "bash", "toolResult": "output here"},
+ None,
+ )
+ )
+ assert result == {}
+
+ def test_on_post_tool_use_truncates_long_result(self, copilot_bot):
+ import asyncio
+
+ long_result = "x" * 600
+ # Should not raise even with a very long result string
+ result = asyncio.run(
+ copilot_bot._on_post_tool_use(
+ {"toolName": "bash", "toolResult": long_result},
+ None,
+ )
+ )
+ assert result == {}
+
+ def test_on_post_tool_use_missing_keys(self, copilot_bot):
+ import asyncio
+
+ result = asyncio.run(copilot_bot._on_post_tool_use({}, None))
+ assert result == {}
+
+
+# ---------------------------------------------------------------------------
+# Unit tests — _mount_additional
+# ---------------------------------------------------------------------------
+
+@pytest.mark.unit
+class TestCopilotBotMountAdditional:
+ """Tests for _mount_additional()."""
+
+ def test_mount_additional_non_copy_raises(self, copilot_bot):
+ """ValueError raised for non-COPY mount type."""
+ from microbots.extras.mount import Mount, MountType
+
+ mock_mount = MagicMock()
+ mock_mount.mount_type = MountType.MOUNT # not COPY
+ with pytest.raises(ValueError, match="Only COPY mount type"):
+ copilot_bot._mount_additional(mock_mount)
+
+ def test_mount_additional_copy_fails_raises(self, copilot_bot):
+ """ValueError raised when copy_to_container returns False."""
+ from microbots.extras.mount import MountType
+
+ mock_mount = MagicMock()
+ mock_mount.mount_type = MountType.COPY
+ mock_mount.host_path_info = MagicMock()
+ mock_mount.host_path_info.abs_path = "/host/path"
+ mock_mount.sandbox_path = "/workdir/path"
+
+ copilot_bot.environment.copy_to_container = MagicMock(return_value=False)
+ with pytest.raises(ValueError, match="Failed to copy additional mount"):
+ copilot_bot._mount_additional(mock_mount)
+
+ def test_mount_additional_copy_succeeds(self, copilot_bot):
+ """No error raised when copy_to_container succeeds."""
+ from microbots.extras.mount import MountType
+
+ mock_mount = MagicMock()
+ mock_mount.mount_type = MountType.COPY
+ mock_mount.host_path_info = MagicMock()
+ mock_mount.host_path_info.abs_path = "/host/path"
+ mock_mount.sandbox_path = "/workdir/path"
+
+ copilot_bot.environment.copy_to_container = MagicMock(return_value=True)
+ copilot_bot._mount_additional(mock_mount) # should not raise
+
+
+# ---------------------------------------------------------------------------
+# Integration tests — require real Docker + copilot-cli + auth
+# ---------------------------------------------------------------------------
+
+_skip_no_copilot_cli = pytest.mark.skipif(
+ not _copilot_cli_available(),
+ reason="GitHub Copilot CLI not installed (copilot not in PATH)",
+)
+
+_skip_no_copilot_sdk = pytest.mark.skipif(
+ not _copilot_sdk_installed(),
+ reason="github-copilot-sdk not installed (pip install microbots[ghcp])",
+)
+
+_skip_no_copilot_auth = pytest.mark.skipif(
+ not _copilot_auth_available(),
+ reason="No GitHub auth available (set GITHUB_TOKEN or run 'gh auth login')",
+)
+
+
+@_skip_no_copilot_cli
+@_skip_no_copilot_sdk
+@_skip_no_copilot_auth
+@pytest.mark.integration
+@pytest.mark.slow
+class TestCopilotBotIntegration:
+ """End-to-end integration tests with real Copilot SDK."""
+
+ def test_simple_task(self, test_repo, issue_1):
+ """CopilotBot can fix a simple syntax error."""
+ _restore_real_copilot_modules()
+ from microbots.bot.CopilotBot import CopilotBot
+
+ issue_text = issue_1[0]
+ verify_function = issue_1[1]
+
+ bot = CopilotBot(
+ model="gpt-4.1",
+ folder_to_mount=str(test_repo),
+ permission="READ_WRITE",
+ )
+
+ try:
+ result = bot.run(
+ issue_text,
+ timeout_in_seconds=300,
+ )
+ assert result.status is True, f"CopilotBot failed: {result.error}"
+ verify_function(test_repo)
+ finally:
+ bot.stop()
+
+
+# ---------------------------------------------------------------------------
+# BYOK helpers
+# ---------------------------------------------------------------------------
+
+def _byok_openai_available():
+ """Check if OpenAI BYOK credentials are configured via env vars."""
+ return bool(
+ os.environ.get("OPEN_AI_KEY")
+ and os.environ.get("OPEN_AI_END_POINT")
+ )
+
+
+_skip_no_byok_openai = pytest.mark.skipif(
+ not _byok_openai_available(),
+ reason="OpenAI BYOK not configured (set OPEN_AI_KEY and OPEN_AI_END_POINT)",
+)
+
+
+@_skip_no_copilot_cli
+@_skip_no_copilot_sdk
+@_skip_no_byok_openai
+@pytest.mark.integration
+@pytest.mark.slow
+class TestCopilotBotBYOKOpenAIIntegration:
+ """End-to-end integration tests for CopilotBot with OpenAI BYOK."""
+
+ def test_byok_openai_simple_task(self, test_repo, issue_1):
+ """CopilotBot can fix a simple syntax error using OpenAI BYOK credentials."""
+ _restore_real_copilot_modules()
+ from microbots.bot.CopilotBot import CopilotBot
+
+ issue_text = issue_1[0]
+ verify_function = issue_1[1]
+
+ api_key = os.environ["OPEN_AI_KEY"]
+ base_url = os.environ["OPEN_AI_END_POINT"]
+ model = os.getenv(
+ "AZURE_OPENAI_DEPLOYMENT_NAME", "mini-swe-agent-gpt5"
+ )
+
+ bot = CopilotBot(
+ model=model,
+ folder_to_mount=str(test_repo),
+ permission="READ_WRITE",
+ api_key=api_key,
+ base_url=base_url,
+ provider_type="openai",
+ )
+
+ try:
+ assert bot._provider_config is not None
+ assert bot._provider_config["type"] == "openai"
+ assert bot.github_token is None
+
+ result = bot.run(
+ issue_text,
+ timeout_in_seconds=300,
+ )
+ assert result.status is True, f"CopilotBot BYOK run failed: {result.error}"
+ verify_function(test_repo)
+ finally:
+ bot.stop()
diff --git a/test/bot/test_writing_bot_copilot.py b/test/bot/test_writing_bot_copilot.py
new file mode 100644
index 0000000..c515c17
--- /dev/null
+++ b/test/bot/test_writing_bot_copilot.py
@@ -0,0 +1,130 @@
+"""
+Integration test for CopilotApi — end-to-end code fix using GitHub Copilot.
+
+This test uses the WritingBot with the `github-copilot` provider to fix
+a real syntax error (missing colon) from the SWE-agent test repository.
+
+Prerequisites:
+ - GitHub Copilot CLI installed and in PATH (`copilot --version`)
+ - Authenticated via `copilot` login or GITHUB_TOKEN / GH_TOKEN env var
+ - Active GitHub Copilot subscription
+ - Install the ghcp extra: `pip install microbots[ghcp]`
+ - Docker daemon running
+
+Usage:
+------
+ # Run the integration test:
+ pytest test/bot/test_writing_bot_copilot.py -v -m "integration"
+"""
+
+import os
+import shutil
+import subprocess
+import sys
+
+import pytest
+
+sys.path.insert(
+ 0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../../src"))
+)
+
+import logging
+logging.basicConfig(level=logging.INFO)
+
+from microbots import WritingBot, BotRunResult
+
+
+def _copilot_cli_available():
+ """Check if the Copilot CLI is installed and accessible."""
+ return shutil.which("copilot") is not None
+
+
+def _copilot_sdk_installed():
+ """Check if the github-copilot-sdk package is installed."""
+ try:
+ import copilot # noqa: F401
+ return True
+ except ImportError:
+ return False
+
+
+def _copilot_auth_available():
+ """Check if GitHub authentication is available for Copilot."""
+ if os.environ.get("GITHUB_TOKEN"):
+ return True
+ # Check if gh CLI is authenticated
+ if shutil.which("gh"):
+ try:
+ result = subprocess.run(
+ ["gh", "auth", "status"],
+ capture_output=True, timeout=10,
+ )
+ return result.returncode == 0
+ except Exception:
+ pass
+ return False
+
+
+skip_no_copilot_cli = pytest.mark.skipif(
+ not _copilot_cli_available(),
+ reason="GitHub Copilot CLI not installed (copilot not in PATH)",
+)
+
+skip_no_copilot_sdk = pytest.mark.skipif(
+ not _copilot_sdk_installed(),
+ reason="github-copilot-sdk not installed (pip install microbots[ghcp])",
+)
+
+skip_no_copilot_auth = pytest.mark.skipif(
+ not _copilot_auth_available(),
+ reason="No GitHub auth available (set GITHUB_TOKEN or run 'gh auth login')",
+)
+
+
+@skip_no_copilot_cli
+@skip_no_copilot_sdk
+@skip_no_copilot_auth
+@pytest.mark.integration
+@pytest.mark.slow
+def test_writing_bot_copilot_fixes_syntax_error(test_repo, issue_1):
+ """
+ End-to-end test: WritingBot with GitHub Copilot fixes a syntax error.
+
+ The test-repo contains `tests/missing_colon.py` with a SyntaxError
+ (missing colon on a function definition). The WritingBot should:
+ 1. Read the error description
+ 2. Find the faulty file
+ 3. Fix the syntax error (add the missing colon)
+ 4. Verify the fix by running the script
+
+ After the bot completes, `verify_function` confirms the fix by
+ executing the script and asserting returncode == 0.
+ """
+ issue_text = issue_1[0]
+ verify_function = issue_1[1]
+
+ model = "github-copilot/gpt-4.1"
+
+ writing_bot = WritingBot(
+ model=model,
+ folder_to_mount=str(test_repo),
+ )
+
+ response: BotRunResult = writing_bot.run(
+ issue_text,
+ max_iterations=25,
+ timeout_in_seconds=300,
+ )
+
+ print(
+ f"Status: {response.status}, "
+ f"Result: {response.result}, "
+ f"Error: {response.error}"
+ )
+
+ assert response.status is True, (
+ f"WritingBot did not complete the task. Error: {response.error}"
+ )
+
+ # Verify the fix actually works: run the script, expect exit code 0
+ verify_function(test_repo)
diff --git a/test/environment/local_docker/test_local_docker_environment.py b/test/environment/local_docker/test_local_docker_environment.py
index 266f9f2..e67f495 100644
--- a/test/environment/local_docker/test_local_docker_environment.py
+++ b/test/environment/local_docker/test_local_docker_environment.py
@@ -492,3 +492,54 @@ def test_execute_redacts_command_when_sensitive(self, caplog):
assert "" in caplog.text
assert "SECRET_TOKEN" not in caplog.text
assert "abc123" not in caplog.text
+
+
+@pytest.mark.unit
+class TestGetIpv4Address:
+ """Unit tests for LocalDockerEnvironment.get_ipv4_address"""
+
+ def _make_env(self):
+ """Create a bare LocalDockerEnvironment without calling __init__"""
+ env = LocalDockerEnvironment.__new__(LocalDockerEnvironment)
+ env.deleted = True
+ env.container = None
+ return env
+
+ def test_returns_ip_address_from_container_networks(self):
+ """get_ipv4_address returns the IP from the first Docker network"""
+ env = self._make_env()
+ env.container = Mock()
+ env.container.attrs = {
+ "NetworkSettings": {
+ "Networks": {
+ "bridge": {"IPAddress": "172.17.0.2"}
+ }
+ }
+ }
+
+ result = env.get_ipv4_address()
+
+ env.container.reload.assert_called_once()
+ assert result == "172.17.0.2"
+
+ def test_raises_runtime_error_when_no_container(self):
+ """get_ipv4_address raises RuntimeError when container is None"""
+ env = self._make_env()
+
+ with pytest.raises(RuntimeError, match="No active container"):
+ env.get_ipv4_address()
+
+ def test_raises_runtime_error_when_ip_is_empty(self):
+ """get_ipv4_address raises RuntimeError when IP address is empty"""
+ env = self._make_env()
+ env.container = Mock()
+ env.container.attrs = {
+ "NetworkSettings": {
+ "Networks": {
+ "bridge": {"IPAddress": ""}
+ }
+ }
+ }
+
+ with pytest.raises(RuntimeError, match="Could not determine container IP address"):
+ env.get_ipv4_address()
diff --git a/test/swe-bench-test/run_swe_bench.py b/test/swe-bench-test/run_swe_bench.py
index ee3da52..ea6c01d 100644
--- a/test/swe-bench-test/run_swe_bench.py
+++ b/test/swe-bench-test/run_swe_bench.py
@@ -11,10 +11,55 @@
0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../../src"))
)
-from microbots import AgentBoss
+from microbots import AgentBoss, CopilotBot
+
+LOG_DIR = Path(__file__).parent.resolve() / "logs"
+LOG_DIR.mkdir(parents=True, exist_ok=True)
+
+LOG_FORMAT = logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s")
logger = logging.getLogger(__name__)
-logging.basicConfig(level=logging.INFO)
+logger.setLevel(logging.DEBUG)
+
+# Console output (always active)
+console_handler = logging.StreamHandler()
+console_handler.setLevel(logging.INFO)
+console_handler.setFormatter(LOG_FORMAT)
+logger.addHandler(console_handler)
+
+# Track per-instance file handlers so they can be swapped between test cases
+_active_file_handlers: list[logging.Handler] = []
+
+
+def setup_instance_logging(instance_id: str):
+ """Create per-instance log directory and swap file handlers."""
+ root = logging.getLogger()
+
+ # Remove previous instance file handlers
+ for h in _active_file_handlers:
+ root.removeHandler(h)
+ h.close()
+ _active_file_handlers.clear()
+
+ instance_log_dir = LOG_DIR / instance_id
+ instance_log_dir.mkdir(parents=True, exist_ok=True)
+
+ # Info log file
+ info_handler = logging.FileHandler(instance_log_dir / "info.log")
+ info_handler.setLevel(logging.INFO)
+ info_handler.setFormatter(LOG_FORMAT)
+
+ # Debug log file
+ debug_handler = logging.FileHandler(instance_log_dir / "debug.log")
+ debug_handler.setLevel(logging.DEBUG)
+ debug_handler.setFormatter(LOG_FORMAT)
+
+ root.setLevel(logging.DEBUG)
+ root.addHandler(info_handler)
+ root.addHandler(debug_handler)
+ _active_file_handlers.extend([info_handler, debug_handler])
+
+ logger.info("Logging for instance %s -> %s", instance_id, instance_log_dir)
# Verification method
# `pip install swebench`
@@ -94,6 +139,23 @@ def run_agent(dataset):
)
+def run_copilot_agent(dataset):
+ bot = CopilotBot(
+ model="gpt-5.4",
+ folder_to_mount=str(TEST_DIR / dataset['instance_id']),
+ permission="READ_WRITE",
+ )
+ try:
+ result = bot.run(
+ task=dataset['problem_statement'] + "\n\nHint: " + dataset['hints_text'],
+ timeout_in_seconds=3600 * 4, # 4 hours
+ )
+ if not result.status:
+ logger.error(f"CopilotBot failed on {dataset['instance_id']}: {result.error}")
+ finally:
+ bot.stop()
+
+
def generate_prediction(dataset):
repo_path = TEST_DIR / dataset['instance_id']
diff_output = subprocess.run(
@@ -136,6 +198,7 @@ def test_swe_bench():
datasets = load_dataset(SWE_BENCH_SUITE, split="test")
for instance in selected_dataset:
+ setup_instance_logging(instance)
dataset = datasets.filter(lambda x: x['instance_id'] == instance)[0]
logger.info(f"DATASET: {pprint(dataset)}")
setup_test_directory(dataset)
@@ -146,5 +209,19 @@ def test_swe_bench():
verify_fix()
+def test_swe_bench_copilot():
+ datasets = load_dataset(SWE_BENCH_SUITE, split="test")
+
+ for instance in selected_dataset:
+ setup_instance_logging(instance)
+ dataset = datasets.filter(lambda x: x['instance_id'] == instance)[0]
+ logger.info(f"DATASET: {pprint(dataset)}")
+ setup_test_directory(dataset)
+ run_copilot_agent(dataset)
+ generate_prediction(dataset)
+
+ verify_fix()
+
+
if __name__ == "__main__":
- test_swe_bench()
\ No newline at end of file
+ test_swe_bench_copilot()
\ No newline at end of file