diff --git a/.gitignore b/.gitignore index 29ce6ce..dc8eed8 100644 --- a/.gitignore +++ b/.gitignore @@ -22,6 +22,7 @@ examples/benchmark_vs_competitors/hipporag_bench/ *.db *.db-shm *.db-wal +*.db.tier1.json # Local Claude Code session transcripts synced to H100 etc. .local-claude/ temp/ diff --git a/examples/ablation/diagnostics/public_scale_20260702.md b/examples/ablation/diagnostics/public_scale_20260702.md index 8da4151..f902863 100644 --- a/examples/ablation/diagnostics/public_scale_20260702.md +++ b/examples/ablation/diagnostics/public_scale_20260702.md @@ -26,6 +26,10 @@ PYTHONUNBUFFERED=1 uv run --extra sqlite python examples/ablation/run_tier1_benc uv run --extra eval python examples/ablation/download_benchmarks.py --only msmarco_passage --large-corpus-limit 1000000 PYTHONUNBUFFERED=1 uv run --extra sqlite python examples/ablation/run_tier1_benchmarks.py --only msmarco --subset 50 --corpus-limit 1000000 --use-sqlite-graph + +# Persistent 1M DB for repeat runs after the initial build: +PYTHONUNBUFFERED=1 uv run --extra sqlite python examples/ablation/run_tier1_benchmarks.py --only msmarco --subset 50 --corpus-limit 1000000 --use-sqlite-graph --sqlite-db-path tests/benchmark/data/msmarco_1m.db --overwrite-sqlite-db +PYTHONUNBUFFERED=1 uv run --extra sqlite python examples/ablation/run_tier1_benchmarks.py --only msmarco --subset 50 --corpus-limit 1000000 --use-sqlite-graph --sqlite-db-path tests/benchmark/data/msmarco_1m.db --reuse-sqlite-db ``` ## FiQA Results @@ -86,6 +90,9 @@ The local artifacts are gitignored: - `--corpus-limit` provides practical staged scale gates while preserving selected query gold docs. - Long 1M runs should use ingest progress output; without it the build phase is too quiet for practical monitoring. +- Repeat 1M runs should use `--sqlite-db-path` + `--reuse-sqlite-db`; the first + run still pays the materialization cost, but follow-up searches can skip the + 31.9 minute ingest/index phase after sidecar metadata validation. ## Guard Policy diff --git a/examples/ablation/run_tier1_benchmarks.py b/examples/ablation/run_tier1_benchmarks.py index 5708a99..3516e62 100644 --- a/examples/ablation/run_tier1_benchmarks.py +++ b/examples/ablation/run_tier1_benchmarks.py @@ -73,6 +73,7 @@ OUT_DIR = Path(__file__).parent / "diagnostics" TOP_K = 10 +REUSE_META_VERSION = 1 def _benchmark_node_id(doc_id: str) -> str: @@ -80,6 +81,30 @@ def _benchmark_node_id(doc_id: str) -> str: return f"bench_{digest}" +def _display_path(path: Path) -> str: + try: + return str(path.relative_to(REPO_ROOT)) + except ValueError: + return str(path) + + +def _sqlite_reuse_meta_path(db_path: Path) -> Path: + return db_path.with_name(f"{db_path.name}.tier1.json") + + +def _remove_sqlite_artifacts(db_path: Path) -> None: + for path in ( + db_path, + db_path.with_name(f"{db_path.name}-shm"), + db_path.with_name(f"{db_path.name}-wal"), + db_path.with_name(f"{db_path.name}.hnsw"), + db_path.with_name(f"{db_path.name}.hnsw.meta.json"), + _sqlite_reuse_meta_path(db_path), + ): + if path.exists(): + path.unlink() + + @dataclass class Dataset: name: str @@ -233,6 +258,83 @@ def _load_corpus_items( return _load_inline_corpus_items(data["corpus"], qrels, query_items, corpus_limit) +def _object_signature(obj: object | None) -> str: + if obj is None: + return "none" + return f"{type(obj).__module__}.{type(obj).__qualname__}" + + +def _reuse_signature( + ds: Dataset, + data: dict, + *, + corpus_limit: int | None, + embedder: object | None, + reranker: object | None, + decomposer: object | None, + phrase_extractor: object | None, + entity_linker_cfg: tuple[int, float] | None, +) -> dict[str, object]: + if data.get("schema") == "beir_jsonl_v1": + corpus_size = int(data.get("corpus_size") or 0) + else: + corpus_size = len(data.get("corpus", {})) + return { + "version": REUSE_META_VERSION, + "dataset": ds.name, + "dataset_path": _display_path(ds.path), + "schema": str(data.get("schema") or "inline_json_v1"), + "source": str(data.get("source") or ""), + "corpus_path": str(data.get("corpus_path") or ""), + "manifest_corpus_size": corpus_size, + "corpus_limit": int(corpus_limit or 0), + "benchmark_node_id": "blake2b16:bench_", + "embedder": _object_signature(embedder), + "reranker": _object_signature(reranker), + "decomposer": _object_signature(decomposer), + "phrase_extractor": _object_signature(phrase_extractor), + "entity_linker_cfg": list(entity_linker_cfg) if entity_linker_cfg else [], + } + + +def _reuse_meta_mismatches( + existing: dict, + expected: dict[str, object], +) -> list[str]: + mismatches: list[str] = [] + for key, expected_value in expected.items(): + actual = existing.get(key) + if actual != expected_value: + mismatches.append(f"{key}: {actual!r} != {expected_value!r}") + return mismatches + + +def _write_reuse_meta( + db_path: Path, + expected: dict[str, object], + *, + node_count: int, +) -> None: + meta = { + **expected, + "node_count": node_count, + "created_at": time.strftime("%Y-%m-%d %H:%M:%S %Z"), + } + _sqlite_reuse_meta_path(db_path).write_text( + json.dumps(meta, ensure_ascii=False, indent=2, sort_keys=True), + encoding="utf-8", + ) + + +def _read_reuse_meta(db_path: Path) -> dict: + path = _sqlite_reuse_meta_path(db_path) + if not path.exists(): + raise FileNotFoundError( + f"{path} missing; build the benchmark DB once without --reuse-sqlite-db" + ) + return json.loads(path.read_text(encoding="utf-8")) + + def _reciprocal_rank(retrieved: list[str], relevant: set[str]) -> float: for i, did in enumerate(retrieved): if did in relevant: @@ -275,7 +377,18 @@ async def run_one( ingest_batch: int = 20000, corpus_limit: int | None = None, progress_every: int = 100000, + sqlite_db_path: Path | None = None, + reuse_sqlite_db: bool = False, + overwrite_sqlite_db: bool = False, ) -> Report: + if sqlite_db_path is not None and not use_sqlite_graph: + raise ValueError("--sqlite-db-path requires --use-sqlite-graph") + if reuse_sqlite_db and sqlite_db_path is None: + raise ValueError("--reuse-sqlite-db requires --sqlite-db-path") + if overwrite_sqlite_db and sqlite_db_path is None: + raise ValueError("--overwrite-sqlite-db requires --sqlite-db-path") + if reuse_sqlite_db and overwrite_sqlite_db: + raise ValueError("--reuse-sqlite-db and --overwrite-sqlite-db are mutually exclusive") if not ds.path.exists(): raise FileNotFoundError( f"{ds.path} missing. Run: python examples/ablation/download_benchmarks.py" @@ -290,18 +403,38 @@ async def run_one( if subset is not None and subset < len(query_items): query_items = query_items[:subset] + reuse_signature = _reuse_signature( + ds, + data, + corpus_limit=corpus_limit, + embedder=embedder, + reranker=reranker, + decomposer=decomposer, + phrase_extractor=phrase_extractor, + entity_linker_cfg=entity_linker_cfg, + ) + # Build the graph once for the whole dataset. t_build = time.perf_counter() - tmp_db_path: str | None = None if use_sqlite_graph: - tmp_db = tempfile.NamedTemporaryFile( - prefix=f"tier1_{ds.name.replace(' ', '_')}_", - suffix=".db", - delete=False, - ) - tmp_db.close() - tmp_db_path = tmp_db.name - backend = SqliteGraphBackend(tmp_db_path) + if sqlite_db_path is not None: + if overwrite_sqlite_db: + _remove_sqlite_artifacts(sqlite_db_path) + elif sqlite_db_path.exists() and not reuse_sqlite_db: + raise FileExistsError( + f"{sqlite_db_path} exists; pass --reuse-sqlite-db to reuse it " + "or --overwrite-sqlite-db to rebuild it" + ) + sqlite_db_path.parent.mkdir(parents=True, exist_ok=True) + backend = SqliteGraphBackend(str(sqlite_db_path)) + else: + tmp_db = tempfile.NamedTemporaryFile( + prefix=f"tier1_{ds.name.replace(' ', '_')}_", + suffix=".db", + delete=False, + ) + tmp_db.close() + backend = SqliteGraphBackend(tmp_db.name) else: backend = MemoryBackend() await backend.connect() @@ -313,10 +446,28 @@ async def run_one( phrase_extractor=phrase_extractor, ) - # Pre-compute embeddings in large batches (GPU-friendly). - # ``graph.add()`` accepts an ``embedding`` arg; if we pass it we - # avoid the per-node single embed call that bottlenecks at batch=1. - items = _load_corpus_items(data, ds.path, qrels, query_items, corpus_limit) + reused_sqlite = False + items: list[CorpusItem] = [] + if reuse_sqlite_db: + assert sqlite_db_path is not None + existing_meta = _read_reuse_meta(sqlite_db_path) + mismatches = _reuse_meta_mismatches(existing_meta, reuse_signature) + if mismatches: + joined = "; ".join(mismatches) + raise ValueError(f"{sqlite_db_path} reuse metadata mismatch: {joined}") + n_docs = int(existing_meta.get("node_count") or await backend.count_nodes()) + print( + f" reuse sqlite db: {_display_path(sqlite_db_path)} ({n_docs:,} docs)", + flush=True, + ) + reused_sqlite = True + else: + # Pre-compute embeddings in large batches (GPU-friendly). + # ``graph.add()`` accepts an ``embedding`` arg; if we pass it we + # avoid the per-node single embed call that bottlenecks at batch=1. + items = _load_corpus_items(data, ds.path, qrels, query_items, corpus_limit) + n_docs = len(items) + total_items = len(items) def maybe_print_progress(done: int, start: float) -> None: @@ -331,7 +482,7 @@ def maybe_print_progress(done: int, start: float) -> None: ) embeddings: list[list[float] | None] = [None] * len(items) - if embedder is not None: + if not reused_sqlite and embedder is not None: embed_inputs = [f"{title}\n{(text or '')[:1500]}" for _doc_id, title, text in items] for i in range(0, len(embed_inputs), embed_batch): chunk = embed_inputs[i : i + embed_batch] @@ -341,7 +492,9 @@ def maybe_print_progress(done: int, start: float) -> None: save_nodes_batch = getattr(backend, "save_nodes_batch", None) t_ingest = time.perf_counter() - if phrase_extractor is None and callable(save_nodes_batch): + if reused_sqlite: + pass + elif phrase_extractor is None and callable(save_nodes_batch): for i in range(0, len(items), ingest_batch): done = min(i + ingest_batch, total_items) batch = [ @@ -375,7 +528,7 @@ def maybe_print_progress(done: int, start: float) -> None: # Runs AFTER ingest because the DF filter needs global corpus # statistics. Typically 5-20× cheaper than inline phrase-extractor # because it uses batch writes and skips per-node re-hash. - if entity_linker_cfg is not None: + if entity_linker_cfg is not None and not reused_sqlite: from synaptic.extensions.domain_profile import DomainProfile from synaptic.extensions.entity_linker import EntityLinker from synaptic.extensions.phrase_extractor import PhraseExtractor @@ -406,6 +559,8 @@ def maybe_print_progress(done: int, start: float) -> None: print(f" top-DF: {top5}") build_sec = time.perf_counter() - t_build + if sqlite_db_path is not None and not reused_sqlite: + _write_reuse_meta(sqlite_db_path, reuse_signature, node_count=n_docs) mrr_total = 0.0 r5_total = 0.0 @@ -435,7 +590,7 @@ def maybe_print_progress(done: int, start: float) -> None: n = max(len(query_items), 1) report = Report( name=ds.name, - n_docs=len(items), + n_docs=n_docs, n_queries=len(query_items), mrr=mrr_total / n, recall_at_5=r5_total / n, @@ -463,6 +618,8 @@ def _emit_markdown( corpus_limit: int | None = None, ingest_batch: int = 20000, progress_every: int = 100000, + sqlite_db_path: Path | None = None, + reuse_sqlite_db: bool = False, ) -> Path: OUT_DIR.mkdir(parents=True, exist_ok=True) stamp = time.strftime("%Y%m%d_%H%M%S") @@ -475,6 +632,8 @@ def _emit_markdown( f"- Corpus limit: {corpus_limit if corpus_limit else 'full'}", f"- Ingest batch: {ingest_batch}", f"- Progress every: {progress_every if progress_every > 0 else 'disabled'}", + f"- SQLite DB path: {_display_path(sqlite_db_path) if sqlite_db_path else 'temporary'}", + f"- SQLite DB reuse: {'yes' if reuse_sqlite_db else 'no'}", f"- Embedder: {embedder_label}", f"- Reranker: {reranker_label}", f"- Decomposer: {decomposer_label}", @@ -580,6 +739,31 @@ async def amain(argv: list[str]) -> int: help="Use SqliteGraphBackend (usearch HNSW) instead of MemoryBackend. " "Required for fast vector search at corpus sizes > 5k.", ) + p.add_argument( + "--sqlite-db-path", + type=Path, + default=None, + help=( + "Persistent SQLite DB path for SqliteGraphBackend. Use with " + "--use-sqlite-graph to keep a built large-corpus index." + ), + ) + p.add_argument( + "--reuse-sqlite-db", + action="store_true", + help=( + "Reuse an existing --sqlite-db-path and skip corpus ingest after " + "validating its tier1 sidecar metadata." + ), + ) + p.add_argument( + "--overwrite-sqlite-db", + action="store_true", + help=( + "Delete an existing --sqlite-db-path and its tier1 sidecar before " + "building. Mutually exclusive with --reuse-sqlite-db." + ), + ) p.add_argument( "--embed-batch", type=int, @@ -692,6 +876,14 @@ async def amain(argv: list[str]) -> int: "('American', 'French') don't form super-hubs.", ) args = p.parse_args(argv) + if args.sqlite_db_path is not None and not args.use_sqlite_graph: + raise SystemExit("--sqlite-db-path requires --use-sqlite-graph") + if args.reuse_sqlite_db and args.sqlite_db_path is None: + raise SystemExit("--reuse-sqlite-db requires --sqlite-db-path") + if args.overwrite_sqlite_db and args.sqlite_db_path is None: + raise SystemExit("--overwrite-sqlite-db requires --sqlite-db-path") + if args.reuse_sqlite_db and args.overwrite_sqlite_db: + raise SystemExit("--reuse-sqlite-db and --overwrite-sqlite-db are mutually exclusive") embedder: EmbeddingProvider | None = None embedder_label = "none (FTS-only baseline)" @@ -794,6 +986,15 @@ async def amain(argv: list[str]) -> int: print(f" corpus limit: {args.corpus_limit if args.corpus_limit else 'full'}") print(f" ingest batch: {args.ingest_batch}") print(f" progress every: {args.progress_every if args.progress_every > 0 else 'disabled'}") + print( + f" sqlite db: {_display_path(args.sqlite_db_path) if args.sqlite_db_path else 'temporary'}" + ) + if args.sqlite_db_path: + print( + " sqlite reuse: " + f"{'yes' if args.reuse_sqlite_db else 'no'}" + f"{' (overwrite)' if args.overwrite_sqlite_db else ''}" + ) if embedder is not None: print(f" embed batch: {args.embed_batch}") print() @@ -817,6 +1018,9 @@ async def amain(argv: list[str]) -> int: ingest_batch=args.ingest_batch, corpus_limit=args.corpus_limit, progress_every=args.progress_every, + sqlite_db_path=args.sqlite_db_path, + reuse_sqlite_db=args.reuse_sqlite_db, + overwrite_sqlite_db=args.overwrite_sqlite_db, ) except FileNotFoundError as e: print(f"{ds.name:<24} SKIP — {e}") @@ -840,6 +1044,8 @@ async def amain(argv: list[str]) -> int: corpus_limit=args.corpus_limit, ingest_batch=args.ingest_batch, progress_every=args.progress_every, + sqlite_db_path=args.sqlite_db_path, + reuse_sqlite_db=args.reuse_sqlite_db, ) print() print(f"Markdown report → {out.relative_to(REPO_ROOT)}") diff --git a/tests/test_tier1_benchmarks.py b/tests/test_tier1_benchmarks.py index d3959bc..5c1b0db 100644 --- a/tests/test_tier1_benchmarks.py +++ b/tests/test_tier1_benchmarks.py @@ -137,6 +137,78 @@ async def test_progress_every_reports_ingest_progress(tmp_path, capsys): assert "ingest: 2/2 docs" in output +@pytest.mark.asyncio +async def test_sqlite_db_reuse_skips_reingest(tmp_path, capsys): + path = tmp_path / "tiny_bench.json" + db_path = tmp_path / "tiny.db" + path.write_text( + json.dumps( + { + "corpus": { + "gold_doc": {"title": "Gold", "text": "needle targetterm"}, + "filler": {"title": "Filler", "text": "unrelated alpha"}, + }, + "queries": {"q1": "targetterm"}, + "qrels": {"q1": {"gold_doc": 1}}, + } + ), + encoding="utf-8", + ) + + first = await runner.run_one( + runner.Dataset(name="Tiny", path=path, reference="unit"), + subset=1, + corpus_limit=2, + use_sqlite_graph=True, + sqlite_db_path=db_path, + progress_every=0, + ) + capsys.readouterr() + + # If the second run ingested from the source file again, the gold + # document would no longer match the query. Reuse should keep the + # already-built SQLite index intact. + path.write_text( + json.dumps( + { + "corpus": { + "gold_doc": {"title": "Gold", "text": "changed text"}, + "filler": {"title": "Filler", "text": "unrelated alpha"}, + }, + "queries": {"q1": "targetterm"}, + "qrels": {"q1": {"gold_doc": 1}}, + } + ), + encoding="utf-8", + ) + + second = await runner.run_one( + runner.Dataset(name="Tiny", path=path, reference="unit"), + subset=1, + corpus_limit=2, + use_sqlite_graph=True, + sqlite_db_path=db_path, + reuse_sqlite_db=True, + progress_every=0, + ) + output = capsys.readouterr().out + + assert first.hit_at_10 == 1 + assert second.hit_at_10 == 1 + assert second.n_docs == 2 + assert "reuse sqlite db:" in output + assert db_path.with_name(f"{db_path.name}.tier1.json").exists() + + +def test_reuse_meta_mismatches_report_changed_signature(): + mismatches = runner._reuse_meta_mismatches( + {"version": 1, "dataset": "Tiny", "corpus_limit": 2}, + {"version": 1, "dataset": "Tiny", "corpus_limit": 1}, + ) + + assert mismatches == ["corpus_limit: 2 != 1"] + + def test_threshold_violations_report_scale_regressions(): report = runner.Report( name="Tiny",