diff --git a/graph_net/agent/__init__.py b/graph_net/agent/__init__.py index b9a38b220d..56608c8761 100644 --- a/graph_net/agent/__init__.py +++ b/graph_net/agent/__init__.py @@ -5,6 +5,6 @@ from Hugging Face models. """ -from graph_net.agent.graph_net_agent import GraphNetAgent +from graph_net.agent.graph_net_agent import ExtractionStatus, GraphNetAgent -__all__ = ["GraphNetAgent"] +__all__ = ["GraphNetAgent", "ExtractionStatus"] diff --git a/graph_net/agent/graph_extractor/subprocess_graph_extractor.py b/graph_net/agent/graph_extractor/subprocess_graph_extractor.py index 3f0a13f48e..3b001693bb 100644 --- a/graph_net/agent/graph_extractor/subprocess_graph_extractor.py +++ b/graph_net/agent/graph_extractor/subprocess_graph_extractor.py @@ -22,14 +22,14 @@ class SubprocessGraphExtractor(BaseGraphExtractor): """Extractor that runs script in subprocess""" - def __init__(self, workspace: str, timeout: int = DEFAULT_TIMEOUT): + def __init__(self, workspace: str, timeout: int | None = None): """ Args: workspace: Workspace root directory - timeout: Timeout in seconds for script execution + timeout: Timeout in seconds for script execution (default 1000s) """ self.workspace = Path(workspace) - self.timeout = timeout + self.timeout = timeout if timeout is not None else DEFAULT_TIMEOUT self.logger = logging.getLogger(self.__class__.__name__) def extract(self, code_path: Path, model_id: str) -> Path: diff --git a/graph_net/agent/graph_net_agent.py b/graph_net/agent/graph_net_agent.py index e51c1a2e34..4339bc65d6 100644 --- a/graph_net/agent/graph_net_agent.py +++ b/graph_net/agent/graph_net_agent.py @@ -2,6 +2,7 @@ import json import os +from enum import Enum from pathlib import Path from typing import Optional @@ -23,6 +24,15 @@ from graph_net.agent.sample_verifier import ForwardVerifier +class ExtractionStatus(str, Enum): + """Extraction result status for a single model.""" + + OK = "ok" + VERIFY_FAILED = "verify_failed" + EXTRACT_FAILED = "extract_failed" + ERROR = "error" + + class GraphNetAgent: """GraphNet automatic sample extraction agent""" @@ -31,16 +41,22 @@ def __init__( workspace: Optional[str] = None, hf_token: Optional[str] = None, llm_retry: bool = True, + extract_timeout: Optional[int] = None, + verify_timeout: Optional[int] = None, ): """ Initialize GraphNet Agent Args: - workspace: Workspace root directory. Defaults to - $GRAPH_NET_EXTRACT_WORKSPACE or ~/graphnet_workspace. - hf_token: HuggingFace API token (optional) - llm_retry: If True and ducc/claude CLI is available, retry failed - extractions up to 2 times with LLM-fixed scripts. + workspace: Workspace root directory. Defaults to + $GRAPH_NET_EXTRACT_WORKSPACE or ~/graphnet_workspace. + hf_token: HuggingFace API token (optional) + llm_retry: If True and ducc/claude CLI is available, retry failed + extractions up to 2 times with LLM-fixed scripts. + extract_timeout: Timeout in seconds for graph extraction subprocess + (default None -> 1000s). + verify_timeout: Timeout in seconds for forward verification subprocess + (default None -> 300s). """ if workspace is None: workspace = os.environ.get( @@ -63,14 +79,15 @@ def __init__( self.metadata_analyzer = ConfigMetadataAnalyzer() self.code_generator = TemplateCodeGenerator() self.graph_extractor = SubprocessGraphExtractor( - workspace=str(self.workspace.workspace_root) + workspace=str(self.workspace.workspace_root), + timeout=extract_timeout, ) - self.sample_verifier = ForwardVerifier() + self.sample_verifier = ForwardVerifier(timeout=verify_timeout) # LLM fixer — only created when llm_retry is requested self.llm_fixer: Optional[LLMCodeFixer] = LLMCodeFixer() if llm_retry else None - def extract_sample(self, model_id: str) -> bool: + def extract_sample(self, model_id: str) -> ExtractionStatus: """ Execute complete sample extraction pipeline from HuggingFace model ID. @@ -82,7 +99,10 @@ def extract_sample(self, model_id: str) -> bool: model_id: HuggingFace model ID (e.g., "bert-base-uncased") Returns: - True if sample extraction succeeded, False otherwise + ExtractionStatus.OK – extraction and verification both passed + ExtractionStatus.VERIFY_FAILED – extraction succeeded but verification failed + ExtractionStatus.EXTRACT_FAILED – extraction (or pre-extraction) failed + ExtractionStatus.ERROR – unexpected error """ try: self.logger.info(f"Starting extraction for model: {model_id}") @@ -104,21 +124,24 @@ def extract_sample(self, model_id: str) -> bool: if self.is_duplicate_sample(sample_dir): self.logger.info("Duplicate sample detected, skipping verification") - return True + return ExtractionStatus.OK if not self.sample_verifier.verify(sample_dir): self.logger.error("Sample verification failed") - return False + return ExtractionStatus.VERIFY_FAILED self.logger.info(f"Successfully extracted sample for {model_id}") - return True + return ExtractionStatus.OK - except (AnalysisError, CodeGenError, ExtractionError, VerificationError) as e: + except VerificationError as e: self.logger.error(f"Extraction failed for {model_id}: {e}") - return False + return ExtractionStatus.VERIFY_FAILED + except (AnalysisError, CodeGenError, ExtractionError) as e: + self.logger.error(f"Extraction failed for {model_id}: {e}") + return ExtractionStatus.EXTRACT_FAILED except Exception as e: self.logger.error(f"Unexpected error for {model_id}: {e}", exc_info=True) - return False + return ExtractionStatus.ERROR def _llm_retry( self, diff --git a/graph_net/agent/parallel_extract.py b/graph_net/agent/parallel_extract.py index 957c25b313..834e68cde1 100644 --- a/graph_net/agent/parallel_extract.py +++ b/graph_net/agent/parallel_extract.py @@ -37,7 +37,9 @@ if str(_GRAPHNET_ROOT) not in sys.path: sys.path.insert(0, str(_GRAPHNET_ROOT)) -from graph_net.agent import GraphNetAgent # noqa: E402 +from graph_net.agent import ExtractionStatus, GraphNetAgent # noqa: E402 + +import torch # noqa: E402 try: from huggingface_hub import list_models as _hf_list_models @@ -64,6 +66,11 @@ def get_models_from_hf(task: Optional[str] = None, limit: int = 100) -> List[str ] +def get_device_type() -> str: + """Return 'cuda' if torch CUDA is available, otherwise 'cpu'.""" + return "cuda" if torch.cuda.is_available() else "cpu" + + def _get_default_gpus() -> List[int]: """Detect available GPU indices from environment or nvidia-smi.""" cvd = os.getenv("CUDA_VISIBLE_DEVICES", "") @@ -90,10 +97,26 @@ def _get_default_gpus() -> List[int]: return [0] -DEFAULT_GPUS = _get_default_gpus() +def get_gpu_ids(args) -> List[int]: + """Resolve GPU indices from args or fall back to nvidia-smi. + + Returns: + List of GPU indices if specified or detected. + Falls back to _get_default_gpus() if args.gpus is None, empty, or invalid. + """ + if args.gpus is not None: + gpus_str = args.gpus.strip() + if gpus_str: + try: + return [int(g.strip()) for g in gpus_str.split(",") if g.strip()] + except ValueError: + print(f"[WARN] Invalid --gpus value: {args.gpus}, using default GPUs") + + return _get_default_gpus() + + DEFAULT_WORKSPACE = os.environ.get( - "GRAPH_NET_EXTRACT_WORKSPACE", - os.path.expanduser("~/graphnet_workspace"), + "GRAPH_NET_EXTRACT_WORKSPACE", "/tmp/graphnet_workspace" ) @@ -102,44 +125,69 @@ def _get_default_gpus() -> List[int]: # --------------------------------------------------------------------------- -def _worker( - gpu_id: int, +def worker_fn( + worker_id: int, + gpu_id: Optional[int], task_queue: multiprocessing.Queue, result_queue: multiprocessing.Queue, workspace: str, hf_token: Optional[str], total: int, + extract_timeout: int, + verify_timeout: int, + llm_retry: bool, ) -> None: """ - Worker function, runs in a dedicated subprocess bound to a single GPU. + Worker function, runs in a dedicated subprocess bound to a single GPU or CPU. Dynamically pulls tasks from task_queue and exits when the queue is empty. Args: - gpu_id: CUDA device index (e.g. 2) - task_queue: Shared task queue; each item is a model_id string - result_queue: Queue for reporting results back to the main process - workspace: Root workspace directory path - hf_token: HuggingFace token (optional) - total: Total task count (used for logging only) + worker_id: Worker process index (e.g. 0) + gpu_id: CUDA device index (e.g. 2), None for CPU mode + task_queue: Shared task queue; each item is a model_id string + result_queue: Queue for reporting results back to the main process + workspace: Root workspace directory path + hf_token: HuggingFace token (optional) + total: Total task count (used for logging only) + extract_timeout: Timeout in seconds for graph extraction subprocess + verify_timeout: Timeout in seconds for forward verification subprocess + llm_retry: Whether to enable LLM retry for failed extractions """ - # Bind GPU: subprocess only sees this card, internal code can use cuda:0 - os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id) + if gpu_id is not None: + os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id) + prefix = f"[Worker-{worker_id} GPU:{gpu_id}]" + else: + os.environ["CUDA_VISIBLE_DEVICES"] = "" + prefix = f"[Worker-{worker_id} CPU]" # Pass workspace to the environment variable used by SubprocessGraphExtractor - os.environ["GRAPH_NET_EXTRACT_WORKSPACE"] = workspace + if "GRAPH_NET_EXTRACT_WORKSPACE" not in os.environ: + os.environ[ + "GRAPH_NET_EXTRACT_WORKSPACE" + ] = f"{workspace}/samples/transformers-auto-model" - print(f"[GPU {gpu_id}] Worker started", flush=True) + print( + f"{prefix} Started (extract_timeout={extract_timeout}s, " + f"verify_timeout={verify_timeout}s)", + flush=True, + ) try: - agent = GraphNetAgent(workspace=workspace, hf_token=hf_token, llm_retry=False) + agent = GraphNetAgent( + workspace=workspace, + hf_token=hf_token, + llm_retry=llm_retry, + extract_timeout=extract_timeout, + verify_timeout=verify_timeout, + ) except Exception as e: - print(f"[GPU {gpu_id}] Failed to initialize agent: {e}", flush=True) + print(f"{prefix} Failed to initialize agent: {e}", flush=True) # Drain queue and mark remaining tasks as failed to avoid blocking the main process while True: try: mid = task_queue.get_nowait() result_queue.put( { - "gpu": gpu_id, + "gpu": gpu_id if gpu_id is not None else worker_id, "model_id": mid, "success": False, "error": str(e), @@ -156,37 +204,32 @@ def _worker( except queue.Empty: break - print(f"[GPU {gpu_id}] Extracting: {model_id}", flush=True) + print(f"{prefix} Extracting: {model_id}", flush=True) t0 = time.time() + + result_dict = { + "gpu": gpu_id if gpu_id is not None else worker_id, + "model_id": model_id, + } try: - success = agent.extract_sample(model_id) + status = agent.extract_sample(model_id) elapsed = time.time() - t0 - status = "OK" if success else "FAIL" - print(f"[GPU {gpu_id}] {status} {model_id} ({elapsed:.1f}s)", flush=True) - result_queue.put( - { - "gpu": gpu_id, - "model_id": model_id, - "success": success, - "elapsed": round(elapsed, 2), - "timestamp": datetime.now().isoformat(), - } - ) + ok = status == ExtractionStatus.OK + label = "OK" if ok else status.name.replace("_", " ") + print(f"{prefix} {label} {model_id} ({elapsed:.1f}s)", flush=True) + result_dict["success"] = ok + result_dict["status"] = status.value except Exception as e: elapsed = time.time() - t0 - print(f"[GPU {gpu_id}] ERROR {model_id}: {e} ({elapsed:.1f}s)", flush=True) - result_queue.put( - { - "gpu": gpu_id, - "model_id": model_id, - "success": False, - "error": str(e), - "elapsed": round(elapsed, 2), - "timestamp": datetime.now().isoformat(), - } - ) + print(f"{prefix} ERROR {model_id}: {e} ({elapsed:.1f}s)", flush=True) + result_dict["success"] = False + result_dict["status"] = ExtractionStatus.ERROR.value + result_dict["error"] = str(e) - print(f"[GPU {gpu_id}] Worker finished (queue empty)", flush=True) + result_dict["elapsed"] = round(elapsed, 2) + result_dict["timestamp"] = datetime.now().isoformat() + result_queue.put(result_dict) + print(f"{prefix} Worker finished (queue empty)", flush=True) # --------------------------------------------------------------------------- @@ -206,29 +249,49 @@ def _print_summary(results: Dict) -> None: details = results.get("details", []) total = len(details) success = sum(1 for d in details if d.get("success")) + extract_success = sum( + 1 + for d in details + if d.get("status") + in (ExtractionStatus.OK.value, ExtractionStatus.VERIFY_FAILED.value) + ) failed = total - success rate = (success / total * 100) if total else 0.0 + extract_rate = (extract_success / total * 100) if total else 0.0 print("\n" + "=" * 60) print("[SUMMARY] Parallel Extraction Summary") print("=" * 60) - print(f" Total : {total}") - print(f" Success: {success}") - print(f" Failed : {failed}") - print(f" Rate : {rate:.2f}%") + print(f" Total : {total}") + print(f" Success : {success} (verify ok)") + print(f" Extract : {extract_success} (graph extracted)") + print(f" Failed : {failed}") + print(f" Rate : {rate:.2f}% (overall)") + print(f" Extract : {extract_rate:.2f}% (extraction only)") # Per-GPU breakdown gpu_stats: Dict[int, Dict] = {} for d in details: g = d.get("gpu", -1) if g not in gpu_stats: - gpu_stats[g] = {"total": 0, "success": 0} + gpu_stats[g] = {"total": 0, "success": 0, "extract": 0} gpu_stats[g]["total"] += 1 if d.get("success"): gpu_stats[g]["success"] += 1 - print("\n Per-GPU:") + if d.get("status") in ( + ExtractionStatus.OK.value, + ExtractionStatus.VERIFY_FAILED.value, + ): + gpu_stats[g]["extract"] += 1 + + label = "GPU" if results.get("gpus") else "Worker" + print(f"\n Per-{label}:") for g in sorted(gpu_stats): gs = gpu_stats[g] gr = (gs["success"] / gs["total"] * 100) if gs["total"] else 0.0 - print(f" GPU {g}: {gs['success']}/{gs['total']} ({gr:.1f}%)") + er = (gs["extract"] / gs["total"] * 100) if gs["total"] else 0.0 + print( + f" {label} {g}: success={gs['success']}/{gs['total']} ({gr:.1f}%), " + f"extract={gs['extract']}/{gs['total']} ({er:.1f}%)" + ) print("=" * 60) @@ -237,9 +300,9 @@ def _print_summary(results: Dict) -> None: # --------------------------------------------------------------------------- -def main() -> int: +def _parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( - description="Parallel computation graph extraction from HuggingFace; one agent process per GPU" + description="Parallel computation graph extraction from HuggingFace; one agent process per GPU or CPU" ) parser.add_argument( "--model-list", @@ -274,8 +337,14 @@ def main() -> int: parser.add_argument( "--gpus", type=str, - default=",".join(str(g) for g in DEFAULT_GPUS), - help=f"Comma-separated GPU indices to use (default: {','.join(str(g) for g in DEFAULT_GPUS)})", + default=None, + help="Comma-separated GPU indices to use (GPU mode; if set, ignores --num-workers)", + ) + parser.add_argument( + "--num-workers", + type=int, + default=None, + help="Number of worker processes in CPU mode (default: CPU count)", ) parser.add_argument( "--output", @@ -283,43 +352,78 @@ def main() -> int: default=None, help="Output JSON file path (default: auto-generated filename with timestamp)", ) + parser.add_argument( + "--extract-timeout", + type=int, + default=None, + help="Timeout in seconds for graph extraction (default: 1000 on GPU, 2000 on CPU)", + ) + parser.add_argument( + "--verify-timeout", + type=int, + default=None, + help="Timeout in seconds for forward verification (default: 300 on GPU, 600 on CPU)", + ) + parser.add_argument( + "--use-llm", + action="store_true", + default=False, + help="Enable LLM retry for failed extractions", + ) + return parser.parse_args() + + +def _load_model_ids(args: argparse.Namespace) -> List[str]: + if args.model_list: + return load_models_from_file(args.model_list) + if HUGGINGFACE_HUB_AVAILABLE: + print( + f"[INFO] Fetching {args.count} models from HuggingFace Hub (task={args.task})..." + ) + return get_models_from_hf(task=args.task, limit=args.count) + return [] - args = parser.parse_args() - # --- Resolve workspace --- +def _resolve_config(args: argparse.Namespace): workspace = ( args.workspace or os.getenv("GRAPH_NET_EXTRACT_WORKSPACE") or DEFAULT_WORKSPACE ) print(f"[INFO] Workspace: {workspace}") - # --- Parse GPU list --- - try: - gpus = [int(g.strip()) for g in args.gpus.split(",") if g.strip()] - except ValueError: - print(f"[ERROR] Invalid --gpus value: {args.gpus}") - return 1 - if not gpus: - print("[ERROR] No GPUs specified") - return 1 - print(f"[INFO] GPUs: {gpus}") - - # --- Load model list --- - if args.model_list: - model_ids = load_models_from_file(args.model_list) - elif HUGGINGFACE_HUB_AVAILABLE: - print( - f"[INFO] Fetching {args.count} models from HuggingFace Hub (task={args.task})..." + if get_device_type() == "cuda": + gpus = get_gpu_ids(args) + num_workers = len(gpus) + print(f"[INFO] GPU mode (torch fallback): {gpus}") + extract_timeout = ( + args.extract_timeout if args.extract_timeout is not None else 1000 ) - model_ids = get_models_from_hf(task=args.task, limit=args.count) + verify_timeout = args.verify_timeout if args.verify_timeout is not None else 300 else: - print("[ERROR] No model list provided and huggingface_hub not available") - return 1 + gpus = [] + num_workers = args.num_workers if args.num_workers else 1 + print(f"[INFO] CPU mode: {num_workers} workers") + extract_timeout = ( + args.extract_timeout if args.extract_timeout is not None else 2000 + ) + verify_timeout = args.verify_timeout if args.verify_timeout is not None else 600 + + return workspace, gpus, num_workers, extract_timeout, verify_timeout + + +def main() -> int: + args = _parse_args() + + workspace, gpus, num_workers, extract_timeout, verify_timeout = _resolve_config( + args + ) + llm_retry = args.use_llm + model_ids = _load_model_ids(args) if not model_ids: print("[ERROR] Empty model list, nothing to do") return 1 - print(f"[INFO] Total models: {len(model_ids)}, workers: {len(gpus)}") + print(f"[INFO] Total models: {len(model_ids)}, workers: {num_workers}") # --- Populate shared task queue --- task_queue: multiprocessing.Queue = multiprocessing.Queue() @@ -328,25 +432,31 @@ def main() -> int: # --- Launch workers --- result_queue: multiprocessing.Queue = multiprocessing.Queue() - processes = [] start_time = datetime.now() print( - f"\n[START] {start_time.strftime('%Y-%m-%d %H:%M:%S')} — launching {len(gpus)} workers\n" + f"\n[START] {start_time.strftime('%Y-%m-%d %H:%M:%S')} — launching {num_workers} workers\n" ) - for gpu_id in gpus: + processes = [] + for worker_id in range(num_workers): + gpu_id = gpus[worker_id] if gpus else None p = multiprocessing.Process( - target=_worker, + target=worker_fn, args=( + worker_id, gpu_id, task_queue, result_queue, workspace, args.hf_token, len(model_ids), + extract_timeout, + verify_timeout, + llm_retry, ), - name=f"worker-gpu{gpu_id}", + name=f"worker-{worker_id}" + + (f"-gpu{gpu_id}" if gpu_id is not None else "-cpu"), daemon=True, ) p.start() @@ -354,17 +464,22 @@ def main() -> int: # --- Collect results --- details = [] - total_expected = len(model_ids) - - while len(details) < total_expected: + while len(details) < len(model_ids): try: entry = result_queue.get(timeout=5) details.append(entry) done = len(details) - success_so_far = sum(1 for d in details if d.get("success")) + ok_so_far = sum(1 for d in details if d.get("success")) + extract_ok_so_far = sum( + 1 + for d in details + if d.get("status") + in (ExtractionStatus.OK.value, ExtractionStatus.VERIFY_FAILED.value) + ) print( - f"[PROGRESS] {done}/{total_expected} done, " - f"success rate so far: {success_so_far/done*100:.1f}%", + f"[PROGRESS] {done}/{len(model_ids)} done, " + f"success={ok_so_far/done*100:.1f}%, " + f"extract={extract_ok_so_far/done*100:.1f}%", flush=True, ) except Exception: @@ -378,23 +493,32 @@ def main() -> int: p.join(timeout=10) end_time = datetime.now() - elapsed_total = (end_time - start_time).total_seconds() - - # --- Build result summary --- + success_count = sum(1 for d in details if d.get("success")) + extract_success_count = sum( + 1 + for d in details + if d.get("status") + in (ExtractionStatus.OK.value, ExtractionStatus.VERIFY_FAILED.value) + ) results = { "start_time": start_time.isoformat(), "end_time": end_time.isoformat(), - "elapsed_seconds": round(elapsed_total, 1), + "elapsed_seconds": round((end_time - start_time).total_seconds(), 1), "gpus": gpus, "workspace": workspace, "total": len(details), - "success": sum(1 for d in details if d.get("success")), - "failed": sum(1 for d in details if not d.get("success")), + "success": success_count, + "extract_success": extract_success_count, + "failed": len(details) - success_count, "success_rate": 0.0, + "extract_success_rate": 0.0, "details": details, } if results["total"] > 0: results["success_rate"] = round(results["success"] / results["total"] * 100, 2) + results["extract_success_rate"] = round( + results["extract_success"] / results["total"] * 100, 2 + ) # --- Save results --- output_file = ( @@ -404,7 +528,7 @@ def main() -> int: # --- Print summary --- _print_summary(results) - print(f"\n[DONE] Total elapsed: {elapsed_total:.0f}s") + print(f"\n[DONE] Total elapsed: {results['elapsed_seconds']:.0f}s") return 0 if results["success_rate"] > 0 else 1 diff --git a/graph_net/agent/sample_verifier/forward_verifier.py b/graph_net/agent/sample_verifier/forward_verifier.py index 5f8ba03235..c7849eac76 100644 --- a/graph_net/agent/sample_verifier/forward_verifier.py +++ b/graph_net/agent/sample_verifier/forward_verifier.py @@ -44,10 +44,11 @@ class ForwardVerifier(BaseSampleVerifier): def __init__(self, timeout: int = 300): """ Args: - timeout: seconds to wait for each forward-pass subprocess (default 5 min) + timeout: seconds to wait for each forward-pass subprocess + (default 300s, can be overridden) """ self._basic = BasicSampleVerifier() - self.timeout = timeout + self.timeout = timeout if timeout is not None else 300 self.logger = logging.getLogger(self.__class__.__name__) def verify(self, sample_dir: Path) -> bool: