diff --git a/cycode/cli/apps/ai_guardrails/codex_config.py b/cycode/cli/apps/ai_guardrails/codex_config.py new file mode 100644 index 00000000..f83c6cb0 --- /dev/null +++ b/cycode/cli/apps/ai_guardrails/codex_config.py @@ -0,0 +1,67 @@ +"""Codex CLI config.toml management for AI guardrails. + +Codex requires `[features] codex_hooks = true` in its `config.toml` for hook +scripts to be invoked. This module merges that flag into the user-scope +(`~/.codex/config.toml`) or repo-scope (`/.codex/config.toml`) file while +preserving any existing keys. +""" + +import sys +from pathlib import Path +from typing import Optional + +import tomli_w + +if sys.version_info >= (3, 11): + import tomllib +else: # pragma: no cover - py<3.11 fallback + import tomli as tomllib + +from cycode.logger import get_logger + +logger = get_logger('AI Guardrails Codex Config') + +CODEX_CONFIG_FILE_NAME = 'config.toml' +CODEX_CONFIG_DIR_NAME = '.codex' + + +def get_codex_config_path(scope: str, repo_path: Optional[Path] = None) -> Path: + """Get the Codex config.toml path for the given scope.""" + if scope == 'repo' and repo_path: + return repo_path / CODEX_CONFIG_DIR_NAME / CODEX_CONFIG_FILE_NAME + return Path.home() / CODEX_CONFIG_DIR_NAME / CODEX_CONFIG_FILE_NAME + + +def enable_codex_hooks_feature(scope: str = 'user', repo_path: Optional[Path] = None) -> tuple[bool, str]: + """Ensure `[features] codex_hooks = true` is set in Codex's config.toml. + + Preserves existing keys. Creates the file (and parent dir) if missing. + + Returns: + Tuple of (success, message). + """ + config_path = get_codex_config_path(scope, repo_path) + + config: dict = {} + if config_path.exists(): + try: + with config_path.open('rb') as f: + config = tomllib.load(f) + except Exception as e: + logger.error('Failed to parse Codex config.toml', exc_info=e) + return False, f'Failed to parse existing Codex config at {config_path}' + + features = config.get('features') + if not isinstance(features, dict): + features = {} + features['codex_hooks'] = True + config['features'] = features + + try: + config_path.parent.mkdir(parents=True, exist_ok=True) + with config_path.open('wb') as f: + tomli_w.dump(config, f) + return True, f'Enabled codex_hooks in {config_path}' + except Exception as e: + logger.error('Failed to write Codex config.toml', exc_info=e) + return False, f'Failed to write Codex config at {config_path}' diff --git a/cycode/cli/apps/ai_guardrails/consts.py b/cycode/cli/apps/ai_guardrails/consts.py index 837096c8..0c811d43 100644 --- a/cycode/cli/apps/ai_guardrails/consts.py +++ b/cycode/cli/apps/ai_guardrails/consts.py @@ -3,6 +3,7 @@ Currently supports: - Cursor - Claude Code +- Codex """ import platform @@ -17,6 +18,7 @@ class AIIDEType(str, Enum): CURSOR = 'cursor' CLAUDE_CODE = 'claude-code' + CODEX = 'codex' class PolicyMode(str, Enum): @@ -61,6 +63,14 @@ def _get_claude_code_hooks_dir() -> Path: return Path.home() / '.claude' +def _get_codex_hooks_dir() -> Path: + """Get Codex hooks directory. + + Codex uses ~/.codex on all platforms. + """ + return Path.home() / '.codex' + + # IDE-specific configurations IDE_CONFIGS: dict[AIIDEType, IDEConfig] = { AIIDEType.CURSOR: IDEConfig( @@ -77,6 +87,13 @@ def _get_claude_code_hooks_dir() -> Path: hooks_file_name='settings.json', hook_events=['UserPromptSubmit', 'PreToolUse:Read', 'PreToolUse:mcp'], ), + AIIDEType.CODEX: IDEConfig( + name='Codex', + hooks_dir=_get_codex_hooks_dir(), + repo_hooks_subdir='.codex', + hooks_file_name='hooks.json', + hook_events=['UserPromptSubmit', 'PreToolUse:Bash'], + ), } # Default IDE @@ -141,6 +158,47 @@ def _get_claude_code_hooks_config(async_mode: bool = False) -> dict: } +def _get_codex_hooks_config(async_mode: bool = False) -> dict: + """Get Codex-specific hooks configuration. + + Codex uses the same nested hook-entry format as Claude Code: + - events are keyed by name + - each entry has an optional 'matcher' (regex on tool name / start source) + and a 'hooks' array with {type, command, ...} + + Codex only supports intercepting Bash for PreToolUse today; MCP and file + reads are not exposed to hooks. + """ + command = f'{CYCODE_SCAN_PROMPT_COMMAND} --ide {AIIDEType.CODEX.value}' + + hook_entry = {'type': 'command', 'command': command} + if async_mode: + hook_entry['async'] = True + hook_entry['timeout'] = 20 + + return { + 'hooks': { + 'SessionStart': [ + { + 'matcher': 'startup', + 'hooks': [{'type': 'command', 'command': CYCODE_ENSURE_AUTH_COMMAND}], + } + ], + 'UserPromptSubmit': [ + { + 'hooks': [deepcopy(hook_entry)], + } + ], + 'PreToolUse': [ + { + 'matcher': 'Bash', + 'hooks': [deepcopy(hook_entry)], + }, + ], + }, + } + + def get_hooks_config(ide: AIIDEType, async_mode: bool = False) -> dict: """Get the hooks configuration for a specific IDE. @@ -153,4 +211,6 @@ def get_hooks_config(ide: AIIDEType, async_mode: bool = False) -> dict: """ if ide == AIIDEType.CLAUDE_CODE: return _get_claude_code_hooks_config(async_mode=async_mode) + if ide == AIIDEType.CODEX: + return _get_codex_hooks_config(async_mode=async_mode) return _get_cursor_hooks_config(async_mode=async_mode) diff --git a/cycode/cli/apps/ai_guardrails/install_command.py b/cycode/cli/apps/ai_guardrails/install_command.py index a92a978f..61988618 100644 --- a/cycode/cli/apps/ai_guardrails/install_command.py +++ b/cycode/cli/apps/ai_guardrails/install_command.py @@ -5,6 +5,7 @@ import typer +from cycode.cli.apps.ai_guardrails.codex_config import enable_codex_hooks_feature from cycode.cli.apps.ai_guardrails.command_utils import ( console, resolve_repo_path, @@ -29,7 +30,7 @@ def install_command( str, typer.Option( '--ide', - help='IDE to install hooks for (e.g., "cursor", "claude-code", or "all" for all IDEs). Defaults to cursor.', + help='IDE to install hooks for ("cursor", "claude-code", "codex", or "all"). Defaults to cursor.', ), ] = AIIDEType.CURSOR.value, repo_path: Annotated[ @@ -74,11 +75,14 @@ def install_command( ides_to_install: list[AIIDEType] = list(AIIDEType) if ide_type is None else [ide_type] results: list[tuple[str, bool, str]] = [] + extra_messages: list[tuple[bool, str]] = [] for current_ide in ides_to_install: ide_name = IDE_CONFIGS[current_ide].name report_mode = mode == InstallMode.REPORT success, message = install_hooks(scope, repo_path, ide=current_ide, report_mode=report_mode) results.append((ide_name, success, message)) + if success and current_ide == AIIDEType.CODEX: + extra_messages.append(enable_codex_hooks_feature(scope, repo_path)) # Report results for each IDE any_success = False @@ -91,6 +95,13 @@ def install_command( console.print(f'[red]✗[/] {message}', style='bold red') all_success = False + for success, message in extra_messages: + if success: + console.print(f'[green]✓[/] {message}') + else: + console.print(f'[red]✗[/] {message}', style='bold red') + all_success = False + if any_success: policy_mode = PolicyMode.WARN if mode == InstallMode.REPORT else PolicyMode.BLOCK _install_policy(scope, repo_path, policy_mode) diff --git a/cycode/cli/apps/ai_guardrails/scan/consts.py b/cycode/cli/apps/ai_guardrails/scan/consts.py index 007892a8..4e29c932 100644 --- a/cycode/cli/apps/ai_guardrails/scan/consts.py +++ b/cycode/cli/apps/ai_guardrails/scan/consts.py @@ -45,4 +45,9 @@ 'action': 'block', 'scan_arguments': True, }, + 'command_exec': { + 'enabled': True, + 'action': 'block', + 'scan_arguments': True, + }, } diff --git a/cycode/cli/apps/ai_guardrails/scan/handlers.py b/cycode/cli/apps/ai_guardrails/scan/handlers.py index fa0bddee..c9c356d1 100644 --- a/cycode/cli/apps/ai_guardrails/scan/handlers.py +++ b/cycode/cli/apps/ai_guardrails/scan/handlers.py @@ -256,6 +256,73 @@ def handle_before_mcp_execution(ctx: typer.Context, payload: AIHookPayload, poli ) +def handle_before_command_exec(ctx: typer.Context, payload: AIHookPayload, policy: dict) -> dict: + """ + Handle PreToolUse:Bash (CommandExec) hook. + + Scans the shell command the agent is about to run for secrets before + execution. Returns deny_permission to block, ask_permission to warn, + allow_permission to allow. + """ + ai_client = ctx.obj['ai_security_client'] + ide = payload.ide_provider + response_builder = get_response_builder(ide) + + command_config = get_policy_value(policy, 'command_exec', default={}) + ai_client.create_conversation(payload) + if not get_policy_value(command_config, 'enabled', default=True): + ai_client.create_event(payload, AiHookEventType.COMMAND_EXEC, AIHookOutcome.ALLOWED) + return response_builder.allow_permission() + + mode = get_policy_value(policy, 'mode', default=PolicyMode.BLOCK) + command = payload.command or '' + max_bytes = get_policy_value(policy, 'secrets', 'max_bytes', default=200000) + timeout_ms = get_policy_value(policy, 'secrets', 'timeout_ms', default=30000) + clipped = truncate_utf8(command, max_bytes) + action = get_policy_value(command_config, 'action', default=PolicyMode.BLOCK) + + scan_id = None + block_reason = None + outcome = AIHookOutcome.ALLOWED + error_message = None + + try: + if get_policy_value(command_config, 'scan_arguments', default=True): + violation_summary, scan_id = _scan_text_for_secrets(ctx, clipped, timeout_ms) + if violation_summary: + block_reason = BlockReason.SECRETS_IN_COMMAND + if mode == PolicyMode.BLOCK and action == PolicyMode.BLOCK: + outcome = AIHookOutcome.BLOCKED + user_message = f'Cycode blocked shell command execution. {violation_summary}' + return response_builder.deny_permission( + user_message, + 'Do not embed secrets in shell commands. Use environment variables or secret references.', + ) + outcome = AIHookOutcome.WARNED + return response_builder.ask_permission( + f'{violation_summary} in shell command. Allow execution?', + 'Possible secrets detected in command; proceed with caution.', + ) + + return response_builder.allow_permission() + except Exception as e: + outcome = ( + AIHookOutcome.ALLOWED if get_policy_value(policy, 'fail_open', default=True) else AIHookOutcome.BLOCKED + ) + block_reason = BlockReason.SCAN_FAILURE + error_message = str(e) + raise e + finally: + ai_client.create_event( + payload, + AiHookEventType.COMMAND_EXEC, + outcome, + scan_id=scan_id, + block_reason=block_reason, + error_message=error_message, + ) + + def get_handler_for_event(event_type: str) -> Optional[Callable[[typer.Context, AIHookPayload, dict], dict]]: """Get the appropriate handler function for a canonical event type. @@ -269,6 +336,7 @@ def get_handler_for_event(event_type: str) -> Optional[Callable[[typer.Context, AiHookEventType.PROMPT.value: handle_before_submit_prompt, AiHookEventType.FILE_READ.value: handle_before_read_file, AiHookEventType.MCP_EXECUTION.value: handle_before_mcp_execution, + AiHookEventType.COMMAND_EXEC.value: handle_before_command_exec, } return handlers.get(event_type) diff --git a/cycode/cli/apps/ai_guardrails/scan/payload.py b/cycode/cli/apps/ai_guardrails/scan/payload.py index ada40a3c..0bdb79b8 100644 --- a/cycode/cli/apps/ai_guardrails/scan/payload.py +++ b/cycode/cli/apps/ai_guardrails/scan/payload.py @@ -11,6 +11,8 @@ from cycode.cli.apps.ai_guardrails.scan.types import ( CLAUDE_CODE_EVENT_MAPPING, CLAUDE_CODE_EVENT_NAMES, + CODEX_EVENT_MAPPING, + CODEX_EVENT_NAMES, CURSOR_EVENT_MAPPING, CURSOR_EVENT_NAMES, AiHookEventType, @@ -139,6 +141,7 @@ class AIHookPayload: mcp_server_name: Optional[str] = None # For mcp_execution events mcp_tool_name: Optional[str] = None # For mcp_execution events mcp_arguments: Optional[dict] = None # For mcp_execution events + command: Optional[str] = None # For command_exec events (e.g., Codex PreToolUse:Bash) @classmethod def from_cursor_payload(cls, payload: dict) -> 'AIHookPayload': @@ -227,6 +230,44 @@ def from_claude_code_payload(cls, payload: dict) -> 'AIHookPayload': mcp_arguments=mcp_arguments, ) + @classmethod + def from_codex_payload(cls, payload: dict) -> 'AIHookPayload': + """Create AIHookPayload from Codex CLI payload. + + Codex payload shape (per https://developers.openai.com/codex/hooks): + - hook_event_name: 'UserPromptSubmit' | 'PreToolUse' | 'PostToolUse' | 'SessionStart' | 'Stop' + - session_id, turn_id, cwd, model delivered at top level + - For UserPromptSubmit: prompt field + - For PreToolUse: tool_name (currently only 'Bash') and tool_input.command + """ + hook_event_name = payload.get('hook_event_name', '') + tool_name = payload.get('tool_name', '') + tool_input = payload.get('tool_input') + + if hook_event_name == 'UserPromptSubmit': + canonical_event = AiHookEventType.PROMPT + elif hook_event_name == 'PreToolUse' and tool_name == 'Bash': + canonical_event = AiHookEventType.COMMAND_EXEC + else: + # Unknown or unsupported event combination; fall back to raw event name + canonical_event = CODEX_EVENT_MAPPING.get(hook_event_name, hook_event_name) + + command = None + if tool_name == 'Bash' and isinstance(tool_input, dict): + command = tool_input.get('command') + + return cls( + event_name=canonical_event, + conversation_id=payload.get('session_id'), + generation_id=payload.get('turn_id'), + ide_user_email=None, + model=payload.get('model'), + ide_provider=AIIDEType.CODEX.value, + ide_version=payload.get('codex_version'), + prompt=payload.get('prompt', ''), + command=command, + ) + @staticmethod def is_payload_for_ide(payload: dict, ide: str) -> bool: """Check if the payload's event name matches the expected IDE. @@ -248,6 +289,8 @@ def is_payload_for_ide(payload: dict, ide: str) -> bool: return hook_event_name in CLAUDE_CODE_EVENT_NAMES if ide == AIIDEType.CURSOR: return hook_event_name in CURSOR_EVENT_NAMES + if ide == AIIDEType.CODEX: + return hook_event_name in CODEX_EVENT_NAMES # Unknown IDE, allow processing return True @@ -270,4 +313,6 @@ def from_payload(cls, payload: dict, tool: str = AIIDEType.CURSOR.value) -> 'AIH return cls.from_cursor_payload(payload) if tool == AIIDEType.CLAUDE_CODE: return cls.from_claude_code_payload(payload) + if tool == AIIDEType.CODEX: + return cls.from_codex_payload(payload) raise ValueError(f'Unsupported IDE/tool: {tool}') diff --git a/cycode/cli/apps/ai_guardrails/scan/response_builders.py b/cycode/cli/apps/ai_guardrails/scan/response_builders.py index ff0a6aa4..6964c046 100644 --- a/cycode/cli/apps/ai_guardrails/scan/response_builders.py +++ b/cycode/cli/apps/ai_guardrails/scan/response_builders.py @@ -110,10 +110,20 @@ def deny_prompt(self, user_message: str) -> dict: return {'decision': 'block', 'reason': user_message} +class CodexResponseBuilder(ClaudeCodeResponseBuilder): + """Response builder for Codex CLI hooks. + + Codex accepts the same hook response shapes as Claude Code + (decision/reason for prompts, hookSpecificOutput.permissionDecision for + PreToolUse), so we reuse the Claude Code builder verbatim. + """ + + # Registry of response builders by IDE type _RESPONSE_BUILDERS: dict[str, IDEResponseBuilder] = { AIIDEType.CURSOR: CursorResponseBuilder(), AIIDEType.CLAUDE_CODE: ClaudeCodeResponseBuilder(), + AIIDEType.CODEX: CodexResponseBuilder(), } diff --git a/cycode/cli/apps/ai_guardrails/scan/types.py b/cycode/cli/apps/ai_guardrails/scan/types.py index 585c7820..bd67d618 100644 --- a/cycode/cli/apps/ai_guardrails/scan/types.py +++ b/cycode/cli/apps/ai_guardrails/scan/types.py @@ -22,6 +22,7 @@ class AiHookEventType(StrEnum): PROMPT = 'Prompt' FILE_READ = 'FileRead' MCP_EXECUTION = 'McpExecution' + COMMAND_EXEC = 'CommandExec' # IDE-specific event name mappings to canonical types @@ -38,9 +39,16 @@ class AiHookEventType(StrEnum): 'PreToolUse': None, # Requires tool_name inspection to determine actual type } +# Codex event mapping - PreToolUse currently only fires for Bash (maps to CommandExec) +CODEX_EVENT_MAPPING = { + 'UserPromptSubmit': AiHookEventType.PROMPT, + 'PreToolUse': None, # Requires tool_name inspection (Bash -> CommandExec) +} + # Set of known event names per IDE (for IDE detection) CURSOR_EVENT_NAMES = set(CURSOR_EVENT_MAPPING.keys()) CLAUDE_CODE_EVENT_NAMES = set(CLAUDE_CODE_EVENT_MAPPING.keys()) +CODEX_EVENT_NAMES = set(CODEX_EVENT_MAPPING.keys()) class AIHookOutcome(StrEnum): @@ -61,5 +69,6 @@ class BlockReason(StrEnum): SECRETS_IN_PROMPT = 'secrets_in_prompt' SECRETS_IN_FILE = 'secrets_in_file' SECRETS_IN_MCP_ARGS = 'secrets_in_mcp_args' + SECRETS_IN_COMMAND = 'secrets_in_command' SENSITIVE_PATH = 'sensitive_path' SCAN_FAILURE = 'scan_failure' diff --git a/poetry.lock b/poetry.lock index f810887f..22baa4a8 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1874,7 +1874,7 @@ version = "2.3.0" description = "A lil' TOML parser" optional = false python-versions = ">=3.8" -groups = ["test"] +groups = ["main", "test"] markers = "python_version < \"3.11\"" files = [ {file = "tomli-2.3.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:88bd15eb972f3664f5ed4b57c1634a97153b4bac4479dcb6a495f41921eb7f45"}, @@ -1921,6 +1921,18 @@ files = [ {file = "tomli-2.3.0.tar.gz", hash = "sha256:64be704a875d2a59753d80ee8a533c3fe183e3f06807ff7dc2232938ccb01549"}, ] +[[package]] +name = "tomli-w" +version = "1.2.0" +description = "A lil' TOML writer" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "tomli_w-1.2.0-py3-none-any.whl", hash = "sha256:188306098d013b691fcadc011abd66727d3c414c571bb01b1a174ba8c983cf90"}, + {file = "tomli_w-1.2.0.tar.gz", hash = "sha256:2dd14fac5a47c27be9cd4c976af5a12d87fb1f0b4512f81d69cce3b35ae25021"}, +] + [[package]] name = "typer" version = "0.15.4" @@ -2042,4 +2054,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.1" python-versions = ">=3.9" -content-hash = "25dc6986a2a4572b689edb26f9184faab8bee8e3a569f8e0cdc8ac35ded0b9fc" +content-hash = "5b9f9c773c58714052b6a2794bf18999fa29a3f1737bbf63129998b7d34e6699" diff --git a/pyproject.toml b/pyproject.toml index 0ed8d8c9..461710cf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,6 +49,8 @@ tenacity = ">=9.0.0,<9.1.0" mcp = { version = ">=1.9.3,<2.0.0", markers = "python_version >= '3.10'" } pydantic = ">=2.11.5,<3.0.0" pathvalidate = ">=3.3.1,<4.0.0" +tomli-w = ">=1.0.0,<2.0.0" +tomli = {version = ">=2.0.0,<3.0.0", python = "<3.11"} [tool.poetry.group.test.dependencies] mock = ">=4.0.3,<4.1.0" diff --git a/tests/cli/commands/ai_guardrails/scan/test_handlers.py b/tests/cli/commands/ai_guardrails/scan/test_handlers.py index 57c25b92..08b5dd86 100644 --- a/tests/cli/commands/ai_guardrails/scan/test_handlers.py +++ b/tests/cli/commands/ai_guardrails/scan/test_handlers.py @@ -7,6 +7,7 @@ import typer from cycode.cli.apps.ai_guardrails.scan.handlers import ( + handle_before_command_exec, handle_before_mcp_execution, handle_before_read_file, handle_before_submit_prompt, @@ -467,3 +468,95 @@ def test_handle_before_mcp_execution_scan_disabled( assert result == {'permission': 'allow'} mock_scan.assert_not_called() + + +# Tests for handle_before_command_exec (Codex PreToolUse:Bash) + + +@pytest.fixture +def codex_policy() -> dict[str, Any]: + return { + 'mode': 'block', + 'fail_open': True, + 'secrets': {'max_bytes': 200000, 'timeout_ms': 30000}, + 'command_exec': {'enabled': True, 'action': 'block', 'scan_arguments': True}, + } + + +def _codex_command_payload(command: str = 'ls -la') -> AIHookPayload: + return AIHookPayload( + event_name='CommandExec', + ide_provider='codex', + command=command, + ) + + +def test_handle_before_command_exec_disabled(mock_ctx: MagicMock, codex_policy: dict[str, Any]) -> None: + """Disabled command_exec allows without scanning.""" + codex_policy['command_exec']['enabled'] = False + result = handle_before_command_exec(mock_ctx, _codex_command_payload(), codex_policy) + + assert result == {'hookSpecificOutput': {'hookEventName': 'PreToolUse', 'permissionDecision': 'allow'}} + mock_ctx.obj['ai_security_client'].create_event.assert_called_once() + + +@patch('cycode.cli.apps.ai_guardrails.scan.handlers._scan_text_for_secrets') +def test_handle_before_command_exec_no_secrets( + mock_scan: MagicMock, mock_ctx: MagicMock, codex_policy: dict[str, Any] +) -> None: + """Clean command is allowed.""" + mock_scan.return_value = (None, 'scan-ok') + result = handle_before_command_exec(mock_ctx, _codex_command_payload(), codex_policy) + + assert result['hookSpecificOutput']['permissionDecision'] == 'allow' + call_args = mock_ctx.obj['ai_security_client'].create_event.call_args + assert call_args.args[2] == AIHookOutcome.ALLOWED + + +@patch('cycode.cli.apps.ai_guardrails.scan.handlers._scan_text_for_secrets') +def test_handle_before_command_exec_with_secrets_blocked( + mock_scan: MagicMock, mock_ctx: MagicMock, codex_policy: dict[str, Any] +) -> None: + """Command with secrets is blocked in block mode.""" + mock_scan.return_value = ('Found 1 secret: AWS key', 'scan-blocked') + result = handle_before_command_exec( + mock_ctx, + _codex_command_payload('curl -H "Authorization: Bearer AKIA..."'), + codex_policy, + ) + + assert result['hookSpecificOutput']['permissionDecision'] == 'deny' + assert 'Found 1 secret' in result['hookSpecificOutput']['permissionDecisionReason'] + call_args = mock_ctx.obj['ai_security_client'].create_event.call_args + assert call_args.args[2] == AIHookOutcome.BLOCKED + assert call_args.kwargs['block_reason'] == BlockReason.SECRETS_IN_COMMAND + + +@patch('cycode.cli.apps.ai_guardrails.scan.handlers._scan_text_for_secrets') +def test_handle_before_command_exec_with_secrets_warned( + mock_scan: MagicMock, mock_ctx: MagicMock, codex_policy: dict[str, Any] +) -> None: + """Command with secrets in warn mode returns ask.""" + codex_policy['command_exec']['action'] = 'warn' + mock_scan.return_value = ('Found 1 secret: token', 'scan-warn') + result = handle_before_command_exec(mock_ctx, _codex_command_payload(), codex_policy) + + assert result['hookSpecificOutput']['permissionDecision'] == 'ask' + call_args = mock_ctx.obj['ai_security_client'].create_event.call_args + assert call_args.args[2] == AIHookOutcome.WARNED + + +@patch('cycode.cli.apps.ai_guardrails.scan.handlers._scan_text_for_secrets') +def test_handle_before_command_exec_scan_failure_fail_open( + mock_scan: MagicMock, mock_ctx: MagicMock, codex_policy: dict[str, Any] +) -> None: + """Scan failure with fail_open=True records ALLOWED outcome.""" + mock_scan.side_effect = RuntimeError('boom') + codex_policy['fail_open'] = True + + with pytest.raises(RuntimeError): + handle_before_command_exec(mock_ctx, _codex_command_payload(), codex_policy) + + call_args = mock_ctx.obj['ai_security_client'].create_event.call_args + assert call_args.args[2] == AIHookOutcome.ALLOWED + assert call_args.kwargs['block_reason'] == BlockReason.SCAN_FAILURE diff --git a/tests/cli/commands/ai_guardrails/scan/test_payload.py b/tests/cli/commands/ai_guardrails/scan/test_payload.py index 1ef5fad0..2eec9bf3 100644 --- a/tests/cli/commands/ai_guardrails/scan/test_payload.py +++ b/tests/cli/commands/ai_guardrails/scan/test_payload.py @@ -430,3 +430,86 @@ def test_is_payload_for_ide_empty_event_name() -> None: payload = {} assert AIHookPayload.is_payload_for_ide(payload, 'cursor') is False assert AIHookPayload.is_payload_for_ide(payload, 'claude-code') is False + + +# Codex payload tests + + +def test_from_codex_payload_prompt_event() -> None: + """Test conversion of Codex UserPromptSubmit payload.""" + codex_payload = { + 'hook_event_name': 'UserPromptSubmit', + 'session_id': 'session-abc', + 'turn_id': 'turn-1', + 'cwd': '/workspace', + 'model': 'gpt-5', + 'prompt': 'Hi Codex', + } + + unified = AIHookPayload.from_codex_payload(codex_payload) + + assert unified.event_name == AiHookEventType.PROMPT + assert unified.conversation_id == 'session-abc' + assert unified.generation_id == 'turn-1' + assert unified.model == 'gpt-5' + assert unified.ide_provider == 'codex' + assert unified.prompt == 'Hi Codex' + assert unified.command is None + + +def test_from_codex_payload_bash_command_event() -> None: + """Test conversion of Codex PreToolUse:Bash payload.""" + codex_payload = { + 'hook_event_name': 'PreToolUse', + 'session_id': 'session-abc', + 'turn_id': 'turn-2', + 'cwd': '/workspace', + 'model': 'gpt-5', + 'tool_name': 'Bash', + 'tool_use_id': 'u1', + 'tool_input': {'command': 'curl https://evil.example'}, + } + + unified = AIHookPayload.from_codex_payload(codex_payload) + + assert unified.event_name == AiHookEventType.COMMAND_EXEC + assert unified.command == 'curl https://evil.example' + assert unified.ide_provider == 'codex' + assert unified.file_path is None + assert unified.mcp_tool_name is None + + +def test_from_codex_payload_unknown_event() -> None: + """Test that unknown Codex events fall back to raw event name.""" + codex_payload = {'hook_event_name': 'PostToolUse'} + unified = AIHookPayload.from_codex_payload(codex_payload) + # PostToolUse isn't in our canonical mapping; preserved as-is + assert unified.event_name == 'PostToolUse' + + +def test_is_payload_for_ide_codex_matches() -> None: + """Test that Codex events match the Codex IDE.""" + payload = {'hook_event_name': 'UserPromptSubmit'} + assert AIHookPayload.is_payload_for_ide(payload, 'codex') is True + + payload = {'hook_event_name': 'PreToolUse'} + assert AIHookPayload.is_payload_for_ide(payload, 'codex') is True + + +def test_is_payload_for_ide_cursor_does_not_match_codex() -> None: + """Test that Cursor events don't match when expected IDE is codex.""" + payload = {'hook_event_name': 'beforeSubmitPrompt'} + assert AIHookPayload.is_payload_for_ide(payload, 'codex') is False + + +def test_from_payload_dispatches_codex() -> None: + """Test that from_payload dispatcher routes Codex payloads correctly.""" + codex_payload = { + 'hook_event_name': 'UserPromptSubmit', + 'session_id': 's1', + 'prompt': 'test', + } + + unified = AIHookPayload.from_payload(codex_payload, tool='codex') + assert unified.event_name == AiHookEventType.PROMPT + assert unified.ide_provider == 'codex' diff --git a/tests/cli/commands/ai_guardrails/scan/test_response_builders.py b/tests/cli/commands/ai_guardrails/scan/test_response_builders.py index 45f80829..4cf0e2cc 100644 --- a/tests/cli/commands/ai_guardrails/scan/test_response_builders.py +++ b/tests/cli/commands/ai_guardrails/scan/test_response_builders.py @@ -4,6 +4,7 @@ from cycode.cli.apps.ai_guardrails.scan.response_builders import ( ClaudeCodeResponseBuilder, + CodexResponseBuilder, CursorResponseBuilder, IDEResponseBuilder, get_response_builder, @@ -146,3 +147,54 @@ def test_get_response_builder_claude_code() -> None: assert isinstance(builder, ClaudeCodeResponseBuilder) assert isinstance(builder, IDEResponseBuilder) + + +# Codex response builder tests (shapes match Claude Code) + + +def test_codex_response_builder_allow_permission() -> None: + builder = CodexResponseBuilder() + assert builder.allow_permission() == { + 'hookSpecificOutput': { + 'hookEventName': 'PreToolUse', + 'permissionDecision': 'allow', + } + } + + +def test_codex_response_builder_deny_permission() -> None: + builder = CodexResponseBuilder() + assert builder.deny_permission('User message', 'Agent message') == { + 'hookSpecificOutput': { + 'hookEventName': 'PreToolUse', + 'permissionDecision': 'deny', + 'permissionDecisionReason': 'User message', + } + } + + +def test_codex_response_builder_ask_permission() -> None: + builder = CodexResponseBuilder() + assert builder.ask_permission('Warning', 'Agent warning') == { + 'hookSpecificOutput': { + 'hookEventName': 'PreToolUse', + 'permissionDecision': 'ask', + 'permissionDecisionReason': 'Warning', + } + } + + +def test_codex_response_builder_allow_prompt() -> None: + builder = CodexResponseBuilder() + assert builder.allow_prompt() == {} + + +def test_codex_response_builder_deny_prompt() -> None: + builder = CodexResponseBuilder() + assert builder.deny_prompt('Secrets detected') == {'decision': 'block', 'reason': 'Secrets detected'} + + +def test_get_response_builder_codex() -> None: + builder = get_response_builder('codex') + assert isinstance(builder, CodexResponseBuilder) + assert isinstance(builder, IDEResponseBuilder) diff --git a/tests/cli/commands/ai_guardrails/test_codex_config.py b/tests/cli/commands/ai_guardrails/test_codex_config.py new file mode 100644 index 00000000..768ea858 --- /dev/null +++ b/tests/cli/commands/ai_guardrails/test_codex_config.py @@ -0,0 +1,96 @@ +"""Tests for Codex config.toml feature flag management.""" + +import sys +from pathlib import Path + +from pyfakefs.fake_filesystem import FakeFilesystem + +from cycode.cli.apps.ai_guardrails.codex_config import ( + enable_codex_hooks_feature, + get_codex_config_path, +) + +if sys.version_info >= (3, 11): + import tomllib +else: # pragma: no cover + import tomli as tomllib + + +def _read_toml(path: Path) -> dict: + with path.open('rb') as f: + return tomllib.load(f) + + +def test_get_codex_config_path_user_scope() -> None: + assert get_codex_config_path('user') == Path.home() / '.codex' / 'config.toml' + + +def test_get_codex_config_path_repo_scope() -> None: + repo = Path('/my-repo') + assert get_codex_config_path('repo', repo) == repo / '.codex' / 'config.toml' + + +def test_enable_codex_hooks_feature_creates_file(fs: FakeFilesystem) -> None: + """When config.toml is absent, the file is created with the feature flag.""" + fs.create_dir(Path.home()) + success, message = enable_codex_hooks_feature('user') + + assert success is True + assert 'codex_hooks' in message + + config_path = Path.home() / '.codex' / 'config.toml' + assert config_path.exists() + config = _read_toml(config_path) + assert config['features']['codex_hooks'] is True + + +def test_enable_codex_hooks_feature_merges_existing(fs: FakeFilesystem) -> None: + """Existing non-features keys are preserved.""" + config_dir = Path.home() / '.codex' + fs.create_dir(config_dir) + config_path = config_dir / 'config.toml' + config_path.write_text('model = "gpt-5"\n[features]\nother_flag = true\n') + + success, _ = enable_codex_hooks_feature('user') + + assert success is True + config = _read_toml(config_path) + assert config['model'] == 'gpt-5' + assert config['features']['codex_hooks'] is True + assert config['features']['other_flag'] is True + + +def test_enable_codex_hooks_feature_idempotent(fs: FakeFilesystem) -> None: + """Running the enabler twice leaves the flag in place.""" + fs.create_dir(Path.home()) + enable_codex_hooks_feature('user') + enable_codex_hooks_feature('user') + + config_path = Path.home() / '.codex' / 'config.toml' + config = _read_toml(config_path) + assert config['features']['codex_hooks'] is True + + +def test_enable_codex_hooks_feature_repo_scope(fs: FakeFilesystem) -> None: + repo = Path('/workdir/repo') + fs.create_dir(repo) + + success, _ = enable_codex_hooks_feature('repo', repo_path=repo) + + assert success is True + config_path = repo / '.codex' / 'config.toml' + assert config_path.exists() + config = _read_toml(config_path) + assert config['features']['codex_hooks'] is True + + +def test_enable_codex_hooks_feature_handles_malformed_toml(fs: FakeFilesystem) -> None: + """Malformed existing TOML is reported as failure rather than silently overwritten.""" + config_dir = Path.home() / '.codex' + fs.create_dir(config_dir) + (config_dir / 'config.toml').write_text('this is not = [valid toml') + + success, message = enable_codex_hooks_feature('user') + + assert success is False + assert 'Failed to parse' in message diff --git a/tests/cli/commands/ai_guardrails/test_hooks_manager.py b/tests/cli/commands/ai_guardrails/test_hooks_manager.py index a5732bca..1e2f2551 100644 --- a/tests/cli/commands/ai_guardrails/test_hooks_manager.py +++ b/tests/cli/commands/ai_guardrails/test_hooks_manager.py @@ -128,6 +128,45 @@ def test_get_hooks_config_claude_code_session_start() -> None: assert '--ide claude-code' in entries[0]['hooks'][0]['command'] +def test_get_hooks_config_codex_sync() -> None: + """Test Codex hooks config in default (sync) mode.""" + config = get_hooks_config(AIIDEType.CODEX) + scan_events = {k: v for k, v in config['hooks'].items() if k != 'SessionStart'} + for event_entries in scan_events.values(): + for event_entry in event_entries: + for hook in event_entry['hooks']: + assert 'async' not in hook + assert 'timeout' not in hook + assert '--ide codex' in hook['command'] + + +def test_get_hooks_config_codex_async() -> None: + """Test Codex hooks config in async mode adds async and timeout.""" + config = get_hooks_config(AIIDEType.CODEX, async_mode=True) + scan_events = {k: v for k, v in config['hooks'].items() if k != 'SessionStart'} + for event_entries in scan_events.values(): + for event_entry in event_entries: + for hook in event_entry['hooks']: + assert hook['async'] is True + + +def test_get_hooks_config_codex_session_start() -> None: + """Test Codex hooks config includes SessionStart auth check.""" + config = get_hooks_config(AIIDEType.CODEX) + assert 'SessionStart' in config['hooks'] + entries = config['hooks']['SessionStart'] + assert len(entries) == 1 + assert entries[0]['hooks'][0]['command'] == CYCODE_ENSURE_AUTH_COMMAND + + +def test_get_hooks_config_codex_pretooluse_bash_only() -> None: + """Test that Codex PreToolUse is scoped to Bash only.""" + config = get_hooks_config(AIIDEType.CODEX) + pretooluse_entries = config['hooks']['PreToolUse'] + assert len(pretooluse_entries) == 1 + assert pretooluse_entries[0]['matcher'] == 'Bash' + + def test_create_policy_file_warn(fs: FakeFilesystem) -> None: """Test creating warn-mode policy file.""" fs.create_dir(Path.home())