From f0892874e39bc04907b6299d30efc14feed25e9e Mon Sep 17 00:00:00 2001 From: abhinav-galileo Date: Fri, 12 Jun 2026 18:05:17 +0530 Subject: [PATCH 1/6] feat(evaluator-galileo): add Luna HTTP connection tuning --- .../luna/client.py | 69 ++++++++++++++- .../galileo/tests/test_luna_evaluator.py | 84 ++++++++++++++++++- 2 files changed, 149 insertions(+), 4 deletions(-) diff --git a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py index 252e519b..e43cfb1f 100644 --- a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py +++ b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py @@ -27,6 +27,9 @@ DEFAULT_KEEPALIVE_EXPIRY_SECS = 1.0 DEFAULT_MAX_CONNECTIONS = 100 DEFAULT_MAX_KEEPALIVE_CONNECTIONS = 20 +LUNA_KEEPALIVE_EXPIRY_ENV = "GALILEO_LUNA_KEEPALIVE_EXPIRY_SECONDS" +LUNA_MAX_CONNECTIONS_ENV = "GALILEO_LUNA_MAX_CONNECTIONS" +LUNA_MAX_KEEPALIVE_CONNECTIONS_ENV = "GALILEO_LUNA_MAX_KEEPALIVE_CONNECTIONS" PUBLIC_SCORER_INVOKE_PATH = "/scorers/invoke" INTERNAL_SCORER_INVOKE_PATH = "/internal/scorers/invoke" AuthMode = Literal["public", "internal"] @@ -78,6 +81,51 @@ def _env_auth_mode() -> AuthMode | None: raise ValueError("GALILEO_LUNA_AUTH_MODE must be either 'public' or 'internal'.") +def _load_float_env(env_name: str, default: float) -> float: + raw = os.getenv(env_name) + if raw is None: + return default + try: + return float(raw) + except ValueError as exc: + raise ValueError(f"{env_name}={raw!r} is not a number.") from exc + + +def _load_int_env(env_name: str, default: int) -> int: + raw = os.getenv(env_name) + if raw is None: + return default + try: + return int(raw) + except ValueError as exc: + raise ValueError(f"{env_name}={raw!r} is not an integer.") from exc + + +def _validate_connection_config( + *, + keepalive_expiry_seconds: float, + max_connections: int, + max_keepalive_connections: int, +) -> None: + if keepalive_expiry_seconds < 0: + raise ValueError( + f"{LUNA_KEEPALIVE_EXPIRY_ENV}={keepalive_expiry_seconds} " + "must be greater than or equal to 0." + ) + if max_connections <= 0: + raise ValueError(f"{LUNA_MAX_CONNECTIONS_ENV}={max_connections} must be greater than 0.") + if max_keepalive_connections < 0: + raise ValueError( + f"{LUNA_MAX_KEEPALIVE_CONNECTIONS_ENV}={max_keepalive_connections} " + "must be greater than or equal to 0." + ) + if max_keepalive_connections > max_connections: + raise ValueError( + f"{LUNA_MAX_KEEPALIVE_CONNECTIONS_ENV}={max_keepalive_connections} " + f"must be less than or equal to {LUNA_MAX_CONNECTIONS_ENV}={max_connections}." + ) + + def _as_float_or_none(value: JSONValue) -> float | None: if isinstance(value, bool) or value is None: return None @@ -184,6 +232,9 @@ class GalileoLunaClient: GALILEO_API_URL: Galileo API URL fallback. GALILEO_LUNA_CA_FILE: CA bundle used to verify the scorer API endpoint, for deployments whose API serves an internally-issued TLS certificate. + GALILEO_LUNA_KEEPALIVE_EXPIRY_SECONDS: HTTP pooled connection expiry. + GALILEO_LUNA_MAX_CONNECTIONS: Maximum outbound HTTP connections. + GALILEO_LUNA_MAX_KEEPALIVE_CONNECTIONS: Maximum idle pooled HTTP connections. GALILEO_CONSOLE_URL: Galileo Console URL (optional, defaults to production). """ @@ -235,6 +286,18 @@ def __init__( self.api_base = self._resolve_api_base(api_url) self.ca_file = (ca_file or os.getenv("GALILEO_LUNA_CA_FILE") or "").strip() or None self._ssl_context = self._load_ssl_context(self.ca_file) + self.keepalive_expiry_seconds = _load_float_env( + LUNA_KEEPALIVE_EXPIRY_ENV, DEFAULT_KEEPALIVE_EXPIRY_SECS + ) + self.max_connections = _load_int_env(LUNA_MAX_CONNECTIONS_ENV, DEFAULT_MAX_CONNECTIONS) + self.max_keepalive_connections = _load_int_env( + LUNA_MAX_KEEPALIVE_CONNECTIONS_ENV, DEFAULT_MAX_KEEPALIVE_CONNECTIONS + ) + _validate_connection_config( + keepalive_expiry_seconds=self.keepalive_expiry_seconds, + max_connections=self.max_connections, + max_keepalive_connections=self.max_keepalive_connections, + ) self._client: httpx.AsyncClient | None = None logger.info("[GalileoLunaClient] Auth mode selected: %s", self.auth_mode) @@ -329,9 +392,9 @@ async def _get_client(self) -> httpx.AsyncClient: headers=headers, timeout=httpx.Timeout(DEFAULT_TIMEOUT_SECS), limits=httpx.Limits( - max_connections=DEFAULT_MAX_CONNECTIONS, - max_keepalive_connections=DEFAULT_MAX_KEEPALIVE_CONNECTIONS, - keepalive_expiry=DEFAULT_KEEPALIVE_EXPIRY_SECS, + max_connections=self.max_connections, + max_keepalive_connections=self.max_keepalive_connections, + keepalive_expiry=self.keepalive_expiry_seconds, ), verify=verify, ) diff --git a/evaluators/contrib/galileo/tests/test_luna_evaluator.py b/evaluators/contrib/galileo/tests/test_luna_evaluator.py index 7332d982..192ec4d7 100644 --- a/evaluators/contrib/galileo/tests/test_luna_evaluator.py +++ b/evaluators/contrib/galileo/tests/test_luna_evaluator.py @@ -261,7 +261,11 @@ def test_client_rejects_unreadable_ca_bundle(self) -> None: async def test_client_applies_ca_bundle_and_connection_limits(self) -> None: import certifi from agent_control_evaluator_galileo.luna import GalileoLunaClient - from agent_control_evaluator_galileo.luna.client import DEFAULT_KEEPALIVE_EXPIRY_SECS + from agent_control_evaluator_galileo.luna.client import ( + DEFAULT_KEEPALIVE_EXPIRY_SECS, + DEFAULT_MAX_CONNECTIONS, + DEFAULT_MAX_KEEPALIVE_CONNECTIONS, + ) captured: dict[str, object] = {} real_async_client = httpx.AsyncClient @@ -290,6 +294,84 @@ def recording_client(**kwargs: object) -> httpx.AsyncClient: limits = captured["limits"] assert isinstance(limits, httpx.Limits) assert limits.keepalive_expiry == DEFAULT_KEEPALIVE_EXPIRY_SECS + assert limits.max_connections == DEFAULT_MAX_CONNECTIONS + assert limits.max_keepalive_connections == DEFAULT_MAX_KEEPALIVE_CONNECTIONS + + @pytest.mark.asyncio + async def test_client_applies_connection_tuning_env(self) -> None: + from agent_control_evaluator_galileo.luna import GalileoLunaClient + + captured: dict[str, object] = {} + real_async_client = httpx.AsyncClient + + def recording_client(**kwargs: object) -> httpx.AsyncClient: + captured.update(kwargs) + return real_async_client(**kwargs) + + with patch.dict( + os.environ, + { + "GALILEO_API_SECRET_KEY": "test-secret", + "GALILEO_LUNA_KEEPALIVE_EXPIRY_SECONDS": "0.25", + "GALILEO_LUNA_MAX_CONNECTIONS": "17", + "GALILEO_LUNA_MAX_KEEPALIVE_CONNECTIONS": "4", + }, + clear=True, + ): + client = GalileoLunaClient(console_url="https://console.example.com") + + with patch( + "agent_control_evaluator_galileo.luna.client.httpx.AsyncClient", recording_client + ): + try: + await client._get_client() + finally: + await client.close() + + assert client.keepalive_expiry_seconds == 0.25 + assert client.max_connections == 17 + assert client.max_keepalive_connections == 4 + limits = captured["limits"] + assert isinstance(limits, httpx.Limits) + assert limits.keepalive_expiry == 0.25 + assert limits.max_connections == 17 + assert limits.max_keepalive_connections == 4 + + @pytest.mark.parametrize( + "env_values, expected", + [ + ({"GALILEO_LUNA_KEEPALIVE_EXPIRY_SECONDS": "soon"}, "not a number"), + ({"GALILEO_LUNA_MAX_CONNECTIONS": "many"}, "not an integer"), + ({"GALILEO_LUNA_MAX_KEEPALIVE_CONNECTIONS": "some"}, "not an integer"), + ( + {"GALILEO_LUNA_KEEPALIVE_EXPIRY_SECONDS": "-0.1"}, + "greater than or equal to 0", + ), + ({"GALILEO_LUNA_MAX_CONNECTIONS": "0"}, "greater than 0"), + ( + {"GALILEO_LUNA_MAX_KEEPALIVE_CONNECTIONS": "-1"}, + "greater than or equal to 0", + ), + ( + { + "GALILEO_LUNA_MAX_CONNECTIONS": "2", + "GALILEO_LUNA_MAX_KEEPALIVE_CONNECTIONS": "3", + }, + "less than or equal", + ), + ], + ) + def test_client_reports_invalid_connection_tuning_env( + self, env_values: dict[str, str], expected: str + ) -> None: + from agent_control_evaluator_galileo.luna import GalileoLunaClient + + env = {"GALILEO_API_SECRET_KEY": "test-secret"} | env_values + with patch.dict(os.environ, env, clear=True): + with pytest.raises(ValueError) as exc_info: + GalileoLunaClient(console_url="https://console.example.com") + + assert expected in str(exc_info.value) @pytest.mark.asyncio async def test_client_posts_to_scorers_invoke_without_protect_fields(self) -> None: From 98d79d09a878de41a3a13d07b390fee232da4212 Mon Sep 17 00:00:00 2001 From: abhinav-galileo Date: Sat, 13 Jun 2026 15:44:03 +0530 Subject: [PATCH 2/6] feat(engine): expose evaluation concurrency setting --- engine/src/agent_control_engine/core.py | 23 ++++++++++- engine/tests/test_core.py | 51 +++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 1 deletion(-) diff --git a/engine/src/agent_control_engine/core.py b/engine/src/agent_control_engine/core.py index e2ae8b6e..9bff6680 100644 --- a/engine/src/agent_control_engine/core.py +++ b/engine/src/agent_control_engine/core.py @@ -27,11 +27,32 @@ logger = logging.getLogger(__name__) + +def _env_positive_int(*names: str, default: int) -> int: + """Read a positive integer from the first configured environment variable.""" + for name in names: + value = os.environ.get(name) + if value is None or value.strip() == "": + continue + try: + parsed = int(value) + except ValueError as exc: + raise RuntimeError(f"{name}={value!r} must be an integer.") from exc + if parsed < 1: + raise RuntimeError(f"{name}={value!r} must be greater than or equal to 1.") + return parsed + return default + + # Default timeout for evaluator execution (seconds) DEFAULT_EVALUATOR_TIMEOUT = float(os.environ.get("EVALUATOR_TIMEOUT_SECONDS", "30")) # Max concurrent evaluations (limits task spawning overhead for large policies) -MAX_CONCURRENT_EVALUATIONS = int(os.environ.get("MAX_CONCURRENT_EVALUATIONS", "3")) +MAX_CONCURRENT_EVALUATIONS = _env_positive_int( + "AGENT_CONTROL_MAX_CONCURRENT_EVALUATIONS", + "MAX_CONCURRENT_EVALUATIONS", + default=3, +) SELECTED_DATA_PREVIEW_MAX_CHARS = int( os.environ.get("AGENT_CONTROL_SELECTED_DATA_PREVIEW_MAX_CHARS", "500") diff --git a/engine/tests/test_core.py b/engine/tests/test_core.py index ed4e6e00..baa46bab 100644 --- a/engine/tests/test_core.py +++ b/engine/tests/test_core.py @@ -1280,6 +1280,57 @@ async def test_timeout_does_not_affect_fast_evaluators(self): class TestConcurrencyLimit: """Tests for semaphore-based concurrency limiting.""" + def test_max_concurrency_env_prefers_agent_control_name( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + """The canonical Agent Control env var overrides the legacy short name.""" + import agent_control_engine.core as core_module + + monkeypatch.setenv("AGENT_CONTROL_MAX_CONCURRENT_EVALUATIONS", "7") + monkeypatch.setenv("MAX_CONCURRENT_EVALUATIONS", "2") + + assert ( + core_module._env_positive_int( + "AGENT_CONTROL_MAX_CONCURRENT_EVALUATIONS", + "MAX_CONCURRENT_EVALUATIONS", + default=3, + ) + == 7 + ) + + def test_max_concurrency_env_reads_legacy_name( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + """The existing env var remains supported for compatibility.""" + import agent_control_engine.core as core_module + + monkeypatch.delenv("AGENT_CONTROL_MAX_CONCURRENT_EVALUATIONS", raising=False) + monkeypatch.setenv("MAX_CONCURRENT_EVALUATIONS", "5") + + assert ( + core_module._env_positive_int( + "AGENT_CONTROL_MAX_CONCURRENT_EVALUATIONS", + "MAX_CONCURRENT_EVALUATIONS", + default=3, + ) + == 5 + ) + + def test_max_concurrency_env_rejects_non_positive_values( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + """The concurrency cap must always allow at least one evaluator.""" + import agent_control_engine.core as core_module + + monkeypatch.setenv("AGENT_CONTROL_MAX_CONCURRENT_EVALUATIONS", "0") + + with pytest.raises(RuntimeError, match="greater than or equal to 1"): + core_module._env_positive_int( + "AGENT_CONTROL_MAX_CONCURRENT_EVALUATIONS", + "MAX_CONCURRENT_EVALUATIONS", + default=3, + ) + @pytest.mark.asyncio async def test_concurrency_limited_to_max(self, monkeypatch: pytest.MonkeyPatch): """Test that concurrent evaluations are limited by semaphore. From 44b95fd2a09bd6ccb6a03367e278128291d96177 Mon Sep 17 00:00:00 2001 From: abhinav-galileo Date: Sun, 14 Jun 2026 07:35:27 +0530 Subject: [PATCH 3/6] feat: add Luna HTTP client pool setting --- .../luna/client.py | 89 +++++++++++++++---- .../galileo/tests/test_luna_evaluator.py | 43 +++++++++ 2 files changed, 113 insertions(+), 19 deletions(-) diff --git a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py index e43cfb1f..cc2cc1cb 100644 --- a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py +++ b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py @@ -6,6 +6,7 @@ import os import ssl import warnings +from asyncio import Lock from base64 import urlsafe_b64encode from hashlib import sha256 from hmac import new as hmac_new @@ -27,9 +28,11 @@ DEFAULT_KEEPALIVE_EXPIRY_SECS = 1.0 DEFAULT_MAX_CONNECTIONS = 100 DEFAULT_MAX_KEEPALIVE_CONNECTIONS = 20 +DEFAULT_CLIENT_POOL_SIZE = 1 LUNA_KEEPALIVE_EXPIRY_ENV = "GALILEO_LUNA_KEEPALIVE_EXPIRY_SECONDS" LUNA_MAX_CONNECTIONS_ENV = "GALILEO_LUNA_MAX_CONNECTIONS" LUNA_MAX_KEEPALIVE_CONNECTIONS_ENV = "GALILEO_LUNA_MAX_KEEPALIVE_CONNECTIONS" +LUNA_CLIENT_POOL_SIZE_ENV = "GALILEO_LUNA_CLIENT_POOL_SIZE" PUBLIC_SCORER_INVOKE_PATH = "/scorers/invoke" INTERNAL_SCORER_INVOKE_PATH = "/internal/scorers/invoke" AuthMode = Literal["public", "internal"] @@ -106,6 +109,7 @@ def _validate_connection_config( keepalive_expiry_seconds: float, max_connections: int, max_keepalive_connections: int, + client_pool_size: int, ) -> None: if keepalive_expiry_seconds < 0: raise ValueError( @@ -124,6 +128,8 @@ def _validate_connection_config( f"{LUNA_MAX_KEEPALIVE_CONNECTIONS_ENV}={max_keepalive_connections} " f"must be less than or equal to {LUNA_MAX_CONNECTIONS_ENV}={max_connections}." ) + if client_pool_size <= 0: + raise ValueError(f"{LUNA_CLIENT_POOL_SIZE_ENV}={client_pool_size} must be greater than 0.") def _as_float_or_none(value: JSONValue) -> float | None: @@ -235,6 +241,7 @@ class GalileoLunaClient: GALILEO_LUNA_KEEPALIVE_EXPIRY_SECONDS: HTTP pooled connection expiry. GALILEO_LUNA_MAX_CONNECTIONS: Maximum outbound HTTP connections. GALILEO_LUNA_MAX_KEEPALIVE_CONNECTIONS: Maximum idle pooled HTTP connections. + GALILEO_LUNA_CLIENT_POOL_SIZE: Number of outbound HTTP clients to rotate across. GALILEO_CONSOLE_URL: Galileo Console URL (optional, defaults to production). """ @@ -293,12 +300,19 @@ def __init__( self.max_keepalive_connections = _load_int_env( LUNA_MAX_KEEPALIVE_CONNECTIONS_ENV, DEFAULT_MAX_KEEPALIVE_CONNECTIONS ) + self.client_pool_size = _load_int_env( + LUNA_CLIENT_POOL_SIZE_ENV, DEFAULT_CLIENT_POOL_SIZE + ) _validate_connection_config( keepalive_expiry_seconds=self.keepalive_expiry_seconds, max_connections=self.max_connections, max_keepalive_connections=self.max_keepalive_connections, + client_pool_size=self.client_pool_size, ) self._client: httpx.AsyncClient | None = None + self._clients: list[httpx.AsyncClient] = [] + self._next_client_index = 0 + self._client_lock = Lock() logger.info("[GalileoLunaClient] Auth mode selected: %s", self.auth_mode) def _resolve_api_base(self, api_url: str | None) -> str: @@ -379,26 +393,58 @@ def _derive_api_url(self, console_url: str) -> str: parts._replace(netloc=parts.netloc.replace(host, new_host, 1)) ) + def _create_client(self) -> httpx.AsyncClient: + """Create an HTTP client with the configured auth, TLS, and connection limits.""" + headers = {"Content-Type": "application/json"} + if self.auth_mode == "public" and self.api_key is not None: + headers["Galileo-API-Key"] = self.api_key + verify: ssl.SSLContext | bool = ( + self._ssl_context if self._ssl_context is not None else True + ) + return httpx.AsyncClient( + headers=headers, + timeout=httpx.Timeout(DEFAULT_TIMEOUT_SECS), + limits=httpx.Limits( + max_connections=self.max_connections, + max_keepalive_connections=self.max_keepalive_connections, + keepalive_expiry=self.keepalive_expiry_seconds, + ), + verify=verify, + ) + + def _select_pooled_client(self) -> httpx.AsyncClient: + """Select the next pooled client without awaiting on the hot path.""" + client = self._clients[self._next_client_index % len(self._clients)] + self._next_client_index = (self._next_client_index + 1) % len(self._clients) + return client + async def _get_client(self) -> httpx.AsyncClient: - """Get or create the HTTP client.""" - if self._client is None or self._client.is_closed: - headers = {"Content-Type": "application/json"} - if self.auth_mode == "public" and self.api_key is not None: - headers["Galileo-API-Key"] = self.api_key - verify: ssl.SSLContext | bool = ( - self._ssl_context if self._ssl_context is not None else True - ) - self._client = httpx.AsyncClient( - headers=headers, - timeout=httpx.Timeout(DEFAULT_TIMEOUT_SECS), - limits=httpx.Limits( - max_connections=self.max_connections, - max_keepalive_connections=self.max_keepalive_connections, - keepalive_expiry=self.keepalive_expiry_seconds, - ), - verify=verify, - ) - return self._client + """Get or create the next HTTP client.""" + if self._client is not None and not self._client.is_closed: + return self._client + if self._client is not None and self._client.is_closed: + self._client = None + + if ( + self.client_pool_size > 1 + and len(self._clients) == self.client_pool_size + and all(not client.is_closed for client in self._clients) + ): + return self._select_pooled_client() + + async with self._client_lock: + if self._client is not None and not self._client.is_closed: + return self._client + + self._clients = [client for client in self._clients if not client.is_closed] + while len(self._clients) < self.client_pool_size: + self._clients.append(self._create_client()) + + if self.client_pool_size == 1: + self._client = self._clients[0] + return self._client + + return self._select_pooled_client() def _endpoint_and_headers( self, @@ -497,6 +543,11 @@ async def close(self) -> None: if self._client is not None: await self._client.aclose() self._client = None + for client in self._clients: + if not client.is_closed: + await client.aclose() + self._clients = [] + self._next_client_index = 0 async def __aenter__(self) -> GalileoLunaClient: """Async context manager entry.""" diff --git a/evaluators/contrib/galileo/tests/test_luna_evaluator.py b/evaluators/contrib/galileo/tests/test_luna_evaluator.py index 192ec4d7..bbad0f85 100644 --- a/evaluators/contrib/galileo/tests/test_luna_evaluator.py +++ b/evaluators/contrib/galileo/tests/test_luna_evaluator.py @@ -337,6 +337,47 @@ def recording_client(**kwargs: object) -> httpx.AsyncClient: assert limits.max_connections == 17 assert limits.max_keepalive_connections == 4 + @pytest.mark.asyncio + async def test_client_pool_size_rotates_across_http_clients(self) -> None: + import agent_control_evaluator_galileo.luna.client as luna_client_module + from agent_control_evaluator_galileo.luna import GalileoLunaClient + + class FakeAsyncClient: + def __init__(self, **kwargs: object) -> None: + self.kwargs = kwargs + self.is_closed = False + + async def aclose(self) -> None: + self.is_closed = True + + created: list[FakeAsyncClient] = [] + + def recording_client(**kwargs: object) -> FakeAsyncClient: + client = FakeAsyncClient(**kwargs) + created.append(client) + return client + + with patch.dict( + os.environ, + { + "GALILEO_API_SECRET_KEY": "test-secret", + "GALILEO_LUNA_CLIENT_POOL_SIZE": "3", + }, + clear=True, + ): + client = GalileoLunaClient(console_url="https://console.example.com") + + with patch.object(luna_client_module.httpx, "AsyncClient", recording_client): + try: + selected = [await client._get_client() for _ in range(5)] + finally: + await client.close() + + assert client.client_pool_size == 3 + assert len(created) == 3 + assert selected == [created[0], created[1], created[2], created[0], created[1]] + assert all(created_client.is_closed for created_client in created) + @pytest.mark.parametrize( "env_values, expected", [ @@ -359,6 +400,8 @@ def recording_client(**kwargs: object) -> httpx.AsyncClient: }, "less than or equal", ), + ({"GALILEO_LUNA_CLIENT_POOL_SIZE": "many"}, "not an integer"), + ({"GALILEO_LUNA_CLIENT_POOL_SIZE": "0"}, "greater than 0"), ], ) def test_client_reports_invalid_connection_tuning_env( From a3847ea9d1a2f7fb04391e4775554c42bfd6a3de Mon Sep 17 00:00:00 2001 From: abhinav-galileo Date: Sun, 14 Jun 2026 17:24:20 +0530 Subject: [PATCH 4/6] fix: expose luna http error metadata --- .../luna/evaluator.py | 44 ++++++++++++++++--- .../galileo/tests/test_luna_evaluator.py | 38 ++++++++++++++++ 2 files changed, 76 insertions(+), 6 deletions(-) diff --git a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/evaluator.py b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/evaluator.py index eff92f2a..1221cedb 100644 --- a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/evaluator.py +++ b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/evaluator.py @@ -8,6 +8,7 @@ from importlib.metadata import PackageNotFoundError, version from typing import Any +import httpx from agent_control_evaluators import Evaluator, EvaluatorMetadata, register_evaluator from agent_control_models import EvaluatorResult, JSONValue @@ -27,6 +28,7 @@ def _resolve_package_version() -> str: _PACKAGE_VERSION = _resolve_package_version() LUNA_AVAILABLE = True +_HTTP_ERROR_BODY_LIMIT = 500 def _coerce_payload_text(value: Any) -> str | None: @@ -74,6 +76,32 @@ def _confidence_from_score(score: JSONValue) -> float: return 1.0 +def _truncated_http_response_body(body: str) -> tuple[str, bool]: + if len(body) <= _HTTP_ERROR_BODY_LIMIT: + return body, False + return body[:_HTTP_ERROR_BODY_LIMIT], True + + +def _http_status_error_metadata(error: httpx.HTTPStatusError) -> dict[str, Any]: + metadata: dict[str, Any] = {} + + request = error.request + metadata["http_method"] = request.method + metadata["http_endpoint_path"] = request.url.path + + response = error.response + metadata["http_status_code"] = response.status_code + metadata["http_response_content_type"] = response.headers.get("content-type") + + body = response.text + if body: + metadata["http_response_body"], metadata["http_response_body_truncated"] = ( + _truncated_http_response_body(body) + ) + + return {key: value for key, value in metadata.items() if value is not None} + + @register_evaluator class LunaEvaluator(Evaluator[LunaEvaluatorConfig]): """Galileo Luna evaluator using the direct scorer invocation API.""" @@ -252,16 +280,20 @@ def _handle_error( error: Exception, ) -> EvaluatorResult: error_detail = str(error) + metadata: dict[str, Any] = { + "error_type": type(error).__name__, + "scorer_label": self.config.scorer_label, + "scorer_id": self.config.scorer_id, + "scorer_version_id": self.config.scorer_version_id, + } + if isinstance(error, httpx.HTTPStatusError): + metadata.update(_http_status_error_metadata(error)) + return EvaluatorResult( matched=False, confidence=0.0, message=f"Luna evaluation error: {error_detail}", - metadata={ - "error_type": type(error).__name__, - "scorer_label": self.config.scorer_label, - "scorer_id": self.config.scorer_id, - "scorer_version_id": self.config.scorer_version_id, - }, + metadata=metadata, error=error_detail, ) diff --git a/evaluators/contrib/galileo/tests/test_luna_evaluator.py b/evaluators/contrib/galileo/tests/test_luna_evaluator.py index bbad0f85..fe6c679f 100644 --- a/evaluators/contrib/galileo/tests/test_luna_evaluator.py +++ b/evaluators/contrib/galileo/tests/test_luna_evaluator.py @@ -729,3 +729,41 @@ async def test_evaluator_fail_open_sets_error(self) -> None: assert "error" not in result.metadata assert result.metadata["error_type"] == "RuntimeError" assert "fallback_action" not in result.metadata + + @patch.dict(os.environ, {"GALILEO_API_KEY": "test-key"}) + @pytest.mark.asyncio + async def test_evaluator_error_metadata_includes_http_status_context(self) -> None: + from agent_control_evaluator_galileo.luna import LunaEvaluator + from agent_control_evaluator_galileo.luna.client import GalileoLunaClient + + evaluator = LunaEvaluator.from_dict({"scorer_label": "toxicity", "threshold": 0.5}) + request = httpx.Request( + "POST", + "https://api.example.test/internal/scorers/invoke?token=secret", + ) + response = httpx.Response( + 503, + headers={"content-type": "application/json"}, + text='{"detail":"busy"}', + request=request, + ) + + with patch.object(GalileoLunaClient, "invoke", new_callable=AsyncMock) as mock_invoke: + mock_invoke.side_effect = httpx.HTTPStatusError( + "service unavailable", + request=request, + response=response, + ) + + result = await evaluator.evaluate("hello") + + assert result.matched is False + assert result.metadata is not None + assert result.metadata["error_type"] == "HTTPStatusError" + assert result.metadata["http_status_code"] == 503 + assert result.metadata["http_method"] == "POST" + assert result.metadata["http_endpoint_path"] == "/internal/scorers/invoke" + assert result.metadata["http_response_content_type"] == "application/json" + assert result.metadata["http_response_body"] == '{"detail":"busy"}' + assert result.metadata["http_response_body_truncated"] is False + assert "token=secret" not in str(result.metadata) From 83873bbbb01f97ad1819cd5d7d93ec152c9dd89e Mon Sep 17 00:00:00 2001 From: abhinav-galileo Date: Mon, 15 Jun 2026 20:47:34 +0530 Subject: [PATCH 5/6] docs(engine): clarify concurrency env fallback --- engine/src/agent_control_engine/core.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/engine/src/agent_control_engine/core.py b/engine/src/agent_control_engine/core.py index 9bff6680..b2cd81b3 100644 --- a/engine/src/agent_control_engine/core.py +++ b/engine/src/agent_control_engine/core.py @@ -47,7 +47,8 @@ def _env_positive_int(*names: str, default: int) -> int: # Default timeout for evaluator execution (seconds) DEFAULT_EVALUATOR_TIMEOUT = float(os.environ.get("EVALUATOR_TIMEOUT_SECONDS", "30")) -# Max concurrent evaluations (limits task spawning overhead for large policies) +# Max concurrent evaluations (limits task spawning overhead for large policies). +# Prefer the namespaced env var; MAX_CONCURRENT_EVALUATIONS is kept for compatibility. MAX_CONCURRENT_EVALUATIONS = _env_positive_int( "AGENT_CONTROL_MAX_CONCURRENT_EVALUATIONS", "MAX_CONCURRENT_EVALUATIONS", From 664a78f38de88587d1559fe2950a1570a4ea88a8 Mon Sep 17 00:00:00 2001 From: abhinav-galileo Date: Wed, 17 Jun 2026 13:29:16 +0530 Subject: [PATCH 6/6] fix(evaluators): synchronize Luna client pooling --- .../luna/client.py | 55 +++++----- .../galileo/tests/test_luna_evaluator.py | 101 ++++++++++++++++++ 2 files changed, 128 insertions(+), 28 deletions(-) diff --git a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py index cc2cc1cb..8c971cba 100644 --- a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py +++ b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py @@ -86,7 +86,7 @@ def _env_auth_mode() -> AuthMode | None: def _load_float_env(env_name: str, default: float) -> float: raw = os.getenv(env_name) - if raw is None: + if raw is None or raw.strip() == "": return default try: return float(raw) @@ -96,7 +96,7 @@ def _load_float_env(env_name: str, default: float) -> float: def _load_int_env(env_name: str, default: int) -> int: raw = os.getenv(env_name) - if raw is None: + if raw is None or raw.strip() == "": return default try: return int(raw) @@ -413,37 +413,27 @@ def _create_client(self) -> httpx.AsyncClient: ) def _select_pooled_client(self) -> httpx.AsyncClient: - """Select the next pooled client without awaiting on the hot path.""" + """Select the next pooled client while holding the client state lock.""" client = self._clients[self._next_client_index % len(self._clients)] self._next_client_index = (self._next_client_index + 1) % len(self._clients) return client async def _get_client(self) -> httpx.AsyncClient: """Get or create the next HTTP client.""" - if self._client is not None and not self._client.is_closed: - return self._client - if self._client is not None and self._client.is_closed: - self._client = None - - if ( - self.client_pool_size > 1 - and len(self._clients) == self.client_pool_size - and all(not client.is_closed for client in self._clients) - ): - return self._select_pooled_client() - async with self._client_lock: - if self._client is not None and not self._client.is_closed: - return self._client - self._clients = [client for client in self._clients if not client.is_closed] - while len(self._clients) < self.client_pool_size: - self._clients.append(self._create_client()) if self.client_pool_size == 1: - self._client = self._clients[0] + if self._client is not None and not self._client.is_closed: + return self._client + self._client = self._clients[0] if self._clients else self._create_client() + self._clients = [self._client] return self._client + self._client = None + while len(self._clients) < self.client_pool_size: + self._clients.append(self._create_client()) + return self._select_pooled_client() def _endpoint_and_headers( @@ -540,14 +530,23 @@ async def invoke( async def close(self) -> None: """Close the HTTP client and release resources.""" - if self._client is not None: - await self._client.aclose() + async with self._client_lock: + clients: list[httpx.AsyncClient] = [] + seen_client_ids: set[int] = set() + if self._client is not None: + clients.append(self._client) + seen_client_ids.add(id(self._client)) self._client = None - for client in self._clients: - if not client.is_closed: - await client.aclose() - self._clients = [] - self._next_client_index = 0 + for client in self._clients: + if id(client) not in seen_client_ids: + clients.append(client) + seen_client_ids.add(id(client)) + self._clients = [] + self._next_client_index = 0 + + for client in clients: + if not client.is_closed: + await client.aclose() async def __aenter__(self) -> GalileoLunaClient: """Async context manager entry.""" diff --git a/evaluators/contrib/galileo/tests/test_luna_evaluator.py b/evaluators/contrib/galileo/tests/test_luna_evaluator.py index fe6c679f..86328e1c 100644 --- a/evaluators/contrib/galileo/tests/test_luna_evaluator.py +++ b/evaluators/contrib/galileo/tests/test_luna_evaluator.py @@ -2,6 +2,7 @@ from __future__ import annotations +import asyncio import json import logging import os @@ -337,6 +338,33 @@ def recording_client(**kwargs: object) -> httpx.AsyncClient: assert limits.max_connections == 17 assert limits.max_keepalive_connections == 4 + def test_client_ignores_empty_connection_tuning_env(self) -> None: + from agent_control_evaluator_galileo.luna import GalileoLunaClient + from agent_control_evaluator_galileo.luna.client import ( + DEFAULT_CLIENT_POOL_SIZE, + DEFAULT_KEEPALIVE_EXPIRY_SECS, + DEFAULT_MAX_CONNECTIONS, + DEFAULT_MAX_KEEPALIVE_CONNECTIONS, + ) + + with patch.dict( + os.environ, + { + "GALILEO_API_SECRET_KEY": "test-secret", + "GALILEO_LUNA_KEEPALIVE_EXPIRY_SECONDS": "", + "GALILEO_LUNA_MAX_CONNECTIONS": " ", + "GALILEO_LUNA_MAX_KEEPALIVE_CONNECTIONS": "", + "GALILEO_LUNA_CLIENT_POOL_SIZE": " ", + }, + clear=True, + ): + client = GalileoLunaClient(console_url="https://console.example.com") + + assert client.keepalive_expiry_seconds == DEFAULT_KEEPALIVE_EXPIRY_SECS + assert client.max_connections == DEFAULT_MAX_CONNECTIONS + assert client.max_keepalive_connections == DEFAULT_MAX_KEEPALIVE_CONNECTIONS + assert client.client_pool_size == DEFAULT_CLIENT_POOL_SIZE + @pytest.mark.asyncio async def test_client_pool_size_rotates_across_http_clients(self) -> None: import agent_control_evaluator_galileo.luna.client as luna_client_module @@ -378,6 +406,79 @@ def recording_client(**kwargs: object) -> FakeAsyncClient: assert selected == [created[0], created[1], created[2], created[0], created[1]] assert all(created_client.is_closed for created_client in created) + @pytest.mark.asyncio + async def test_pooled_client_selection_waits_for_client_lock(self) -> None: + from agent_control_evaluator_galileo.luna import GalileoLunaClient + + class FakeAsyncClient: + is_closed = False + + async def aclose(self) -> None: + self.is_closed = True + + with patch.dict( + os.environ, + { + "GALILEO_API_SECRET_KEY": "test-secret", + "GALILEO_LUNA_CLIENT_POOL_SIZE": "2", + }, + clear=True, + ): + client = GalileoLunaClient(console_url="https://console.example.com") + + first_client = FakeAsyncClient() + second_client = FakeAsyncClient() + client._clients = [first_client, second_client] # type: ignore[list-item] + + await client._client_lock.acquire() + try: + pending_selection = asyncio.create_task(client._get_client()) + await asyncio.sleep(0) + + assert not pending_selection.done() + finally: + client._client_lock.release() + + try: + assert await pending_selection is first_client + finally: + await client.close() + + @pytest.mark.asyncio + async def test_close_waits_for_client_lock_before_resetting_state(self) -> None: + from agent_control_evaluator_galileo.luna import GalileoLunaClient + + class FakeAsyncClient: + is_closed = False + + async def aclose(self) -> None: + self.is_closed = True + + with patch.dict(os.environ, {"GALILEO_API_SECRET_KEY": "test-secret"}, clear=True): + client = GalileoLunaClient(console_url="https://console.example.com") + + http_client = FakeAsyncClient() + client._client = http_client # type: ignore[assignment] + client._clients = [http_client] # type: ignore[list-item] + + await client._client_lock.acquire() + try: + pending_close = asyncio.create_task(client.close()) + await asyncio.sleep(0) + + assert not pending_close.done() + assert client._client is http_client + assert not http_client.is_closed + finally: + client._client_lock.release() + + await pending_close + + assert client._client is None + assert client._clients == [] + assert client._next_client_index == 0 + assert http_client.is_closed + @pytest.mark.parametrize( "env_values, expected", [