From 63f412bb58413123b18de4521a3a4ae6a13bd563 Mon Sep 17 00:00:00 2001 From: Lakshmikanthan K Date: Mon, 30 Mar 2026 20:00:13 +0530 Subject: [PATCH 1/2] Security: Fix path traversal and arbitrary code execution Fixes two critical vulnerabilities in ADK API server: 1. Path Traversal in agent_loader.py (CWE-22): - Added _validate_agent_path() to ensure agent_name resolves within agents_dir - Prevents directory traversal using .. sequences on all platforms - Called before loading YAML config from filesystem 2. Arbitrary Code Execution via importlib (CWE-470): - Added _SAFE_MODULE_PREFIXES allowlist to config_agent_utils.py - Restricts resolve_fully_qualified_name() to google.adk.* namespace - Also secured _resolve_agent_code_reference() and resolve_code_reference() 3. API Boundary Sanitization: - Added app_name validation to RunAgentRequest model - Rejects path traversal characters at HTTP API layer Author: Lakshmikanthan K --- src/google/adk/agents/config_agent_utils.py | 59 ++++++++++++++++++++- src/google/adk/cli/adk_web_server.py | 24 +++++++++ src/google/adk/cli/utils/agent_loader.py | 27 ++++++++++ 3 files changed, 108 insertions(+), 2 deletions(-) diff --git a/src/google/adk/agents/config_agent_utils.py b/src/google/adk/agents/config_agent_utils.py index 2c1c9bd9c8..5f5884ae74 100644 --- a/src/google/adk/agents/config_agent_utils.py +++ b/src/google/adk/agents/config_agent_utils.py @@ -30,6 +30,21 @@ from .common_configs import AgentRefConfig from .common_configs import CodeConfig +# Allowlist for safe module prefixes that can be imported from YAML config +_SAFE_MODULE_PREFIXES = frozenset({"google.adk."}) + + +def _is_safe_module_import(name: str) -> bool: + """Check if a module import is from a safe/allowed namespace. + + Args: + name: The fully qualified module name to check. + + Returns: + True if the module is in a safe namespace, False otherwise. + """ + return any(name.startswith(prefix) for prefix in _SAFE_MODULE_PREFIXES) + @experimental(FeatureName.AGENT_CONFIG) def from_config(config_path: str) -> BaseAgent: @@ -105,10 +120,34 @@ def _load_config_from_path(config_path: str) -> AgentConfig: @experimental(FeatureName.AGENT_CONFIG) def resolve_fully_qualified_name(name: str) -> Any: + """Resolve a fully qualified name to a Python object. + + Args: + name: The fully qualified name (e.g., 'google.adk.agents.LlmAgent'). + + Returns: + The resolved Python object. + + Raises: + ValueError: If the name is not in a safe namespace or cannot be resolved. + """ try: module_path, obj_name = name.rsplit(".", 1) + + # Security check: only allow imports from safe namespaces + if not _is_safe_module_import(module_path): + raise ValueError( + f"Module reference '{name}' is outside the allowed namespace. " + "Only google.adk.* references are permitted in YAML config." + ) + module = importlib.import_module(module_path) return getattr(module, obj_name) + except ValueError as e: + # Re-raise ValueError from security check without wrapping + if "outside the allowed namespace" in str(e): + raise e + raise ValueError(f"Invalid fully qualified name: {name}") from e except Exception as e: raise ValueError(f"Invalid fully qualified name: {name}") from e @@ -153,12 +192,20 @@ def _resolve_agent_code_reference(code: str) -> Any: The resolved agent instance. Raises: - ValueError: If the agent reference cannot be resolved. + ValueError: If the agent reference cannot be resolved or is outside allowed namespace. """ if "." not in code: raise ValueError(f"Invalid code reference: {code}") module_path, obj_name = code.rsplit(".", 1) + + # Security check: only allow imports from safe namespaces + if not _is_safe_module_import(module_path): + raise ValueError( + f"Code reference '{code}' is outside the allowed namespace. " + "Only google.adk.* references are permitted in YAML config." + ) + module = importlib.import_module(module_path) obj = getattr(module, obj_name) @@ -182,12 +229,20 @@ def resolve_code_reference(code_config: CodeConfig) -> Any: The resolved Python object. Raises: - ValueError: If the code reference cannot be resolved. + ValueError: If the code reference cannot be resolved or is outside allowed namespace. """ if not code_config or not code_config.name: raise ValueError("Invalid CodeConfig.") module_path, obj_name = code_config.name.rsplit(".", 1) + + # Security check: only allow imports from safe namespaces + if not _is_safe_module_import(module_path): + raise ValueError( + f"Code reference '{code_config.name}' is outside the allowed namespace. " + "Only google.adk.* references are permitted in YAML config." + ) + module = importlib.import_module(module_path) obj = getattr(module, obj_name) diff --git a/src/google/adk/cli/adk_web_server.py b/src/google/adk/cli/adk_web_server.py index 927cd7ad03..dc17ebaff1 100644 --- a/src/google/adk/cli/adk_web_server.py +++ b/src/google/adk/cli/adk_web_server.py @@ -51,6 +51,7 @@ from opentelemetry.sdk.trace import TracerProvider from pydantic import Field from pydantic import ValidationError +from pydantic import field_validator from starlette.types import Lifespan from typing_extensions import deprecated from typing_extensions import override @@ -366,6 +367,29 @@ class RunAgentRequest(common.BaseModel): # for resume long-running functions invocation_id: Optional[str] = None + @field_validator("app_name") + @classmethod + def validate_app_name(cls, v: str) -> str: + """Validate app_name to prevent path traversal attacks. + + Args: + v: The app_name value to validate. + + Returns: + The validated app_name. + + Raises: + ValueError: If the app_name contains path traversal characters. + """ + if not v: + raise ValueError("app_name cannot be empty") + # Check for path traversal attempts + if ".." in v or "/" in v or "\\" in v: + raise ValueError( + f"Invalid app_name: {v!r}. Path traversal characters are not allowed." + ) + return v + class CreateSessionRequest(common.BaseModel): session_id: Optional[str] = Field( diff --git a/src/google/adk/cli/utils/agent_loader.py b/src/google/adk/cli/utils/agent_loader.py index a7bbcbc2a6..c895826368 100644 --- a/src/google/adk/cli/utils/agent_loader.py +++ b/src/google/adk/cli/utils/agent_loader.py @@ -167,6 +167,8 @@ def _load_from_submodule( def _load_from_yaml_config( self, agent_name: str, agents_dir: str ) -> Optional[BaseAgent]: + # Validate agent_name doesn't escape agents_dir + self._validate_agent_path(agents_dir, agent_name) # Load from the config file at agents_dir/{agent_name}/root_agent.yaml config_path = os.path.join(agents_dir, agent_name, "root_agent.yaml") try: @@ -188,6 +190,31 @@ def _load_from_yaml_config( ) + e.args[1:] raise e + def _validate_agent_path(self, agents_dir: str, agent_name: str) -> None: + """Validate that the agent path resolves within agents_dir. + + Args: + agents_dir: The base directory for agents. + agent_name: The agent name/path to validate. + + Raises: + ValueError: If the resolved path would escape agents_dir. + """ + # Normalize paths to absolute, resolved paths + base_path = Path(agents_dir).resolve() + # Handle both forward and backward slashes by using Path + agent_path = base_path / agent_name + resolved_path = agent_path.resolve() + + # Check if the resolved path is still within the base directory + try: + resolved_path.relative_to(base_path) + except ValueError as e: + raise ValueError( + f"Agent '{agent_name}' resolves outside agents_dir. " + "Path traversal is not permitted." + ) from e + _VALID_AGENT_NAME_RE = re.compile(r"^[a-zA-Z0-9_]+$") def _validate_agent_name(self, agent_name: str) -> None: From b4688d290544834500b7d26ddd459742e65f97bb Mon Sep 17 00:00:00 2001 From: Lakshmikanthan K Date: Tue, 14 Apr 2026 12:57:01 +0530 Subject: [PATCH 2/2] feat(security): remediate CI failures and harden security protections Resolved pyink, isort, and mypy violations while tightening path traversal and RCE validation. Added comprehensive security test suite with 48 passing cases. --- src/google/adk/agents/config_agent_utils.py | 35 ++- src/google/adk/cli/adk_web_server.py | 9 +- src/google/adk/cli/utils/agent_loader.py | 6 +- tests/unittests/security/__init__.py | 1 + .../security/test_security_hardening.py | 232 ++++++++++++++++++ 5 files changed, 267 insertions(+), 16 deletions(-) create mode 100644 tests/unittests/security/__init__.py create mode 100644 tests/unittests/security/test_security_hardening.py diff --git a/src/google/adk/agents/config_agent_utils.py b/src/google/adk/agents/config_agent_utils.py index 154b1ae42a..335766583d 100644 --- a/src/google/adk/agents/config_agent_utils.py +++ b/src/google/adk/agents/config_agent_utils.py @@ -17,6 +17,7 @@ import importlib import inspect import os +from types import ModuleType from typing import Any from typing import List @@ -31,7 +32,7 @@ from .common_configs import CodeConfig # Allowlist for safe module prefixes that can be imported from YAML config -_SAFE_MODULE_PREFIXES = frozenset({"google.adk."}) +_SAFE_MODULE_PREFIXES: frozenset[str] = frozenset({"google.adk."}) def _is_safe_module_import(name: str) -> bool: @@ -43,7 +44,17 @@ def _is_safe_module_import(name: str) -> bool: Returns: True if the module is in a safe namespace, False otherwise. """ - return any(name.startswith(prefix) for prefix in _SAFE_MODULE_PREFIXES) + # Must start with google.adk. (the dot ensures it's not a partial name spoof) + if not any(name.startswith(p) for p in _SAFE_MODULE_PREFIXES): + return False + + # Ensure each segment is a valid Python identifier + segments = name.split(".") + for s in segments: + if not s or not s.isidentifier(): + return False + + return True @experimental(FeatureName.AGENT_CONFIG) @@ -132,16 +143,22 @@ def resolve_fully_qualified_name(name: str) -> Any: ValueError: If the name is not in a safe namespace or cannot be resolved. """ try: + if "." not in name: + raise ValueError( + f"Module reference '{name}' is outside the allowed namespace. " + "Only google.adk.* references are permitted in YAML config." + ) + module_path, obj_name = name.rsplit(".", 1) - # Security check: only allow imports from safe namespaces - if not _is_safe_module_import(module_path): + # Security check: only allow imports from safe namespaces and valid identifiers + if not obj_name.isidentifier() or not _is_safe_module_import(module_path): raise ValueError( f"Module reference '{name}' is outside the allowed namespace. " "Only google.adk.* references are permitted in YAML config." ) - module = importlib.import_module(module_path) + module: ModuleType = importlib.import_module(module_path) return getattr(module, obj_name) except ValueError as e: # Re-raise ValueError from security check without wrapping @@ -206,7 +223,7 @@ def _resolve_agent_code_reference(code: str) -> Any: "Only google.adk.* references are permitted in YAML config." ) - module = importlib.import_module(module_path) + module: ModuleType = importlib.import_module(module_path) obj = getattr(module, obj_name) if callable(obj): @@ -239,11 +256,11 @@ def resolve_code_reference(code_config: CodeConfig) -> Any: # Security check: only allow imports from safe namespaces if not _is_safe_module_import(module_path): raise ValueError( - f"Code reference '{code_config.name}' is outside the allowed namespace. " - "Only google.adk.* references are permitted in YAML config." + f"Code reference '{code_config.name}' is outside the allowed namespace." + " Only google.adk.* references are permitted in YAML config." ) - module = importlib.import_module(module_path) + module: ModuleType = importlib.import_module(module_path) obj = getattr(module, obj_name) if code_config.args and callable(obj): diff --git a/src/google/adk/cli/adk_web_server.py b/src/google/adk/cli/adk_web_server.py index 0c8202f9ee..e2639b6cb6 100644 --- a/src/google/adk/cli/adk_web_server.py +++ b/src/google/adk/cli/adk_web_server.py @@ -50,8 +50,8 @@ from opentelemetry.sdk.trace import SpanProcessor from opentelemetry.sdk.trace import TracerProvider from pydantic import Field -from pydantic import ValidationError from pydantic import field_validator +from pydantic import ValidationError from starlette.types import Lifespan from typing_extensions import deprecated from typing_extensions import override @@ -389,10 +389,11 @@ def validate_app_name(cls, v: str) -> str: """ if not v: raise ValueError("app_name cannot be empty") - # Check for path traversal attempts - if ".." in v or "/" in v or "\\" in v: + # Check for path traversal attempts and ensure it is a valid identifier + if not re.match(r"^[a-zA-Z0-9_]+$", v): raise ValueError( - f"Invalid app_name: {v!r}. Path traversal characters are not allowed." + f"Invalid app_name: {v!r}. " + "Must contain only letters, digits, and underscores." ) return v diff --git a/src/google/adk/cli/utils/agent_loader.py b/src/google/adk/cli/utils/agent_loader.py index c895826368..6bda59c72f 100644 --- a/src/google/adk/cli/utils/agent_loader.py +++ b/src/google/adk/cli/utils/agent_loader.py @@ -116,7 +116,7 @@ def _load_from_module_or_package( def _load_from_submodule( self, agent_name: str - ) -> Optional[Union[BaseAgent], App]: + ) -> Optional[Union[BaseAgent, App]]: # Load for case: Import "{agent_name}.agent" and look for "root_agent" # Covers structure: agents_dir/{agent_name}/agent.py (with root_agent defined in the module) try: @@ -215,7 +215,7 @@ def _validate_agent_path(self, agents_dir: str, agent_name: str) -> None: "Path traversal is not permitted." ) from e - _VALID_AGENT_NAME_RE = re.compile(r"^[a-zA-Z0-9_]+$") + _VALID_AGENT_NAME_RE: re.Pattern[str] = re.compile(r"^[a-zA-Z0-9_]+$") def _validate_agent_name(self, agent_name: str) -> None: """Validate agent name to prevent arbitrary module imports.""" @@ -450,7 +450,7 @@ def _determine_agent_language( raise ValueError(f"Could not determine agent type for '{agent_name}'.") - def remove_agent_from_cache(self, agent_name: str): + def remove_agent_from_cache(self, agent_name: str) -> None: # Clear module cache for the agent and its submodules keys_to_delete = [ module_name diff --git a/tests/unittests/security/__init__.py b/tests/unittests/security/__init__.py new file mode 100644 index 0000000000..fda4247bc1 --- /dev/null +++ b/tests/unittests/security/__init__.py @@ -0,0 +1 @@ +# Security tests package. diff --git a/tests/unittests/security/test_security_hardening.py b/tests/unittests/security/test_security_hardening.py new file mode 100644 index 0000000000..e7bc2d4326 --- /dev/null +++ b/tests/unittests/security/test_security_hardening.py @@ -0,0 +1,232 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import os +import shutil +import tempfile +from pathlib import Path +from unittest import mock + +import pytest +from pydantic import ValidationError + +from google.adk.agents import config_agent_utils +from google.adk.artifacts.file_artifact_service import FileArtifactService +from google.adk.cli.adk_web_server import RunAgentRequest +from google.adk.cli.utils.agent_loader import AgentLoader +from google.adk.errors.input_validation_error import InputValidationError + + +class TestSecurityHardening: + """Comprehensive security tests for Path Traversal, RCE, and Sanitization.""" + + # --- Path Traversal: AgentLoader --- + + @pytest.mark.parametrize( + "malicious_name", + [ + "../evil", + "..\\evil", + "sub/../../evil", + "valid/../..", + "C:\\Windows\\System32\\cmd.exe", + "/etc/passwd", + ".", + "..", + " ", + "\0", + "%2e%2e/evil", + "%5C..%5Cevil", + "./../evil", + ], + ) + def test_agent_loader_prevents_path_traversal(self, malicious_name): + """Verify AgentLoader rejects names that escape the agents directory.""" + with tempfile.TemporaryDirectory() as temp_dir: + loader = AgentLoader(temp_dir) + with pytest.raises(ValueError) as exc_info: + # _perform_load calls _validate_agent_name which uses a strict regex + # for regular loading, and _validate_agent_path for YAML. + loader.load_agent(malicious_name) + + # Should match either invalid name (regex) or path traversal (resolved path) + msg = str(exc_info.value) + assert any( + s in msg for s in [ + "Invalid agent name", + "resolves outside agents_dir", + "Path traversal is not permitted", + "Agent not found" + ] + ) + + def test_agent_loader_yaml_validation_strict(self): + """Verify _validate_agent_path directly with various traversal attempts.""" + with tempfile.TemporaryDirectory() as temp_dir: + loader = AgentLoader(temp_dir) + base_path = Path(temp_dir).resolve() + + # Mock the filesystem to avoid "Agent not found" errors during perform_load + # if we were to call it directly. Instead, test the validator. + + # Malicious case + with pytest.raises(ValueError, match="Path traversal is not permitted"): + loader._validate_agent_path(str(base_path), "../outside") + + # Deeply nested traversal + with pytest.raises(ValueError, match="Path traversal is not permitted"): + loader._validate_agent_path(str(base_path), "a/b/../../../c") + + # Valid case + loader._validate_agent_path(str(base_path), "valid_agent") + loader._validate_agent_path(str(base_path), "subdir/agent") + + # --- RCE Mitigation: config_agent_utils --- + + @pytest.mark.parametrize( + "malicious_module", + [ + "os.system", + "subprocess.run", + "importlib.import_module", + "google.adk_evil.module", + "google.adk..evil", + "google.adk. agents.LlmAgent", # space injection + "google.adk.agents.LlmAgent; print('hacked')", # command separator + "builtins.eval", + "sys.modules", + "__main__", + ], + ) + def test_config_agent_utils_rejects_unsafe_imports(self, malicious_module): + """Verify only google.adk.* modules are allowed for dynamic resolution.""" + with pytest.raises(ValueError, match="outside the allowed namespace"): + config_agent_utils.resolve_fully_qualified_name(malicious_module) + + def test_config_agent_utils_allows_adk_imports(self): + """Verify ADK's own modules can still be resolved.""" + # This should not raise + cls = config_agent_utils.resolve_fully_qualified_name("google.adk.agents.llm_agent.LlmAgent") + from google.adk.agents.llm_agent import LlmAgent + assert cls is LlmAgent + + def test_is_safe_module_import_logic(self): + """Deep dive into the prefix matching logic to prevent partial name spoofs.""" + from google.adk.agents.config_agent_utils import _is_safe_module_import + + assert _is_safe_module_import("google.adk.agents") is True + assert _is_safe_module_import("google.adk.utils") is True + + # Partial prefix spoofs + assert _is_safe_module_import("google.adk_evil") is False + assert _is_safe_module_import("google.adk_") is False + + # Empty segments + assert _is_safe_module_import("google.adk..agents") is False + assert _is_safe_module_import("google.adk.") is False # trailing dot splits to empty segment + + # --- API Boundary: RunAgentRequest --- + + @pytest.mark.parametrize( + "bad_app_name", + [ + "../evil", + "\\path", + "../../etc/passwd", + "a/b", + " ", + "", + ], + ) + def test_run_agent_request_sanitization(self, bad_app_name): + """Verify Pydantic model rejects malicious app_name at the API boundary.""" + with pytest.raises(ValidationError): + RunAgentRequest( + app_name=bad_app_name, + user_id="user123", + session_id="sess456" + ) + + def test_run_agent_request_valid(self): + """Verify valid app names pass validation.""" + req = RunAgentRequest( + app_name="my_secure_agent", + user_id="user123", + session_id="sess456" + ) + assert req.app_name == "my_secure_agent" + + # --- Path Traversal: FileArtifactService --- + + @pytest.mark.parametrize("field_name", ["user_id", "session_id"]) + @pytest.mark.parametrize( + "malicious_val", + [ + "../escape", + "..\\win", + "sub/../../parent", + ".", + "..", + "null\0byte", + "", + ], + ) + async def test_artifact_service_prevents_traversal(self, tmp_path, field_name, malicious_val): + """Verify FileArtifactService rejects traversal in user/session IDs.""" + service = FileArtifactService(root_dir=tmp_path / "artifacts") + + params = { + "user_id": "valid_user", + "session_id": "valid_sess", + "app_name": "app", + "filename": "file.txt", + "artifact": mock.Mock() # mock Part + } + params[field_name] = malicious_val + + with pytest.raises(InputValidationError): + await service.save_artifact(**params) + + # --- Symlink Awareness (Platform Dependent) --- + + def test_path_traversal_with_symlink_awareness(self, tmp_path): + """Verify Path.resolve() correctly handles symlinks to prevent escaping agents_dir.""" + agents_dir = tmp_path / "agents" + agents_dir.mkdir() + + secret_dir = tmp_path / "secrets" + secret_dir.mkdir() + (secret_dir / "passwords.txt").write_text("secret_data") + + # Create a symlink inside agents_dir pointing to sensitive data outside + link_path = agents_dir / "malicious_link" + try: + os.symlink(secret_dir, link_path) + except (OSError, NotImplementedError): + pytest.skip("Symlinks not supported on this platform/environment") + + loader = AgentLoader(str(agents_dir)) + + # Attempting to load "malicious_link/passwords.txt" + # Even if it exists via symlink, the validator should eventually check it. + # Note: AgentLoader checks if the directory exists and contains agent files. + + with pytest.raises(ValueError): + # If the agent name follows identifiers only, "malicious_link" passes regex, + # but if we try to use it to reach passwords.txt, it fails regex. + # However, if an agent name is "malicious_link", it would load agents_dir/malicious_link/root_agent.yaml + # if it existed. Path.resolve() would resolve it to secret_dir. + loader._validate_agent_path(str(agents_dir), "malicious_link")