From 6d4132d58dc035473bea13e8d6bc9e73d6e2c9fd Mon Sep 17 00:00:00 2001 From: Stran Dutton Date: Tue, 2 Jun 2026 10:29:31 -0500 Subject: [PATCH 1/4] feat: add OpenHound support for log upload & download (BED-7975) - Add management polling via GET /api/v2/clients/management before each job poll cycle so management operations take priority over collections. - Add support_bundle module: collect_log_files() gathers openhound.log and ext_*.log files; create_support_bundle() zips them with a timestamped filename (_support_bundle_%Y-%m-%d-%H-%M-%S.zip). - Add upload_support_bundle() on BloodHoundEnterprise client, POSTing the zip to POST /api/v2/clients/management/artifacts. - Add ManagementOperationType and ManagementOperationStatus as StrEnums (values confirmed via BED-8268 DB migration CHECK constraints). - Add ManagementOperation and ManagementAvailable Pydantic models with optional nullable fields from the DB schema (requested_by_user_id, started_at, completed_at, execution_time). - Add check_management() and _send_support_bundle() to scheduler Service. - Extend _poll() to check management before jobs; management errors are logged and swallowed so job collection continues uninterrupted. - Add 14 new tests covering management polling, poll ordering, log collection, bundle creation, upload, and cleanup. TODO(BED-8266): Confirm response field names once GET /api/v2/clients/management is fully implemented in BHE. --- .../core/clients/bloodhound_enterprise.py | 36 ++ src/openhound/core/clients/models/jobs.py | 37 ++ src/openhound/core/support_bundle.py | 94 +++++ src/openhound/scheduler/service.py | 102 +++++- src/scheduler.py | 10 +- tests/test_bhe_job_scheduling.py | 331 +++++++++++++++++- .../management_available_empty.json | 3 + .../management_available_with_operation.json | 10 + 8 files changed, 607 insertions(+), 16 deletions(-) create mode 100644 src/openhound/core/support_bundle.py create mode 100644 tests/test_data/api/management/management_available_empty.json create mode 100644 tests/test_data/api/management/management_available_with_operation.json diff --git a/src/openhound/core/clients/bloodhound_enterprise.py b/src/openhound/core/clients/bloodhound_enterprise.py index a76d3cb..677f27b 100644 --- a/src/openhound/core/clients/bloodhound_enterprise.py +++ b/src/openhound/core/clients/bloodhound_enterprise.py @@ -1,6 +1,7 @@ import gzip import json from enum import Enum +from pathlib import Path from openhound.core.clients.bloodhound import BloodHound from openhound.core.clients.models.jobs import ( @@ -8,6 +9,7 @@ JobsCurrent, JobsEnd, JobStart, + ManagementAvailable, ) @@ -52,3 +54,37 @@ def ingest(self, data: str) -> None: self.request( method="POST", path=path, body=compressed_data, extra_headers=headers ) + + @property + def management_available(self) -> ManagementAvailable: + # Endpoint confirmed via BHADR-6 ADR (BED-8268). + # TODO(BED-8266): Confirm response field names match the Go handler once + # GET /api/v2/clients/management is fully implemented in BHE. + path = "/api/v2/clients/management" + response = self.request(method="GET", path=path) + return ManagementAvailable.model_validate(response.json()) + + def upload_support_bundle(self, bundle_path: Path) -> None: + """Upload a support bundle zip file to BHE. + + Reads the entire zip file into memory and POSTs it as raw bytes with + Content-Type: application/zip. BHE returns 202 Accepted on success. + + Args: + bundle_path: Path to the zip file to upload. + + # TODO(BED-7968): Confirm upload endpoint path and Content-Type header once + # POST /api/v2/clients/management/artifacts is merged and deployed. + # TODO: For bundles >= 1 GB consider streaming instead of reading into memory. + # This requires refactoring BloodHound._request() to accept a file-like body + # since the HMAC signature currently requires the full body bytes. + """ + path = "/api/v2/clients/management/artifacts" + with bundle_path.open("rb") as f: + body = f.read() + self.request( + method="POST", + path=path, + body=body, + extra_headers={"Content-Type": "application/zip"}, + ) diff --git a/src/openhound/core/clients/models/jobs.py b/src/openhound/core/clients/models/jobs.py index b02e654..ad2ed29 100644 --- a/src/openhound/core/clients/models/jobs.py +++ b/src/openhound/core/clients/models/jobs.py @@ -1,5 +1,6 @@ from pydantic import BaseModel from datetime import datetime +from enum import StrEnum from typing import Union @@ -51,3 +52,39 @@ class JobsCurrent(BaseModel): class JobsEnd(BaseModel): data: Job + + +# Values match the CHECK constraint on collector_management_operations.type in BHE. +# See: lib/go/database/migration/migrations/20260529140822_v9_collector_artifacts.sql +class ManagementOperationType(StrEnum): + SUPPORT_BUNDLE = "support_bundle" + + +# Values match the CHECK constraint on collector_management_operations.status in BHE. +# See: lib/go/database/migration/migrations/20260529140822_v9_collector_artifacts.sql +class ManagementOperationStatus(StrEnum): + QUEUED = "queued" + RUNNING = "running" + SUCCEEDED = "succeeded" + FAILED = "failed" + CANCELED = "canceled" + TIMED_OUT = "timed_out" + EXPIRED = "expired" + + +class ManagementOperation(BaseModel): + # Fields sourced from the collector_management_operations DB schema (BED-8268). + # TODO(BED-8266): Confirm all field names match the actual BHE JSON response shape + # once GET /api/v2/clients/management is implemented in BHE. + id: str + type: str + status: str + created_at: datetime + requested_by_user_id: str | None = None + started_at: datetime | None = None + completed_at: datetime | None = None + execution_time: datetime | None = None + + +class ManagementAvailable(BaseModel): + data: list[ManagementOperation] diff --git a/src/openhound/core/support_bundle.py b/src/openhound/core/support_bundle.py new file mode 100644 index 0000000..8473ed6 --- /dev/null +++ b/src/openhound/core/support_bundle.py @@ -0,0 +1,94 @@ +import logging +import tempfile +import zipfile +from datetime import UTC, datetime +from pathlib import Path + +logger = logging.getLogger(__name__) + +# Glob patterns that match the platform log and all rotated backups. +# The CustomLogger writes to /openhound.log and rotates to +# /openhound.log.YYYY-MM-DD_HH(-MM-SS)?. +_PLATFORM_LOG_PATTERNS = ["openhound.log", "openhound.log.*"] + +# Glob patterns that match extension/job run logs and all rotated backups. +# The CustomLogger writes to /ext_.log and rotates to +# /ext_.log.YYYY-MM-DD_HH(-MM-SS)?. +_JOB_LOG_PATTERNS = ["ext_*.log", "ext_*.log.*"] + + +def collect_log_files(log_base_path: Path) -> list[Path]: + """Collect all current and rotated log files from the log directory. + + Finds the platform log (openhound.log) and all job run logs (ext_*.log), + including any rotated backup files produced by CustomLogger's + RotatingFileHandler. + + Args: + log_base_path: The directory where OpenHound writes its log files. + This is CustomLogger.base_path after setup() has been called. + + Returns: + Sorted list of Path objects for each log file found. Empty if the + directory does not exist or contains no matching files. + """ + if not log_base_path.is_dir(): + logger.warning( + f"Log directory does not exist, support bundle will be empty: {log_base_path}" + ) + return [] + + found: set[Path] = set() + for pattern in _PLATFORM_LOG_PATTERNS + _JOB_LOG_PATTERNS: + found.update(log_base_path.glob(pattern)) + + log_files = sorted(f for f in found if f.is_file()) + + if not log_files: + logger.warning( + f"No log files found in {log_base_path}; support bundle will be empty." + ) + else: + logger.debug(f"Collected {len(log_files)} log file(s) for support bundle.") + + return log_files + + +def create_support_bundle(collector_name: str, log_base_path: Path) -> Path: + """Collect all log files and zip them into a named support bundle. + + The zip file is written to a temporary directory so it does not pollute + the log directory. The caller is responsible for deleting the file after + it has been uploaded. + + Filename format: _support_bundle_YYYY-MM-DD-HH-MM-SS.zip + (UTC timestamp, dashes as separators to match the acceptance criteria.) + + Files inside the zip are stored flat (basename only, no directory prefix). + If two rotated backups share the same basename they will collide; this is + not expected given CustomLogger's naming conventions. + + Args: + collector_name: The configured collector name (used in the zip filename). + log_base_path: The directory where OpenHound writes its log files. + + Returns: + Path to the created zip file inside a temporary directory. + """ + timestamp = datetime.now(UTC).strftime("%Y-%m-%d-%H-%M-%S") + zip_name = f"{collector_name}_support_bundle_{timestamp}.zip" + + tmp_dir = Path(tempfile.mkdtemp()) + zip_path = tmp_dir / zip_name + + log_files = collect_log_files(log_base_path) + + with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as zf: + for log_file in log_files: + zf.write(log_file, arcname=log_file.name) + + logger.info( + f"Created support bundle '{zip_name}' with {len(log_files)} log file(s) " + f"at {zip_path}." + ) + return zip_path diff --git a/src/openhound/scheduler/service.py b/src/openhound/scheduler/service.py index a1a17c3..db7101e 100644 --- a/src/openhound/scheduler/service.py +++ b/src/openhound/scheduler/service.py @@ -3,11 +3,18 @@ import time from concurrent.futures import Future, ProcessPoolExecutor from dataclasses import dataclass +from pathlib import Path import openhound.core.logging # noqa: F401 from openhound.core.clients.bloodhound_enterprise import BloodHoundEnterprise, JobStatus -from openhound.core.clients.models.jobs import Job +from openhound.core.clients.models.jobs import ( + Job, + ManagementOperation, + ManagementOperationType, +) +from openhound.core.logging import CustomLogger from openhound.core.manager import CollectorManager +from openhound.core.support_bundle import create_support_bundle from openhound.scheduler import dataflow logger = logging.getLogger(__name__) @@ -57,8 +64,9 @@ def _subprocess_collect(collector_name: str, job_id: int) -> Result: class Service: """Base scheduler service that checks for available jobs in BloodHound Enterprise. - Runs on a simple loop every X seconds and checks for available jobs. If a job is available, - a subprocess is started for the configured collector to run the DLT/OpenHound pipeline. + Runs on a simple loop every X seconds. On each cycle the management endpoint + is checked first (per BHADR-6); if a management operation is pending it is + executed before any collection job is started. """ def __init__( @@ -68,6 +76,7 @@ def __init__( token_id: str, collector_name: str, interval: int = 5, + log_base_path: Path | None = None, ): # BHE client settings self.bhe_uri = bhe_uri @@ -78,6 +87,10 @@ def __init__( # Interval how often to check for a job self.interval = interval + # Directory from which log files are collected for support bundles. + # Falls back to the platform default if not supplied explicitly. + self.log_base_path: Path = log_base_path or CustomLogger.default_platform_path() + # Stores the ID of currently running BHE job self.job_running: int | None = None @@ -99,6 +112,55 @@ def _shutdown(self) -> None: self.executor.shutdown(wait=True, cancel_futures=True) logger.info("Collection service stopped.") + def check_management(self) -> ManagementOperation | None: + """Check the BHE management endpoint for pending operations. + + Per BHADR-6 this is always called before checking for collection jobs so + that management operations (e.g. support-bundle upload) take priority. + + Returns: + The first pending support-bundle operation, or None if there are none. + + # TODO(BED-8266): Validate the response field names once + # GET /api/v2/clients/management is fully implemented in BHE. + # Endpoint path and enum values confirmed via BHADR-6 ADR (BED-8268). + """ + logger.info("Checking for management operations in BloodHound Enterprise.") + management = self.client.management_available + for operation in management.data: + if operation.type == ManagementOperationType.SUPPORT_BUNDLE: + logger.info( + f"Support bundle operation found: {operation.id}" + ) + return operation + return None + + def _send_support_bundle(self, operation: ManagementOperation) -> None: + """Collect all log files, zip them, and upload the bundle to BHE. + + Runs synchronously in the poll thread so that no collection jobs are + started while the upload is in progress (per BED-7975 acceptance criteria). + The temporary zip file is always cleaned up regardless of upload success. + + Args: + operation: The management operation that triggered this upload. + """ + logger.info( + f"Starting support bundle upload for operation {operation.id}." + ) + bundle_path = create_support_bundle(self.collector_name, self.log_base_path) + try: + self.client.upload_support_bundle(bundle_path) + logger.info( + f"Support bundle uploaded successfully for operation {operation.id}." + ) + # TODO(BED-8266): Determine whether OH needs to notify BHE that the + # management operation is complete (e.g. PATCH status to "succeeded"). + # If a completion callback is required it should be called here. + finally: + bundle_path.unlink(missing_ok=True) + logger.debug(f"Cleaned up temporary support bundle at {bundle_path}.") + def check_jobs(self) -> Job | None: """Checks BloodHound enterprise for available jobs. These can either be new jobs or jobs currently started and not finished/stopped. @@ -176,8 +238,17 @@ def _handle_completed_job(self, future: Future[Result]) -> None: self.job_running = None def _poll(self) -> None: - """Checks if jobs are completed and if a job should be run.""" - # If a job is currently running check if the future is done and handle completion + """Checks if jobs are completed and if a job or management operation should be run. + + Poll sequence (per BHADR-6): + 1. If a job future is done, handle its completion. + 2. If no job is currently running: + a. Check the management endpoint FIRST. + b. If a support-bundle operation is pending, send the bundle and + skip the job check for this cycle (job polling resumes next cycle). + c. Otherwise check for available collection jobs and start one if found. + """ + # Step 1: Handle completion of any currently running job. try: if self.future is not None and self.future.done(): self._handle_completed_job(self.future) @@ -186,12 +257,23 @@ def _poll(self) -> None: self.future = None self.job_running = None - # If no job is currently running, check for new jobs available and start the job + if self.job_running is not None: + return + + # Step 2a-b: Check management operations first. + try: + management_op = self.check_management() + if management_op: + self._send_support_bundle(management_op) + return + except Exception: + logger.exception("Error checking or executing management operations.") + + # Step 2c: No management work — check for collection jobs. try: - if self.job_running is None: - available_job = self.check_jobs() - if available_job: - self._start_job(available_job) + available_job = self.check_jobs() + if available_job: + self._start_job(available_job) except Exception: logger.exception("Error checking for or starting jobs.") diff --git a/src/scheduler.py b/src/scheduler.py index 9e37573..b87dad1 100644 --- a/src/scheduler.py +++ b/src/scheduler.py @@ -3,6 +3,7 @@ import dlt import typer +from openhound.core.logging import logger_override from openhound.scheduler.service import Service logger = logging.getLogger(__name__) @@ -22,13 +23,20 @@ def start(): token_id = dlt.secrets["destination.bloodhoundenterprise.token_id"] # Start the service - logger.info(f"Initializing service for collector '{collector_name}'.") + # logger_override.base_path is resolved during module-level setup() in logging.py + # and reflects any log_path override set in dlt config. + log_base_path = logger_override.base_path + logger.info( + f"Initializing service for collector '{collector_name}' " + f"(log_base_path={log_base_path})." + ) svc = Service( bhe_uri=bhe_uri, token_key=token_key, token_id=token_id, collector_name=collector_name, interval=interval, + log_base_path=log_base_path, ) svc.start() diff --git a/tests/test_bhe_job_scheduling.py b/tests/test_bhe_job_scheduling.py index b7e976f..193c12e 100644 --- a/tests/test_bhe_job_scheduling.py +++ b/tests/test_bhe_job_scheduling.py @@ -1,5 +1,6 @@ import gzip import json +import zipfile from concurrent.futures import Future from pathlib import Path from urllib.parse import urlsplit @@ -9,7 +10,9 @@ from fastapi.testclient import TestClient from openhound.core.clients.bloodhound_enterprise import JobStatus +from openhound.core.clients.models.jobs import ManagementOperationType from openhound.core.models.graph import Graph +from openhound.core.support_bundle import collect_log_files, create_support_bundle from openhound.scheduler import service as scheduler_service from openhound.scheduler.service import ( ExtensionNotFoundError, @@ -18,11 +21,12 @@ _subprocess_collect, ) -TEST_DATA_DIR = Path(__file__).parent / "test_data" / "api" / "jobs" +JOBS_DATA_DIR = Path(__file__).parent / "test_data" / "api" / "jobs" +MANAGEMENT_DATA_DIR = Path(__file__).parent / "test_data" / "api" / "management" -def load_json(filename: str) -> dict: - with open(TEST_DATA_DIR / filename, "r") as f: +def load_json(filename: str, data_dir: Path = JOBS_DATA_DIR) -> dict: + with open(data_dir / filename, "r") as f: return json.load(f) @@ -41,6 +45,12 @@ def mock_bloodhound_api(): app.state.start_payload = None app.state.ingested_edges = 0 app.state.ingested_nodes = 0 + # Management / support-bundle state + # TODO(BED-8266): Update management_operations fixture data once the real + # GET /api/v2/clients/management/available response shape is confirmed. + app.state.management_operations = [] # list of dicts; set per-test to inject ops + app.state.bundle_uploaded = False + app.state.bundle_content = b"" @app.get("/api/v2/jobs/available") async def jobs_available(): @@ -73,16 +83,35 @@ async def ingest(request: Request): app.state.ingested_edges += len(validate_graph.graph.edges) return {"status": "success"} + # Endpoint path confirmed via BHADR-6 ADR (BED-8268). + # TODO(BED-8266): Confirm response field names once GET /api/v2/clients/management + # is fully implemented in BHE. + @app.get("/api/v2/clients/management") + async def management_available(): + return {"data": app.state.management_operations} + + # TODO(BED-7968): Confirm endpoint path and accepted Content-Types once + # POST /api/v2/clients/management/artifacts is merged into BHE main. + @app.post("/api/v2/clients/management/artifacts") + async def upload_artifact(request: Request): + app.state.bundle_uploaded = True + app.state.bundle_content = await request.body() + return Response(status_code=202) + return TestClient(app) @pytest.fixture -def mock_service(mock_bloodhound_api, monkeypatch): - """Patches requests.requests so that our mocked BloodHound API will be used for testing the service. +def mock_service(mock_bloodhound_api, monkeypatch, tmp_path): + """Patches requests.request so that our mocked BloodHound API will be used for testing the service. + + Also provides a temporary log directory so support bundle tests have a + real path without touching the host filesystem. Args: mock_bloodhound_api (TestClient): A TestClient instance for the mocked BloodHound API. monkeypatch (pytest.MonkeyPatch): A pytest fixture for monkeypatching. + tmp_path (Path): A temporary directory provided by pytest. """ class DummyExecutor: @@ -115,6 +144,7 @@ def mock_request(method, url, **kwargs): token_id="test-id", interval=1, collector_name="openhound-faker", + log_base_path=tmp_path, ) @@ -214,3 +244,294 @@ def test_scheduler_ingest_opengraph(mock_service, mock_bloodhound_api, monkeypat assert result.job_id == 123 assert mock_bloodhound_api.app.state.ingested_nodes == 1000 assert mock_bloodhound_api.app.state.ingested_edges == 10000 + + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_support_bundle_op() -> dict: + """Return a management operation dict that represents a queued support-bundle request. + + # TODO(BED-8266): Update field names and enum values once the real + # GET /api/v2/clients/management/available response shape is confirmed. + """ + return load_json( + "management_available_with_operation.json", data_dir=MANAGEMENT_DATA_DIR + )["data"][0] + + +def _create_log_files(log_dir: Path) -> list[Path]: + """Write a small set of log files into log_dir and return their paths.""" + platform_log = log_dir / "openhound.log" + platform_log.write_text('{"level": "INFO", "message": "started"}') + + rotated_platform = log_dir / "openhound.log.2026-05-28_10" + rotated_platform.write_text('{"level": "INFO", "message": "rotated"}') + + ext_log = log_dir / "ext_faker.log" + ext_log.write_text('{"level": "INFO", "message": "collection started"}') + + return [platform_log, rotated_platform, ext_log] + + +# --------------------------------------------------------------------------- +# Management endpoint unit tests +# --------------------------------------------------------------------------- + +def test_check_management_returns_support_bundle_op(mock_service, mock_bloodhound_api): + """check_management() returns the first support-bundle operation when one is queued.""" + mock_bloodhound_api.app.state.management_operations = [_make_support_bundle_op()] + + op = mock_service.check_management() + + assert op is not None + assert op.type == ManagementOperationType.SUPPORT_BUNDLE + + +def test_check_management_returns_none_when_empty(mock_service, mock_bloodhound_api): + """check_management() returns None when the management endpoint returns an empty list.""" + mock_bloodhound_api.app.state.management_operations = [] + + assert mock_service.check_management() is None + + +# --------------------------------------------------------------------------- +# Poll loop ordering tests +# --------------------------------------------------------------------------- + +def test_poll_checks_management_before_jobs(mock_service, mock_bloodhound_api, monkeypatch): + """_poll() performs the support-bundle upload before starting a collection job. + + Both a management operation and a collection job are available; only the + bundle upload should happen this cycle (job is skipped). + """ + mock_bloodhound_api.app.state.management_operations = [_make_support_bundle_op()] + + # Stub _send_support_bundle so we don't need real log files for this test. + bundle_sent = [] + monkeypatch.setattr( + mock_service, "_send_support_bundle", lambda op: bundle_sent.append(op) + ) + + mock_service._poll() + + assert len(bundle_sent) == 1 + assert bundle_sent[0].type == ManagementOperationType.SUPPORT_BUNDLE + # Job check must NOT have been triggered this cycle. + assert mock_bloodhound_api.app.state.job_started is False + + +def test_poll_proceeds_to_jobs_when_management_empty(mock_service, mock_bloodhound_api, monkeypatch): + """_poll() falls through to job check when there are no management operations.""" + mock_bloodhound_api.app.state.management_operations = [] + + submitted = Future() + monkeypatch.setattr(mock_service.executor, "submit", lambda *args: submitted) + + mock_service._poll() + + assert mock_service.job_running == 123 + assert mock_bloodhound_api.app.state.job_started is True + + +def test_poll_proceeds_to_jobs_when_management_check_fails( + mock_service, mock_bloodhound_api, monkeypatch +): + """_poll() falls through to job check if the management endpoint raises an exception. + + This guards against BED-8266 not being deployed yet or transient errors. + """ + # Patch check_management on the instance level so that it raises. + monkeypatch.setattr( + mock_service, + "check_management", + lambda: (_ for _ in ()).throw(RuntimeError("endpoint not found")), + ) + + submitted = Future() + monkeypatch.setattr(mock_service.executor, "submit", lambda *args: submitted) + + # Should not raise; error is caught and logged. + mock_service._poll() + + assert mock_service.job_running == 123 + assert mock_bloodhound_api.app.state.job_started is True + + +def test_poll_skips_job_while_management_op_runs(mock_service, mock_bloodhound_api, monkeypatch): + """_poll() does not start a collection job in the same cycle as a bundle upload.""" + mock_bloodhound_api.app.state.management_operations = [_make_support_bundle_op()] + + monkeypatch.setattr(mock_service, "_send_support_bundle", lambda op: None) + + def fail_submit(*args, **kwargs): + raise AssertionError("submit should not be called during management cycle") + + monkeypatch.setattr(mock_service.executor, "submit", fail_submit) + + mock_service._poll() # must not raise + + +# --------------------------------------------------------------------------- +# Support-bundle creation tests +# --------------------------------------------------------------------------- + +def test_collect_log_files_finds_all_logs(tmp_path): + """collect_log_files() finds the platform log, rotated backups, and job run logs.""" + created = _create_log_files(tmp_path) + + found = collect_log_files(tmp_path) + + assert set(found) == set(created) + + +def test_collect_log_files_ignores_non_log_files(tmp_path): + """collect_log_files() does not include unrelated files in the log directory.""" + _create_log_files(tmp_path) + (tmp_path / "unrelated.txt").write_text("ignore me") + (tmp_path / "openhound.db").write_bytes(b"\x00\x01") + + found = collect_log_files(tmp_path) + + names = {f.name for f in found} + assert "unrelated.txt" not in names + assert "openhound.db" not in names + + +def test_collect_log_files_returns_empty_for_missing_dir(tmp_path): + """collect_log_files() returns an empty list when the log directory does not exist.""" + assert collect_log_files(tmp_path / "nonexistent") == [] + + +def test_create_support_bundle_filename_format(tmp_path): + """create_support_bundle() produces a zip with the correct filename format. + + Expected: _support_bundle_YYYY-MM-DD-HH-MM-SS.zip + """ + _create_log_files(tmp_path) + + bundle = create_support_bundle("openhound-faker", tmp_path) + try: + assert bundle.suffix == ".zip" + assert bundle.stem.startswith("openhound-faker_support_bundle_") + # Timestamp portion: YYYY-MM-DD-HH-MM-SS (19 chars) + timestamp_part = bundle.stem[len("openhound-faker_support_bundle_"):] + assert len(timestamp_part) == 19, f"Unexpected timestamp: {timestamp_part}" + finally: + bundle.unlink(missing_ok=True) + + +def test_create_support_bundle_contains_log_files(tmp_path): + """create_support_bundle() zips all collected log files with flat names inside the zip.""" + created = _create_log_files(tmp_path) + + bundle = create_support_bundle("openhound-faker", tmp_path) + try: + with zipfile.ZipFile(bundle, "r") as zf: + zip_names = set(zf.namelist()) + expected_names = {f.name for f in created} + assert zip_names == expected_names + finally: + bundle.unlink(missing_ok=True) + + +# --------------------------------------------------------------------------- +# End-to-end upload tests (via mock API) +# --------------------------------------------------------------------------- + +def test_send_support_bundle_uploads_to_bhe(mock_service, mock_bloodhound_api, tmp_path): + """_send_support_bundle() creates a zip and POSTs it to the artifacts endpoint.""" + _create_log_files(tmp_path) + mock_service.log_base_path = tmp_path + + from openhound.core.clients.models.jobs import ManagementOperation, ManagementOperationStatus + from datetime import datetime, UTC + op = ManagementOperation( + id="a1b2c3d4-e5f6-7890-abcd-ef1234567890", + type=ManagementOperationType.SUPPORT_BUNDLE, + status=ManagementOperationStatus.QUEUED, + created_at=datetime.now(UTC), + ) + + mock_service._send_support_bundle(op) + + assert mock_bloodhound_api.app.state.bundle_uploaded is True + # Verify BHE received a valid zip + with zipfile.ZipFile( + __import__("io").BytesIO(mock_bloodhound_api.app.state.bundle_content) + ) as zf: + assert len(zf.namelist()) > 0 + + +def test_send_support_bundle_cleans_up_temp_file(mock_service, mock_bloodhound_api, tmp_path, monkeypatch): + """_send_support_bundle() deletes the temp zip even when the upload succeeds.""" + _create_log_files(tmp_path) + mock_service.log_base_path = tmp_path + + captured_bundle_path = [] + + original_create = scheduler_service.create_support_bundle + + def capturing_create(collector_name, log_base_path): + path = original_create(collector_name, log_base_path) + captured_bundle_path.append(path) + return path + + monkeypatch.setattr(scheduler_service, "create_support_bundle", capturing_create) + + from openhound.core.clients.models.jobs import ManagementOperation, ManagementOperationStatus + from datetime import datetime, UTC + op = ManagementOperation( + id="test-op-id", + type=ManagementOperationType.SUPPORT_BUNDLE, + status=ManagementOperationStatus.QUEUED, + created_at=datetime.now(UTC), + ) + + mock_service._send_support_bundle(op) + + assert len(captured_bundle_path) == 1 + assert not captured_bundle_path[0].exists(), "Temp bundle file was not cleaned up" + + +def test_send_support_bundle_cleans_up_on_upload_failure(mock_service, monkeypatch, tmp_path): + """_send_support_bundle() deletes the temp zip even when the upload raises.""" + _create_log_files(tmp_path) + mock_service.log_base_path = tmp_path + + captured_bundle_path = [] + + original_create = scheduler_service.create_support_bundle + + def capturing_create(collector_name, log_base_path): + path = original_create(collector_name, log_base_path) + captured_bundle_path.append(path) + return path + + monkeypatch.setattr(scheduler_service, "create_support_bundle", capturing_create) + monkeypatch.setattr( + mock_service.client, + "upload_support_bundle", + lambda path: (_ for _ in ()).throw(RuntimeError("upload failed")), + ) + + from openhound.core.clients.models.jobs import ManagementOperation, ManagementOperationStatus + from datetime import datetime, UTC + op = ManagementOperation( + id="test-op-id", + type=ManagementOperationType.SUPPORT_BUNDLE, + status=ManagementOperationStatus.QUEUED, + created_at=datetime.now(UTC), + ) + + # _send_support_bundle itself re-raises; the outer _poll() catch swallows it. + # Here we just verify the cleanup happened. + try: + mock_service._send_support_bundle(op) + except RuntimeError: + pass + + assert len(captured_bundle_path) == 1 + assert not captured_bundle_path[0].exists(), "Temp bundle file was not cleaned up after failure" diff --git a/tests/test_data/api/management/management_available_empty.json b/tests/test_data/api/management/management_available_empty.json new file mode 100644 index 0000000..268c73f --- /dev/null +++ b/tests/test_data/api/management/management_available_empty.json @@ -0,0 +1,3 @@ +{ + "data": [] +} diff --git a/tests/test_data/api/management/management_available_with_operation.json b/tests/test_data/api/management/management_available_with_operation.json new file mode 100644 index 0000000..d8f6d91 --- /dev/null +++ b/tests/test_data/api/management/management_available_with_operation.json @@ -0,0 +1,10 @@ +{ + "data": [ + { + "id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890", + "type": "support_bundle", + "status": "queued", + "created_at": "2026-05-28T10:00:00Z" + } + ] +} From df580e9a080dc389a9f3122fb99cad82315c2cc2 Mon Sep 17 00:00:00 2001 From: Stran Dutton Date: Tue, 2 Jun 2026 14:21:20 -0500 Subject: [PATCH 2/4] docs: add AGENTS.md with branch naming convention --- AGENTS.md | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 AGENTS.md diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..f37953d --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,20 @@ +# Agent Guidelines + +## Branch Naming + +All branches **must** follow one of these formats or the CI pipeline will reject the push: + +``` +fix/ +patch/ +feature/ +minor/ +major/ +``` + +**Examples:** +- `feature/BED-1234-add-support-bundle-upload` +- `fix/BED-5678-correct-management-endpoint-path` +- `patch/bump-pydantic-version` + +Use lowercase `` with hyphens. Include the ticket ID when one exists. From d9fd1fe04d21ec27c56d7539aa98e8e636918281 Mon Sep 17 00:00:00 2001 From: Stran Dutton Date: Tue, 9 Jun 2026 10:27:00 -0500 Subject: [PATCH 3/4] fix: correct management poll path to /api/v2/clients/management/available (BED-8266) --- src/openhound/core/clients/bloodhound_enterprise.py | 6 +++--- src/openhound/scheduler/service.py | 4 ++-- tests/test_bhe_job_scheduling.py | 6 +++--- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/openhound/core/clients/bloodhound_enterprise.py b/src/openhound/core/clients/bloodhound_enterprise.py index 677f27b..97eff68 100644 --- a/src/openhound/core/clients/bloodhound_enterprise.py +++ b/src/openhound/core/clients/bloodhound_enterprise.py @@ -57,10 +57,10 @@ def ingest(self, data: str) -> None: @property def management_available(self) -> ManagementAvailable: - # Endpoint confirmed via BHADR-6 ADR (BED-8268). + # Path confirmed by BED-8266 ticket spec. # TODO(BED-8266): Confirm response field names match the Go handler once - # GET /api/v2/clients/management is fully implemented in BHE. - path = "/api/v2/clients/management" + # GET /api/v2/clients/management/available is fully implemented in BHE. + path = "/api/v2/clients/management/available" response = self.request(method="GET", path=path) return ManagementAvailable.model_validate(response.json()) diff --git a/src/openhound/scheduler/service.py b/src/openhound/scheduler/service.py index db7101e..86265df 100644 --- a/src/openhound/scheduler/service.py +++ b/src/openhound/scheduler/service.py @@ -122,8 +122,8 @@ def check_management(self) -> ManagementOperation | None: The first pending support-bundle operation, or None if there are none. # TODO(BED-8266): Validate the response field names once - # GET /api/v2/clients/management is fully implemented in BHE. - # Endpoint path and enum values confirmed via BHADR-6 ADR (BED-8268). + # GET /api/v2/clients/management/available is fully implemented in BHE. + # Endpoint path confirmed by BED-8266 ticket spec; enum values confirmed via BED-8268. """ logger.info("Checking for management operations in BloodHound Enterprise.") management = self.client.management_available diff --git a/tests/test_bhe_job_scheduling.py b/tests/test_bhe_job_scheduling.py index 193c12e..ef6b785 100644 --- a/tests/test_bhe_job_scheduling.py +++ b/tests/test_bhe_job_scheduling.py @@ -83,10 +83,10 @@ async def ingest(request: Request): app.state.ingested_edges += len(validate_graph.graph.edges) return {"status": "success"} - # Endpoint path confirmed via BHADR-6 ADR (BED-8268). - # TODO(BED-8266): Confirm response field names once GET /api/v2/clients/management + # Path confirmed by BED-8266 ticket spec. + # TODO(BED-8266): Confirm response field names once GET /api/v2/clients/management/available # is fully implemented in BHE. - @app.get("/api/v2/clients/management") + @app.get("/api/v2/clients/management/available") async def management_available(): return {"data": app.state.management_operations} From 7a12620f07e1bfe918d16b050acb8150d6546f2e Mon Sep 17 00:00:00 2001 From: Stran Dutton Date: Thu, 11 Jun 2026 13:49:42 -0500 Subject: [PATCH 4/4] feat: implement /start and /end lifecycle calls for management operations (BED-7975) --- .../core/clients/bloodhound_enterprise.py | 27 +++++++ src/openhound/core/clients/models/jobs.py | 5 ++ src/openhound/scheduler/service.py | 28 ++++++-- tests/test_bhe_job_scheduling.py | 70 +++++++++++++++---- 4 files changed, 110 insertions(+), 20 deletions(-) diff --git a/src/openhound/core/clients/bloodhound_enterprise.py b/src/openhound/core/clients/bloodhound_enterprise.py index 97eff68..b81519c 100644 --- a/src/openhound/core/clients/bloodhound_enterprise.py +++ b/src/openhound/core/clients/bloodhound_enterprise.py @@ -10,6 +10,8 @@ JobsEnd, JobStart, ManagementAvailable, + ManagementOperationResult, + ManagementOperationStatus, ) @@ -64,6 +66,31 @@ def management_available(self) -> ManagementAvailable: response = self.request(method="GET", path=path) return ManagementAvailable.model_validate(response.json()) + def start_operation(self, operation_id: str) -> ManagementOperationResult: + """Claim a queued management operation, moving it to running. + + Args: + operation_id: The UUID of the management operation to claim. + """ + path = "/api/v2/clients/management/start" + body = json.dumps({"id": operation_id}) + response = self.request(method="POST", path=path, body=body.encode()) + return ManagementOperationResult.model_validate(response.json()) + + def end_operation( + self, operation_id: str, status: ManagementOperationStatus + ) -> ManagementOperationResult: + """Mark a running management operation as succeeded, failed, or canceled. + + Args: + operation_id: The UUID of the management operation to complete. + status: The final status — typically SUCCEEDED or FAILED. + """ + path = "/api/v2/clients/management/end" + body = json.dumps({"id": operation_id, "status": status}) + response = self.request(method="POST", path=path, body=body.encode()) + return ManagementOperationResult.model_validate(response.json()) + def upload_support_bundle(self, bundle_path: Path) -> None: """Upload a support bundle zip file to BHE. diff --git a/src/openhound/core/clients/models/jobs.py b/src/openhound/core/clients/models/jobs.py index ad2ed29..73d956b 100644 --- a/src/openhound/core/clients/models/jobs.py +++ b/src/openhound/core/clients/models/jobs.py @@ -88,3 +88,8 @@ class ManagementOperation(BaseModel): class ManagementAvailable(BaseModel): data: list[ManagementOperation] + + +class ManagementOperationResult(BaseModel): + """Response wrapper returned by POST /start and POST /end.""" + data: ManagementOperation diff --git a/src/openhound/scheduler/service.py b/src/openhound/scheduler/service.py index 86265df..62a03a6 100644 --- a/src/openhound/scheduler/service.py +++ b/src/openhound/scheduler/service.py @@ -10,6 +10,7 @@ from openhound.core.clients.models.jobs import ( Job, ManagementOperation, + ManagementOperationStatus, ManagementOperationType, ) from openhound.core.logging import CustomLogger @@ -136,7 +137,7 @@ def check_management(self) -> ManagementOperation | None: return None def _send_support_bundle(self, operation: ManagementOperation) -> None: - """Collect all log files, zip them, and upload the bundle to BHE. + """Claim the operation, collect log files, zip and upload them, then report completion. Runs synchronously in the poll thread so that no collection jobs are started while the upload is in progress (per BED-7975 acceptance criteria). @@ -148,18 +149,31 @@ def _send_support_bundle(self, operation: ManagementOperation) -> None: logger.info( f"Starting support bundle upload for operation {operation.id}." ) - bundle_path = create_support_bundle(self.collector_name, self.log_base_path) + self.client.start_operation(operation.id) + + bundle_path = None try: + bundle_path = create_support_bundle(self.collector_name, self.log_base_path) self.client.upload_support_bundle(bundle_path) + self.client.end_operation(operation.id, ManagementOperationStatus.SUCCEEDED) logger.info( f"Support bundle uploaded successfully for operation {operation.id}." ) - # TODO(BED-8266): Determine whether OH needs to notify BHE that the - # management operation is complete (e.g. PATCH status to "succeeded"). - # If a completion callback is required it should be called here. + except Exception: + logger.exception( + f"Support bundle upload failed for operation {operation.id}." + ) + try: + self.client.end_operation(operation.id, ManagementOperationStatus.FAILED) + except Exception: + logger.exception( + f"Failed to mark operation {operation.id} as failed in BHE." + ) + raise finally: - bundle_path.unlink(missing_ok=True) - logger.debug(f"Cleaned up temporary support bundle at {bundle_path}.") + if bundle_path is not None: + bundle_path.unlink(missing_ok=True) + logger.debug(f"Cleaned up temporary support bundle at {bundle_path}.") def check_jobs(self) -> Job | None: """Checks BloodHound enterprise for available jobs. These can either be new jobs or jobs currently started and not finished/stopped. diff --git a/tests/test_bhe_job_scheduling.py b/tests/test_bhe_job_scheduling.py index ef6b785..532d13b 100644 --- a/tests/test_bhe_job_scheduling.py +++ b/tests/test_bhe_job_scheduling.py @@ -46,11 +46,13 @@ def mock_bloodhound_api(): app.state.ingested_edges = 0 app.state.ingested_nodes = 0 # Management / support-bundle state - # TODO(BED-8266): Update management_operations fixture data once the real - # GET /api/v2/clients/management/available response shape is confirmed. app.state.management_operations = [] # list of dicts; set per-test to inject ops app.state.bundle_uploaded = False app.state.bundle_content = b"" + app.state.operation_started = False + app.state.operation_start_payload = None + app.state.operation_ended = False + app.state.operation_end_payload = None @app.get("/api/v2/jobs/available") async def jobs_available(): @@ -83,21 +85,28 @@ async def ingest(request: Request): app.state.ingested_edges += len(validate_graph.graph.edges) return {"status": "success"} - # Path confirmed by BED-8266 ticket spec. - # TODO(BED-8266): Confirm response field names once GET /api/v2/clients/management/available - # is fully implemented in BHE. @app.get("/api/v2/clients/management/available") async def management_available(): return {"data": app.state.management_operations} - # TODO(BED-7968): Confirm endpoint path and accepted Content-Types once - # POST /api/v2/clients/management/artifacts is merged into BHE main. + @app.post("/api/v2/clients/management/start") + async def start_operation(body: dict): + app.state.operation_started = True + app.state.operation_start_payload = body + return {"data": {"id": body.get("id"), "type": "support_bundle", "status": "running", "created_at": "2026-01-01T00:00:00Z"}} + @app.post("/api/v2/clients/management/artifacts") async def upload_artifact(request: Request): app.state.bundle_uploaded = True app.state.bundle_content = await request.body() return Response(status_code=202) + @app.post("/api/v2/clients/management/end") + async def end_operation(body: dict): + app.state.operation_ended = True + app.state.operation_end_payload = body + return {"data": {"id": body.get("id"), "type": "support_bundle", "status": body.get("status"), "created_at": "2026-01-01T00:00:00Z"}} + return TestClient(app) @@ -442,7 +451,7 @@ def test_create_support_bundle_contains_log_files(tmp_path): # --------------------------------------------------------------------------- def test_send_support_bundle_uploads_to_bhe(mock_service, mock_bloodhound_api, tmp_path): - """_send_support_bundle() creates a zip and POSTs it to the artifacts endpoint.""" + """_send_support_bundle() claims the op, uploads a zip, and reports success.""" _create_log_files(tmp_path) mock_service.log_base_path = tmp_path @@ -457,8 +466,11 @@ def test_send_support_bundle_uploads_to_bhe(mock_service, mock_bloodhound_api, t mock_service._send_support_bundle(op) + assert mock_bloodhound_api.app.state.operation_started is True + assert mock_bloodhound_api.app.state.operation_start_payload == {"id": op.id} assert mock_bloodhound_api.app.state.bundle_uploaded is True - # Verify BHE received a valid zip + assert mock_bloodhound_api.app.state.operation_ended is True + assert mock_bloodhound_api.app.state.operation_end_payload == {"id": op.id, "status": "succeeded"} with zipfile.ZipFile( __import__("io").BytesIO(mock_bloodhound_api.app.state.bundle_content) ) as zf: @@ -496,8 +508,8 @@ def capturing_create(collector_name, log_base_path): assert not captured_bundle_path[0].exists(), "Temp bundle file was not cleaned up" -def test_send_support_bundle_cleans_up_on_upload_failure(mock_service, monkeypatch, tmp_path): - """_send_support_bundle() deletes the temp zip even when the upload raises.""" +def test_send_support_bundle_cleans_up_on_upload_failure(mock_service, mock_bloodhound_api, monkeypatch, tmp_path): + """_send_support_bundle() deletes the temp zip and reports failure when upload raises.""" _create_log_files(tmp_path) mock_service.log_base_path = tmp_path @@ -526,8 +538,6 @@ def capturing_create(collector_name, log_base_path): created_at=datetime.now(UTC), ) - # _send_support_bundle itself re-raises; the outer _poll() catch swallows it. - # Here we just verify the cleanup happened. try: mock_service._send_support_bundle(op) except RuntimeError: @@ -535,3 +545,37 @@ def capturing_create(collector_name, log_base_path): assert len(captured_bundle_path) == 1 assert not captured_bundle_path[0].exists(), "Temp bundle file was not cleaned up after failure" + assert mock_bloodhound_api.app.state.operation_started is True + assert mock_bloodhound_api.app.state.operation_ended is True + assert mock_bloodhound_api.app.state.operation_end_payload == {"id": op.id, "status": "failed"} + + +def test_send_support_bundle_reports_failed_when_end_also_fails(mock_service, mock_bloodhound_api, monkeypatch, tmp_path): + """_send_support_bundle() still cleans up and re-raises even if /end itself fails.""" + _create_log_files(tmp_path) + mock_service.log_base_path = tmp_path + + monkeypatch.setattr( + mock_service.client, + "upload_support_bundle", + lambda path: (_ for _ in ()).throw(RuntimeError("upload failed")), + ) + monkeypatch.setattr( + mock_service.client, + "end_operation", + lambda op_id, status: (_ for _ in ()).throw(RuntimeError("end failed")), + ) + + from openhound.core.clients.models.jobs import ManagementOperation, ManagementOperationStatus + from datetime import datetime, UTC + op = ManagementOperation( + id="test-op-id", + type=ManagementOperationType.SUPPORT_BUNDLE, + status=ManagementOperationStatus.QUEUED, + created_at=datetime.now(UTC), + ) + + with pytest.raises(RuntimeError, match="upload failed"): + mock_service._send_support_bundle(op) + + assert mock_bloodhound_api.app.state.operation_started is True