From 3875373b299380c42d9abd9440f5e5ed2b11a42a Mon Sep 17 00:00:00 2001 From: GangGreenTemperTatum <104169244+GangGreenTemperTatum@users.noreply.github.com> Date: Mon, 4 May 2026 14:47:29 -0400 Subject: [PATCH 1/2] Add interrupted tool result recovery hook to web-security --- capabilities/web-security/capability.yaml | 5 +- .../hooks/interrupted_tool_result.py | 156 +++++++++++ .../test_interrupted_tool_result_hook.py | 259 ++++++++++++++++++ 3 files changed, 419 insertions(+), 1 deletion(-) create mode 100644 capabilities/web-security/hooks/interrupted_tool_result.py create mode 100644 capabilities/web-security/tests/test_interrupted_tool_result_hook.py diff --git a/capabilities/web-security/capability.yaml b/capabilities/web-security/capability.yaml index e695f57..e09c5eb 100644 --- a/capabilities/web-security/capability.yaml +++ b/capabilities/web-security/capability.yaml @@ -1,6 +1,6 @@ schema: 1 name: web-security -version: "1.1.4" +version: "1.1.5" description: > Web application penetration testing with 60+ attack technique playbooks covering request smuggling, cache poisoning, SSRF, SSTI, DOM @@ -11,6 +11,9 @@ description: > vulnerability verification, IP rotation helpers (Flareprox, fireprox), and archive extraction vulnerability crafting with archivealchemist. +hooks: + - hooks/interrupted_tool_result.py + mcp: servers: caido: diff --git a/capabilities/web-security/hooks/interrupted_tool_result.py b/capabilities/web-security/hooks/interrupted_tool_result.py new file mode 100644 index 0000000..58eecd8 --- /dev/null +++ b/capabilities/web-security/hooks/interrupted_tool_result.py @@ -0,0 +1,156 @@ +"""Recover from provider interruption sentinels after tool execution.""" + +from __future__ import annotations + +import asyncio +import re +from dataclasses import dataclass + +from dreadnode.agents.events import AgentEnd, GenerationStep, ToolEnd, ToolError +from dreadnode.agents.reactions import Continue +from dreadnode.core.hook import hook + +_INTERRUPTION_SENTINEL = re.compile( + r"^\[?\s*response interrupted by a tool call result\.\s*\]?$", + re.IGNORECASE, +) +_MAX_RECOVERIES_PER_AGENT = 2 +_MAX_SUMMARY_CHARS = 600 + + +@dataclass(slots=True) +class _ToolOutcome: + tool_name: str + summary: str + + +@dataclass(slots=True) +class _AgentState: + last_tool_outcome: _ToolOutcome | None = None + recoveries: int = 0 + + +_STATE_LOCK = asyncio.Lock() +_AGENT_STATE: dict[str, _AgentState] = {} + + +def _normalize_text(value: object | None) -> str | None: + """Collapse tool output into a short, stable single-line summary.""" + if value is None: + return None + + text = " ".join(str(value).split()).strip() + if not text: + return None + if len(text) <= _MAX_SUMMARY_CHARS: + return text + return f"{text[:_MAX_SUMMARY_CHARS - 3].rstrip()}..." + + +def _extract_assistant_text(event: GenerationStep) -> str | None: + """Return the last assistant text only when it is a plain text turn.""" + if not event.messages: + return None + + last_message = event.messages[-1] + if getattr(last_message, "role", None) != "assistant": + return None + if getattr(last_message, "tool_calls", None): + return None + + return _normalize_text(getattr(last_message, "content", None)) + + +def _is_interruption_sentinel(text: str | None) -> bool: + """Match the provider sentinel exactly to avoid false positives.""" + if text is None: + return False + return _INTERRUPTION_SENTINEL.fullmatch(text) is not None + + +def _tool_end_summary(event: ToolEnd) -> str: + """Describe the last completed tool call for recovery feedback.""" + if event.error: + detail = _normalize_text(event.error) + if detail: + return f"{event.tool_call.name} returned an error: {detail}" + return f"{event.tool_call.name} returned an error." + + detail = _normalize_text(event.result) + if detail: + return f"{event.tool_call.name} returned: {detail}" + return f"{event.tool_call.name} completed without output." + + +def _tool_error_summary(event: ToolError) -> str: + """Describe an uncaught tool exception for recovery feedback.""" + detail = _normalize_text(event.error) + if detail: + return f"{event.tool_call.name} raised an error: {detail}" + return f"{event.tool_call.name} raised an error." + + +def _recovery_feedback(state: _AgentState) -> str: + """Build the corrective prompt appended after the sentinel turn.""" + base = ( + "Your last response was a transport artifact " + "(`[Response interrupted by a tool call result.]`), not a valid assistant turn. " + "Ignore it." + ) + if state.last_tool_outcome is None: + return f"{base} Continue from the current conversation state and take the next best action." + return ( + f"{base} The last tool outcome was: {state.last_tool_outcome.summary} " + "Continue from that result and take the next best action." + ) + + +@hook(ToolEnd) +async def remember_tool_end(event: ToolEnd) -> None: + """Remember the most recent tool completion for later recovery.""" + async with _STATE_LOCK: + state = _AGENT_STATE.setdefault(event.agent_id, _AgentState()) + state.last_tool_outcome = _ToolOutcome( + tool_name=event.tool_call.name, + summary=_tool_end_summary(event), + ) + + +@hook(ToolError) +async def remember_tool_error(event: ToolError) -> None: + """Remember uncaught tool failures for later recovery.""" + async with _STATE_LOCK: + state = _AGENT_STATE.setdefault(event.agent_id, _AgentState()) + state.last_tool_outcome = _ToolOutcome( + tool_name=event.tool_call.name, + summary=_tool_error_summary(event), + ) + + +@hook(GenerationStep) +async def recover_interrupted_tool_result(event: GenerationStep) -> Continue | None: + """Continue the run when the model emits the interruption sentinel.""" + assistant_text = _extract_assistant_text(event) + + async with _STATE_LOCK: + state = _AGENT_STATE.setdefault(event.agent_id, _AgentState()) + + if not _is_interruption_sentinel(assistant_text): + if assistant_text: + state.recoveries = 0 + return None + + if state.recoveries >= _MAX_RECOVERIES_PER_AGENT: + return None + + state.recoveries += 1 + feedback = _recovery_feedback(state) + + return Continue(feedback=feedback) + + +@hook(AgentEnd) +async def clear_recovery_state(event: AgentEnd) -> None: + """Drop per-agent recovery state when the run ends.""" + async with _STATE_LOCK: + _AGENT_STATE.pop(event.agent_id, None) diff --git a/capabilities/web-security/tests/test_interrupted_tool_result_hook.py b/capabilities/web-security/tests/test_interrupted_tool_result_hook.py new file mode 100644 index 0000000..d244136 --- /dev/null +++ b/capabilities/web-security/tests/test_interrupted_tool_result_hook.py @@ -0,0 +1,259 @@ +from __future__ import annotations + +import importlib.util +import sys +import types +from dataclasses import dataclass, field +from pathlib import Path + +import pytest +import yaml + + +def _install_hook_stubs() -> None: + dreadnode = types.ModuleType("dreadnode") + agents = types.ModuleType("dreadnode.agents") + events = types.ModuleType("dreadnode.agents.events") + reactions = types.ModuleType("dreadnode.agents.reactions") + core = types.ModuleType("dreadnode.core") + hook_module = types.ModuleType("dreadnode.core.hook") + + @dataclass + class FunctionCall: + name: str + arguments: str = "{}" + + @dataclass + class ToolCall: + id: str + name: str + function: FunctionCall = field(init=False) + + def __post_init__(self) -> None: + self.function = FunctionCall(name=self.name) + + @dataclass + class Message: + role: str + content: str | None = None + tool_calls: list[object] | None = None + + @dataclass + class AgentEnd: + agent_id: str + + @dataclass + class ToolEnd: + agent_id: str + tool_call: ToolCall + result: str | None = None + error: str | None = None + error_type: str | None = None + + @dataclass + class ToolError: + agent_id: str + tool_call: ToolCall + error: Exception | str + + @dataclass + class GenerationStep: + agent_id: str + messages: list[Message] + step: int = 1 + + @dataclass + class Continue(Exception): + feedback: str | None = None + + class Hook: + def __init__(self, func, event_type) -> None: + self.func = func + self.event_type = event_type + self.__name__ = getattr(func, "__name__", "hook") + + def __call__(self, event): + if not isinstance(event, self.event_type): + return None + return self.func(event) + + def hook(event_type): + def decorator(fn): + return Hook(fn, event_type) + + return decorator + + events.AgentEnd = AgentEnd + events.GenerationStep = GenerationStep + events.ToolCall = ToolCall + events.ToolEnd = ToolEnd + events.ToolError = ToolError + reactions.Continue = Continue + hook_module.Hook = Hook + hook_module.hook = hook + + dreadnode.agents = agents + dreadnode.core = core + agents.events = events + reactions.Message = Message + core.hook = hook_module + + sys.modules["dreadnode"] = dreadnode + sys.modules["dreadnode.agents"] = agents + sys.modules["dreadnode.agents.events"] = events + sys.modules["dreadnode.agents.reactions"] = reactions + sys.modules["dreadnode.core"] = core + sys.modules["dreadnode.core.hook"] = hook_module + + +@pytest.fixture +def hook_module(): + _install_hook_stubs() + + module_path = ( + Path(__file__).resolve().parents[1] / "hooks" / "interrupted_tool_result.py" + ) + module_name = "test_web_security_interrupted_tool_result" + spec = importlib.util.spec_from_file_location(module_name, module_path) + assert spec is not None + assert spec.loader is not None + module = importlib.util.module_from_spec(spec) + sys.modules[module_name] = module + spec.loader.exec_module(module) + return module + + +@pytest.mark.asyncio +async def test_manifest_wires_hook_file() -> None: + manifest_path = Path(__file__).resolve().parents[1] / "capability.yaml" + manifest = yaml.safe_load(manifest_path.read_text(encoding="utf-8")) + + assert manifest["version"] == "1.0.4" + assert manifest["hooks"] == ["hooks/interrupted_tool_result.py"] + + +@pytest.mark.asyncio +async def test_recovers_from_interruption_marker_after_tool_end(hook_module) -> None: + tool_end = sys.modules["dreadnode.agents.events"].ToolEnd( + agent_id="agent-1", + tool_call=sys.modules["dreadnode.agents.events"].ToolCall("tc-1", "bash"), + error="Command failed (1): nmap target", + ) + await hook_module.remember_tool_end(tool_end) + + generation = sys.modules["dreadnode.agents.events"].GenerationStep( + agent_id="agent-1", + messages=[ + sys.modules["dreadnode.agents.reactions"].Message( + role="assistant", + content="[Response interrupted by a tool call result.]", + ) + ], + step=2, + ) + + reaction = await hook_module.recover_interrupted_tool_result(generation) + + assert reaction is not None + assert "transport artifact" in reaction.feedback + assert ( + "bash returned an error: Command failed (1): nmap target" in reaction.feedback + ) + + +@pytest.mark.asyncio +async def test_recovers_from_interruption_marker_after_tool_error(hook_module) -> None: + tool_error = sys.modules["dreadnode.agents.events"].ToolError( + agent_id="agent-2", + tool_call=sys.modules["dreadnode.agents.events"].ToolCall("tc-2", "bash"), + error=RuntimeError("socket hangup"), + ) + await hook_module.remember_tool_error(tool_error) + + generation = sys.modules["dreadnode.agents.events"].GenerationStep( + agent_id="agent-2", + messages=[ + sys.modules["dreadnode.agents.reactions"].Message( + role="assistant", + content="Response interrupted by a tool call result.", + ) + ], + step=3, + ) + + reaction = await hook_module.recover_interrupted_tool_result(generation) + + assert reaction is not None + assert "bash raised an error: socket hangup" in reaction.feedback + + +@pytest.mark.asyncio +async def test_does_not_fire_on_normal_text_or_embedded_phrase(hook_module) -> None: + normal = sys.modules["dreadnode.agents.events"].GenerationStep( + agent_id="agent-3", + messages=[ + sys.modules["dreadnode.agents.reactions"].Message( + role="assistant", + content="I found a login form and will test password reset next.", + ) + ], + step=1, + ) + embedded = sys.modules["dreadnode.agents.events"].GenerationStep( + agent_id="agent-3", + messages=[ + sys.modules["dreadnode.agents.reactions"].Message( + role="assistant", + content="The UI literally showed [Response interrupted by a tool call result.] once.", + ) + ], + step=2, + ) + + assert await hook_module.recover_interrupted_tool_result(normal) is None + assert await hook_module.recover_interrupted_tool_result(embedded) is None + + +@pytest.mark.asyncio +async def test_retry_budget_resets_after_valid_turn_and_state_cleans_up( + hook_module, +) -> None: + tool_end = sys.modules["dreadnode.agents.events"].ToolEnd( + agent_id="agent-4", + tool_call=sys.modules["dreadnode.agents.events"].ToolCall("tc-4", "bash"), + result="80/tcp open http", + ) + await hook_module.remember_tool_end(tool_end) + + sentinel = sys.modules["dreadnode.agents.events"].GenerationStep( + agent_id="agent-4", + messages=[ + sys.modules["dreadnode.agents.reactions"].Message( + role="assistant", + content="[Response interrupted by a tool call result.]", + ) + ], + step=1, + ) + + assert await hook_module.recover_interrupted_tool_result(sentinel) is not None + assert await hook_module.recover_interrupted_tool_result(sentinel) is not None + assert await hook_module.recover_interrupted_tool_result(sentinel) is None + + valid_turn = sys.modules["dreadnode.agents.events"].GenerationStep( + agent_id="agent-4", + messages=[ + sys.modules["dreadnode.agents.reactions"].Message( + role="assistant", + content="Port 80 is open. I will fetch the homepage next.", + ) + ], + step=2, + ) + assert await hook_module.recover_interrupted_tool_result(valid_turn) is None + assert await hook_module.recover_interrupted_tool_result(sentinel) is not None + + await hook_module.clear_recovery_state( + sys.modules["dreadnode.agents.events"].AgentEnd(agent_id="agent-4") + ) + assert "agent-4" not in hook_module._AGENT_STATE From c08572f10186c71e2bcbefb0cf92d49156aaa8f8 Mon Sep 17 00:00:00 2001 From: GangGreenTemperTatum <104169244+GangGreenTemperTatum@users.noreply.github.com> Date: Mon, 4 May 2026 14:51:42 -0400 Subject: [PATCH 2/2] Fix web-security test imports and asyncio marks --- .../web-security/tests/test_bbscope.py | 31 +++++++++--- .../web-security/tests/test_credence.py | 49 ++++++++++++++++--- .../test_interrupted_tool_result_hook.py | 2 +- 3 files changed, 68 insertions(+), 14 deletions(-) diff --git a/capabilities/web-security/tests/test_bbscope.py b/capabilities/web-security/tests/test_bbscope.py index 14a34b1..9132249 100644 --- a/capabilities/web-security/tests/test_bbscope.py +++ b/capabilities/web-security/tests/test_bbscope.py @@ -10,15 +10,13 @@ import httpx import pytest -pytestmark = pytest.mark.asyncio - # Add tools directory to path for import _REPO_ROOT = Path(__file__).resolve() while _REPO_ROOT != _REPO_ROOT.parent: - if (_REPO_ROOT / "dreadnode" / "web-security" / "tools").is_dir(): + if (_REPO_ROOT / "capabilities" / "web-security" / "tools").is_dir(): break _REPO_ROOT = _REPO_ROOT.parent -sys.path.insert(0, str(_REPO_ROOT / "dreadnode" / "web-security" / "tools")) +sys.path.insert(0, str(_REPO_ROOT / "capabilities" / "web-security" / "tools")) from bbscope import BBScope @@ -56,12 +54,21 @@ def test_all_tools_have_catch(self, toolset: BBScope) -> None: class TestFind: + @pytest.mark.asyncio async def test_find_with_results(self, toolset: BBScope) -> None: mock_data = { "query": "example.com", "programs": [ - {"platform": "h1", "handle": "example", "url": "https://hackerone.com/example"}, - {"platform": "bc", "handle": "example-bc", "url": "https://bugcrowd.com/example-bc"}, + { + "platform": "h1", + "handle": "example", + "url": "https://hackerone.com/example", + }, + { + "platform": "bc", + "handle": "example-bc", + "url": "https://bugcrowd.com/example-bc", + }, ], "total_count": 2, } @@ -76,6 +83,7 @@ async def test_find_with_results(self, toolset: BBScope) -> None: assert "example" in result assert "BC" in result + @pytest.mark.asyncio async def test_find_no_results(self, toolset: BBScope) -> None: mock_data = {"query": "nonexistent.invalid", "programs": [], "total_count": 0} with patch.object(toolset, "_get_client") as mock_client: @@ -86,6 +94,7 @@ async def test_find_no_results(self, toolset: BBScope) -> None: result = await toolset.find(query="nonexistent.invalid") assert "No bug bounty programs found" in result + @pytest.mark.asyncio async def test_find_api_error(self, toolset: BBScope) -> None: with patch.object(toolset, "_get_client") as mock_client: client = AsyncMock() @@ -98,6 +107,7 @@ async def test_find_api_error(self, toolset: BBScope) -> None: class TestProgram: + @pytest.mark.asyncio async def test_program_details(self, toolset: BBScope) -> None: mock_data = { "platform": "h1", @@ -119,6 +129,7 @@ async def test_program_details(self, toolset: BBScope) -> None: assert "*.example.com" in result assert "In-scope targets: 5" in result + @pytest.mark.asyncio async def test_program_vdp(self, toolset: BBScope) -> None: mock_data = { "platform": "bc", @@ -138,11 +149,13 @@ async def test_program_vdp(self, toolset: BBScope) -> None: result = await toolset.program(platform="bc", handle="test") assert "VDP" in result + @pytest.mark.asyncio async def test_program_invalid_platform(self, toolset: BBScope) -> None: result = await toolset.program(platform="invalid", handle="test") assert "Error" in result assert "Invalid platform" in result + @pytest.mark.asyncio async def test_program_not_found(self, toolset: BBScope) -> None: with patch.object(toolset, "_get_client") as mock_client: client = AsyncMock() @@ -154,6 +167,7 @@ async def test_program_not_found(self, toolset: BBScope) -> None: class TestTargets: + @pytest.mark.asyncio async def test_targets_wildcards(self, toolset: BBScope) -> None: mock_data = ["*.example.com", "*.test.org"] with patch.object(toolset, "_get_client") as mock_client: @@ -165,16 +179,19 @@ async def test_targets_wildcards(self, toolset: BBScope) -> None: assert "*.example.com" in result assert "2 wildcards" in result + @pytest.mark.asyncio async def test_targets_invalid_type(self, toolset: BBScope) -> None: result = await toolset.targets(target_type="invalid") assert "Error" in result assert "Invalid target_type" in result + @pytest.mark.asyncio async def test_targets_invalid_platform(self, toolset: BBScope) -> None: result = await toolset.targets(target_type="domains", platform="invalid") assert "Error" in result assert "Invalid platform" in result + @pytest.mark.asyncio async def test_targets_with_limit(self, toolset: BBScope) -> None: mock_data = [f"target{i}.com" for i in range(200)] with patch.object(toolset, "_get_client") as mock_client: @@ -188,6 +205,7 @@ async def test_targets_with_limit(self, toolset: BBScope) -> None: class TestUpdates: + @pytest.mark.asyncio async def test_updates_today(self, toolset: BBScope) -> None: mock_data = { "updates": [ @@ -215,6 +233,7 @@ async def test_updates_today(self, toolset: BBScope) -> None: assert "new.example.com" in result assert "added" in result + @pytest.mark.asyncio async def test_updates_no_results(self, toolset: BBScope) -> None: mock_data = {"updates": [], "total_count": 0} with patch.object(toolset, "_get_client") as mock_client: diff --git a/capabilities/web-security/tests/test_credence.py b/capabilities/web-security/tests/test_credence.py index bd997a0..5c23dd6 100644 --- a/capabilities/web-security/tests/test_credence.py +++ b/capabilities/web-security/tests/test_credence.py @@ -9,8 +9,6 @@ import pytest -pytestmark = pytest.mark.asyncio - # Add tools directory to path for import _REPO_ROOT = Path(__file__).resolve() while _REPO_ROOT != _REPO_ROOT.parent: @@ -64,6 +62,7 @@ def test_schema_does_not_accept_trace_id(self, toolset: CredenceTool) -> None: class TestHighConfidence: + @pytest.mark.asyncio async def test_high_with_poc_confirmed(self, toolset: CredenceTool) -> None: result = await toolset.assess_confidence( claim="SQLi in /api/users?id=1' OR 1=1--", @@ -73,6 +72,7 @@ async def test_high_with_poc_confirmed(self, toolset: CredenceTool) -> None: assert "CONFIRMED" in result extract_trace_id(result) + @pytest.mark.asyncio async def test_high_with_response_verified(self, toolset: CredenceTool) -> None: result = await toolset.assess_confidence( claim="XSS reflected unencoded in search param", @@ -81,6 +81,7 @@ async def test_high_with_response_verified(self, toolset: CredenceTool) -> None: ) assert "CONFIRMED" in result + @pytest.mark.asyncio async def test_high_with_data_flow_traced(self, toolset: CredenceTool) -> None: result = await toolset.assess_confidence( claim="user input reaches innerHTML in app.js:456", @@ -89,6 +90,7 @@ async def test_high_with_data_flow_traced(self, toolset: CredenceTool) -> None: ) assert "CONFIRMED" in result + @pytest.mark.asyncio async def test_high_with_pattern_only_is_overconfident( self, toolset: CredenceTool ) -> None: @@ -100,6 +102,7 @@ async def test_high_with_pattern_only_is_overconfident( assert "OVERCONFIDENT" in result assert "lead/gadget" in result.lower() + @pytest.mark.asyncio async def test_high_with_scanner_output_is_overconfident( self, toolset: CredenceTool ) -> None: @@ -110,6 +113,7 @@ async def test_high_with_scanner_output_is_overconfident( ) assert "OVERCONFIDENT" in result + @pytest.mark.asyncio async def test_high_with_assumed_is_overconfident( self, toolset: CredenceTool ) -> None: @@ -120,6 +124,7 @@ async def test_high_with_assumed_is_overconfident( ) assert "OVERCONFIDENT" in result + @pytest.mark.asyncio async def test_high_with_behavior_observed_is_overconfident( self, toolset: CredenceTool ) -> None: @@ -130,6 +135,7 @@ async def test_high_with_behavior_observed_is_overconfident( ) assert "OVERCONFIDENT" in result + @pytest.mark.asyncio async def test_high_with_code_pattern_is_overconfident( self, toolset: CredenceTool ) -> None: @@ -142,6 +148,7 @@ async def test_high_with_code_pattern_is_overconfident( class TestMediumConfidence: + @pytest.mark.asyncio async def test_medium_with_weak_evidence(self, toolset: CredenceTool) -> None: result = await toolset.assess_confidence( claim="possible IDOR on /api/orders/{id}", @@ -151,6 +158,7 @@ async def test_medium_with_weak_evidence(self, toolset: CredenceTool) -> None: assert "UNCONFIRMED LEAD" in result assert "report" not in result.lower() or "do not" in result.lower() + @pytest.mark.asyncio async def test_medium_with_behavior_observed(self, toolset: CredenceTool) -> None: result = await toolset.assess_confidence( claim="different response length for admin vs user", @@ -159,6 +167,7 @@ async def test_medium_with_behavior_observed(self, toolset: CredenceTool) -> Non ) assert "UNCONFIRMED LEAD" in result + @pytest.mark.asyncio async def test_medium_with_strong_evidence_suggests_upgrade( self, toolset: CredenceTool ) -> None: @@ -169,6 +178,7 @@ async def test_medium_with_strong_evidence_suggests_upgrade( ) assert "UPGRADE" in result + @pytest.mark.asyncio async def test_medium_with_response_verified_suggests_upgrade( self, toolset: CredenceTool ) -> None: @@ -181,6 +191,7 @@ async def test_medium_with_response_verified_suggests_upgrade( class TestLowConfidence: + @pytest.mark.asyncio async def test_low_confidence(self, toolset: CredenceTool) -> None: result = await toolset.assess_confidence( claim="might have command injection somewhere", @@ -190,6 +201,7 @@ async def test_low_confidence(self, toolset: CredenceTool) -> None: assert "INSUFFICIENT" in result assert "gadget" in result.lower() + @pytest.mark.asyncio async def test_uncertain_confidence(self, toolset: CredenceTool) -> None: result = await toolset.assess_confidence( claim="not sure what this endpoint does", @@ -198,6 +210,7 @@ async def test_uncertain_confidence(self, toolset: CredenceTool) -> None: ) assert "INSUFFICIENT" in result + @pytest.mark.asyncio async def test_low_with_strong_evidence_still_insufficient( self, toolset: CredenceTool ) -> None: @@ -211,6 +224,7 @@ async def test_low_with_strong_evidence_still_insufficient( class TestAgentString: + @pytest.mark.asyncio async def test_agent_string_in_output(self, toolset: CredenceTool) -> None: result = await toolset.assess_confidence( claim="XSS confirmed", @@ -221,6 +235,7 @@ async def test_agent_string_in_output(self, toolset: CredenceTool) -> None: assert result.startswith("[agent-opus] ") assert "CONFIRMED" in result + @pytest.mark.asyncio async def test_different_agent_strings(self, toolset: CredenceTool) -> None: for agent in ("dn-agent-kimi", "agent-codex", "agent-opus"): result = await toolset.assess_confidence( @@ -231,6 +246,7 @@ async def test_different_agent_strings(self, toolset: CredenceTool) -> None: ) assert result.startswith(f"[{agent}] ") + @pytest.mark.asyncio async def test_default_agent_string(self, toolset: CredenceTool) -> None: result = await toolset.assess_confidence( claim="test claim", @@ -239,6 +255,7 @@ async def test_default_agent_string(self, toolset: CredenceTool) -> None: ) assert result.startswith("[unknown] ") + @pytest.mark.asyncio async def test_agent_string_in_schema(self, toolset: CredenceTool) -> None: tool = toolset.get_tools()[0] props = tool.parameters_schema.get("properties", {}) @@ -246,6 +263,7 @@ async def test_agent_string_in_schema(self, toolset: CredenceTool) -> None: class TestTraceId: + @pytest.mark.asyncio async def test_trace_id_is_generated_for_each_assessment( self, toolset: CredenceTool ) -> None: @@ -264,6 +282,7 @@ async def test_trace_id_is_generated_for_each_assessment( class TestCvssScore: + @pytest.mark.asyncio async def test_cvss_tag_in_output(self, toolset: CredenceTool) -> None: result = await toolset.assess_confidence( claim="IDOR on /api/users/{id}", @@ -274,37 +293,52 @@ async def test_cvss_tag_in_output(self, toolset: CredenceTool) -> None: assert "[cvss:7.5]" in result assert "CONFIRMED" in result + @pytest.mark.asyncio async def test_no_cvss_tag_when_omitted(self, toolset: CredenceTool) -> None: result = await toolset.assess_confidence( - claim="test", confidence="high", evidence_basis="poc_confirmed", + claim="test", + confidence="high", + evidence_basis="poc_confirmed", ) assert "[cvss:" not in result + @pytest.mark.asyncio async def test_low_confidence_high_cvss_warns(self, toolset: CredenceTool) -> None: result = await toolset.assess_confidence( - claim="maybe RCE", confidence="low", evidence_basis="assumed", + claim="maybe RCE", + confidence="low", + evidence_basis="assumed", cvss_score=9.8, ) assert "CVSS WARNING" in result assert "inflated" in result - async def test_high_confidence_critical_cvss_warns(self, toolset: CredenceTool) -> None: + @pytest.mark.asyncio + async def test_high_confidence_critical_cvss_warns( + self, toolset: CredenceTool + ) -> None: result = await toolset.assess_confidence( - claim="full RCE", confidence="high", evidence_basis="poc_confirmed", + claim="full RCE", + confidence="high", + evidence_basis="poc_confirmed", cvss_score=9.8, ) assert "CVSS WARNING" in result assert "Critical" in result + @pytest.mark.asyncio async def test_matching_cvss_no_warning(self, toolset: CredenceTool) -> None: result = await toolset.assess_confidence( - claim="info disclosure", confidence="high", evidence_basis="poc_confirmed", + claim="info disclosure", + confidence="high", + evidence_basis="poc_confirmed", cvss_score=4.3, ) assert "CVSS WARNING" not in result class TestHandleToolCall: + @pytest.mark.asyncio async def test_via_handle_tool_call(self, toolset: CredenceTool) -> None: from dreadnode.agents.tools import FunctionCall, ToolCall @@ -320,6 +354,7 @@ async def test_via_handle_tool_call(self, toolset: CredenceTool) -> None: assert stop is False assert "CONFIRMED" in message.content + @pytest.mark.asyncio async def test_overconfident_via_handle_tool_call( self, toolset: CredenceTool ) -> None: diff --git a/capabilities/web-security/tests/test_interrupted_tool_result_hook.py b/capabilities/web-security/tests/test_interrupted_tool_result_hook.py index d244136..82cad39 100644 --- a/capabilities/web-security/tests/test_interrupted_tool_result_hook.py +++ b/capabilities/web-security/tests/test_interrupted_tool_result_hook.py @@ -128,7 +128,7 @@ async def test_manifest_wires_hook_file() -> None: manifest_path = Path(__file__).resolve().parents[1] / "capability.yaml" manifest = yaml.safe_load(manifest_path.read_text(encoding="utf-8")) - assert manifest["version"] == "1.0.4" + assert manifest["version"] == "1.1.5" assert manifest["hooks"] == ["hooks/interrupted_tool_result.py"]