Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions graph_net/agent/code_generator/llm_code_fixer.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,11 @@ def __init__(
):
"""
Args:
timeout: Max seconds to wait for ducc response.
timeout: Max seconds to wait for ducc response (default 360s).
model: Override the LLM model (e.g. 'sonnet', 'haiku').
If None, uses whatever ducc default is configured.
"""
self.timeout = timeout
self.timeout = timeout if timeout is not None else 360
self.model = model
self.logger = logging.getLogger(self.__class__.__name__)
self._ducc_bin = _find_ducc()
Expand Down
34 changes: 24 additions & 10 deletions graph_net/agent/graph_net_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,22 @@ def __init__(
llm_retry: bool = True,
extract_timeout: Optional[int] = None,
verify_timeout: Optional[int] = None,
llm_timeout: int = 900,
):
"""
Initialize GraphNet Agent

Args:
workspace: Workspace root directory. Defaults to
$GRAPH_NET_EXTRACT_WORKSPACE or ~/graphnet_workspace.
hf_token: HuggingFace API token (optional)
llm_retry: If True and ducc/claude CLI is available, retry failed
extractions up to 2 times with LLM-fixed scripts.
extract_timeout: Timeout in seconds for graph extraction subprocess
(default None -> 1000s).
verify_timeout: Timeout in seconds for forward verification subprocess
(default None -> 300s).
workspace: Workspace root directory. Defaults to
$GRAPH_NET_EXTRACT_WORKSPACE or ~/graphnet_workspace.
hf_token: HuggingFace API token (optional)
llm_retry: If True and ducc/claude CLI is available, retry failed
extractions up to 2 times with LLM-fixed scripts.
extract_timeout: Timeout in seconds for graph extraction subprocess
(default None -> 1000s).
verify_timeout: Timeout in seconds for forward verification subprocess
(default None -> 300s).
llm_timeout: Timeout in seconds for LLM script fix (default: 600).
"""
if workspace is None:
workspace = os.environ.get(
Expand Down Expand Up @@ -85,7 +87,12 @@ def __init__(
self.sample_verifier = ForwardVerifier(timeout=verify_timeout)

# LLM fixer — only created when llm_retry is requested
self.llm_fixer: Optional[LLMCodeFixer] = LLMCodeFixer() if llm_retry else None
self.llm_fixer: Optional[LLMCodeFixer] = (
LLMCodeFixer(timeout=llm_timeout) if llm_retry else None
)

# Track whether the last verify succeeded only because of timeout skip
self.last_timeout_success = False

def extract_sample(self, model_id: str) -> ExtractionStatus:
"""
Expand All @@ -104,6 +111,7 @@ def extract_sample(self, model_id: str) -> ExtractionStatus:
ExtractionStatus.EXTRACT_FAILED – extraction (or pre-extraction) failed
ExtractionStatus.ERROR – unexpected error
"""
self.last_timeout_success = False
try:
self.logger.info(f"Starting extraction for model: {model_id}")

Expand Down Expand Up @@ -131,6 +139,12 @@ def extract_sample(self, model_id: str) -> ExtractionStatus:
self.logger.error("Sample verification failed")
return ExtractionStatus.VERIFY_FAILED

if getattr(self.sample_verifier, "last_timeout_success", False):
self.last_timeout_success = True
self.logger.info(
f"Sample verification for {model_id} passed via timeout skip"
)

self.logger.info(f"Successfully extracted sample for {model_id}")
return ExtractionStatus.OK

Expand Down
33 changes: 27 additions & 6 deletions graph_net/agent/parallel_extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,16 +215,21 @@ def worker_fn(
status = agent.extract_sample(model_id)
elapsed = time.time() - t0
ok = status == ExtractionStatus.OK
timeout_success = getattr(agent, "last_timeout_success", False)
label = "OK" if ok else status.name.replace("_", " ")
if ok and timeout_success:
label = "OK(timeout)"
print(f"{prefix} {label} {model_id} ({elapsed:.1f}s)", flush=True)
result_dict["success"] = ok
result_dict["status"] = status.value
result_dict["timeout_success"] = timeout_success
except Exception as e:
elapsed = time.time() - t0
print(f"{prefix} ERROR {model_id}: {e} ({elapsed:.1f}s)", flush=True)
result_dict["success"] = False
result_dict["status"] = ExtractionStatus.ERROR.value
result_dict["error"] = str(e)
result_dict["timeout_success"] = False

result_dict["elapsed"] = round(elapsed, 2)
result_dict["timestamp"] = datetime.now().isoformat()
Expand All @@ -249,6 +254,7 @@ def _print_summary(results: Dict) -> None:
details = results.get("details", [])
total = len(details)
success = sum(1 for d in details if d.get("success"))
timeout_success = sum(1 for d in details if d.get("timeout_success"))
extract_success = sum(
1
for d in details
Expand All @@ -257,25 +263,29 @@ def _print_summary(results: Dict) -> None:
)
failed = total - success
rate = (success / total * 100) if total else 0.0
timeout_rate = (timeout_success / total * 100) if total else 0.0
extract_rate = (extract_success / total * 100) if total else 0.0
print("\n" + "=" * 60)
print("[SUMMARY] Parallel Extraction Summary")
print("=" * 60)
print(f" Total : {total}")
print(f" Success : {success} (verify ok)")
print(f" Timeout : {timeout_success} (verify skipped by timeout)")
print(f" Extract : {extract_success} (graph extracted)")
print(f" Failed : {failed}")
print(f" Rate : {rate:.2f}% (overall)")
print(f" Rate : {rate:.2f}% (overall, timeout_success={timeout_rate:.2f}%)")
print(f" Extract : {extract_rate:.2f}% (extraction only)")
# Per-GPU breakdown
gpu_stats: Dict[int, Dict] = {}
for d in details:
g = d.get("gpu", -1)
if g not in gpu_stats:
gpu_stats[g] = {"total": 0, "success": 0, "extract": 0}
gpu_stats[g] = {"total": 0, "success": 0, "extract": 0, "timeout": 0}
gpu_stats[g]["total"] += 1
if d.get("success"):
gpu_stats[g]["success"] += 1
if d.get("timeout_success"):
gpu_stats[g]["timeout"] += 1
if d.get("status") in (
ExtractionStatus.OK.value,
ExtractionStatus.VERIFY_FAILED.value,
Expand All @@ -288,9 +298,11 @@ def _print_summary(results: Dict) -> None:
gs = gpu_stats[g]
gr = (gs["success"] / gs["total"] * 100) if gs["total"] else 0.0
er = (gs["extract"] / gs["total"] * 100) if gs["total"] else 0.0
tr = (gs["timeout"] / gs["total"] * 100) if gs["total"] else 0.0
print(
f" {label} {g}: success={gs['success']}/{gs['total']} ({gr:.1f}%), "
f"extract={gs['extract']}/{gs['total']} ({er:.1f}%)"
f"extract={gs['extract']}/{gs['total']} ({er:.1f}%), "
f"timeout={gs['timeout']}/{gs['total']} ({tr:.1f}%)"
)
print("=" * 60)

Expand Down Expand Up @@ -362,7 +374,7 @@ def _parse_args() -> argparse.Namespace:
"--verify-timeout",
type=int,
default=None,
help="Timeout in seconds for forward verification (default: 300 on GPU, 600 on CPU)",
help="Timeout in seconds for forward verification (default: 300 on GPU, 1200 on CPU)",
)
parser.add_argument(
"--use-llm",
Expand Down Expand Up @@ -405,7 +417,9 @@ def _resolve_config(args: argparse.Namespace):
extract_timeout = (
args.extract_timeout if args.extract_timeout is not None else 2000
)
verify_timeout = args.verify_timeout if args.verify_timeout is not None else 600
verify_timeout = (
args.verify_timeout if args.verify_timeout is not None else 1200
)

return workspace, gpus, num_workers, extract_timeout, verify_timeout

Expand Down Expand Up @@ -470,6 +484,7 @@ def main() -> int:
details.append(entry)
done = len(details)
ok_so_far = sum(1 for d in details if d.get("success"))
timeout_so_far = sum(1 for d in details if d.get("timeout_success"))
extract_ok_so_far = sum(
1
for d in details
Expand All @@ -478,7 +493,7 @@ def main() -> int:
)
print(
f"[PROGRESS] {done}/{len(model_ids)} done, "
f"success={ok_so_far/done*100:.1f}%, "
f"success={ok_so_far/done*100:.1f}%(timeout_success={timeout_so_far/done*100:.1f}%), "
f"extract={extract_ok_so_far/done*100:.1f}%",
flush=True,
)
Expand All @@ -494,6 +509,7 @@ def main() -> int:

end_time = datetime.now()
success_count = sum(1 for d in details if d.get("success"))
timeout_success_count = sum(1 for d in details if d.get("timeout_success"))
extract_success_count = sum(
1
for d in details
Expand All @@ -508,14 +524,19 @@ def main() -> int:
"workspace": workspace,
"total": len(details),
"success": success_count,
"timeout_success": timeout_success_count,
"extract_success": extract_success_count,
"failed": len(details) - success_count,
"success_rate": 0.0,
"timeout_success_rate": 0.0,
"extract_success_rate": 0.0,
"details": details,
}
if results["total"] > 0:
results["success_rate"] = round(results["success"] / results["total"] * 100, 2)
results["timeout_success_rate"] = round(
results["timeout_success"] / results["total"] * 100, 2
)
results["extract_success_rate"] = round(
results["extract_success"] / results["total"] * 100, 2
)
Expand Down
26 changes: 19 additions & 7 deletions graph_net/agent/sample_verifier/forward_verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def __init__(self, timeout: int = 300):
self._basic = BasicSampleVerifier()
self.timeout = timeout if timeout is not None else 300
self.logger = logging.getLogger(self.__class__.__name__)
self.last_timeout_success = False

def verify(self, sample_dir: Path) -> bool:
"""
Expand All @@ -61,6 +62,7 @@ def verify(self, sample_dir: Path) -> bool:
Returns:
True if all checks pass, False otherwise
"""
self.last_timeout_success = False
try:
# Stage 1: file structure check
if not self._basic.verify(sample_dir):
Expand All @@ -72,16 +74,25 @@ def verify(self, sample_dir: Path) -> bool:
targets = subgraph_dirs if subgraph_dirs else [sample_dir]

for target in targets:
if not self._run_forward(target):
ok, is_timeout = self._run_forward(target)
if not ok:
return False
if is_timeout:
self.last_timeout_success = True

return True

except Exception as e:
raise VerificationError(f"Forward verification failed: {e}") from e

def _run_forward(self, model_path: Path) -> bool:
"""Run an eager forward pass on one model directory in a subprocess."""
def _run_forward(self, model_path: Path) -> tuple[bool, bool]:
"""Run an eager forward pass on one model directory in a subprocess.

Returns:
(success, is_timeout): success=True means the check passed;
is_timeout=True means it passed only because
the subprocess timed out (treated as skip).
"""
self.logger.info(f"Forward verify (eager): {model_path.name}")
try:
result = subprocess.run(
Expand All @@ -92,14 +103,15 @@ def _run_forward(self, model_path: Path) -> bool:
)
if result.returncode == 0:
self.logger.info(f"Forward verify OK: {model_path.name}")
return True
return True, False
else:
self.logger.warning(
f"Forward verify FAIL: {model_path.name}\n{result.stderr[-2000:]}"
)
return False
return False, False
except subprocess.TimeoutExpired:
self.logger.warning(
f"Forward verify TIMEOUT ({self.timeout}s): {model_path.name}"
f"Forward verify TIMEOUT ({self.timeout}s): {model_path.name}, "
"treating as pass (skip verification for large models on CPU)"
)
return False
return True, True
Loading