From 31299c0c6cfc4de5d83d99d86fab804d898c7127 Mon Sep 17 00:00:00 2001 From: Liu Yiqun Date: Thu, 14 May 2026 23:15:02 +0800 Subject: [PATCH 1/5] Support extract use CPU and optimize some codes. --- graph_net/agent/parallel_extract.py | 196 ++++++++++++++++------------ 1 file changed, 115 insertions(+), 81 deletions(-) diff --git a/graph_net/agent/parallel_extract.py b/graph_net/agent/parallel_extract.py index 957c25b31..f96927012 100644 --- a/graph_net/agent/parallel_extract.py +++ b/graph_net/agent/parallel_extract.py @@ -39,6 +39,8 @@ from graph_net.agent import GraphNetAgent # noqa: E402 +import torch # noqa: E402 + try: from huggingface_hub import list_models as _hf_list_models @@ -64,6 +66,11 @@ def get_models_from_hf(task: Optional[str] = None, limit: int = 100) -> List[str ] +def get_device_type() -> str: + """Return 'cuda' if torch CUDA is available, otherwise 'cpu'.""" + return "cuda" if torch.cuda.is_available() else "cpu" + + def _get_default_gpus() -> List[int]: """Detect available GPU indices from environment or nvidia-smi.""" cvd = os.getenv("CUDA_VISIBLE_DEVICES", "") @@ -90,10 +97,26 @@ def _get_default_gpus() -> List[int]: return [0] -DEFAULT_GPUS = _get_default_gpus() +def get_gpu_ids(args) -> List[int]: + """Resolve GPU indices from args or fall back to nvidia-smi. + + Returns: + List of GPU indices if specified or detected. + Falls back to _get_default_gpus() if args.gpus is None, empty, or invalid. + """ + if args.gpus is not None: + gpus_str = args.gpus.strip() + if gpus_str: + try: + return [int(g.strip()) for g in gpus_str.split(",") if g.strip()] + except ValueError: + print(f"[WARN] Invalid --gpus value: {args.gpus}, using default GPUs") + + return _get_default_gpus() + + DEFAULT_WORKSPACE = os.environ.get( - "GRAPH_NET_EXTRACT_WORKSPACE", - os.path.expanduser("~/graphnet_workspace"), + "GRAPH_NET_EXTRACT_WORKSPACE", "/tmp/graphnet_workspace" ) @@ -102,8 +125,9 @@ def _get_default_gpus() -> List[int]: # --------------------------------------------------------------------------- -def _worker( - gpu_id: int, +def worker_fn( + worker_id: int, + gpu_id: Optional[int], task_queue: multiprocessing.Queue, result_queue: multiprocessing.Queue, workspace: str, @@ -111,35 +135,40 @@ def _worker( total: int, ) -> None: """ - Worker function, runs in a dedicated subprocess bound to a single GPU. + Worker function, runs in a dedicated subprocess bound to a single GPU or CPU. Dynamically pulls tasks from task_queue and exits when the queue is empty. Args: - gpu_id: CUDA device index (e.g. 2) + worker_id: Worker process index (e.g. 0) + gpu_id: CUDA device index (e.g. 2), None for CPU mode task_queue: Shared task queue; each item is a model_id string result_queue: Queue for reporting results back to the main process workspace: Root workspace directory path hf_token: HuggingFace token (optional) total: Total task count (used for logging only) """ - # Bind GPU: subprocess only sees this card, internal code can use cuda:0 - os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id) + if gpu_id is not None: + os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id) + prefix = f"[Worker-{worker_id} GPU:{gpu_id}]" + else: + os.environ["CUDA_VISIBLE_DEVICES"] = "" + prefix = f"[Worker-{worker_id} CPU]" # Pass workspace to the environment variable used by SubprocessGraphExtractor os.environ["GRAPH_NET_EXTRACT_WORKSPACE"] = workspace - print(f"[GPU {gpu_id}] Worker started", flush=True) + print(f"{prefix} Started", flush=True) try: agent = GraphNetAgent(workspace=workspace, hf_token=hf_token, llm_retry=False) except Exception as e: - print(f"[GPU {gpu_id}] Failed to initialize agent: {e}", flush=True) + print(f"{prefix} Failed to initialize agent: {e}", flush=True) # Drain queue and mark remaining tasks as failed to avoid blocking the main process while True: try: mid = task_queue.get_nowait() result_queue.put( { - "gpu": gpu_id, + "gpu": gpu_id if gpu_id is not None else worker_id, "model_id": mid, "success": False, "error": str(e), @@ -156,37 +185,29 @@ def _worker( except queue.Empty: break - print(f"[GPU {gpu_id}] Extracting: {model_id}", flush=True) + print(f"{prefix} Extracting: {model_id}", flush=True) t0 = time.time() + + result_dict = { + "gpu": gpu_id if gpu_id is not None else worker_id, + "model_id": model_id, + } try: success = agent.extract_sample(model_id) elapsed = time.time() - t0 status = "OK" if success else "FAIL" - print(f"[GPU {gpu_id}] {status} {model_id} ({elapsed:.1f}s)", flush=True) - result_queue.put( - { - "gpu": gpu_id, - "model_id": model_id, - "success": success, - "elapsed": round(elapsed, 2), - "timestamp": datetime.now().isoformat(), - } - ) + print(f"{prefix} {status} {model_id} ({elapsed:.1f}s)", flush=True) + result_dict["success"] = success except Exception as e: elapsed = time.time() - t0 - print(f"[GPU {gpu_id}] ERROR {model_id}: {e} ({elapsed:.1f}s)", flush=True) - result_queue.put( - { - "gpu": gpu_id, - "model_id": model_id, - "success": False, - "error": str(e), - "elapsed": round(elapsed, 2), - "timestamp": datetime.now().isoformat(), - } - ) + print(f"{prefix} ERROR {model_id}: {e} ({elapsed:.1f}s)", flush=True) + result_dict["success"] = False + result_dict["error"] = str(e) - print(f"[GPU {gpu_id}] Worker finished (queue empty)", flush=True) + result_dict["elapsed"] = round(elapsed, 2) + result_dict["timestamp"] = datetime.now().isoformat() + result_queue.put(result_dict) + print(f"{prefix} Worker finished (queue empty)", flush=True) # --------------------------------------------------------------------------- @@ -224,11 +245,13 @@ def _print_summary(results: Dict) -> None: gpu_stats[g]["total"] += 1 if d.get("success"): gpu_stats[g]["success"] += 1 - print("\n Per-GPU:") + + label = "GPU" if results.get("gpus") else "Worker" + print(f"\n Per-{label}:") for g in sorted(gpu_stats): gs = gpu_stats[g] gr = (gs["success"] / gs["total"] * 100) if gs["total"] else 0.0 - print(f" GPU {g}: {gs['success']}/{gs['total']} ({gr:.1f}%)") + print(f" {label} {g}: {gs['success']}/{gs['total']} ({gr:.1f}%)") print("=" * 60) @@ -237,9 +260,9 @@ def _print_summary(results: Dict) -> None: # --------------------------------------------------------------------------- -def main() -> int: +def _parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( - description="Parallel computation graph extraction from HuggingFace; one agent process per GPU" + description="Parallel computation graph extraction from HuggingFace; one agent process per GPU or CPU" ) parser.add_argument( "--model-list", @@ -274,8 +297,14 @@ def main() -> int: parser.add_argument( "--gpus", type=str, - default=",".join(str(g) for g in DEFAULT_GPUS), - help=f"Comma-separated GPU indices to use (default: {','.join(str(g) for g in DEFAULT_GPUS)})", + default=None, + help="Comma-separated GPU indices to use (GPU mode; if set, ignores --num-workers)", + ) + parser.add_argument( + "--num-workers", + type=int, + default=None, + help="Number of worker processes in CPU mode (default: CPU count)", ) parser.add_argument( "--output", @@ -283,43 +312,49 @@ def main() -> int: default=None, help="Output JSON file path (default: auto-generated filename with timestamp)", ) + return parser.parse_args() + + +def _load_model_ids(args: argparse.Namespace) -> List[str]: + if args.model_list: + return load_models_from_file(args.model_list) + if HUGGINGFACE_HUB_AVAILABLE: + print( + f"[INFO] Fetching {args.count} models from HuggingFace Hub (task={args.task})..." + ) + return get_models_from_hf(task=args.task, limit=args.count) + return [] - args = parser.parse_args() - # --- Resolve workspace --- +def _resolve_config(args: argparse.Namespace): workspace = ( args.workspace or os.getenv("GRAPH_NET_EXTRACT_WORKSPACE") or DEFAULT_WORKSPACE ) print(f"[INFO] Workspace: {workspace}") - # --- Parse GPU list --- - try: - gpus = [int(g.strip()) for g in args.gpus.split(",") if g.strip()] - except ValueError: - print(f"[ERROR] Invalid --gpus value: {args.gpus}") - return 1 - if not gpus: - print("[ERROR] No GPUs specified") - return 1 - print(f"[INFO] GPUs: {gpus}") - - # --- Load model list --- - if args.model_list: - model_ids = load_models_from_file(args.model_list) - elif HUGGINGFACE_HUB_AVAILABLE: - print( - f"[INFO] Fetching {args.count} models from HuggingFace Hub (task={args.task})..." - ) - model_ids = get_models_from_hf(task=args.task, limit=args.count) + if get_device_type() == "cuda": + gpus = get_gpu_ids(args) + num_workers = len(gpus) + print(f"[INFO] GPU mode (torch fallback): {gpus}") else: - print("[ERROR] No model list provided and huggingface_hub not available") - return 1 + gpus = [] + num_workers = args.num_workers if args.num_workers else 1 + print(f"[INFO] CPU mode: {num_workers} workers") + + return workspace, gpus, num_workers + + +def main() -> int: + args = _parse_args() + workspace, gpus, num_workers = _resolve_config(args) + + model_ids = _load_model_ids(args) if not model_ids: print("[ERROR] Empty model list, nothing to do") return 1 - print(f"[INFO] Total models: {len(model_ids)}, workers: {len(gpus)}") + print(f"[INFO] Total models: {len(model_ids)}, workers: {num_workers}") # --- Populate shared task queue --- task_queue: multiprocessing.Queue = multiprocessing.Queue() @@ -328,17 +363,19 @@ def main() -> int: # --- Launch workers --- result_queue: multiprocessing.Queue = multiprocessing.Queue() - processes = [] start_time = datetime.now() print( - f"\n[START] {start_time.strftime('%Y-%m-%d %H:%M:%S')} — launching {len(gpus)} workers\n" + f"\n[START] {start_time.strftime('%Y-%m-%d %H:%M:%S')} — launching {num_workers} workers\n" ) - for gpu_id in gpus: + processes = [] + for worker_id in range(num_workers): + gpu_id = gpus[worker_id] if gpus else None p = multiprocessing.Process( - target=_worker, + target=worker_fn, args=( + worker_id, gpu_id, task_queue, result_queue, @@ -346,7 +383,8 @@ def main() -> int: args.hf_token, len(model_ids), ), - name=f"worker-gpu{gpu_id}", + name=f"worker-{worker_id}" + + (f"-gpu{gpu_id}" if gpu_id is not None else "-cpu"), daemon=True, ) p.start() @@ -354,16 +392,14 @@ def main() -> int: # --- Collect results --- details = [] - total_expected = len(model_ids) - - while len(details) < total_expected: + while len(details) < len(model_ids): try: entry = result_queue.get(timeout=5) details.append(entry) done = len(details) success_so_far = sum(1 for d in details if d.get("success")) print( - f"[PROGRESS] {done}/{total_expected} done, " + f"[PROGRESS] {done}/{len(model_ids)} done, " f"success rate so far: {success_so_far/done*100:.1f}%", flush=True, ) @@ -378,18 +414,16 @@ def main() -> int: p.join(timeout=10) end_time = datetime.now() - elapsed_total = (end_time - start_time).total_seconds() - - # --- Build result summary --- + success_count = sum(1 for d in details if d.get("success")) results = { "start_time": start_time.isoformat(), "end_time": end_time.isoformat(), - "elapsed_seconds": round(elapsed_total, 1), + "elapsed_seconds": round((end_time - start_time).total_seconds(), 1), "gpus": gpus, "workspace": workspace, "total": len(details), - "success": sum(1 for d in details if d.get("success")), - "failed": sum(1 for d in details if not d.get("success")), + "success": success_count, + "failed": len(details) - success_count, "success_rate": 0.0, "details": details, } @@ -404,7 +438,7 @@ def main() -> int: # --- Print summary --- _print_summary(results) - print(f"\n[DONE] Total elapsed: {elapsed_total:.0f}s") + print(f"\n[DONE] Total elapsed: {results['elapsed_seconds']:.0f}s") return 0 if results["success_rate"] > 0 else 1 From aad09c26d487a964c7ecb7a8c5fed643ef342f2a Mon Sep 17 00:00:00 2001 From: Liu Yiqun Date: Thu, 14 May 2026 23:50:15 +0800 Subject: [PATCH 2/5] Fix workspace path. --- graph_net/agent/parallel_extract.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/graph_net/agent/parallel_extract.py b/graph_net/agent/parallel_extract.py index f96927012..7e79f3714 100644 --- a/graph_net/agent/parallel_extract.py +++ b/graph_net/agent/parallel_extract.py @@ -154,7 +154,10 @@ def worker_fn( os.environ["CUDA_VISIBLE_DEVICES"] = "" prefix = f"[Worker-{worker_id} CPU]" # Pass workspace to the environment variable used by SubprocessGraphExtractor - os.environ["GRAPH_NET_EXTRACT_WORKSPACE"] = workspace + if "GRAPH_NET_EXTRACT_WORKSPACE" not in os.environ: + os.envion[ + "GRAPH_NET_EXTRACT_WORKSPACE" + ] = f"{workspace}/samples/transformers-auto-model" print(f"{prefix} Started", flush=True) From daad1c6de28d3f2ba262036bcc5d474381313c9c Mon Sep 17 00:00:00 2001 From: Liu Yiqun Date: Fri, 15 May 2026 10:01:15 +0800 Subject: [PATCH 3/5] Support configurable extract and verify timeouts with CPU/GPU defaults. - GraphNetAgent: add extract_timeout and verify_timeout parameters - parallel_extract: add --extract-timeout and --verify-timeout CLI args - Default timeouts differ by device: GPU: extract=1000s, verify=300s CPU: extract=2000s, verify=600s - Fix typo: os.envion -> os.environ Co-Authored-By: Claude Opus 4.6 --- .../subprocess_graph_extractor.py | 6 +- graph_net/agent/graph_net_agent.py | 21 ++++--- graph_net/agent/parallel_extract.py | 63 +++++++++++++++---- .../agent/sample_verifier/forward_verifier.py | 7 ++- 4 files changed, 72 insertions(+), 25 deletions(-) diff --git a/graph_net/agent/graph_extractor/subprocess_graph_extractor.py b/graph_net/agent/graph_extractor/subprocess_graph_extractor.py index 3f0a13f48..3b001693b 100644 --- a/graph_net/agent/graph_extractor/subprocess_graph_extractor.py +++ b/graph_net/agent/graph_extractor/subprocess_graph_extractor.py @@ -22,14 +22,14 @@ class SubprocessGraphExtractor(BaseGraphExtractor): """Extractor that runs script in subprocess""" - def __init__(self, workspace: str, timeout: int = DEFAULT_TIMEOUT): + def __init__(self, workspace: str, timeout: int | None = None): """ Args: workspace: Workspace root directory - timeout: Timeout in seconds for script execution + timeout: Timeout in seconds for script execution (default 1000s) """ self.workspace = Path(workspace) - self.timeout = timeout + self.timeout = timeout if timeout is not None else DEFAULT_TIMEOUT self.logger = logging.getLogger(self.__class__.__name__) def extract(self, code_path: Path, model_id: str) -> Path: diff --git a/graph_net/agent/graph_net_agent.py b/graph_net/agent/graph_net_agent.py index e51c1a2e3..1cdfdf16f 100644 --- a/graph_net/agent/graph_net_agent.py +++ b/graph_net/agent/graph_net_agent.py @@ -31,16 +31,22 @@ def __init__( workspace: Optional[str] = None, hf_token: Optional[str] = None, llm_retry: bool = True, + extract_timeout: Optional[int] = None, + verify_timeout: Optional[int] = None, ): """ Initialize GraphNet Agent Args: - workspace: Workspace root directory. Defaults to - $GRAPH_NET_EXTRACT_WORKSPACE or ~/graphnet_workspace. - hf_token: HuggingFace API token (optional) - llm_retry: If True and ducc/claude CLI is available, retry failed - extractions up to 2 times with LLM-fixed scripts. + workspace: Workspace root directory. Defaults to + $GRAPH_NET_EXTRACT_WORKSPACE or ~/graphnet_workspace. + hf_token: HuggingFace API token (optional) + llm_retry: If True and ducc/claude CLI is available, retry failed + extractions up to 2 times with LLM-fixed scripts. + extract_timeout: Timeout in seconds for graph extraction subprocess + (default None -> 1000s). + verify_timeout: Timeout in seconds for forward verification subprocess + (default None -> 300s). """ if workspace is None: workspace = os.environ.get( @@ -63,9 +69,10 @@ def __init__( self.metadata_analyzer = ConfigMetadataAnalyzer() self.code_generator = TemplateCodeGenerator() self.graph_extractor = SubprocessGraphExtractor( - workspace=str(self.workspace.workspace_root) + workspace=str(self.workspace.workspace_root), + timeout=extract_timeout, ) - self.sample_verifier = ForwardVerifier() + self.sample_verifier = ForwardVerifier(timeout=verify_timeout) # LLM fixer — only created when llm_retry is requested self.llm_fixer: Optional[LLMCodeFixer] = LLMCodeFixer() if llm_retry else None diff --git a/graph_net/agent/parallel_extract.py b/graph_net/agent/parallel_extract.py index 7e79f3714..9c5f8fd6c 100644 --- a/graph_net/agent/parallel_extract.py +++ b/graph_net/agent/parallel_extract.py @@ -133,19 +133,23 @@ def worker_fn( workspace: str, hf_token: Optional[str], total: int, + extract_timeout: int, + verify_timeout: int, ) -> None: """ Worker function, runs in a dedicated subprocess bound to a single GPU or CPU. Dynamically pulls tasks from task_queue and exits when the queue is empty. Args: - worker_id: Worker process index (e.g. 0) - gpu_id: CUDA device index (e.g. 2), None for CPU mode - task_queue: Shared task queue; each item is a model_id string - result_queue: Queue for reporting results back to the main process - workspace: Root workspace directory path - hf_token: HuggingFace token (optional) - total: Total task count (used for logging only) + worker_id: Worker process index (e.g. 0) + gpu_id: CUDA device index (e.g. 2), None for CPU mode + task_queue: Shared task queue; each item is a model_id string + result_queue: Queue for reporting results back to the main process + workspace: Root workspace directory path + hf_token: HuggingFace token (optional) + total: Total task count (used for logging only) + extract_timeout: Timeout in seconds for graph extraction subprocess + verify_timeout: Timeout in seconds for forward verification subprocess """ if gpu_id is not None: os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id) @@ -155,14 +159,24 @@ def worker_fn( prefix = f"[Worker-{worker_id} CPU]" # Pass workspace to the environment variable used by SubprocessGraphExtractor if "GRAPH_NET_EXTRACT_WORKSPACE" not in os.environ: - os.envion[ + os.environ[ "GRAPH_NET_EXTRACT_WORKSPACE" ] = f"{workspace}/samples/transformers-auto-model" - print(f"{prefix} Started", flush=True) + print( + f"{prefix} Started (extract_timeout={extract_timeout}s, " + f"verify_timeout={verify_timeout}s)", + flush=True, + ) try: - agent = GraphNetAgent(workspace=workspace, hf_token=hf_token, llm_retry=False) + agent = GraphNetAgent( + workspace=workspace, + hf_token=hf_token, + llm_retry=False, + extract_timeout=extract_timeout, + verify_timeout=verify_timeout, + ) except Exception as e: print(f"{prefix} Failed to initialize agent: {e}", flush=True) # Drain queue and mark remaining tasks as failed to avoid blocking the main process @@ -315,6 +329,18 @@ def _parse_args() -> argparse.Namespace: default=None, help="Output JSON file path (default: auto-generated filename with timestamp)", ) + parser.add_argument( + "--extract-timeout", + type=int, + default=None, + help="Timeout in seconds for graph extraction (default: 1000 on GPU, 2000 on CPU)", + ) + parser.add_argument( + "--verify-timeout", + type=int, + default=None, + help="Timeout in seconds for forward verification (default: 300 on GPU, 600 on CPU)", + ) return parser.parse_args() @@ -339,18 +365,29 @@ def _resolve_config(args: argparse.Namespace): gpus = get_gpu_ids(args) num_workers = len(gpus) print(f"[INFO] GPU mode (torch fallback): {gpus}") + extract_timeout = ( + args.extract_timeout if args.extract_timeout is not None else 1000 + ) + verify_timeout = args.verify_timeout if args.verify_timeout is not None else 300 else: gpus = [] num_workers = args.num_workers if args.num_workers else 1 print(f"[INFO] CPU mode: {num_workers} workers") + extract_timeout = ( + args.extract_timeout if args.extract_timeout is not None else 2000 + ) + verify_timeout = args.verify_timeout if args.verify_timeout is not None else 600 - return workspace, gpus, num_workers + print(f"[INFO] Timeouts: extract={extract_timeout}s, verify={verify_timeout}s") + return workspace, gpus, num_workers, extract_timeout, verify_timeout def main() -> int: args = _parse_args() - workspace, gpus, num_workers = _resolve_config(args) + workspace, gpus, num_workers, extract_timeout, verify_timeout = _resolve_config( + args + ) model_ids = _load_model_ids(args) if not model_ids: @@ -385,6 +422,8 @@ def main() -> int: workspace, args.hf_token, len(model_ids), + extract_timeout, + verify_timeout, ), name=f"worker-{worker_id}" + (f"-gpu{gpu_id}" if gpu_id is not None else "-cpu"), diff --git a/graph_net/agent/sample_verifier/forward_verifier.py b/graph_net/agent/sample_verifier/forward_verifier.py index 5f8ba0323..8423baeb1 100644 --- a/graph_net/agent/sample_verifier/forward_verifier.py +++ b/graph_net/agent/sample_verifier/forward_verifier.py @@ -41,13 +41,14 @@ class ForwardVerifier(BaseSampleVerifier): is verified independently; all must pass. """ - def __init__(self, timeout: int = 300): + def __init__(self, timeout: int | None = 300): """ Args: - timeout: seconds to wait for each forward-pass subprocess (default 5 min) + timeout: seconds to wait for each forward-pass subprocess + (default 300s, can be overridden) """ self._basic = BasicSampleVerifier() - self.timeout = timeout + self.timeout = timeout if timeout is not None else 300 self.logger = logging.getLogger(self.__class__.__name__) def verify(self, sample_dir: Path) -> bool: From 0bdf25b6ef151e94decb9b6e355d2e6b5fbe8483 Mon Sep 17 00:00:00 2001 From: Liu Yiqun Date: Fri, 15 May 2026 10:07:31 +0800 Subject: [PATCH 4/5] Add --use-llm flag to parallel_extract for configurable LLM retry. - New CLI arg --use-llm (default: true) controls llm_retry in GraphNetAgent - Pass llm_retry through worker_fn and _resolve_config Co-Authored-By: Claude Opus 4.6 --- graph_net/agent/parallel_extract.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/graph_net/agent/parallel_extract.py b/graph_net/agent/parallel_extract.py index 9c5f8fd6c..f81c9ee21 100644 --- a/graph_net/agent/parallel_extract.py +++ b/graph_net/agent/parallel_extract.py @@ -135,6 +135,7 @@ def worker_fn( total: int, extract_timeout: int, verify_timeout: int, + llm_retry: bool, ) -> None: """ Worker function, runs in a dedicated subprocess bound to a single GPU or CPU. @@ -150,6 +151,7 @@ def worker_fn( total: Total task count (used for logging only) extract_timeout: Timeout in seconds for graph extraction subprocess verify_timeout: Timeout in seconds for forward verification subprocess + llm_retry: Whether to enable LLM retry for failed extractions """ if gpu_id is not None: os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id) @@ -173,7 +175,7 @@ def worker_fn( agent = GraphNetAgent( workspace=workspace, hf_token=hf_token, - llm_retry=False, + llm_retry=llm_retry, extract_timeout=extract_timeout, verify_timeout=verify_timeout, ) @@ -341,6 +343,12 @@ def _parse_args() -> argparse.Namespace: default=None, help="Timeout in seconds for forward verification (default: 300 on GPU, 600 on CPU)", ) + parser.add_argument( + "--use-llm", + type=lambda x: x.lower() in ("true", "1", "yes"), + default="true", + help="Enable LLM retry for failed extractions (default: true)", + ) return parser.parse_args() @@ -378,7 +386,6 @@ def _resolve_config(args: argparse.Namespace): ) verify_timeout = args.verify_timeout if args.verify_timeout is not None else 600 - print(f"[INFO] Timeouts: extract={extract_timeout}s, verify={verify_timeout}s") return workspace, gpus, num_workers, extract_timeout, verify_timeout @@ -388,6 +395,7 @@ def main() -> int: workspace, gpus, num_workers, extract_timeout, verify_timeout = _resolve_config( args ) + llm_retry = args.use_llm model_ids = _load_model_ids(args) if not model_ids: @@ -424,6 +432,7 @@ def main() -> int: len(model_ids), extract_timeout, verify_timeout, + llm_retry, ), name=f"worker-{worker_id}" + (f"-gpu{gpu_id}" if gpu_id is not None else "-cpu"), From 437a689d8fab57028425cd749512c9ba2f890eb6 Mon Sep 17 00:00:00 2001 From: Liu Yiqun Date: Fri, 15 May 2026 10:20:59 +0800 Subject: [PATCH 5/5] Refactor extract_sample return to ExtractionStatus enum and split success rates. - Introduce ExtractionStatus(str, Enum): OK, VERIFY_FAILED, EXTRACT_FAILED, ERROR - GraphNetAgent.extract_sample() now returns ExtractionStatus instead of bool - parallel_extract tracks and prints both overall and extraction-only success rates - Per-GPU/Worker summary also shows both rates Co-Authored-By: Claude Opus 4.6 --- graph_net/agent/__init__.py | 4 +- graph_net/agent/graph_net_agent.py | 32 ++++++--- graph_net/agent/parallel_extract.py | 71 ++++++++++++++----- .../agent/sample_verifier/forward_verifier.py | 2 +- 4 files changed, 82 insertions(+), 27 deletions(-) diff --git a/graph_net/agent/__init__.py b/graph_net/agent/__init__.py index b9a38b220..56608c876 100644 --- a/graph_net/agent/__init__.py +++ b/graph_net/agent/__init__.py @@ -5,6 +5,6 @@ from Hugging Face models. """ -from graph_net.agent.graph_net_agent import GraphNetAgent +from graph_net.agent.graph_net_agent import ExtractionStatus, GraphNetAgent -__all__ = ["GraphNetAgent"] +__all__ = ["GraphNetAgent", "ExtractionStatus"] diff --git a/graph_net/agent/graph_net_agent.py b/graph_net/agent/graph_net_agent.py index 1cdfdf16f..4339bc65d 100644 --- a/graph_net/agent/graph_net_agent.py +++ b/graph_net/agent/graph_net_agent.py @@ -2,6 +2,7 @@ import json import os +from enum import Enum from pathlib import Path from typing import Optional @@ -23,6 +24,15 @@ from graph_net.agent.sample_verifier import ForwardVerifier +class ExtractionStatus(str, Enum): + """Extraction result status for a single model.""" + + OK = "ok" + VERIFY_FAILED = "verify_failed" + EXTRACT_FAILED = "extract_failed" + ERROR = "error" + + class GraphNetAgent: """GraphNet automatic sample extraction agent""" @@ -77,7 +87,7 @@ def __init__( # LLM fixer — only created when llm_retry is requested self.llm_fixer: Optional[LLMCodeFixer] = LLMCodeFixer() if llm_retry else None - def extract_sample(self, model_id: str) -> bool: + def extract_sample(self, model_id: str) -> ExtractionStatus: """ Execute complete sample extraction pipeline from HuggingFace model ID. @@ -89,7 +99,10 @@ def extract_sample(self, model_id: str) -> bool: model_id: HuggingFace model ID (e.g., "bert-base-uncased") Returns: - True if sample extraction succeeded, False otherwise + ExtractionStatus.OK – extraction and verification both passed + ExtractionStatus.VERIFY_FAILED – extraction succeeded but verification failed + ExtractionStatus.EXTRACT_FAILED – extraction (or pre-extraction) failed + ExtractionStatus.ERROR – unexpected error """ try: self.logger.info(f"Starting extraction for model: {model_id}") @@ -111,21 +124,24 @@ def extract_sample(self, model_id: str) -> bool: if self.is_duplicate_sample(sample_dir): self.logger.info("Duplicate sample detected, skipping verification") - return True + return ExtractionStatus.OK if not self.sample_verifier.verify(sample_dir): self.logger.error("Sample verification failed") - return False + return ExtractionStatus.VERIFY_FAILED self.logger.info(f"Successfully extracted sample for {model_id}") - return True + return ExtractionStatus.OK - except (AnalysisError, CodeGenError, ExtractionError, VerificationError) as e: + except VerificationError as e: self.logger.error(f"Extraction failed for {model_id}: {e}") - return False + return ExtractionStatus.VERIFY_FAILED + except (AnalysisError, CodeGenError, ExtractionError) as e: + self.logger.error(f"Extraction failed for {model_id}: {e}") + return ExtractionStatus.EXTRACT_FAILED except Exception as e: self.logger.error(f"Unexpected error for {model_id}: {e}", exc_info=True) - return False + return ExtractionStatus.ERROR def _llm_retry( self, diff --git a/graph_net/agent/parallel_extract.py b/graph_net/agent/parallel_extract.py index f81c9ee21..834e68cde 100644 --- a/graph_net/agent/parallel_extract.py +++ b/graph_net/agent/parallel_extract.py @@ -37,7 +37,7 @@ if str(_GRAPHNET_ROOT) not in sys.path: sys.path.insert(0, str(_GRAPHNET_ROOT)) -from graph_net.agent import GraphNetAgent # noqa: E402 +from graph_net.agent import ExtractionStatus, GraphNetAgent # noqa: E402 import torch # noqa: E402 @@ -212,15 +212,18 @@ def worker_fn( "model_id": model_id, } try: - success = agent.extract_sample(model_id) + status = agent.extract_sample(model_id) elapsed = time.time() - t0 - status = "OK" if success else "FAIL" - print(f"{prefix} {status} {model_id} ({elapsed:.1f}s)", flush=True) - result_dict["success"] = success + ok = status == ExtractionStatus.OK + label = "OK" if ok else status.name.replace("_", " ") + print(f"{prefix} {label} {model_id} ({elapsed:.1f}s)", flush=True) + result_dict["success"] = ok + result_dict["status"] = status.value except Exception as e: elapsed = time.time() - t0 print(f"{prefix} ERROR {model_id}: {e} ({elapsed:.1f}s)", flush=True) result_dict["success"] = False + result_dict["status"] = ExtractionStatus.ERROR.value result_dict["error"] = str(e) result_dict["elapsed"] = round(elapsed, 2) @@ -246,31 +249,49 @@ def _print_summary(results: Dict) -> None: details = results.get("details", []) total = len(details) success = sum(1 for d in details if d.get("success")) + extract_success = sum( + 1 + for d in details + if d.get("status") + in (ExtractionStatus.OK.value, ExtractionStatus.VERIFY_FAILED.value) + ) failed = total - success rate = (success / total * 100) if total else 0.0 + extract_rate = (extract_success / total * 100) if total else 0.0 print("\n" + "=" * 60) print("[SUMMARY] Parallel Extraction Summary") print("=" * 60) - print(f" Total : {total}") - print(f" Success: {success}") - print(f" Failed : {failed}") - print(f" Rate : {rate:.2f}%") + print(f" Total : {total}") + print(f" Success : {success} (verify ok)") + print(f" Extract : {extract_success} (graph extracted)") + print(f" Failed : {failed}") + print(f" Rate : {rate:.2f}% (overall)") + print(f" Extract : {extract_rate:.2f}% (extraction only)") # Per-GPU breakdown gpu_stats: Dict[int, Dict] = {} for d in details: g = d.get("gpu", -1) if g not in gpu_stats: - gpu_stats[g] = {"total": 0, "success": 0} + gpu_stats[g] = {"total": 0, "success": 0, "extract": 0} gpu_stats[g]["total"] += 1 if d.get("success"): gpu_stats[g]["success"] += 1 + if d.get("status") in ( + ExtractionStatus.OK.value, + ExtractionStatus.VERIFY_FAILED.value, + ): + gpu_stats[g]["extract"] += 1 label = "GPU" if results.get("gpus") else "Worker" print(f"\n Per-{label}:") for g in sorted(gpu_stats): gs = gpu_stats[g] gr = (gs["success"] / gs["total"] * 100) if gs["total"] else 0.0 - print(f" {label} {g}: {gs['success']}/{gs['total']} ({gr:.1f}%)") + er = (gs["extract"] / gs["total"] * 100) if gs["total"] else 0.0 + print( + f" {label} {g}: success={gs['success']}/{gs['total']} ({gr:.1f}%), " + f"extract={gs['extract']}/{gs['total']} ({er:.1f}%)" + ) print("=" * 60) @@ -345,9 +366,9 @@ def _parse_args() -> argparse.Namespace: ) parser.add_argument( "--use-llm", - type=lambda x: x.lower() in ("true", "1", "yes"), - default="true", - help="Enable LLM retry for failed extractions (default: true)", + action="store_true", + default=False, + help="Enable LLM retry for failed extractions", ) return parser.parse_args() @@ -448,10 +469,17 @@ def main() -> int: entry = result_queue.get(timeout=5) details.append(entry) done = len(details) - success_so_far = sum(1 for d in details if d.get("success")) + ok_so_far = sum(1 for d in details if d.get("success")) + extract_ok_so_far = sum( + 1 + for d in details + if d.get("status") + in (ExtractionStatus.OK.value, ExtractionStatus.VERIFY_FAILED.value) + ) print( f"[PROGRESS] {done}/{len(model_ids)} done, " - f"success rate so far: {success_so_far/done*100:.1f}%", + f"success={ok_so_far/done*100:.1f}%, " + f"extract={extract_ok_so_far/done*100:.1f}%", flush=True, ) except Exception: @@ -466,6 +494,12 @@ def main() -> int: end_time = datetime.now() success_count = sum(1 for d in details if d.get("success")) + extract_success_count = sum( + 1 + for d in details + if d.get("status") + in (ExtractionStatus.OK.value, ExtractionStatus.VERIFY_FAILED.value) + ) results = { "start_time": start_time.isoformat(), "end_time": end_time.isoformat(), @@ -474,12 +508,17 @@ def main() -> int: "workspace": workspace, "total": len(details), "success": success_count, + "extract_success": extract_success_count, "failed": len(details) - success_count, "success_rate": 0.0, + "extract_success_rate": 0.0, "details": details, } if results["total"] > 0: results["success_rate"] = round(results["success"] / results["total"] * 100, 2) + results["extract_success_rate"] = round( + results["extract_success"] / results["total"] * 100, 2 + ) # --- Save results --- output_file = ( diff --git a/graph_net/agent/sample_verifier/forward_verifier.py b/graph_net/agent/sample_verifier/forward_verifier.py index 8423baeb1..c7849eac7 100644 --- a/graph_net/agent/sample_verifier/forward_verifier.py +++ b/graph_net/agent/sample_verifier/forward_verifier.py @@ -41,7 +41,7 @@ class ForwardVerifier(BaseSampleVerifier): is verified independently; all must pass. """ - def __init__(self, timeout: int | None = 300): + def __init__(self, timeout: int = 300): """ Args: timeout: seconds to wait for each forward-pass subprocess