diff --git a/scan_engine.py b/scan_engine.py index d36055c..cc10006 100644 --- a/scan_engine.py +++ b/scan_engine.py @@ -2,6 +2,7 @@ import json from pathlib import Path import os +import re import model # SAFE import, avoids circular import @@ -14,28 +15,44 @@ def run_semgrep_json(path, ruleset): Executes semgrep with JSON output. Returns: Dict with 'results' list and optional 'error' string. """ + try: + target_path = str(Path(path).resolve(strict=True)) + except Exception: + return {"results": [], "error": "Invalid target path"} + + BASE_DIR = Path(__file__).resolve().parent + rules_dir = (BASE_DIR / "rules").resolve() + # Resolve absolute rules directory if not a public registry (p/...) if ruleset.startswith("p/"): + # Semgrep registry rulesets like p/owasp-top-ten + if not re.fullmatch(r"p/[A-Za-z0-9._/\-]+", ruleset): + return {"results": [], "error": "Invalid ruleset format"} config_val = ruleset else: - BASE_DIR = os.path.dirname(os.path.abspath(__file__)) - config_val = os.path.join(BASE_DIR, "rules", ruleset) - - # Use list-based arguments for better path safety especially on Windows - # We still use shell=True on Windows because semgrep is often a script/shim - cmd = ["semgrep", "scan", "--json", "--config", config_val, str(path)] + # Local ruleset must be a simple filename to prevent traversal/injection + if not re.fullmatch(r"[A-Za-z0-9._\-]+", ruleset): + return {"results": [], "error": "Invalid local ruleset name"} + try: + candidate = (rules_dir / ruleset).resolve(strict=True) + except Exception: + return {"results": [], "error": "Invalid local ruleset path"} + try: + candidate.relative_to(rules_dir) + except ValueError: + return {"results": [], "error": "Invalid local ruleset path"} + config_val = str(candidate) + + cmd = ["semgrep", "scan", "--json", "--config", config_val, target_path] # Ensure UTF-8 for Semgrep subprocess on Windows env = os.environ.copy() env["PYTHONIOENCODING"] = "utf-8" try: - # Use shell=True for Windows compatibility with shims - # Use shell=False for Unix/Mac for better signal handling and security - is_windows = (os.name == "nt") result = subprocess.run( cmd, - shell=is_windows, + shell=False, capture_output=True, text=True, encoding="utf-8", diff --git a/secure_review.py b/secure_review.py index 13e0935..7da5ea1 100644 --- a/secure_review.py +++ b/secure_review.py @@ -16,10 +16,16 @@ from oss_engine import run_oss_scan def run_secure_review(target_path: str, skip_secrets: bool, ruleset: str, llm_provider: str = "gemini", llm_model: str = "", api_key: str = "", enable_oss: bool = False, oss_token: str = ""): - target = Path(target_path) - - if not target.exists(): - raise FileNotFoundError(f"Target path not found: {target}") + base_dir = Path(__file__).resolve().parent + try: + target = (base_dir / target_path).resolve(strict=True) + except Exception: + raise FileNotFoundError(f"Target path not found: {target_path}") + + try: + target.relative_to(base_dir) + except ValueError: + raise PermissionError("Target path is outside the allowed scan directory") print(f"[+] Running Semgrep on: {target} with ruleset={ruleset}") semgrep_results = run_semgrep_json(str(target), ruleset)