diff --git a/pyoaev/apis/__init__.py b/pyoaev/apis/__init__.py index b21ef4a..67bea1d 100644 --- a/pyoaev/apis/__init__.py +++ b/pyoaev/apis/__init__.py @@ -1,3 +1,4 @@ +from .ai_target import * # noqa: F401,F403 from .attack_pattern import * # noqa: F401,F403 from .collector import * # noqa: F401,F403 from .cve import * # noqa: F401,F403 diff --git a/pyoaev/apis/ai_target.py b/pyoaev/apis/ai_target.py new file mode 100644 index 0000000..fb86676 --- /dev/null +++ b/pyoaev/apis/ai_target.py @@ -0,0 +1,30 @@ +from pyoaev.base import RESTManager, RESTObject +from pyoaev.mixins import CreateMixin, DeleteMixin, GetMixin, ListMixin, UpdateMixin +from pyoaev.utils import RequiredOptional + + +class AiTarget(RESTObject): + _id_attr = "asset_id" + + +class AiTargetManager( + GetMixin, ListMixin, CreateMixin, UpdateMixin, DeleteMixin, RESTManager +): + """Manage AI Target assets (LLM endpoints / AI agents under adversarial test).""" + + _path = "/ai_targets" + _obj_cls = AiTarget + _create_attrs = RequiredOptional( + required=("asset_name", "ai_target_provider"), + optional=( + "asset_description", + "asset_tags", + "asset_external_reference", + "ai_target_endpoint", + "ai_target_model", + "ai_target_modality", + "ai_target_system_prompt", + "ai_target_api_key_variable", + "ai_target_configuration", + ), + ) diff --git a/pyoaev/apis/inject_expectation/inject_expectation.py b/pyoaev/apis/inject_expectation/inject_expectation.py index a669502..be1c453 100644 --- a/pyoaev/apis/inject_expectation/inject_expectation.py +++ b/pyoaev/apis/inject_expectation/inject_expectation.py @@ -1,4 +1,4 @@ -from typing import Any, Dict +from typing import Any, Dict, List from pyoaev import exceptions as exc from pyoaev.apis.inject_expectation.model import ( @@ -40,6 +40,41 @@ def expectations_assets_for_source( ) return result + @exc.on_http_error(exc.OpenAEVListError) + def ai_expectations_for_source( + self, source_id: str, **kwargs: Any + ) -> List[Dict[str, Any]]: + """Returns agentless DETECTION/PREVENTION expectations (AI adversarial injects) not yet + filled for the given source. Used by AI defense collectors (LLM firewall / guardrail). + + :param source_id: the identifier of the collector requesting expectations + :type source_id: str + :raises OpenAEVParsingError: if the server does not return a JSON list of dicts + :return: a list of agentless detection/prevention expectation dicts + :rtype: list[dict] + """ + path = f"{self.path}/ai/{source_id}" + # http_get is annotated Union[Dict, requests.Response]; widen to Any so the list + # validation below stays meaningful to type checkers, then build the result list + # explicitly to satisfy the declared return type. + raw: Any = self.openaev.http_get(path, **kwargs) + if not isinstance(raw, list): + raise exc.OpenAEVParsingError( + error_message=( + f"Expected a list of AI expectations from {path}, " + f"got {type(raw).__name__}" + ) + ) + for item in raw: + if not isinstance(item, dict): + raise exc.OpenAEVParsingError( + error_message=( + f"Expected AI expectation objects (dicts) from {path}, " + f"got a list element of type {type(item).__name__}" + ) + ) + return [item for item in raw] + def expectations_models_for_source(self, source_id: str, **kwargs: Any): """Returns all expectations from OpenAEV that have had no result yet from the source_id (e.g. collector). diff --git a/pyoaev/client.py b/pyoaev/client.py index e478d43..f0f295f 100644 --- a/pyoaev/client.py +++ b/pyoaev/client.py @@ -74,6 +74,7 @@ def __init__( self.inject_expectation = apis.InjectExpectationManager(self) self.payload = apis.PayloadManager(self) self.security_platform = apis.SecurityPlatformManager(self) + self.ai_target = apis.AiTargetManager(self) self.inject_expectation_trace = apis.InjectExpectationTraceManager(self) self.signature = apis.SignatureApiManager(self) self.tag = apis.TagManager(self) diff --git a/pyoaev/exceptions.py b/pyoaev/exceptions.py index 046405a..cca8e84 100644 --- a/pyoaev/exceptions.py +++ b/pyoaev/exceptions.py @@ -180,6 +180,10 @@ class OpenAEVCreateError(OpenAEVError): pass +class OpenAEVDeleteError(OpenAEVError): + pass + + class SignatureTransmissionError(OpenAEVError): """Signatures didn't make it. Validation rejected them, 4xx slammed the door, or retries ran dry.""" @@ -222,5 +226,7 @@ def wrapped_f(*args: Any, **kwargs: Any) -> Any: "OpenAEVListError", "OpenAEVGetError", "OpenAEVUpdateError", + "OpenAEVCreateError", + "OpenAEVDeleteError", "SignatureTransmissionError", ] diff --git a/pyoaev/mixins.py b/pyoaev/mixins.py index daaef47..6309e31 100644 --- a/pyoaev/mixins.py +++ b/pyoaev/mixins.py @@ -227,7 +227,7 @@ class DeleteMixin(_RestManagerBase): _path: Optional[str] openaev: pyoaev.OpenAEV - @exc.on_http_error(exc.OpenAEVCreateError) + @exc.on_http_error(exc.OpenAEVDeleteError) def delete( self, id: Optional[Union[str, int]] = None, **kwargs: Any ) -> requests.Response: diff --git a/pyoaev/security_domain/types.py b/pyoaev/security_domain/types.py index d13aeb4..c48e48c 100644 --- a/pyoaev/security_domain/types.py +++ b/pyoaev/security_domain/types.py @@ -13,4 +13,8 @@ class SecurityDomains(Enum): URL_FILTERING = {"domain_name": "URL Filtering", "domain_color": "#66CCFF"} CLOUD = {"domain_name": "Cloud", "domain_color": "#9999CC"} TABLE_TOP = {"domain_name": "Tabletop", "domain_color": "#FFCC33"} + ARTIFICIAL_INTELLIGENCE = { + "domain_name": "Artificial Intelligence", + "domain_color": "#7C4DFF", + } TOCLASSIFY = {"domain_name": "To classify", "domain_color": "#FFFFFF"} diff --git a/pyoaev/signatures/ai_marker.py b/pyoaev/signatures/ai_marker.py new file mode 100644 index 0000000..5116506 --- /dev/null +++ b/pyoaev/signatures/ai_marker.py @@ -0,0 +1,15 @@ +"""Deterministic per-inject canary marker shared by the AI red-team injector and the AI defense +collectors. + +The marker is derived purely from the inject (and optional agent) id, so the injector that sends the +attack and the collector that validates an AI defense response compute the same value independently, +without the platform having to store it. It is emitted by the injector (request header + in-prompt +token) and matched by collectors against guardrail / firewall logs. +""" + +import hashlib + + +def build_marker(inject_id: str, agent_id: str = "") -> str: + seed = f"{inject_id}:{agent_id}".encode("utf-8") + return "oaev" + hashlib.sha256(seed).hexdigest()[:16] diff --git a/pyoaev/signatures/types.py b/pyoaev/signatures/types.py index 0c4f78b..ff92b1c 100644 --- a/pyoaev/signatures/types.py +++ b/pyoaev/signatures/types.py @@ -37,3 +37,7 @@ class SignatureTypes(str, Enum): SIG_TYPE_CLOUD_REGION = "cloud_region" SIG_TYPE_TARGET_SERVICE = "target_service" SIG_TYPE_QUERY = "query" + # AI adversarial validation: correlate AI defense (LLM firewall / guardrail) events back to a + # specific AI inject execution. + SIG_TYPE_AI_REQUEST_MARKER = "ai_request_marker" + SIG_TYPE_AI_TARGET_ENDPOINT = "ai_target_endpoint" diff --git a/test/apis/ai_target/test_ai_target.py b/test/apis/ai_target/test_ai_target.py new file mode 100644 index 0000000..61ffceb --- /dev/null +++ b/test/apis/ai_target/test_ai_target.py @@ -0,0 +1,89 @@ +from unittest import TestCase, main, mock + + +def mock_response(*args, **kwargs): + class MockResponse: + def __init__(self): + self.status_code = 200 + self.history = None + self.content = None + self.headers = {"Content-Type": "application/json"} + + def json(self): + return {} + + return MockResponse() + + +class TestAiTargetManager(TestCase): + @mock.patch("requests.Session.request", side_effect=mock_response) + def test_create_posts_to_ai_targets(self, mock_request): + from pyoaev import OpenAEV + + api_client = OpenAEV("url", "token") + data = { + "asset_name": "OpenAI guardrail", + "ai_target_provider": "openai", + "ai_target_endpoint": "https://api.openai.com/v1", + } + + api_client.ai_target.create(data=data) + + _, kwargs = mock_request.call_args + self.assertEqual(kwargs["method"], "post") + self.assertEqual(kwargs["url"], "url/api/ai_targets") + self.assertEqual(kwargs["json"], data) + + @mock.patch("requests.Session.request", side_effect=mock_response) + def test_get_requests_single_ai_target(self, mock_request): + from pyoaev import OpenAEV + + api_client = OpenAEV("url", "token") + + api_client.ai_target.get("asset-123") + + _, kwargs = mock_request.call_args + self.assertEqual(kwargs["method"], "get") + self.assertEqual(kwargs["url"], "url/api/ai_targets/asset-123") + + @mock.patch("requests.Session.request", side_effect=mock_response) + def test_update_puts_to_ai_target(self, mock_request): + from pyoaev import OpenAEV + + api_client = OpenAEV("url", "token") + new_data = {"asset_description": "updated"} + + api_client.ai_target.update("asset-123", new_data=new_data) + + _, kwargs = mock_request.call_args + self.assertEqual(kwargs["method"], "put") + self.assertEqual(kwargs["url"], "url/api/ai_targets/asset-123") + self.assertEqual(kwargs["json"], new_data) + + @mock.patch("requests.Session.request", side_effect=mock_response) + def test_delete_calls_delete_on_ai_target(self, mock_request): + from pyoaev import OpenAEV + + api_client = OpenAEV("url", "token") + + api_client.ai_target.delete("asset-123") + + _, kwargs = mock_request.call_args + self.assertEqual(kwargs["method"], "delete") + self.assertEqual(kwargs["url"], "url/api/ai_targets/asset-123") + + def test_delete_http_error_is_mapped_to_delete_error(self): + from pyoaev import OpenAEV + from pyoaev.exceptions import OpenAEVDeleteError, OpenAEVHttpError + + api_client = OpenAEV("url", "token") + + with mock.patch.object( + OpenAEV, "http_delete", side_effect=OpenAEVHttpError("boom") + ): + with self.assertRaises(OpenAEVDeleteError): + api_client.ai_target.delete("asset-123") + + +if __name__ == "__main__": + main() diff --git a/test/apis/inject_expectation/test_inject_expectation.py b/test/apis/inject_expectation/test_inject_expectation.py new file mode 100644 index 0000000..eb42ea6 --- /dev/null +++ b/test/apis/inject_expectation/test_inject_expectation.py @@ -0,0 +1,69 @@ +from unittest import TestCase, main, mock + +from pyoaev import OpenAEV +from pyoaev.exceptions import OpenAEVHttpError, OpenAEVListError, OpenAEVParsingError + + +def make_json_response(payload): + class MockResponse: + def __init__(self): + self.status_code = 200 + self.history = None + self.content = None + self.headers = {"Content-Type": "application/json"} + + def json(self): + return payload + + return MockResponse() + + +class TestAiExpectationsForSource(TestCase): + @mock.patch("requests.Session.request") + def test_returns_list_of_expectations(self, mock_request): + expectations = [ + {"inject_expectation_id": "exp-1"}, + {"inject_expectation_id": "exp-2"}, + ] + mock_request.return_value = make_json_response(expectations) + api_client = OpenAEV("url", "token") + + result = api_client.inject_expectation.ai_expectations_for_source("collector-1") + + mock_request.assert_called_once() + _, kwargs = mock_request.call_args + self.assertEqual(kwargs["method"], "get") + self.assertEqual(kwargs["url"], "url/api/injects/expectations/ai/collector-1") + self.assertEqual(result, expectations) + + @mock.patch("requests.Session.request") + def test_raises_parsing_error_when_not_a_list(self, mock_request): + mock_request.return_value = make_json_response({"unexpected": "shape"}) + api_client = OpenAEV("url", "token") + + with self.assertRaises(OpenAEVParsingError): + api_client.inject_expectation.ai_expectations_for_source("collector-1") + + @mock.patch("requests.Session.request") + def test_raises_parsing_error_when_elements_not_dicts(self, mock_request): + mock_request.return_value = make_json_response(["not", "a", "dict"]) + api_client = OpenAEV("url", "token") + + with self.assertRaises(OpenAEVParsingError) as ctx: + api_client.inject_expectation.ai_expectations_for_source("collector-1") + + # The message should call out the offending element type, not just "list". + self.assertIn("str", str(ctx.exception)) + + def test_http_error_is_mapped_to_list_error(self): + api_client = OpenAEV("url", "token") + + with mock.patch.object( + OpenAEV, "http_get", side_effect=OpenAEVHttpError("boom") + ): + with self.assertRaises(OpenAEVListError): + api_client.inject_expectation.ai_expectations_for_source("collector-1") + + +if __name__ == "__main__": + main() diff --git a/test/signatures/test_ai_marker.py b/test/signatures/test_ai_marker.py new file mode 100644 index 0000000..9d7151c --- /dev/null +++ b/test/signatures/test_ai_marker.py @@ -0,0 +1,41 @@ +import unittest + +from pyoaev.signatures.ai_marker import build_marker + + +class TestBuildMarker(unittest.TestCase): + def test_marker_has_expected_prefix_and_length(self): + marker = build_marker("inject-1", "agent-1") + + self.assertTrue(marker.startswith("oaev")) + # "oaev" prefix (4 chars) + 16 hex chars from the sha256 digest. + self.assertEqual(len(marker), 20) + self.assertTrue(all(c in "0123456789abcdef" for c in marker[4:])) + + def test_marker_is_deterministic_for_same_inputs(self): + self.assertEqual( + build_marker("inject-1", "agent-1"), + build_marker("inject-1", "agent-1"), + ) + + def test_marker_differs_for_different_inputs(self): + self.assertNotEqual( + build_marker("inject-1", "agent-1"), + build_marker("inject-2", "agent-1"), + ) + self.assertNotEqual( + build_marker("inject-1", "agent-1"), + build_marker("inject-1", "agent-2"), + ) + + def test_agent_id_defaults_to_empty(self): + self.assertEqual(build_marker("inject-1"), build_marker("inject-1", "")) + + def test_marker_value_is_stable_across_runs(self): + # Lock the exact value so the injector and collectors (potentially in + # other languages) stay byte-for-byte compatible. + self.assertEqual(build_marker("inject-1", "agent-1"), "oaev6457d87cba0698ab") + + +if __name__ == "__main__": + unittest.main() diff --git a/test/signatures/test_ai_signature_types.py b/test/signatures/test_ai_signature_types.py new file mode 100644 index 0000000..0253362 --- /dev/null +++ b/test/signatures/test_ai_signature_types.py @@ -0,0 +1,29 @@ +import unittest + +from pyoaev.signatures.signature_type import SignatureType +from pyoaev.signatures.types import MatchTypes, SignatureTypes + + +class TestAiSignatureTypes(unittest.TestCase): + def test_ai_signature_type_values(self): + self.assertEqual( + SignatureTypes.SIG_TYPE_AI_REQUEST_MARKER.value, "ai_request_marker" + ) + self.assertEqual( + SignatureTypes.SIG_TYPE_AI_TARGET_ENDPOINT.value, "ai_target_endpoint" + ) + + def test_ai_request_marker_usable_in_signature_type(self): + signature_type = SignatureType( + label=SignatureTypes.SIG_TYPE_AI_REQUEST_MARKER, + match_type=MatchTypes.MATCH_TYPE_SIMPLE, + ) + + struct = signature_type.make_struct_for_matching(data="oaevdeadbeef") + + self.assertEqual(struct.get("type"), MatchTypes.MATCH_TYPE_SIMPLE.value) + self.assertEqual(struct.get("data"), "oaevdeadbeef") + + +if __name__ == "__main__": + unittest.main()