Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion capabilities/ai-red-teaming/capability.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
schema: 1
name: ai-red-teaming
version: "1.4.0"
version: "1.4.1"
description: >
Probe the security and safety of AI applications, agents, and foundation models.
Orchestrates adversarial attack workflows to discover vulnerabilities in LLMs,
Expand Down
39 changes: 28 additions & 11 deletions capabilities/ai-red-teaming/scripts/attack_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,29 @@ def _get_workspace_path() -> Path:
Path(os.environ.get("AIRT_WORKFLOWS_DIR")) if os.environ.get("AIRT_WORKFLOWS_DIR") else _get_workspace_path()
)
METADATA_FILE = WORKFLOWS_DIR / ".workflow_metadata.json"
METADATA_FILE = WORKFLOWS_DIR / ".workflow_metadata.json"


def _unique_workflow_path(filename: str) -> tuple[Path, str]:
"""Return a collision-free path under WORKFLOWS_DIR.

Never overwrite an existing workflow file — the user may have hand-patched
it after generation (ENG-6813). If ``filename`` already exists, save as a
new versioned file (``name_v2.py``, ``name_v3.py``, ...) and return the
actual filename used so callers can report it.
"""
WORKFLOWS_DIR.mkdir(parents=True, exist_ok=True)
candidate = WORKFLOWS_DIR / filename
if not candidate.exists():
return candidate, filename

stem, suffix = candidate.stem, candidate.suffix
version = 2
while True:
new_name = "{}_v{}{}".format(stem, version, suffix)
new_path = WORKFLOWS_DIR / new_name
if not new_path.exists():
return new_path, new_name
version += 1


def _resolve_platform_env() -> dict[str, str]:
Expand Down Expand Up @@ -3882,8 +3904,7 @@ def generate_category_attack(params: dict) -> dict:
}

# Save the script
WORKFLOWS_DIR.mkdir(parents=True, exist_ok=True)
filepath = WORKFLOWS_DIR / filename
filepath, filename = _unique_workflow_path(filename)
filepath.write_text(script)

# Update metadata
Expand Down Expand Up @@ -4051,8 +4072,7 @@ def generate_attack(params: dict) -> dict:
}

# Save the script
WORKFLOWS_DIR.mkdir(parents=True, exist_ok=True)
filepath = WORKFLOWS_DIR / filename
filepath, filename = _unique_workflow_path(filename)
filepath.write_text(script)

# Update metadata
Expand Down Expand Up @@ -4486,8 +4506,7 @@ def generate_agentic_attack(params: dict) -> dict:
}

# Save the script
WORKFLOWS_DIR.mkdir(parents=True, exist_ok=True)
filepath = WORKFLOWS_DIR / filename
filepath, filename = _unique_workflow_path(filename)
filepath.write_text(script)

# Update metadata
Expand Down Expand Up @@ -4943,8 +4962,7 @@ def generate_image_attack(params: dict) -> dict:
}

# Save
WORKFLOWS_DIR.mkdir(parents=True, exist_ok=True)
filepath = WORKFLOWS_DIR / filename
filepath, filename = _unique_workflow_path(filename)
filepath.write_text(script)

# Update metadata
Expand Down Expand Up @@ -5257,8 +5275,7 @@ async def main():
}

# Save
WORKFLOWS_DIR.mkdir(parents=True, exist_ok=True)
filepath = WORKFLOWS_DIR / filename
filepath, filename = _unique_workflow_path(filename)
filepath.write_text(script)

# Update metadata
Expand Down
36 changes: 36 additions & 0 deletions capabilities/ai-red-teaming/tests/test_attack_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,3 +451,39 @@ def test_campaign_script_has_multiple_attacks(self) -> None:
)
assert "tap_attack(" in script
assert "goat_attack(" in script


class TestUniqueWorkflowPath:
"""Collision-safe workflow saving (ENG-6813).

Regeneration must never overwrite an existing workflow file — a user may
have hand-patched it after generation.
"""

def test_returns_original_when_no_collision(self, tmp_path, monkeypatch) -> None:
monkeypatch.setattr(runner, "WORKFLOWS_DIR", tmp_path)
path, name = runner._unique_workflow_path("attack.py")
assert name == "attack.py"
assert path == tmp_path / "attack.py"

def test_does_not_overwrite_hand_patched_file(self, tmp_path, monkeypatch) -> None:
monkeypatch.setattr(runner, "WORKFLOWS_DIR", tmp_path)
existing = tmp_path / "attack.py"
existing.write_text("# hand-patched, do not clobber")

path, name = runner._unique_workflow_path("attack.py")

assert name == "attack_v2.py"
assert path == tmp_path / "attack_v2.py"
# Original is left untouched.
assert existing.read_text() == "# hand-patched, do not clobber"

def test_increments_version_until_free(self, tmp_path, monkeypatch) -> None:
monkeypatch.setattr(runner, "WORKFLOWS_DIR", tmp_path)
(tmp_path / "a.py").write_text("x")
(tmp_path / "a_v2.py").write_text("x")

path, name = runner._unique_workflow_path("a.py")

assert name == "a_v3.py"
assert path == tmp_path / "a_v3.py"
Loading