From 94c262dcdf2cfb2589c6b89630a07632c25d5beb Mon Sep 17 00:00:00 2001 From: Samuel Hassine Date: Sat, 27 Jun 2026 00:05:31 +0200 Subject: [PATCH 1/8] feat(ai): add pyoaev support for AI adversarial exposure validation (#295) Add AI request marker / target endpoint signature types, a shared deterministic per-inject canary marker helper, ai_expectations_for_source to poll agentless detection/prevention expectations, and an AiTargetManager for AI Target assets. --- pyoaev/apis/__init__.py | 1 + pyoaev/apis/ai_target.py | 30 +++++++++++++++++++ .../inject_expectation/inject_expectation.py | 15 ++++++++++ pyoaev/client.py | 1 + pyoaev/signatures/ai_marker.py | 15 ++++++++++ pyoaev/signatures/types.py | 4 +++ 6 files changed, 66 insertions(+) create mode 100644 pyoaev/apis/ai_target.py create mode 100644 pyoaev/signatures/ai_marker.py diff --git a/pyoaev/apis/__init__.py b/pyoaev/apis/__init__.py index b21ef4a..67bea1d 100644 --- a/pyoaev/apis/__init__.py +++ b/pyoaev/apis/__init__.py @@ -1,3 +1,4 @@ +from .ai_target import * # noqa: F401,F403 from .attack_pattern import * # noqa: F401,F403 from .collector import * # noqa: F401,F403 from .cve import * # noqa: F401,F403 diff --git a/pyoaev/apis/ai_target.py b/pyoaev/apis/ai_target.py new file mode 100644 index 0000000..fb86676 --- /dev/null +++ b/pyoaev/apis/ai_target.py @@ -0,0 +1,30 @@ +from pyoaev.base import RESTManager, RESTObject +from pyoaev.mixins import CreateMixin, DeleteMixin, GetMixin, ListMixin, UpdateMixin +from pyoaev.utils import RequiredOptional + + +class AiTarget(RESTObject): + _id_attr = "asset_id" + + +class AiTargetManager( + GetMixin, ListMixin, CreateMixin, UpdateMixin, DeleteMixin, RESTManager +): + """Manage AI Target assets (LLM endpoints / AI agents under adversarial test).""" + + _path = "/ai_targets" + _obj_cls = AiTarget + _create_attrs = RequiredOptional( + required=("asset_name", "ai_target_provider"), + optional=( + "asset_description", + "asset_tags", + "asset_external_reference", + "ai_target_endpoint", + "ai_target_model", + "ai_target_modality", + "ai_target_system_prompt", + "ai_target_api_key_variable", + "ai_target_configuration", + ), + ) diff --git a/pyoaev/apis/inject_expectation/inject_expectation.py b/pyoaev/apis/inject_expectation/inject_expectation.py index a669502..c69db7d 100644 --- a/pyoaev/apis/inject_expectation/inject_expectation.py +++ b/pyoaev/apis/inject_expectation/inject_expectation.py @@ -40,6 +40,21 @@ def expectations_assets_for_source( ) return result + @exc.on_http_error(exc.OpenAEVUpdateError) + def ai_expectations_for_source( + self, source_id: str, **kwargs: Any + ) -> Dict[str, Any]: + """Returns agentless DETECTION/PREVENTION expectations (AI adversarial injects) not yet + filled for the given source. Used by AI defense collectors (LLM firewall / guardrail). + + :param source_id: the identifier of the collector requesting expectations + :type source_id: str + :return: a list of agentless detection/prevention expectation dicts + """ + path = f"{self.path}/ai/" + source_id + result = self.openaev.http_get(path, **kwargs) + return result + def expectations_models_for_source(self, source_id: str, **kwargs: Any): """Returns all expectations from OpenAEV that have had no result yet from the source_id (e.g. collector). diff --git a/pyoaev/client.py b/pyoaev/client.py index e478d43..f0f295f 100644 --- a/pyoaev/client.py +++ b/pyoaev/client.py @@ -74,6 +74,7 @@ def __init__( self.inject_expectation = apis.InjectExpectationManager(self) self.payload = apis.PayloadManager(self) self.security_platform = apis.SecurityPlatformManager(self) + self.ai_target = apis.AiTargetManager(self) self.inject_expectation_trace = apis.InjectExpectationTraceManager(self) self.signature = apis.SignatureApiManager(self) self.tag = apis.TagManager(self) diff --git a/pyoaev/signatures/ai_marker.py b/pyoaev/signatures/ai_marker.py new file mode 100644 index 0000000..5116506 --- /dev/null +++ b/pyoaev/signatures/ai_marker.py @@ -0,0 +1,15 @@ +"""Deterministic per-inject canary marker shared by the AI red-team injector and the AI defense +collectors. + +The marker is derived purely from the inject (and optional agent) id, so the injector that sends the +attack and the collector that validates an AI defense response compute the same value independently, +without the platform having to store it. It is emitted by the injector (request header + in-prompt +token) and matched by collectors against guardrail / firewall logs. +""" + +import hashlib + + +def build_marker(inject_id: str, agent_id: str = "") -> str: + seed = f"{inject_id}:{agent_id}".encode("utf-8") + return "oaev" + hashlib.sha256(seed).hexdigest()[:16] diff --git a/pyoaev/signatures/types.py b/pyoaev/signatures/types.py index 0c4f78b..ff92b1c 100644 --- a/pyoaev/signatures/types.py +++ b/pyoaev/signatures/types.py @@ -37,3 +37,7 @@ class SignatureTypes(str, Enum): SIG_TYPE_CLOUD_REGION = "cloud_region" SIG_TYPE_TARGET_SERVICE = "target_service" SIG_TYPE_QUERY = "query" + # AI adversarial validation: correlate AI defense (LLM firewall / guardrail) events back to a + # specific AI inject execution. + SIG_TYPE_AI_REQUEST_MARKER = "ai_request_marker" + SIG_TYPE_AI_TARGET_ENDPOINT = "ai_target_endpoint" From 3202c2126d7ad738ca2a02bd524b159803ebe9dc Mon Sep 17 00:00:00 2001 From: Samuel Hassine Date: Sat, 27 Jun 2026 11:38:10 +0200 Subject: [PATCH 2/8] feat(ai): add Artificial Intelligence security domain (#295) Adds ARTIFICIAL_INTELLIGENCE to SecurityDomains so AI red-team contracts can be bucketed under the AI security domain. --- pyoaev/security_domain/types.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pyoaev/security_domain/types.py b/pyoaev/security_domain/types.py index d13aeb4..c48e48c 100644 --- a/pyoaev/security_domain/types.py +++ b/pyoaev/security_domain/types.py @@ -13,4 +13,8 @@ class SecurityDomains(Enum): URL_FILTERING = {"domain_name": "URL Filtering", "domain_color": "#66CCFF"} CLOUD = {"domain_name": "Cloud", "domain_color": "#9999CC"} TABLE_TOP = {"domain_name": "Tabletop", "domain_color": "#FFCC33"} + ARTIFICIAL_INTELLIGENCE = { + "domain_name": "Artificial Intelligence", + "domain_color": "#7C4DFF", + } TOCLASSIFY = {"domain_name": "To classify", "domain_color": "#FFFFFF"} From 8ff0bf3ef4fa1d834274318c45114a7fb5585ff9 Mon Sep 17 00:00:00 2001 From: Samuel Hassine Date: Sat, 27 Jun 2026 12:38:19 +0200 Subject: [PATCH 3/8] test(ai): cover AI SDK primitives and fix expectations return type (#295) Add unit tests for the new AI SDK building blocks and tighten a type annotation flagged in review: - build_marker: lock the prefix, length, determinism and exact value so the injector and collectors stay byte-for-byte compatible. - AiTargetManager: validate request construction (method/path/payload) for create/get/update/delete against /ai_targets. - SignatureTypes: cover the new ai_request_marker / ai_target_endpoint values and confirm they are usable by SignatureType. - inject_expectation.ai_expectations_for_source: return List[Dict] to match the docstring (the endpoint returns a collection). --- .../inject_expectation/inject_expectation.py | 5 +- test/apis/ai_target/test_ai_target.py | 77 +++++++++++++++++++ test/signatures/test_ai_marker.py | 41 ++++++++++ test/signatures/test_ai_signature_types.py | 29 +++++++ 4 files changed, 150 insertions(+), 2 deletions(-) create mode 100644 test/apis/ai_target/test_ai_target.py create mode 100644 test/signatures/test_ai_marker.py create mode 100644 test/signatures/test_ai_signature_types.py diff --git a/pyoaev/apis/inject_expectation/inject_expectation.py b/pyoaev/apis/inject_expectation/inject_expectation.py index c69db7d..c17866d 100644 --- a/pyoaev/apis/inject_expectation/inject_expectation.py +++ b/pyoaev/apis/inject_expectation/inject_expectation.py @@ -1,4 +1,4 @@ -from typing import Any, Dict +from typing import Any, Dict, List from pyoaev import exceptions as exc from pyoaev.apis.inject_expectation.model import ( @@ -43,13 +43,14 @@ def expectations_assets_for_source( @exc.on_http_error(exc.OpenAEVUpdateError) def ai_expectations_for_source( self, source_id: str, **kwargs: Any - ) -> Dict[str, Any]: + ) -> List[Dict[str, Any]]: """Returns agentless DETECTION/PREVENTION expectations (AI adversarial injects) not yet filled for the given source. Used by AI defense collectors (LLM firewall / guardrail). :param source_id: the identifier of the collector requesting expectations :type source_id: str :return: a list of agentless detection/prevention expectation dicts + :rtype: list[dict] """ path = f"{self.path}/ai/" + source_id result = self.openaev.http_get(path, **kwargs) diff --git a/test/apis/ai_target/test_ai_target.py b/test/apis/ai_target/test_ai_target.py new file mode 100644 index 0000000..de62768 --- /dev/null +++ b/test/apis/ai_target/test_ai_target.py @@ -0,0 +1,77 @@ +from unittest import TestCase, main, mock + + +def mock_response(*args, **kwargs): + class MockResponse: + def __init__(self): + self.status_code = 200 + self.history = None + self.content = None + self.headers = {"Content-Type": "application/json"} + + def json(self): + return {} + + return MockResponse() + + +class TestAiTargetManager(TestCase): + @mock.patch("requests.Session.request", side_effect=mock_response) + def test_create_posts_to_ai_targets(self, mock_request): + from pyoaev import OpenAEV + + api_client = OpenAEV("url", "token") + data = { + "asset_name": "OpenAI guardrail", + "ai_target_provider": "openai", + "ai_target_endpoint": "https://api.openai.com/v1", + } + + api_client.ai_target.create(data=data) + + _, kwargs = mock_request.call_args + self.assertEqual(kwargs["method"], "post") + self.assertEqual(kwargs["url"], "url/api/ai_targets") + self.assertEqual(kwargs["json"], data) + + @mock.patch("requests.Session.request", side_effect=mock_response) + def test_get_requests_single_ai_target(self, mock_request): + from pyoaev import OpenAEV + + api_client = OpenAEV("url", "token") + + api_client.ai_target.get("asset-123") + + _, kwargs = mock_request.call_args + self.assertEqual(kwargs["method"], "get") + self.assertEqual(kwargs["url"], "url/api/ai_targets/asset-123") + + @mock.patch("requests.Session.request", side_effect=mock_response) + def test_update_puts_to_ai_target(self, mock_request): + from pyoaev import OpenAEV + + api_client = OpenAEV("url", "token") + new_data = {"asset_description": "updated"} + + api_client.ai_target.update("asset-123", new_data=new_data) + + _, kwargs = mock_request.call_args + self.assertEqual(kwargs["method"], "put") + self.assertEqual(kwargs["url"], "url/api/ai_targets/asset-123") + self.assertEqual(kwargs["json"], new_data) + + @mock.patch("requests.Session.request", side_effect=mock_response) + def test_delete_calls_delete_on_ai_target(self, mock_request): + from pyoaev import OpenAEV + + api_client = OpenAEV("url", "token") + + api_client.ai_target.delete("asset-123") + + _, kwargs = mock_request.call_args + self.assertEqual(kwargs["method"], "delete") + self.assertEqual(kwargs["url"], "url/api/ai_targets/asset-123") + + +if __name__ == "__main__": + main() diff --git a/test/signatures/test_ai_marker.py b/test/signatures/test_ai_marker.py new file mode 100644 index 0000000..9d7151c --- /dev/null +++ b/test/signatures/test_ai_marker.py @@ -0,0 +1,41 @@ +import unittest + +from pyoaev.signatures.ai_marker import build_marker + + +class TestBuildMarker(unittest.TestCase): + def test_marker_has_expected_prefix_and_length(self): + marker = build_marker("inject-1", "agent-1") + + self.assertTrue(marker.startswith("oaev")) + # "oaev" prefix (4 chars) + 16 hex chars from the sha256 digest. + self.assertEqual(len(marker), 20) + self.assertTrue(all(c in "0123456789abcdef" for c in marker[4:])) + + def test_marker_is_deterministic_for_same_inputs(self): + self.assertEqual( + build_marker("inject-1", "agent-1"), + build_marker("inject-1", "agent-1"), + ) + + def test_marker_differs_for_different_inputs(self): + self.assertNotEqual( + build_marker("inject-1", "agent-1"), + build_marker("inject-2", "agent-1"), + ) + self.assertNotEqual( + build_marker("inject-1", "agent-1"), + build_marker("inject-1", "agent-2"), + ) + + def test_agent_id_defaults_to_empty(self): + self.assertEqual(build_marker("inject-1"), build_marker("inject-1", "")) + + def test_marker_value_is_stable_across_runs(self): + # Lock the exact value so the injector and collectors (potentially in + # other languages) stay byte-for-byte compatible. + self.assertEqual(build_marker("inject-1", "agent-1"), "oaev6457d87cba0698ab") + + +if __name__ == "__main__": + unittest.main() diff --git a/test/signatures/test_ai_signature_types.py b/test/signatures/test_ai_signature_types.py new file mode 100644 index 0000000..0253362 --- /dev/null +++ b/test/signatures/test_ai_signature_types.py @@ -0,0 +1,29 @@ +import unittest + +from pyoaev.signatures.signature_type import SignatureType +from pyoaev.signatures.types import MatchTypes, SignatureTypes + + +class TestAiSignatureTypes(unittest.TestCase): + def test_ai_signature_type_values(self): + self.assertEqual( + SignatureTypes.SIG_TYPE_AI_REQUEST_MARKER.value, "ai_request_marker" + ) + self.assertEqual( + SignatureTypes.SIG_TYPE_AI_TARGET_ENDPOINT.value, "ai_target_endpoint" + ) + + def test_ai_request_marker_usable_in_signature_type(self): + signature_type = SignatureType( + label=SignatureTypes.SIG_TYPE_AI_REQUEST_MARKER, + match_type=MatchTypes.MATCH_TYPE_SIMPLE, + ) + + struct = signature_type.make_struct_for_matching(data="oaevdeadbeef") + + self.assertEqual(struct.get("type"), MatchTypes.MATCH_TYPE_SIMPLE.value) + self.assertEqual(struct.get("data"), "oaevdeadbeef") + + +if __name__ == "__main__": + unittest.main() From 86147e6fc2b5cb35cb8f427f7496c0feadbc2288 Mon Sep 17 00:00:00 2001 From: Samuel Hassine Date: Sat, 27 Jun 2026 12:41:45 +0200 Subject: [PATCH 4/8] refactor(ai): build AI expectations path with a single f-string (#295) --- pyoaev/apis/inject_expectation/inject_expectation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyoaev/apis/inject_expectation/inject_expectation.py b/pyoaev/apis/inject_expectation/inject_expectation.py index c17866d..6711f2c 100644 --- a/pyoaev/apis/inject_expectation/inject_expectation.py +++ b/pyoaev/apis/inject_expectation/inject_expectation.py @@ -52,7 +52,7 @@ def ai_expectations_for_source( :return: a list of agentless detection/prevention expectation dicts :rtype: list[dict] """ - path = f"{self.path}/ai/" + source_id + path = f"{self.path}/ai/{source_id}" result = self.openaev.http_get(path, **kwargs) return result From bf6871f12aed296f6bda4166d346dabf25e698af Mon Sep 17 00:00:00 2001 From: Samuel Hassine Date: Sat, 27 Jun 2026 20:19:59 +0200 Subject: [PATCH 5/8] fix(ai): validate ai_expectations_for_source returns a list (#295) http_get is typed as Union[Dict, requests.Response] and can return a non-list JSON shape. Guard the response in ai_expectations_for_source: raise OpenAEVParsingError when the payload is not a JSON list so the documented List[Dict] contract holds instead of silently returning the wrong shape. Add manager-level tests for the success and parsing-error paths. --- .../inject_expectation/inject_expectation.py | 8 +++ .../test_inject_expectation.py | 49 +++++++++++++++++++ 2 files changed, 57 insertions(+) create mode 100644 test/apis/inject_expectation/test_inject_expectation.py diff --git a/pyoaev/apis/inject_expectation/inject_expectation.py b/pyoaev/apis/inject_expectation/inject_expectation.py index 6711f2c..22f3788 100644 --- a/pyoaev/apis/inject_expectation/inject_expectation.py +++ b/pyoaev/apis/inject_expectation/inject_expectation.py @@ -49,11 +49,19 @@ def ai_expectations_for_source( :param source_id: the identifier of the collector requesting expectations :type source_id: str + :raises OpenAEVParsingError: if the server does not return a JSON list :return: a list of agentless detection/prevention expectation dicts :rtype: list[dict] """ path = f"{self.path}/ai/{source_id}" result = self.openaev.http_get(path, **kwargs) + if not isinstance(result, list): + raise exc.OpenAEVParsingError( + error_message=( + f"Expected a list of AI expectations from {path}, " + f"got {type(result).__name__}" + ) + ) return result def expectations_models_for_source(self, source_id: str, **kwargs: Any): diff --git a/test/apis/inject_expectation/test_inject_expectation.py b/test/apis/inject_expectation/test_inject_expectation.py new file mode 100644 index 0000000..c2fcae1 --- /dev/null +++ b/test/apis/inject_expectation/test_inject_expectation.py @@ -0,0 +1,49 @@ +from unittest import TestCase, main, mock + +from pyoaev import OpenAEV +from pyoaev.exceptions import OpenAEVParsingError + + +def make_json_response(payload): + class MockResponse: + def __init__(self): + self.status_code = 200 + self.history = None + self.content = None + self.headers = {"Content-Type": "application/json"} + + def json(self): + return payload + + return MockResponse() + + +class TestAiExpectationsForSource(TestCase): + @mock.patch("requests.Session.request") + def test_returns_list_of_expectations(self, mock_request): + expectations = [ + {"inject_expectation_id": "exp-1"}, + {"inject_expectation_id": "exp-2"}, + ] + mock_request.return_value = make_json_response(expectations) + api_client = OpenAEV("url", "token") + + result = api_client.inject_expectation.ai_expectations_for_source("collector-1") + + mock_request.assert_called_once() + _, kwargs = mock_request.call_args + self.assertEqual(kwargs["method"], "get") + self.assertEqual(kwargs["url"], "url/api/injects/expectations/ai/collector-1") + self.assertEqual(result, expectations) + + @mock.patch("requests.Session.request") + def test_raises_parsing_error_when_not_a_list(self, mock_request): + mock_request.return_value = make_json_response({"unexpected": "shape"}) + api_client = OpenAEV("url", "token") + + with self.assertRaises(OpenAEVParsingError): + api_client.inject_expectation.ai_expectations_for_source("collector-1") + + +if __name__ == "__main__": + main() From d19cd0bd0a794edf7dae3672e796d83686958d32 Mon Sep 17 00:00:00 2001 From: Samuel Hassine Date: Sat, 27 Jun 2026 21:13:19 +0200 Subject: [PATCH 6/8] fix(ai): sound typing for AI expectations and dedicated delete error (#295) Address the follow-up Copilot review: - ai_expectations_for_source: widen the http_get result to Any before the list validation so the isinstance check is not treated as unreachable by static type checkers, and build the returned List[Dict] explicitly to satisfy the declared return type. - mixins.DeleteMixin: stop mapping HTTP delete failures to OpenAEVCreateError. Introduce a dedicated OpenAEVDeleteError and raise it from delete(), so delete failures are distinguishable from create failures for every manager using the mixin (AiTargetManager, InjectorContractManager). Add a test that locks in the mapping. --- pyoaev/apis/inject_expectation/inject_expectation.py | 11 +++++++---- pyoaev/exceptions.py | 6 ++++++ pyoaev/mixins.py | 2 +- test/apis/ai_target/test_ai_target.py | 12 ++++++++++++ 4 files changed, 26 insertions(+), 5 deletions(-) diff --git a/pyoaev/apis/inject_expectation/inject_expectation.py b/pyoaev/apis/inject_expectation/inject_expectation.py index 22f3788..9033c5e 100644 --- a/pyoaev/apis/inject_expectation/inject_expectation.py +++ b/pyoaev/apis/inject_expectation/inject_expectation.py @@ -54,15 +54,18 @@ def ai_expectations_for_source( :rtype: list[dict] """ path = f"{self.path}/ai/{source_id}" - result = self.openaev.http_get(path, **kwargs) - if not isinstance(result, list): + # http_get is annotated Union[Dict, requests.Response]; widen to Any so the list + # validation below stays meaningful to type checkers, then build the result list + # explicitly to satisfy the declared return type. + raw: Any = self.openaev.http_get(path, **kwargs) + if not isinstance(raw, list): raise exc.OpenAEVParsingError( error_message=( f"Expected a list of AI expectations from {path}, " - f"got {type(result).__name__}" + f"got {type(raw).__name__}" ) ) - return result + return [item for item in raw] def expectations_models_for_source(self, source_id: str, **kwargs: Any): """Returns all expectations from OpenAEV that have had no result yet diff --git a/pyoaev/exceptions.py b/pyoaev/exceptions.py index 046405a..cca8e84 100644 --- a/pyoaev/exceptions.py +++ b/pyoaev/exceptions.py @@ -180,6 +180,10 @@ class OpenAEVCreateError(OpenAEVError): pass +class OpenAEVDeleteError(OpenAEVError): + pass + + class SignatureTransmissionError(OpenAEVError): """Signatures didn't make it. Validation rejected them, 4xx slammed the door, or retries ran dry.""" @@ -222,5 +226,7 @@ def wrapped_f(*args: Any, **kwargs: Any) -> Any: "OpenAEVListError", "OpenAEVGetError", "OpenAEVUpdateError", + "OpenAEVCreateError", + "OpenAEVDeleteError", "SignatureTransmissionError", ] diff --git a/pyoaev/mixins.py b/pyoaev/mixins.py index daaef47..6309e31 100644 --- a/pyoaev/mixins.py +++ b/pyoaev/mixins.py @@ -227,7 +227,7 @@ class DeleteMixin(_RestManagerBase): _path: Optional[str] openaev: pyoaev.OpenAEV - @exc.on_http_error(exc.OpenAEVCreateError) + @exc.on_http_error(exc.OpenAEVDeleteError) def delete( self, id: Optional[Union[str, int]] = None, **kwargs: Any ) -> requests.Response: diff --git a/test/apis/ai_target/test_ai_target.py b/test/apis/ai_target/test_ai_target.py index de62768..61ffceb 100644 --- a/test/apis/ai_target/test_ai_target.py +++ b/test/apis/ai_target/test_ai_target.py @@ -72,6 +72,18 @@ def test_delete_calls_delete_on_ai_target(self, mock_request): self.assertEqual(kwargs["method"], "delete") self.assertEqual(kwargs["url"], "url/api/ai_targets/asset-123") + def test_delete_http_error_is_mapped_to_delete_error(self): + from pyoaev import OpenAEV + from pyoaev.exceptions import OpenAEVDeleteError, OpenAEVHttpError + + api_client = OpenAEV("url", "token") + + with mock.patch.object( + OpenAEV, "http_delete", side_effect=OpenAEVHttpError("boom") + ): + with self.assertRaises(OpenAEVDeleteError): + api_client.ai_target.delete("asset-123") + if __name__ == "__main__": main() From 48cab94367c772fa1f745eb3d1c24c1df41574ec Mon Sep 17 00:00:00 2001 From: Samuel Hassine Date: Sat, 27 Jun 2026 21:24:30 +0200 Subject: [PATCH 7/8] fix(ai): align AI expectations error type and validate element shape (#295) Address the follow-up Copilot review on ai_expectations_for_source: - Map HTTP failures to OpenAEVListError instead of OpenAEVUpdateError: the method is a GET that returns a collection, so this matches the SDK convention used by ListMixin.list() and gives consumers an operation-appropriate error type. - Validate element shape: raise OpenAEVParsingError when the response is not a list of dicts (previously only the top-level list was checked), so the declared List[Dict[str, Any]] contract holds. - Add tests for the non-dict-elements and HTTP-error -> OpenAEVListError paths. --- .../inject_expectation/inject_expectation.py | 8 ++++---- .../test_inject_expectation.py | 19 ++++++++++++++++++- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/pyoaev/apis/inject_expectation/inject_expectation.py b/pyoaev/apis/inject_expectation/inject_expectation.py index 9033c5e..f528092 100644 --- a/pyoaev/apis/inject_expectation/inject_expectation.py +++ b/pyoaev/apis/inject_expectation/inject_expectation.py @@ -40,7 +40,7 @@ def expectations_assets_for_source( ) return result - @exc.on_http_error(exc.OpenAEVUpdateError) + @exc.on_http_error(exc.OpenAEVListError) def ai_expectations_for_source( self, source_id: str, **kwargs: Any ) -> List[Dict[str, Any]]: @@ -49,7 +49,7 @@ def ai_expectations_for_source( :param source_id: the identifier of the collector requesting expectations :type source_id: str - :raises OpenAEVParsingError: if the server does not return a JSON list + :raises OpenAEVParsingError: if the server does not return a JSON list of dicts :return: a list of agentless detection/prevention expectation dicts :rtype: list[dict] """ @@ -58,10 +58,10 @@ def ai_expectations_for_source( # validation below stays meaningful to type checkers, then build the result list # explicitly to satisfy the declared return type. raw: Any = self.openaev.http_get(path, **kwargs) - if not isinstance(raw, list): + if not isinstance(raw, list) or not all(isinstance(item, dict) for item in raw): raise exc.OpenAEVParsingError( error_message=( - f"Expected a list of AI expectations from {path}, " + f"Expected a list of AI expectation objects from {path}, " f"got {type(raw).__name__}" ) ) diff --git a/test/apis/inject_expectation/test_inject_expectation.py b/test/apis/inject_expectation/test_inject_expectation.py index c2fcae1..4a0d98f 100644 --- a/test/apis/inject_expectation/test_inject_expectation.py +++ b/test/apis/inject_expectation/test_inject_expectation.py @@ -1,7 +1,7 @@ from unittest import TestCase, main, mock from pyoaev import OpenAEV -from pyoaev.exceptions import OpenAEVParsingError +from pyoaev.exceptions import OpenAEVHttpError, OpenAEVListError, OpenAEVParsingError def make_json_response(payload): @@ -44,6 +44,23 @@ def test_raises_parsing_error_when_not_a_list(self, mock_request): with self.assertRaises(OpenAEVParsingError): api_client.inject_expectation.ai_expectations_for_source("collector-1") + @mock.patch("requests.Session.request") + def test_raises_parsing_error_when_elements_not_dicts(self, mock_request): + mock_request.return_value = make_json_response(["not", "a", "dict"]) + api_client = OpenAEV("url", "token") + + with self.assertRaises(OpenAEVParsingError): + api_client.inject_expectation.ai_expectations_for_source("collector-1") + + def test_http_error_is_mapped_to_list_error(self): + api_client = OpenAEV("url", "token") + + with mock.patch.object( + OpenAEV, "http_get", side_effect=OpenAEVHttpError("boom") + ): + with self.assertRaises(OpenAEVListError): + api_client.inject_expectation.ai_expectations_for_source("collector-1") + if __name__ == "__main__": main() From 8ee20e4a139091235b73c644c7ddf58fb1f83794 Mon Sep 17 00:00:00 2001 From: Samuel Hassine Date: Sat, 27 Jun 2026 21:38:48 +0200 Subject: [PATCH 8/8] fix(ai): distinguish non-list vs non-dict-element parsing errors (#295) --- pyoaev/apis/inject_expectation/inject_expectation.py | 12 ++++++++++-- .../inject_expectation/test_inject_expectation.py | 5 ++++- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/pyoaev/apis/inject_expectation/inject_expectation.py b/pyoaev/apis/inject_expectation/inject_expectation.py index f528092..be1c453 100644 --- a/pyoaev/apis/inject_expectation/inject_expectation.py +++ b/pyoaev/apis/inject_expectation/inject_expectation.py @@ -58,13 +58,21 @@ def ai_expectations_for_source( # validation below stays meaningful to type checkers, then build the result list # explicitly to satisfy the declared return type. raw: Any = self.openaev.http_get(path, **kwargs) - if not isinstance(raw, list) or not all(isinstance(item, dict) for item in raw): + if not isinstance(raw, list): raise exc.OpenAEVParsingError( error_message=( - f"Expected a list of AI expectation objects from {path}, " + f"Expected a list of AI expectations from {path}, " f"got {type(raw).__name__}" ) ) + for item in raw: + if not isinstance(item, dict): + raise exc.OpenAEVParsingError( + error_message=( + f"Expected AI expectation objects (dicts) from {path}, " + f"got a list element of type {type(item).__name__}" + ) + ) return [item for item in raw] def expectations_models_for_source(self, source_id: str, **kwargs: Any): diff --git a/test/apis/inject_expectation/test_inject_expectation.py b/test/apis/inject_expectation/test_inject_expectation.py index 4a0d98f..eb42ea6 100644 --- a/test/apis/inject_expectation/test_inject_expectation.py +++ b/test/apis/inject_expectation/test_inject_expectation.py @@ -49,9 +49,12 @@ def test_raises_parsing_error_when_elements_not_dicts(self, mock_request): mock_request.return_value = make_json_response(["not", "a", "dict"]) api_client = OpenAEV("url", "token") - with self.assertRaises(OpenAEVParsingError): + with self.assertRaises(OpenAEVParsingError) as ctx: api_client.inject_expectation.ai_expectations_for_source("collector-1") + # The message should call out the offending element type, not just "list". + self.assertIn("str", str(ctx.exception)) + def test_http_error_is_mapped_to_list_error(self): api_client = OpenAEV("url", "token")