diff --git a/README.md b/README.md index 1a82b02..f28bf00 100644 --- a/README.md +++ b/README.md @@ -82,6 +82,31 @@ uv run hyrule-knowledge observe --profile safe-health `.github/workflows/ingest.yml` runs nightly and on manual dispatch. When source knowledge changes, it opens a refresh PR. The PR must be reviewed before merge; generated knowledge is never auto-merged. +## Knowledge Loop agent + +`hyrule-knowledge loop --once` is the governed producer loop for the knowledge repo. It is separate from the read-only Knowledge MCP server: the MCP serves context; the loop refreshes, validates, enriches/imports when explicitly enabled, and opens reviewable PRs. + +Phase 1 is deterministic by default: + +```bash +uv run hyrule-knowledge loop --once --dry-run +``` + +It acquires a singleton lock, updates a per-day ledger, runs ingest plus the standard validation/export/eval/ledger/secret-scan gates, and reports JSON. Set `--create-pr` or `HYRULE_KNOWLEDGE_LOOP_CREATE_PR=1` to push a branch and open a PR when changes exist. + +Phase 2 is opt-in producer work: + +```bash +uv run hyrule-knowledge loop --once \ + --create-pr \ + --enrich-target infrastructure \ + --enrich-live \ + --max-openrouter-calls-per-day 1 \ + --learning-event /var/lib/engineering-loop/learning-events +``` + +Live enrichment requires the Knowledge Loop credential scope (`OPENROUTER_API_KEY` rendered from `kv/knowledge-loop`) and an explicit daily OpenRouter call budget. Learning-event inputs may be files, directories, or globs; imported artifacts remain review-gated ledger proposals. + ## Governed agent consumption Humans can browse `okf/index.md`. Agents should first read `okf/index.md`, then directory indexes, then individual concepts. Machine consumers should prefer `exports/knowledge.sqlite` and the JSONL exports: diff --git a/docs/plans/2026-06-23-knowledge-loop-agent-phases.md b/docs/plans/2026-06-23-knowledge-loop-agent-phases.md new file mode 100644 index 0000000..a842669 --- /dev/null +++ b/docs/plans/2026-06-23-knowledge-loop-agent-phases.md @@ -0,0 +1,69 @@ +# Knowledge Loop Agent — Phases 1 and 2 + +## Intent + +Turn `AS215932/knowledge` from a manual/tooling repository plus read-only MCP server into a governed autonomous Knowledge Loop agent, comparable in shape to Engineering Loop and NOC Agent. + +The read-only Knowledge MCP remains a serving dependency for Engineering Loop. The Knowledge Loop is a separate producer agent that refreshes, validates, enriches, imports learning artifacts, and opens reviewable PRs/issues. It must never auto-merge and must not promote generated synthesis to canonical truth without human review. + +## Runtime boundaries + +- Run on `loop` as a dedicated unprivileged `knowledge-loop` runtime. +- Use a systemd one-shot service plus timer. +- Use Vault-backed runtime credentials scoped to Knowledge Loop only. +- Do not mount fleet SSH credentials, Docker socket, app runtime credentials, wallet data, or broad Vault access. +- Do not write directly to `main`. +- All repository mutation exits as a PR. +- All generated synthesis remains advisory unless reviewed/promoted by humans. + +## Phase 1 — deterministic loop skeleton + +Build `hyrule-knowledge loop --once` with: + +1. Singleton state lock under the Knowledge Loop state directory. +2. Per-day ledger with cycle and PR counters. +3. Deterministic refresh and validation commands: + - `hyrule-knowledge ingest` + - `ruff check src tests` + - `mypy --strict src` + - `pytest` + - `hyrule-knowledge validate okf` + - `hyrule-knowledge quality --check` + - `hyrule-knowledge export --check` + - `hyrule-knowledge eval --check` + - `hyrule-knowledge ledger --check` + - `hyrule-knowledge ledger lifecycle --check` + - `hyrule-knowledge scan-secrets okf exports reports evals ledger schema` +4. Optional dry-run enrichment plumbing for proving source-pack/writer paths without provider calls. +5. Git branch/commit/push/PR creation when there are changes. +6. JSON report output for every cycle. +7. Optional passive Icinga heartbeat. +8. Tests for lock, budget, command sequencing, PR gating, and heartbeat behavior. + +## Phase 2 — Knowledge production work + +Extend the loop with opt-in bounded producer work: + +1. OpenRouter LLM enrichment targets using the Knowledge Loop credential scope. +2. Learning-event import from sanitized artifact directories. +3. Budget gates for provider calls and imported artifacts. +4. Validation/export/eval/secret-scan after phase-2 mutation. +5. PR output for enrichment/import changes. +6. Tests that prove phase-2 command sequencing and budget behavior without live provider calls. + +## Deployment follow-up + +Add NetOps source-managed runtime: + +- `knowledge_loop` Ansible role. +- Vault Agent template for `/etc/knowledge-loop/knowledge-loop.env`. +- systemd service/timer. +- host_vars on `loop` for disabled-by-default canary rollout. +- monitoring passive check for loop freshness/status. + +## Human gates + +- Humans review Knowledge Loop PRs. +- Humans merge Knowledge PRs. +- Humans decide curated A1 promotion. +- Deployment of updated Knowledge MCP remains a NetOps pin-bump/apply path. diff --git a/src/hyrule_knowledge/cli.py b/src/hyrule_knowledge/cli.py index 8adfbe6..89a0cb1 100644 --- a/src/hyrule_knowledge/cli.py +++ b/src/hyrule_knowledge/cli.py @@ -26,6 +26,8 @@ from .evals import eval_check, load_eval_cases, write_eval_reports from .exporter import exports_match, write_exports from .github_source import collect_snapshot +from .knowledge_loop import KnowledgeLoopConfig +from .knowledge_loop import run_once as run_knowledge_loop_once from .learning_ledger import ( LearningLedgerError, import_learning_events, @@ -767,6 +769,69 @@ def cmd_eval(args: argparse.Namespace) -> int: return 0 +def _truthy(value: str | None) -> bool: + return value is not None and value.strip().lower() in {"1", "true", "yes", "on"} + + +def _env_paths(name: str) -> list[Path]: + value = os.environ.get(name, "").strip() + if not value: + return [] + return [Path(item) for item in value.split(os.pathsep) if item] + + +def _env_csv(name: str) -> list[str]: + value = os.environ.get(name, "").strip() + if not value: + return [] + return [item.strip() for item in value.split(",") if item.strip()] + + +def _env_int(name: str, default: int) -> int: + value = os.environ.get(name, "").strip() + if not value: + return default + try: + return int(value) + except ValueError: + return default + + +def _env_str(name: str, default: str) -> str: + value = os.environ.get(name, "").strip() + return value or default + + +def cmd_loop(args: argparse.Namespace) -> int: + if not args.once: + print("knowledge loop currently requires --once", file=sys.stderr) + return 1 + config = KnowledgeLoopConfig( + repo_path=Path(args.repo_path), + config_path=Path(args.config), + state_dir=Path((args.state_dir or "").strip() or _env_str("HYRULE_KNOWLEDGE_LOOP_STATE_DIR", ".cache/hyrule-knowledge/loop-state")), + branch_prefix=args.branch_prefix, + base_branch=args.base_branch, + create_pr=bool(args.create_pr or _truthy(os.environ.get("HYRULE_KNOWLEDGE_LOOP_CREATE_PR"))), + dry_run=bool(args.dry_run), + git_user_name=args.git_user_name, + git_user_email=args.git_user_email, + max_cycles_per_day=args.max_cycles_per_day, + max_prs_per_day=args.max_prs_per_day, + max_openrouter_calls_per_day=args.max_openrouter_calls_per_day, + max_learning_events_per_day=args.max_learning_events_per_day, + enrich_targets=tuple(args.enrich_target or _env_csv("HYRULE_KNOWLEDGE_LOOP_ENRICH_TARGETS")), + enrich_live=bool(args.enrich_live or _truthy(os.environ.get("HYRULE_KNOWLEDGE_LOOP_ENRICH_LIVE"))), + dry_run_enrich_targets=tuple(args.dry_run_enrich_target or _env_csv("HYRULE_KNOWLEDGE_LOOP_DRY_RUN_ENRICH_TARGETS")), + learning_event_paths=tuple([*(Path(path) for path in (args.learning_event or [])), *_env_paths("HYRULE_KNOWLEDGE_LOOP_LEARNING_EVENTS")]), + replace_learning_events=bool(args.replace_learning_events), + run_validation=not args.skip_validation, + ) + report = run_knowledge_loop_once(config) + print_json(report.as_json()) + return 0 if report.outcome in {"published", "changes_detected", "idle", "locked"} else 1 + + def cmd_mcp(args: argparse.Namespace) -> int: from .mcp_server import main as mcp_main @@ -929,6 +994,28 @@ def build_parser() -> argparse.ArgumentParser: ledger.add_argument("--replace", action="store_true", help="replace existing proposed event on ledger import") ledger.set_defaults(func=cmd_ledger) + loop = subparsers.add_parser("loop") + loop.add_argument("--once", action="store_true", help="run exactly one Knowledge Loop cycle") + loop.add_argument("--repo-path", default=".") + loop.add_argument("--state-dir") + loop.add_argument("--branch-prefix", default="bot/knowledge-loop") + loop.add_argument("--base-branch", default="main") + loop.add_argument("--create-pr", action="store_true", help="push a branch and open a PR when changes are detected") + loop.add_argument("--dry-run", action="store_true", help="run the cycle but do not commit/push/open PR") + loop.add_argument("--skip-validation", action="store_true") + loop.add_argument("--git-user-name", default=os.environ.get("HYRULE_KNOWLEDGE_LOOP_GIT_USER_NAME", "hyrule-knowledge-loop[bot]")) + loop.add_argument("--git-user-email", default=os.environ.get("HYRULE_KNOWLEDGE_LOOP_GIT_USER_EMAIL", "knowledge-loop@as215932.net")) + loop.add_argument("--max-cycles-per-day", type=int, default=_env_int("HYRULE_KNOWLEDGE_LOOP_MAX_CYCLES_PER_DAY", 1)) + loop.add_argument("--max-prs-per-day", type=int, default=_env_int("HYRULE_KNOWLEDGE_LOOP_MAX_PRS_PER_DAY", 1)) + loop.add_argument("--dry-run-enrich-target", action="append", help="phase-1 enrichment plumbing target; never calls a provider") + loop.add_argument("--enrich-target", action="append", help="phase-2 enrichment target; dry-run unless --enrich-live is set") + loop.add_argument("--enrich-live", action="store_true", help="allow live OpenRouter enrichment for --enrich-target") + loop.add_argument("--max-openrouter-calls-per-day", type=int, default=_env_int("HYRULE_KNOWLEDGE_LOOP_MAX_OPENROUTER_CALLS_PER_DAY", 0)) + loop.add_argument("--learning-event", action="append", help="phase-2 learning-event file, directory, or glob to import") + loop.add_argument("--replace-learning-events", action="store_true") + loop.add_argument("--max-learning-events-per-day", type=int, default=_env_int("HYRULE_KNOWLEDGE_LOOP_MAX_LEARNING_EVENTS_PER_DAY", 100)) + loop.set_defaults(func=cmd_loop) + mcp = subparsers.add_parser("mcp") mcp.add_argument("--transport", default="stdio", choices=["stdio", "sse", "streamable-http", "http"]) mcp.add_argument("--db", help="SQLite export path (default: configured exports/knowledge.sqlite)") diff --git a/src/hyrule_knowledge/knowledge_loop.py b/src/hyrule_knowledge/knowledge_loop.py new file mode 100644 index 0000000..cc44c27 --- /dev/null +++ b/src/hyrule_knowledge/knowledge_loop.py @@ -0,0 +1,838 @@ +"""Governed Knowledge Loop daemon. + +This is the producer-side counterpart to the read-only Knowledge MCP server. One +cycle refreshes/validates the OKF bundle and, when explicitly enabled, performs +bounded enrichment or learning-event imports before opening a reviewable PR. +""" + +from __future__ import annotations + +import json +import os +import re +import subprocess +import sys +import time +import urllib.request +from base64 import b64encode +from collections.abc import Callable, Mapping, Sequence +from dataclasses import dataclass, field +from datetime import UTC, datetime +from pathlib import Path +from typing import Any, Literal + +from .learning_ledger import load_learning_events + +DEFAULT_LOCK_MAX_AGE_SECONDS = 2 * 60 * 60 + +# Generated/exported paths the loop is allowed to refresh and republish. +_MANAGED_DIRTY_PREFIXES = ("okf", "exports", "reports", "evals", "ledger", "schema") +# Binary artifacts that bake in wall-clock state (SQLite `datetime('now')`) and are +# excluded from byte comparison everywhere, so they are always treated as volatile. +_VOLATILE_BINARY_PATHS = frozenset({"exports/knowledge.sqlite"}) +# Generation-metadata timestamp fields stamped by `utc_now()`/`utc_now_iso()` at +# write time. Scoped by key (not a blanket ISO match) so a corrected source field +# such as a learning event's `event_time` under `--replace-learning-events` is +# never mistaken for timestamp churn and silently discarded. +_VOLATILE_TS_KEYS = ( + "last_verified_at", + "generated_at", + "extracted_at", + "claim_extracted_at", + "requested_at", + "created_at", +) +_VOLATILE_TS_RE = re.compile( + r'("?(?:' + "|".join(_VOLATILE_TS_KEYS) + r')"?\s*[:=]\s*["\']?)' + r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?(?:Z|[+-]\d{2}:\d{2})" +) +# `run_id` values: the loop wall-clock `local-YYYYMMDDHHMMSS` seed plus any value +# derived from it (e.g. the eval run-id hash that seeds off the manifest run id). +_RUN_ID_VALUE_RE = re.compile(r'("run_id"\s*:\s*)"[^"]*"') +_LOCAL_RUN_ID_RE = re.compile(r"local-\d{14}") + +LoopOutcome = Literal[ + "published", + "changes_detected", + "idle", + "locked", + "over_budget", + "error", +] + + +@dataclass(frozen=True) +class CommandResult: + argv: tuple[str, ...] + returncode: int + stdout: str = "" + stderr: str = "" + + +CommandRunner = Callable[[Sequence[str], Path, Mapping[str, str] | None], CommandResult] +Poster = Callable[[str, dict[str, Any]], None] + + +@dataclass(frozen=True) +class KnowledgeLoopConfig: + repo_path: Path = Path(".") + config_path: Path = Path("knowledge.config.yml") + state_dir: Path = Path(".cache/hyrule-knowledge/loop-state") + branch_prefix: str = "bot/knowledge-loop" + base_branch: str = "main" + git_user_name: str = "hyrule-knowledge-loop[bot]" + git_user_email: str = "knowledge-loop@as215932.net" + create_pr: bool = False + dry_run: bool = False + max_cycles_per_day: int = 1 + max_prs_per_day: int = 1 + max_openrouter_calls_per_day: int = 0 + max_learning_events_per_day: int = 100 + enrich_targets: tuple[str, ...] = () + enrich_live: bool = False + dry_run_enrich_targets: tuple[str, ...] = () + learning_event_paths: tuple[Path, ...] = () + replace_learning_events: bool = False + run_validation: bool = True + run_id: str | None = None + + def resolved_repo_path(self) -> Path: + return self.repo_path.expanduser().resolve() + + def resolved_state_dir(self) -> Path: + if self.state_dir.is_absolute(): + return self.state_dir + return self.resolved_repo_path() / self.state_dir + + +@dataclass +class KnowledgeLoopReport: + outcome: LoopOutcome + detail: str = "" + run_id: str = "" + branch: str | None = None + pr_url: str | None = None + changed: bool = False + commands: list[dict[str, Any]] = field(default_factory=list) + ledger: dict[str, Any] = field(default_factory=dict) + learning_events_seen: int = 0 + openrouter_calls_requested: int = 0 + openrouter_calls_run: int = 0 + wall_clock_seconds: float = 0.0 + notifications: list[str] = field(default_factory=list) + + def as_json(self) -> dict[str, Any]: + return { + "outcome": self.outcome, + "detail": self.detail, + "run_id": self.run_id, + "branch": self.branch, + "pr_url": self.pr_url, + "changed": self.changed, + "commands": self.commands, + "ledger": self.ledger, + "learning_events_seen": self.learning_events_seen, + "openrouter_calls_requested": self.openrouter_calls_requested, + "openrouter_calls_run": self.openrouter_calls_run, + "wall_clock_seconds": round(self.wall_clock_seconds, 1), + "notifications": self.notifications, + } + + +class KnowledgeLoopError(RuntimeError): + """Raised when a Knowledge Loop cycle fails.""" + + +# --- lock and ledger ----------------------------------------------------- + + +def today() -> str: + return datetime.now(UTC).strftime("%Y-%m-%d") + + +def default_run_id() -> str: + return os.environ.get("GITHUB_RUN_ID") or f"local-{datetime.now(UTC).strftime('%Y%m%d%H%M%S')}" + + +def _pid_alive(pid: int) -> bool: + try: + os.kill(pid, 0) + except OSError: + return False + return True + + +def acquire_lock(state_dir: Path, *, max_age_seconds: int = DEFAULT_LOCK_MAX_AGE_SECONDS) -> Path | None: + state_dir.mkdir(parents=True, exist_ok=True) + lock_path = state_dir / "knowledge-loop.lock" + payload = json.dumps({"pid": os.getpid(), "started_at": time.time()}) + for _attempt in range(2): + try: + fd = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o600) + except FileExistsError: + if _lock_is_live(lock_path, max_age_seconds=max_age_seconds): + return None + try: + lock_path.unlink() + except FileNotFoundError: + pass + continue + with os.fdopen(fd, "w", encoding="utf-8") as handle: + handle.write(payload) + return lock_path + return None + + +def _lock_is_live(lock_path: Path, *, max_age_seconds: int) -> bool: + try: + holder = json.loads(lock_path.read_text(encoding="utf-8")) + pid = int(holder.get("pid", -1)) + except (json.JSONDecodeError, ValueError): + try: + return (time.time() - lock_path.stat().st_mtime) < max_age_seconds + except OSError: + return False + except OSError: + return False + if pid > 0 and _pid_alive(pid): + return True + return False + + +def release_lock(lock_path: Path | None) -> None: + if lock_path is not None: + lock_path.unlink(missing_ok=True) + + +def _empty_ledger() -> dict[str, Any]: + return { + "cycles": 0, + "prs_opened": 0, + "openrouter_calls": 0, + "learning_events_imported": 0, + } + + +def _ledger_path(state_dir: Path, day: str) -> Path: + return state_dir / f"ledger-{day}.json" + + +def load_ledger(state_dir: Path, day: str | None = None) -> dict[str, Any]: + day = day or today() + path = _ledger_path(state_dir, day) + if not path.exists(): + return _empty_ledger() + try: + loaded = json.loads(path.read_text(encoding="utf-8")) + except (json.JSONDecodeError, OSError): + return _empty_ledger() + if not isinstance(loaded, dict): + return _empty_ledger() + merged = _empty_ledger() + for key in merged: + merged[key] = loaded.get(key, merged[key]) + return merged + + +def update_ledger( + state_dir: Path, + day: str | None = None, + *, + cycles: int = 0, + prs_opened: int = 0, + openrouter_calls: int = 0, + learning_events_imported: int = 0, +) -> dict[str, Any]: + day = day or today() + ledger = load_ledger(state_dir, day) + ledger["cycles"] = int(ledger.get("cycles", 0)) + cycles + ledger["prs_opened"] = int(ledger.get("prs_opened", 0)) + prs_opened + ledger["openrouter_calls"] = int(ledger.get("openrouter_calls", 0)) + openrouter_calls + ledger["learning_events_imported"] = int(ledger.get("learning_events_imported", 0)) + learning_events_imported + state_dir.mkdir(parents=True, exist_ok=True) + _ledger_path(state_dir, day).write_text(json.dumps(ledger, indent=2, sort_keys=True) + "\n", encoding="utf-8") + return ledger + + +# --- cycle --------------------------------------------------------------- + + +def run_once( + config: KnowledgeLoopConfig, + *, + runner: CommandRunner | None = None, + poster: Poster | None = None, +) -> KnowledgeLoopReport: + start = time.time() + repo = config.resolved_repo_path() + state_dir = config.resolved_state_dir() + run_id = config.run_id or default_run_id() + report = KnowledgeLoopReport(outcome="idle", run_id=run_id) + active_runner = runner or _default_runner + phase2_learning_events_to_charge = 0 + phase2_learning_charged = False + phase2_openrouter_charged = False + lock = acquire_lock(state_dir) + if lock is None: + report.outcome = "locked" + report.detail = "another Knowledge Loop cycle holds the lock" + report.wall_clock_seconds = time.time() - start + _safe_notify(report, poster=poster) + return report + try: + ledger = load_ledger(state_dir) + gate_reason = _budget_gate(config, ledger) + if gate_reason: + report.outcome = "over_budget" + report.detail = gate_reason + report.ledger = ledger + return report + + learning_event_paths = _resolve_learning_event_paths(config, repo) + learning_event_count = _learning_event_count(learning_event_paths) + report.learning_events_seen = learning_event_count + openrouter_requested = len(config.enrich_targets) if config.enrich_live else 0 + report.openrouter_calls_requested = openrouter_requested + phase2_gate = _phase2_budget_gate(config, ledger, learning_event_count, openrouter_requested) + if phase2_gate: + report.outcome = "over_budget" + report.detail = phase2_gate + report.ledger = ledger + return report + + _prepare_base_checkout(repo, config, active_runner, report) + _run_phase1_refresh(repo, config, active_runner, report) + phase2_learning_events_to_charge = learning_event_count + imported_count = _run_phase2_imports(repo, config, learning_event_paths, active_runner, report) + if imported_count: + report.ledger = update_ledger(state_dir, learning_events_imported=imported_count) + phase2_learning_charged = True + openrouter_run = _run_phase2_enrichment(repo, config, active_runner, report) + if openrouter_run: + report.ledger = update_ledger(state_dir, openrouter_calls=openrouter_run) + phase2_openrouter_charged = True + + if _git_dirty(repo, state_dir, active_runner, report): + _run_generation_refresh(repo, config, active_runner, report) + + if config.run_validation: + _run_validation(repo, config, active_runner, report) + + _drop_volatile_only_changes(repo, state_dir, active_runner, report) + report.changed = _git_dirty(repo, state_dir, active_runner, report) + if not report.changed: + report.outcome = "idle" + report.detail = "no knowledge changes detected" + report.ledger = update_ledger( + state_dir, + cycles=1, + openrouter_calls=0, + learning_events_imported=0, + ) + return report + + pr_gate_reason = _pr_budget_gate(config, load_ledger(state_dir)) + if pr_gate_reason: + report.outcome = "over_budget" + report.detail = pr_gate_reason + report.ledger = update_ledger(state_dir, cycles=1) + return report + + branch, pr_url = _publish_pr(repo, config, active_runner, report, run_id=run_id) + report.branch = branch + report.pr_url = pr_url + if pr_url: + report.outcome = "published" + report.detail = "opened Knowledge Loop refresh PR" + prs_opened = 1 + else: + report.outcome = "changes_detected" + report.detail = "changes detected; PR creation disabled or dry-run" + prs_opened = 0 + report.ledger = update_ledger( + state_dir, + cycles=1, + prs_opened=prs_opened, + openrouter_calls=0, + learning_events_imported=0, + ) + return report + except Exception as exc: + if phase2_learning_events_to_charge and not phase2_learning_charged: + update_ledger(state_dir, learning_events_imported=phase2_learning_events_to_charge) + if report.openrouter_calls_run and not phase2_openrouter_charged: + update_ledger(state_dir, openrouter_calls=report.openrouter_calls_run) + report.outcome = "error" + report.detail = str(exc)[:800] + # Count the failed cycle so a broken ingest/validation run is throttled by the + # daily cycle budget instead of being retried by the timer all day. + report.ledger = update_ledger(state_dir, cycles=1) + return report + finally: + release_lock(lock) + report.wall_clock_seconds = time.time() - start + _safe_notify(report, poster=poster) + + +def _budget_gate(config: KnowledgeLoopConfig, ledger: dict[str, Any]) -> str | None: + if int(ledger.get("cycles", 0)) >= config.max_cycles_per_day: + return f"daily cycle budget reached ({config.max_cycles_per_day})" + return None + + +def _pr_budget_gate(config: KnowledgeLoopConfig, ledger: dict[str, Any]) -> str | None: + if not config.create_pr or config.dry_run: + return None + if int(ledger.get("prs_opened", 0)) >= config.max_prs_per_day: + return f"daily PR budget reached ({config.max_prs_per_day})" + return None + + +def _phase2_budget_gate( + config: KnowledgeLoopConfig, + ledger: dict[str, Any], + learning_event_count: int, + openrouter_requested: int, +) -> str | None: + if openrouter_requested: + used = int(ledger.get("openrouter_calls", 0)) + remaining = config.max_openrouter_calls_per_day - used + if openrouter_requested > remaining: + return f"daily OpenRouter budget would be exceeded ({used}+{openrouter_requested}/{config.max_openrouter_calls_per_day})" + if learning_event_count: + used = int(ledger.get("learning_events_imported", 0)) + remaining = config.max_learning_events_per_day - used + if learning_event_count > remaining: + return f"daily learning-event import budget would be exceeded ({used}+{learning_event_count}/{config.max_learning_events_per_day})" + return None + + +def _learning_event_count(paths: tuple[Path, ...]) -> int: + if not paths: + return 0 + return len(load_learning_events(list(paths))) + + +def _resolve_learning_event_paths(config: KnowledgeLoopConfig, repo: Path) -> tuple[Path, ...]: + """Resolve relative learning-event inputs against the repo checkout. + + The budget preflight counts events in this supervisor process while the actual + `ledger import` runs as a subprocess with ``cwd=repo``. Resolving relative paths + against ``repo`` keeps both sides pointed at the same files when the loop is + invoked from outside the checkout. + """ + resolved: list[Path] = [] + for path in config.learning_event_paths: + candidate = path.expanduser() + if not candidate.is_absolute(): + candidate = repo / candidate + resolved.append(candidate) + return tuple(resolved) + + +def _prepare_base_checkout( + repo: Path, + config: KnowledgeLoopConfig, + runner: CommandRunner, + report: KnowledgeLoopReport, +) -> None: + if not config.create_pr or config.dry_run: + return + _run_checked(["git", "fetch", "origin", config.base_branch], repo, runner, report) + _run_checked(["git", "checkout", config.base_branch], repo, runner, report) + _run_checked(["git", "reset", "--hard", f"origin/{config.base_branch}"], repo, runner, report) + + +def _run_phase1_refresh( + repo: Path, + config: KnowledgeLoopConfig, + runner: CommandRunner, + report: KnowledgeLoopReport, +) -> None: + _run_checked(_knowledge_cmd(config, "ingest"), repo, runner, report) + for target in config.dry_run_enrich_targets: + _run_checked(_knowledge_cmd(config, "enrich", "--target", target, "--dry-run"), repo, runner, report) + + +def _run_phase2_imports( + repo: Path, + config: KnowledgeLoopConfig, + learning_event_paths: tuple[Path, ...], + runner: CommandRunner, + report: KnowledgeLoopReport, +) -> int: + if not learning_event_paths: + return 0 + argv = [*_knowledge_cmd(config, "ledger", "import"), *(path.as_posix() for path in learning_event_paths)] + if config.replace_learning_events: + argv.append("--replace") + _run_checked(argv, repo, runner, report) + return report.learning_events_seen + + +def _run_phase2_enrichment( + repo: Path, + config: KnowledgeLoopConfig, + runner: CommandRunner, + report: KnowledgeLoopReport, +) -> int: + if not config.enrich_targets: + return 0 + calls = 0 + for target in config.enrich_targets: + argv = _knowledge_cmd(config, "enrich", "--target", target) + if not config.enrich_live: + argv.append("--dry-run") + if config.enrich_live: + calls += 1 + report.openrouter_calls_run = calls + _run_checked(argv, repo, runner, report, keep_openrouter=config.enrich_live) + return calls + + +def _run_generation_refresh( + repo: Path, + config: KnowledgeLoopConfig, + runner: CommandRunner, + report: KnowledgeLoopReport, +) -> None: + for argv in ( + _knowledge_cmd(config, "quality", "--write"), + _knowledge_cmd(config, "export"), + _knowledge_cmd(config, "eval", "--write"), + _knowledge_cmd(config, "export"), + ): + _run_checked(argv, repo, runner, report) + + +def _run_validation( + repo: Path, + config: KnowledgeLoopConfig, + runner: CommandRunner, + report: KnowledgeLoopReport, +) -> None: + commands = [ + _tool_cmd("ruff", "check", "src", "tests"), + _tool_cmd("mypy", "--strict", "src"), + _tool_cmd("pytest"), + _knowledge_cmd(config, "validate", "okf"), + _knowledge_cmd(config, "quality", "--check"), + _knowledge_cmd(config, "export", "--check"), + _knowledge_cmd(config, "eval", "--check"), + _knowledge_cmd(config, "ledger", "--check"), + _knowledge_cmd(config, "ledger", "lifecycle", "--check"), + _knowledge_cmd(config, "scan-secrets", "okf", "exports", "reports", "evals", "ledger", "schema"), + ] + for argv in commands: + _run_checked(argv, repo, runner, report) + + +def _publish_pr( + repo: Path, + config: KnowledgeLoopConfig, + runner: CommandRunner, + report: KnowledgeLoopReport, + *, + run_id: str, +) -> tuple[str, str | None]: + branch = f"{config.branch_prefix}/{run_id}".replace("//", "/") + if config.dry_run or not config.create_pr: + return branch, None + _run_checked(["git", "fetch", "origin", config.base_branch], repo, runner, report) + _run_checked(["git", "checkout", "-B", branch, f"origin/{config.base_branch}"], repo, runner, report) + _run_checked(["git", "add", *_MANAGED_DIRTY_PREFIXES], repo, runner, report) + staged = _run_checked(["git", "diff", "--cached", "--quiet"], repo, runner, report, allow_exit_codes={0, 1}) + if staged.returncode == 0: + return branch, None + title = f"knowledge: loop refresh {run_id}" + _run_checked( + [ + "git", + "-c", + f"user.name={config.git_user_name}", + "-c", + f"user.email={config.git_user_email}", + "commit", + "-m", + title, + ], + repo, + runner, + report, + ) + _run_checked(["git", "push", "-u", "origin", branch], repo, runner, report) + body = _write_pr_body(config, run_id=run_id) + result = _run_checked( + [ + "gh", + "pr", + "create", + "--title", + title, + "--body-file", + body.as_posix(), + "--base", + config.base_branch, + "--head", + branch, + ], + repo, + runner, + report, + ) + url = result.stdout.strip().splitlines()[-1] if result.stdout.strip() else None + return branch, url + + +def _write_pr_body(config: KnowledgeLoopConfig, *, run_id: str) -> Path: + state_dir = config.resolved_state_dir() + state_dir.mkdir(parents=True, exist_ok=True) + path = state_dir / f"pr-body-{run_id}.md" + phase2_bits: list[str] = [] + if config.learning_event_paths: + phase2_bits.append("learning-event import") + if config.enrich_targets: + mode = "live OpenRouter" if config.enrich_live else "dry-run" + phase2_bits.append(f"{mode} enrichment") + phase2 = ", ".join(phase2_bits) if phase2_bits else "none" + if config.run_validation: + validation = "The Knowledge Loop ran the configured validation suite before opening this PR.\n" + else: + validation = ( + "Validation was skipped for this run (`--skip-validation`); the standard gates did " + "not execute and must be run before merge.\n" + ) + path.write_text( + "## Summary\n" + "- automated Knowledge Loop refresh\n" + f"- run id: `{run_id}`\n" + f"- phase-2 work: {phase2}\n\n" + "## Safety\n" + "- generated knowledge remains review-gated\n" + "- generated enrichment remains advisory unless separately reviewed\n" + "- source repositories continue to win conflicts\n\n" + "## Validation\n" + f"{validation}", + encoding="utf-8", + ) + return path + + +# --- command helpers ----------------------------------------------------- + + +def _knowledge_cmd(config: KnowledgeLoopConfig, *args: str) -> list[str]: + return [ + sys.executable, + "-m", + "hyrule_knowledge.cli", + "--config", + config.config_path.as_posix(), + *args, + ] + + +def _tool_cmd(name: str, *args: str) -> list[str]: + candidate = Path(sys.executable).with_name(name) + executable = candidate.as_posix() if candidate.exists() else name + return [executable, *args] + + +def _scrub_env(*, keep_openrouter: bool = False) -> dict[str, str]: + env = dict(os.environ) + if not keep_openrouter: + env["OPENROUTER_API_KEY"] = "" + return env + + +def _default_runner(argv: Sequence[str], cwd: Path, env: Mapping[str, str] | None) -> CommandResult: + completed = subprocess.run( + list(argv), + cwd=cwd, + text=True, + capture_output=True, + check=False, + env=dict(env) if env is not None else None, + ) + return CommandResult(tuple(str(part) for part in argv), completed.returncode, completed.stdout, completed.stderr) + + +def _run_checked( + argv: Sequence[str], + repo: Path, + runner: CommandRunner, + report: KnowledgeLoopReport, + *, + keep_openrouter: bool = False, + allow_exit_codes: set[int] | None = None, +) -> CommandResult: + allowed = allow_exit_codes or {0} + result = runner(argv, repo, _scrub_env(keep_openrouter=keep_openrouter)) + report.commands.append( + { + "argv": list(result.argv), + "returncode": result.returncode, + "stdout_tail": result.stdout[-500:], + "stderr_tail": result.stderr[-500:], + } + ) + if result.returncode not in allowed: + raise KnowledgeLoopError( + f"command failed ({result.returncode}): {' '.join(result.argv)}\n{result.stderr[-800:] or result.stdout[-800:]}" + ) + return result + + +def _git_dirty(repo: Path, state_dir: Path, runner: CommandRunner, report: KnowledgeLoopReport) -> bool: + result = _run_checked(["git", "status", "--porcelain"], repo, runner, report) + ignored_prefix = _state_dir_status_prefix(repo, state_dir) + for line in result.stdout.splitlines(): + path = _porcelain_path(line) + if ignored_prefix and (path == ignored_prefix or path.startswith(f"{ignored_prefix}/")): + continue + if path: + return True + return False + + +def _drop_volatile_only_changes( + repo: Path, + state_dir: Path, + runner: CommandRunner, + report: KnowledgeLoopReport, +) -> None: + """Discard timestamp/run-id-only regeneration churn so idle cycles stay idle. + + Every ``ingest``/``export``/``eval`` rewrites ``last_verified_at``/``run_id`` + metadata and rebuilds ``exports/knowledge.sqlite`` (which bakes in + ``datetime('now')``). Without this, an unchanged source snapshot still leaves a + dirty worktree and ``--create-pr`` would open timestamp-only refresh PRs. We + revert the churn only when *every* managed change is volatile, so genuine + knowledge changes still publish (carrying their fresh timestamps). + """ + status = _run_checked(["git", "status", "--porcelain"], repo, runner, report) + ignored_prefix = _state_dir_status_prefix(repo, state_dir) + revertable: list[str] = [] + for line in status.stdout.splitlines(): + path = _porcelain_path(line) + if not path: + continue + if ignored_prefix and (path == ignored_prefix or path.startswith(f"{ignored_prefix}/")): + continue + if not _is_managed_dirty_path(path): + return # unmanaged change present; treat the cycle as a real change + if line[:2].strip() != "M": + return # add/delete/rename/untracked under managed paths is semantic + if path in _VOLATILE_BINARY_PATHS: + revertable.append(path) + continue + diff = _run_checked(["git", "diff", "--no-color", "-U0", "--", path], repo, runner, report) + if not _diff_is_volatile_only(diff.stdout): + return # at least one semantic change; keep the whole worktree + revertable.append(path) + if revertable: + _run_checked(["git", "checkout", "--", *revertable], repo, runner, report) + + +def _is_managed_dirty_path(path: str) -> bool: + return any(path == prefix or path.startswith(f"{prefix}/") for prefix in _MANAGED_DIRTY_PREFIXES) + + +def _normalize_volatile(line: str) -> str: + line = _VOLATILE_TS_RE.sub(r"\1", line) + line = _RUN_ID_VALUE_RE.sub(r'\1""', line) + line = _LOCAL_RUN_ID_RE.sub("", line) + return line + + +def _diff_is_volatile_only(diff: str) -> bool: + """Return True when a ``git diff -U0`` only rewrites timestamps/run ids in place.""" + removed: list[str] = [] + added: list[str] = [] + in_hunk = False + for line in diff.splitlines(): + if line.startswith("@@"): + in_hunk = True + continue + if not in_hunk: + continue + if line.startswith("-"): + removed.append(line[1:]) + elif line.startswith("+"): + added.append(line[1:]) + if not removed and not added: + return False + if len(removed) != len(added): + return False + return all(_normalize_volatile(old) == _normalize_volatile(new) for old, new in zip(removed, added)) + + +def _state_dir_status_prefix(repo: Path, state_dir: Path) -> str | None: + try: + return state_dir.resolve().relative_to(repo.resolve()).as_posix() + except ValueError: + return None + + +def _porcelain_path(line: str) -> str: + if len(line) < 4: + return "" + path = line[3:].strip() + if " -> " in path: + path = path.rsplit(" -> ", 1)[1] + return path.strip('"') + + +# --- heartbeat ----------------------------------------------------------- + + +ICINGA_EXIT_STATUS: dict[LoopOutcome, int] = { + "published": 0, + "changes_detected": 0, + "idle": 0, + "locked": 0, + "over_budget": 1, + "error": 2, +} + + +def notify_icinga(report: KnowledgeLoopReport, *, poster: Poster | None = None) -> bool: + url = os.environ.get("HYRULE_KNOWLEDGE_LOOP_ICINGA_URL") or os.environ.get("HYRULE_ICINGA_URL") + user = os.environ.get("HYRULE_KNOWLEDGE_LOOP_ICINGA_USER") or os.environ.get("HYRULE_ICINGA_USER") + password = os.environ.get("HYRULE_KNOWLEDGE_LOOP_ICINGA_PASSWORD") or os.environ.get("HYRULE_ICINGA_PASSWORD") + check = os.environ.get("HYRULE_KNOWLEDGE_LOOP_ICINGA_CHECK", "loop!knowledge-loop") + if not url or not user or not password: + return False + payload: dict[str, Any] = { + "type": "Service", + "filter": f'service.__name=="{check}"', + "exit_status": ICINGA_EXIT_STATUS.get(report.outcome, 2), + "plugin_output": f"knowledge-loop {report.outcome}: {report.detail or report.run_id}"[:900], + "_basic_auth": b64encode(f"{user}:{password}".encode()).decode(), + "_headers": {"Accept": "application/json"}, + } + (poster or _default_http_post)(f"{url.rstrip('/')}/v1/actions/process-check-result", payload) + return True + + +def _default_http_post(url: str, payload: dict[str, Any]) -> None: + body = {key: value for key, value in payload.items() if not key.startswith("_")} + headers = dict(payload.get("_headers") or {}) + if payload.get("_basic_auth"): + headers["Authorization"] = f"Basic {payload['_basic_auth']}" + request = urllib.request.Request( + url, + data=json.dumps(body).encode(), + headers={"Content-Type": "application/json", **headers}, + method="POST", + ) + with urllib.request.urlopen(request, timeout=10): + pass + + +def _safe_notify(report: KnowledgeLoopReport, *, poster: Poster | None = None) -> None: + try: + if notify_icinga(report, poster=poster): + report.notifications.append("icinga") + except Exception as exc: # pragma: no cover - heartbeat must not mask cycle result + report.notifications.append(f"icinga_error:{type(exc).__name__}") diff --git a/tests/test_knowledge_loop.py b/tests/test_knowledge_loop.py new file mode 100644 index 0000000..7b89b42 --- /dev/null +++ b/tests/test_knowledge_loop.py @@ -0,0 +1,730 @@ +from __future__ import annotations + +import json +import os +import time +from collections.abc import Mapping, Sequence +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path + +from hyrule_knowledge.knowledge_loop import ( + CommandResult, + KnowledgeLoopConfig, + KnowledgeLoopReport, + acquire_lock, + notify_icinga, + release_lock, + run_once, +) + + +class FakeRunner: + def __init__(self, *, dirty: bool = False, status_stdout: str | None = None) -> None: + self.dirty = dirty + self.status_stdout = status_stdout + self.calls: list[tuple[str, ...]] = [] + self.envs: list[Mapping[str, str] | None] = [] + + def __call__(self, argv: Sequence[str], cwd: Path, env: Mapping[str, str] | None) -> CommandResult: + del cwd + call = tuple(str(part) for part in argv) + self.calls.append(call) + self.envs.append(env) + if call[:3] == ("git", "status", "--porcelain"): + stdout = self.status_stdout if self.status_stdout is not None else " M okf/generated/example.md\n" if self.dirty else "" + return CommandResult(call, 0, stdout) + if call[:4] == ("git", "diff", "--cached", "--quiet"): + return CommandResult(call, 1 if self.dirty else 0) + if call[:3] == ("gh", "pr", "create"): + return CommandResult(call, 0, "https://github.com/AS215932/knowledge/pull/999\n") + return CommandResult(call, 0, "ok\n") + + +class FailingAfterEnrichmentRunner(FakeRunner): + def __call__(self, argv: Sequence[str], cwd: Path, env: Mapping[str, str] | None) -> CommandResult: + call = tuple(str(part) for part in argv) + if Path(call[0]).name == "pytest": + self.calls.append(call) + self.envs.append(env) + return CommandResult(call, 1, "", "pytest failed after enrichment\n") + return super().__call__(argv, cwd, env) + + +class FailingEnrichmentRunner(FakeRunner): + def __call__(self, argv: Sequence[str], cwd: Path, env: Mapping[str, str] | None) -> CommandResult: + call = tuple(str(part) for part in argv) + if "enrich" in call and "infrastructure" in call: + self.calls.append(call) + self.envs.append(env) + return CommandResult(call, 1, "", "provider response validation failed after HTTP request\n") + return super().__call__(argv, cwd, env) + + +class FailingImportRunner(FakeRunner): + def __call__(self, argv: Sequence[str], cwd: Path, env: Mapping[str, str] | None) -> CommandResult: + call = tuple(str(part) for part in argv) + if "ledger" in call and "import" in call: + self.calls.append(call) + self.envs.append(env) + return CommandResult(call, 1, "", "ledger import failed after partial writes\n") + return super().__call__(argv, cwd, env) + + +class FailingIngestRunner(FakeRunner): + def __call__(self, argv: Sequence[str], cwd: Path, env: Mapping[str, str] | None) -> CommandResult: + call = tuple(str(part) for part in argv) + if "ingest" in call: + self.calls.append(call) + self.envs.append(env) + return CommandResult(call, 1, "", "ingest failed\n") + return super().__call__(argv, cwd, env) + + +class VolatileRefreshRunner(FakeRunner): + """Models ingest/export churn that only rewrites timestamps, run ids, and the + SQLite artifact, then reports a clean worktree once the loop reverts it.""" + + def __init__(self) -> None: + super().__init__() + self.reverted = False + + def __call__(self, argv: Sequence[str], cwd: Path, env: Mapping[str, str] | None) -> CommandResult: + call = tuple(str(part) for part in argv) + if call[:3] == ("git", "checkout", "--"): + self.reverted = True + self.calls.append(call) + self.envs.append(env) + return CommandResult(call, 0, "") + if call[:3] == ("git", "status", "--porcelain"): + self.calls.append(call) + self.envs.append(env) + stdout = "" if self.reverted else " M exports/manifest.json\n M exports/knowledge.sqlite\n" + return CommandResult(call, 0, stdout) + if call[:3] == ("git", "diff", "--no-color"): + self.calls.append(call) + self.envs.append(env) + diff = ( + f"diff --git a/{call[-1]} b/{call[-1]}\n" + "@@ -1 +1 @@\n" + '-{"run_id":"local-20260101000000","generated_at":"2026-01-01T00:00:00Z"}\n' + '+{"run_id":"local-20260102000000","generated_at":"2026-01-02T00:00:00Z"}\n' + ) + return CommandResult(call, 0, diff) + return super().__call__(argv, cwd, env) + + +class SemanticRefreshRunner(FakeRunner): + """Models a genuine knowledge change whose diff is not timestamp-only.""" + + def __call__(self, argv: Sequence[str], cwd: Path, env: Mapping[str, str] | None) -> CommandResult: + call = tuple(str(part) for part in argv) + if call[:3] == ("git", "status", "--porcelain"): + self.calls.append(call) + self.envs.append(env) + return CommandResult(call, 0, " M okf/generated/services/example.md\n") + if call[:3] == ("git", "diff", "--no-color"): + self.calls.append(call) + self.envs.append(env) + diff = ( + f"diff --git a/{call[-1]} b/{call[-1]}\n" + "@@ -1 +1 @@\n" + "-old description of the service\n" + "+new description of the service\n" + ) + return CommandResult(call, 0, diff) + return super().__call__(argv, cwd, env) + + +def _repo(tmp_path: Path) -> Path: + repo = tmp_path / "knowledge" + repo.mkdir() + (repo / "okf").mkdir() + (repo / "exports").mkdir() + (repo / "reports").mkdir() + (repo / "evals").mkdir() + (repo / "ledger").mkdir() + (repo / "schema").mkdir() + return repo + + +def test_knowledge_loop_phase1_runs_refresh_and_validation_when_idle(tmp_path: Path) -> None: + runner = FakeRunner(dirty=False) + report = run_once( + KnowledgeLoopConfig(repo_path=_repo(tmp_path), state_dir=tmp_path / "state", run_id="test-1"), + runner=runner, + ) + + assert report.outcome == "idle" + calls = [" ".join(call) for call in runner.calls] + assert any("hyrule_knowledge.cli --config knowledge.config.yml ingest" in call for call in calls) + assert any(call.endswith("ruff check src tests") for call in calls) + assert any("hyrule_knowledge.cli --config knowledge.config.yml scan-secrets okf exports reports evals ledger schema" in call for call in calls) + assert report.ledger["cycles"] == 1 + + +def test_knowledge_loop_prepares_base_checkout_before_publishing_cycle(tmp_path: Path) -> None: + runner = FakeRunner(dirty=False) + + report = run_once( + KnowledgeLoopConfig( + repo_path=_repo(tmp_path), + state_dir=tmp_path / "state", + run_id="test-base", + create_pr=True, + run_validation=False, + ), + runner=runner, + ) + + assert report.outcome == "idle" + assert ("git", "fetch", "origin", "main") in runner.calls + assert ("git", "checkout", "main") in runner.calls + assert ("git", "reset", "--hard", "origin/main") in runner.calls + + +def test_knowledge_loop_publishes_pr_when_enabled_and_dirty(tmp_path: Path) -> None: + runner = FakeRunner(dirty=True) + report = run_once( + KnowledgeLoopConfig( + repo_path=_repo(tmp_path), + state_dir=tmp_path / "state", + run_id="test-2", + create_pr=True, + run_validation=False, + ), + runner=runner, + ) + + assert report.outcome == "published" + assert report.pr_url == "https://github.com/AS215932/knowledge/pull/999" + assert report.branch == "bot/knowledge-loop/test-2" + assert ("git", "fetch", "origin", "main") in runner.calls + assert ("git", "checkout", "-B", "bot/knowledge-loop/test-2", "origin/main") in runner.calls + assert any(call[:6] == ("git", "-c", "user.name=hyrule-knowledge-loop[bot]", "-c", "user.email=knowledge-loop@as215932.net", "commit") for call in runner.calls) + assert any(call[:3] == ("gh", "pr", "create") for call in runner.calls) + assert report.ledger["prs_opened"] == 1 + + +def test_knowledge_loop_dirty_dry_run_does_not_publish(tmp_path: Path) -> None: + runner = FakeRunner(dirty=True) + report = run_once( + KnowledgeLoopConfig( + repo_path=_repo(tmp_path), + state_dir=tmp_path / "state", + run_id="test-3", + create_pr=True, + dry_run=True, + run_validation=False, + ), + runner=runner, + ) + + assert report.outcome == "changes_detected" + assert report.pr_url is None + assert not any(call[:3] == ("gh", "pr", "create") for call in runner.calls) + + +def test_knowledge_loop_reverts_timestamp_only_refresh_and_reports_idle(tmp_path: Path) -> None: + runner = VolatileRefreshRunner() + + report = run_once( + KnowledgeLoopConfig( + repo_path=_repo(tmp_path), + state_dir=tmp_path / "state", + run_id="idle-volatile", + create_pr=True, + run_validation=False, + ), + runner=runner, + ) + + assert report.outcome == "idle" + assert report.changed is False + # the timestamp/run-id/sqlite churn is reverted instead of published + checkout_calls = [call for call in runner.calls if call[:3] == ("git", "checkout", "--")] + assert checkout_calls + assert any("exports/knowledge.sqlite" in call for call in checkout_calls) + assert any("exports/manifest.json" in call for call in checkout_calls) + assert not any(call[:3] == ("gh", "pr", "create") for call in runner.calls) + assert report.ledger["cycles"] == 1 + + +def test_diff_classifier_keeps_corrected_event_time(tmp_path: Path) -> None: + from hyrule_knowledge.knowledge_loop import _diff_is_volatile_only + + generated_metadata = ( + "@@ -1 +1 @@\n" + '-{"run_id":"local-20260101000000","generated_at":"2026-01-01T00:00:00Z","extracted_at":"2026-01-01T00:00:00Z"}\n' + '+{"run_id":"local-20260102000000","generated_at":"2026-01-02T00:00:00Z","extracted_at":"2026-01-02T00:00:00Z"}\n' + ) + assert _diff_is_volatile_only(generated_metadata) is True + + # a corrected learning-event `event_time` (stable id unchanged) is real data, not churn + corrected_event_time = ( + "@@ -1 +1 @@\n" + '-{"id":"evt_1","event_time":"2026-01-01T00:00:00Z","summary":"x"}\n' + '+{"id":"evt_1","event_time":"2026-02-02T00:00:00Z","summary":"x"}\n' + ) + assert _diff_is_volatile_only(corrected_event_time) is False + + +def test_knowledge_loop_publish_stages_schema(tmp_path: Path) -> None: + runner = FakeRunner(dirty=True) + + run_once( + KnowledgeLoopConfig( + repo_path=_repo(tmp_path), + state_dir=tmp_path / "state", + run_id="schema-stage", + create_pr=True, + run_validation=False, + ), + runner=runner, + ) + + add_calls = [call for call in runner.calls if call[:2] == ("git", "add")] + assert add_calls + # schema is a managed dirty prefix, so a schema-only change must be staged and published + assert any("schema" in call for call in add_calls) + + +def test_knowledge_loop_publishes_pr_when_change_is_semantic(tmp_path: Path) -> None: + runner = SemanticRefreshRunner(dirty=True) + + report = run_once( + KnowledgeLoopConfig( + repo_path=_repo(tmp_path), + state_dir=tmp_path / "state", + run_id="semantic-1", + create_pr=True, + run_validation=False, + ), + runner=runner, + ) + + assert report.outcome == "published" + # a real content change must not be reverted as volatile churn + assert not any(call[:3] == ("git", "checkout", "--") for call in runner.calls) + assert any(call[:3] == ("gh", "pr", "create") for call in runner.calls) + + +def test_knowledge_loop_ignores_default_repo_local_state_dir_in_dirty_check(tmp_path: Path) -> None: + runner = FakeRunner(status_stdout="?? .cache/hyrule-knowledge/loop-state/ledger-2026-06-23.json\n") + + report = run_once( + KnowledgeLoopConfig(repo_path=_repo(tmp_path), run_id="state-clean", run_validation=False), + runner=runner, + ) + + assert report.outcome == "idle" + assert report.changed is False + + +def test_knowledge_loop_ignores_explicit_repo_local_state_dir_in_dirty_check(tmp_path: Path) -> None: + runner = FakeRunner(status_stdout="?? .knowledge-loop-state/knowledge-loop.lock\n") + + report = run_once( + KnowledgeLoopConfig( + repo_path=_repo(tmp_path), + state_dir=Path(".knowledge-loop-state"), + run_id="state-clean-explicit", + run_validation=False, + ), + runner=runner, + ) + + assert report.outcome == "idle" + assert report.changed is False + + +def test_acquire_lock_does_not_evict_old_live_holder(tmp_path: Path) -> None: + state = tmp_path / "state" + state.mkdir() + (state / "knowledge-loop.lock").write_text( + json.dumps({"pid": os.getpid(), "started_at": time.time() - 999_999}), + encoding="utf-8", + ) + + assert acquire_lock(state, max_age_seconds=1) is None + + +def test_acquire_lock_is_atomic_under_concurrency(tmp_path: Path) -> None: + state = tmp_path / "state" + + with ThreadPoolExecutor(max_workers=8) as executor: + locks = list(executor.map(lambda _item: acquire_lock(state), range(8))) + + acquired = [lock for lock in locks if lock is not None] + assert len(acquired) == 1 + release_lock(acquired[0]) + + +def test_knowledge_loop_lock_blocks_second_cycle(tmp_path: Path) -> None: + repo = _repo(tmp_path) + state = tmp_path / "state" + state.mkdir() + (state / "knowledge-loop.lock").write_text( + json.dumps({"pid": os.getpid(), "started_at": time.time()}), + encoding="utf-8", + ) + + report = run_once(KnowledgeLoopConfig(repo_path=repo, state_dir=state), runner=FakeRunner()) + + assert report.outcome == "locked" + + +def test_knowledge_loop_zero_pr_budget_allows_non_publishing_refresh(tmp_path: Path) -> None: + report = run_once( + KnowledgeLoopConfig( + repo_path=_repo(tmp_path), + state_dir=tmp_path / "state", + max_prs_per_day=0, + create_pr=False, + run_validation=False, + ), + runner=FakeRunner(dirty=False), + ) + + assert report.outcome == "idle" + assert report.ledger["cycles"] == 1 + + +def test_knowledge_loop_pr_budget_blocks_only_publish_step(tmp_path: Path) -> None: + report = run_once( + KnowledgeLoopConfig( + repo_path=_repo(tmp_path), + state_dir=tmp_path / "state", + max_prs_per_day=0, + create_pr=True, + run_validation=False, + ), + runner=FakeRunner(dirty=True), + ) + + assert report.outcome == "over_budget" + assert "daily PR budget" in report.detail + assert report.ledger["cycles"] == 1 + + +def test_knowledge_loop_failed_cycle_counts_against_daily_budget(tmp_path: Path) -> None: + report = run_once( + KnowledgeLoopConfig( + repo_path=_repo(tmp_path), + state_dir=tmp_path / "state", + run_validation=False, + ), + runner=FailingIngestRunner(), + ) + + # a failing cycle must be counted so the timer is throttled by the daily budget + assert report.outcome == "error" + assert report.ledger["cycles"] == 1 + + +def test_cmd_loop_blank_state_dir_env_falls_back_to_cache(monkeypatch, tmp_path: Path) -> None: + from hyrule_knowledge import cli + + captured: dict[str, KnowledgeLoopConfig] = {} + + def fake_run(config: KnowledgeLoopConfig) -> KnowledgeLoopReport: + captured["config"] = config + return KnowledgeLoopReport(outcome="idle", run_id="x") + + monkeypatch.setattr(cli, "run_knowledge_loop_once", fake_run) + monkeypatch.setenv("HYRULE_KNOWLEDGE_LOOP_STATE_DIR", " ") + + args = cli.build_parser().parse_args(["loop", "--once", "--repo-path", str(tmp_path)]) + assert cli.cmd_loop(args) == 0 + assert captured["config"].state_dir == Path(".cache/hyrule-knowledge/loop-state") + + +def test_knowledge_loop_daily_cycle_budget_blocks_work(tmp_path: Path) -> None: + repo = _repo(tmp_path) + state = tmp_path / "state" + state.mkdir() + (state / "ledger-2099-01-01.json").write_text("{}\n", encoding="utf-8") + today = __import__("hyrule_knowledge.knowledge_loop", fromlist=["today"]).today() + (state / f"ledger-{today}.json").write_text(json.dumps({"cycles": 1, "prs_opened": 0, "openrouter_calls": 0, "learning_events_imported": 0}), encoding="utf-8") + + report = run_once( + KnowledgeLoopConfig(repo_path=repo, state_dir=state, max_cycles_per_day=1), + runner=FakeRunner(), + ) + + assert report.outcome == "over_budget" + assert "daily cycle budget" in report.detail + + +def test_phase2_live_enrichment_requires_openrouter_budget(tmp_path: Path) -> None: + report = run_once( + KnowledgeLoopConfig( + repo_path=_repo(tmp_path), + state_dir=tmp_path / "state", + enrich_targets=("infrastructure",), + enrich_live=True, + max_openrouter_calls_per_day=0, + ), + runner=FakeRunner(), + ) + + assert report.outcome == "over_budget" + assert "OpenRouter budget" in report.detail + + +def test_phase2_live_enrichment_preserves_openrouter_env_for_enrich_only(tmp_path: Path, monkeypatch) -> None: + monkeypatch.setenv("OPENROUTER_API_KEY", "not-a-real-secret") + runner = FakeRunner(dirty=False) + report = run_once( + KnowledgeLoopConfig( + repo_path=_repo(tmp_path), + state_dir=tmp_path / "state", + enrich_targets=("infrastructure",), + enrich_live=True, + max_openrouter_calls_per_day=1, + run_validation=False, + ), + runner=runner, + ) + + assert report.outcome == "idle" + enrich_indexes = [ + index + for index, call in enumerate(runner.calls) + if "enrich" in call and "infrastructure" in call + ] + assert enrich_indexes + assert runner.envs[enrich_indexes[0]] is not None + assert runner.envs[enrich_indexes[0]]["OPENROUTER_API_KEY"] == "not-a-real-secret" + scrubbed_indexes = [index for index, call in enumerate(runner.calls) if "ingest" in call] + assert scrubbed_indexes + assert runner.envs[scrubbed_indexes[0]] is not None + assert runner.envs[scrubbed_indexes[0]]["OPENROUTER_API_KEY"] == "" + + +def test_phase2_live_enrichment_charges_attempt_when_child_enrich_fails(tmp_path: Path, monkeypatch) -> None: + monkeypatch.setenv("OPENROUTER_API_KEY", "not-a-real-secret") + runner = FailingEnrichmentRunner(dirty=False) + + report = run_once( + KnowledgeLoopConfig( + repo_path=_repo(tmp_path), + state_dir=tmp_path / "state", + enrich_targets=("infrastructure",), + enrich_live=True, + max_openrouter_calls_per_day=1, + run_validation=False, + ), + runner=runner, + ) + + assert report.outcome == "error" + assert report.openrouter_calls_run == 1 + assert report.ledger["openrouter_calls"] == 1 + + +def test_phase2_live_enrichment_charges_budget_when_later_validation_fails(tmp_path: Path, monkeypatch) -> None: + monkeypatch.setenv("OPENROUTER_API_KEY", "not-a-real-secret") + runner = FailingAfterEnrichmentRunner(dirty=False) + + report = run_once( + KnowledgeLoopConfig( + repo_path=_repo(tmp_path), + state_dir=tmp_path / "state", + enrich_targets=("infrastructure",), + enrich_live=True, + max_openrouter_calls_per_day=1, + ), + runner=runner, + ) + + assert report.outcome == "error" + assert report.openrouter_calls_run == 1 + assert report.ledger["openrouter_calls"] == 1 + + +def test_phase2_learning_import_charges_budget_when_import_command_fails(tmp_path: Path) -> None: + events = tmp_path / "events.json" + events.write_text( + json.dumps( + [ + {"ledger_version": "learning_ledger_v1", "event_type": "run_summary", "producer": "test", "subject": "one", "summary": "one"}, + {"ledger_version": "learning_ledger_v1", "event_type": "run_summary", "producer": "test", "subject": "two", "summary": "two"}, + ] + ) + + "\n", + encoding="utf-8", + ) + + report = run_once( + KnowledgeLoopConfig( + repo_path=_repo(tmp_path), + state_dir=tmp_path / "state", + learning_event_paths=(events,), + max_learning_events_per_day=2, + run_validation=False, + ), + runner=FailingImportRunner(), + ) + + assert report.outcome == "error" + assert report.learning_events_seen == 2 + assert report.ledger["learning_events_imported"] == 2 + + +def test_phase2_learning_import_counts_events_inside_list_files(tmp_path: Path) -> None: + events = tmp_path / "events.json" + events.write_text( + json.dumps( + [ + {"ledger_version": "learning_ledger_v1", "event_type": "run_summary", "producer": "test", "subject": "one", "summary": "one"}, + {"ledger_version": "learning_ledger_v1", "event_type": "run_summary", "producer": "test", "subject": "two", "summary": "two"}, + ] + ) + + "\n", + encoding="utf-8", + ) + + report = run_once( + KnowledgeLoopConfig( + repo_path=_repo(tmp_path), + state_dir=tmp_path / "state", + learning_event_paths=(events,), + max_learning_events_per_day=1, + ), + runner=FakeRunner(), + ) + + assert report.outcome == "over_budget" + assert report.learning_events_seen == 2 + assert "learning-event import budget" in report.detail + + +def test_phase2_learning_import_uses_existing_path_expansion_budget(tmp_path: Path) -> None: + events = tmp_path / "events" + events.mkdir() + (events / "one.json").write_text('{"ledger_version":"learning_ledger_v1","event_type":"run_summary","producer":"test","subject":"one","summary":"one"}\n', encoding="utf-8") + (events / "two.json").write_text('{"ledger_version":"learning_ledger_v1","event_type":"run_summary","producer":"test","subject":"two","summary":"two"}\n', encoding="utf-8") + + report = run_once( + KnowledgeLoopConfig( + repo_path=_repo(tmp_path), + state_dir=tmp_path / "state", + learning_event_paths=(events,), + max_learning_events_per_day=1, + ), + runner=FakeRunner(), + ) + + assert report.outcome == "over_budget" + assert "learning-event import budget" in report.detail + + +def test_phase2_learning_import_runs_when_budget_allows(tmp_path: Path) -> None: + events = tmp_path / "events" + events.mkdir() + (events / "one.json").write_text('{"ledger_version":"learning_ledger_v1","event_type":"run_summary","producer":"test","subject":"one","summary":"one"}\n', encoding="utf-8") + runner = FakeRunner(dirty=False) + + report = run_once( + KnowledgeLoopConfig( + repo_path=_repo(tmp_path), + state_dir=tmp_path / "state", + learning_event_paths=(events,), + max_learning_events_per_day=1, + run_validation=False, + ), + runner=runner, + ) + + assert report.outcome == "idle" + assert report.learning_events_seen == 1 + assert any("ledger" in call and "import" in call and events.as_posix() in call for call in runner.calls) + assert report.ledger["learning_events_imported"] == 1 + + +def test_phase2_learning_import_resolves_relative_event_path_against_repo(tmp_path: Path) -> None: + repo = _repo(tmp_path) + intake = repo / "intake" + intake.mkdir() + (intake / "one.json").write_text( + '{"ledger_version":"learning_ledger_v1","event_type":"run_summary","producer":"test","subject":"one","summary":"one"}\n', + encoding="utf-8", + ) + runner = FakeRunner(dirty=False) + + report = run_once( + KnowledgeLoopConfig( + repo_path=repo, + state_dir=tmp_path / "state", + learning_event_paths=(Path("intake"),), + max_learning_events_per_day=1, + run_validation=False, + ), + runner=runner, + ) + + # counted against the repo checkout, not the supervisor cwd + assert report.outcome == "idle" + assert report.learning_events_seen == 1 + expected = (repo.resolve() / "intake").as_posix() + assert any("import" in call and expected in call for call in runner.calls) + assert report.ledger["learning_events_imported"] == 1 + + +def test_pr_body_reflects_whether_validation_ran(tmp_path: Path) -> None: + from hyrule_knowledge.knowledge_loop import _write_pr_body + + repo = _repo(tmp_path) + ran = _write_pr_body( + KnowledgeLoopConfig(repo_path=repo, state_dir=tmp_path / "state-ran", run_validation=True), + run_id="ran", + ).read_text(encoding="utf-8") + skipped = _write_pr_body( + KnowledgeLoopConfig(repo_path=repo, state_dir=tmp_path / "state-skip", run_validation=False), + run_id="skip", + ).read_text(encoding="utf-8") + + assert "ran the configured validation suite" in ran + assert "ran the configured validation suite" not in skipped + assert "skipped" in skipped.lower() + assert "must be run before merge" in skipped + + +def test_notify_icinga_posts_sanitized_passive_check(monkeypatch) -> None: + monkeypatch.setenv("HYRULE_KNOWLEDGE_LOOP_ICINGA_URL", "https://icinga.example") + monkeypatch.setenv("HYRULE_KNOWLEDGE_LOOP_ICINGA_USER", "user") + monkeypatch.setenv("HYRULE_KNOWLEDGE_LOOP_ICINGA_PASSWORD", "secret") + posted: list[tuple[str, dict[str, object]]] = [] + + ok = notify_icinga( + KnowledgeLoopReport(outcome="idle", run_id="abc", detail="nothing to do"), + poster=lambda url, payload: posted.append((url, payload)), + ) + + assert ok is True + assert posted[0][0] == "https://icinga.example/v1/actions/process-check-result" + assert posted[0][1]["exit_status"] == 0 + assert "secret" not in str(posted[0][1].get("plugin_output")) + + +def test_cli_parser_tolerates_blank_loop_budget_env_for_non_loop_commands(monkeypatch) -> None: + from hyrule_knowledge.cli import build_parser + + monkeypatch.setenv("HYRULE_KNOWLEDGE_LOOP_MAX_OPENROUTER_CALLS_PER_DAY", "") + monkeypatch.setenv("HYRULE_KNOWLEDGE_LOOP_MAX_LEARNING_EVENTS_PER_DAY", "") + monkeypatch.setenv("HYRULE_KNOWLEDGE_LOOP_MAX_CYCLES_PER_DAY", "not-an-int") + + args = build_parser().parse_args(["query", "service landscape"]) + + assert args.command == "query" + + +def test_cli_loop_help_includes_phase2_flags() -> None: + from hyrule_knowledge.cli import build_parser + + help_text = build_parser().format_help() + loop_parser = build_parser()._subparsers._group_actions[0].choices["loop"] # type: ignore[attr-defined] + loop_help = loop_parser.format_help() + assert "loop" in help_text + assert "--enrich-target" in loop_help + assert "--learning-event" in loop_help