Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ examples/benchmark_vs_competitors/hipporag_bench/
*.db
*.db-shm
*.db-wal
*.db.tier1.json
# Local Claude Code session transcripts synced to H100 etc.
.local-claude/
temp/
Expand Down
7 changes: 7 additions & 0 deletions examples/ablation/diagnostics/public_scale_20260702.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ PYTHONUNBUFFERED=1 uv run --extra sqlite python examples/ablation/run_tier1_benc

uv run --extra eval python examples/ablation/download_benchmarks.py --only msmarco_passage --large-corpus-limit 1000000
PYTHONUNBUFFERED=1 uv run --extra sqlite python examples/ablation/run_tier1_benchmarks.py --only msmarco --subset 50 --corpus-limit 1000000 --use-sqlite-graph

# Persistent 1M DB for repeat runs after the initial build:
PYTHONUNBUFFERED=1 uv run --extra sqlite python examples/ablation/run_tier1_benchmarks.py --only msmarco --subset 50 --corpus-limit 1000000 --use-sqlite-graph --sqlite-db-path tests/benchmark/data/msmarco_1m.db --overwrite-sqlite-db
PYTHONUNBUFFERED=1 uv run --extra sqlite python examples/ablation/run_tier1_benchmarks.py --only msmarco --subset 50 --corpus-limit 1000000 --use-sqlite-graph --sqlite-db-path tests/benchmark/data/msmarco_1m.db --reuse-sqlite-db
```

## FiQA Results
Expand Down Expand Up @@ -86,6 +90,9 @@ The local artifacts are gitignored:
- `--corpus-limit` provides practical staged scale gates while preserving selected query gold docs.
- Long 1M runs should use ingest progress output; without it the build phase is
too quiet for practical monitoring.
- Repeat 1M runs should use `--sqlite-db-path` + `--reuse-sqlite-db`; the first
run still pays the materialization cost, but follow-up searches can skip the
31.9 minute ingest/index phase after sidecar metadata validation.

## Guard Policy

Expand Down
240 changes: 223 additions & 17 deletions examples/ablation/run_tier1_benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,38 @@
OUT_DIR = Path(__file__).parent / "diagnostics"

TOP_K = 10
REUSE_META_VERSION = 1


def _benchmark_node_id(doc_id: str) -> str:
digest = hashlib.blake2b(doc_id.encode("utf-8"), digest_size=16).hexdigest()
return f"bench_{digest}"


def _display_path(path: Path) -> str:
try:
return str(path.relative_to(REPO_ROOT))
except ValueError:
return str(path)


def _sqlite_reuse_meta_path(db_path: Path) -> Path:
return db_path.with_name(f"{db_path.name}.tier1.json")


def _remove_sqlite_artifacts(db_path: Path) -> None:
for path in (
db_path,
db_path.with_name(f"{db_path.name}-shm"),
db_path.with_name(f"{db_path.name}-wal"),
db_path.with_name(f"{db_path.name}.hnsw"),
db_path.with_name(f"{db_path.name}.hnsw.meta.json"),
_sqlite_reuse_meta_path(db_path),
):
if path.exists():
path.unlink()


@dataclass
class Dataset:
name: str
Expand Down Expand Up @@ -233,6 +258,83 @@ def _load_corpus_items(
return _load_inline_corpus_items(data["corpus"], qrels, query_items, corpus_limit)


def _object_signature(obj: object | None) -> str:
if obj is None:
return "none"
return f"{type(obj).__module__}.{type(obj).__qualname__}"


def _reuse_signature(
ds: Dataset,
data: dict,
*,
corpus_limit: int | None,
embedder: object | None,
reranker: object | None,
decomposer: object | None,
phrase_extractor: object | None,
entity_linker_cfg: tuple[int, float] | None,
) -> dict[str, object]:
if data.get("schema") == "beir_jsonl_v1":
corpus_size = int(data.get("corpus_size") or 0)
else:
corpus_size = len(data.get("corpus", {}))
return {
"version": REUSE_META_VERSION,
"dataset": ds.name,
"dataset_path": _display_path(ds.path),
"schema": str(data.get("schema") or "inline_json_v1"),
"source": str(data.get("source") or ""),
"corpus_path": str(data.get("corpus_path") or ""),
"manifest_corpus_size": corpus_size,
"corpus_limit": int(corpus_limit or 0),
"benchmark_node_id": "blake2b16:bench_",
"embedder": _object_signature(embedder),
"reranker": _object_signature(reranker),
"decomposer": _object_signature(decomposer),
"phrase_extractor": _object_signature(phrase_extractor),
"entity_linker_cfg": list(entity_linker_cfg) if entity_linker_cfg else [],
}


def _reuse_meta_mismatches(
existing: dict,
expected: dict[str, object],
) -> list[str]:
mismatches: list[str] = []
for key, expected_value in expected.items():
actual = existing.get(key)
if actual != expected_value:
mismatches.append(f"{key}: {actual!r} != {expected_value!r}")
return mismatches


def _write_reuse_meta(
db_path: Path,
expected: dict[str, object],
*,
node_count: int,
) -> None:
meta = {
**expected,
"node_count": node_count,
"created_at": time.strftime("%Y-%m-%d %H:%M:%S %Z"),
}
_sqlite_reuse_meta_path(db_path).write_text(
json.dumps(meta, ensure_ascii=False, indent=2, sort_keys=True),
encoding="utf-8",
)


def _read_reuse_meta(db_path: Path) -> dict:
path = _sqlite_reuse_meta_path(db_path)
if not path.exists():
raise FileNotFoundError(
f"{path} missing; build the benchmark DB once without --reuse-sqlite-db"
)
return json.loads(path.read_text(encoding="utf-8"))


def _reciprocal_rank(retrieved: list[str], relevant: set[str]) -> float:
for i, did in enumerate(retrieved):
if did in relevant:
Expand Down Expand Up @@ -275,7 +377,18 @@ async def run_one(
ingest_batch: int = 20000,
corpus_limit: int | None = None,
progress_every: int = 100000,
sqlite_db_path: Path | None = None,
reuse_sqlite_db: bool = False,
overwrite_sqlite_db: bool = False,
) -> Report:
if sqlite_db_path is not None and not use_sqlite_graph:
raise ValueError("--sqlite-db-path requires --use-sqlite-graph")
if reuse_sqlite_db and sqlite_db_path is None:
raise ValueError("--reuse-sqlite-db requires --sqlite-db-path")
if overwrite_sqlite_db and sqlite_db_path is None:
raise ValueError("--overwrite-sqlite-db requires --sqlite-db-path")
if reuse_sqlite_db and overwrite_sqlite_db:
raise ValueError("--reuse-sqlite-db and --overwrite-sqlite-db are mutually exclusive")
if not ds.path.exists():
raise FileNotFoundError(
f"{ds.path} missing. Run: python examples/ablation/download_benchmarks.py"
Expand All @@ -290,18 +403,38 @@ async def run_one(
if subset is not None and subset < len(query_items):
query_items = query_items[:subset]

reuse_signature = _reuse_signature(
ds,
data,
corpus_limit=corpus_limit,
embedder=embedder,
reranker=reranker,
decomposer=decomposer,
phrase_extractor=phrase_extractor,
entity_linker_cfg=entity_linker_cfg,
)

# Build the graph once for the whole dataset.
t_build = time.perf_counter()
tmp_db_path: str | None = None
if use_sqlite_graph:
tmp_db = tempfile.NamedTemporaryFile(
prefix=f"tier1_{ds.name.replace(' ', '_')}_",
suffix=".db",
delete=False,
)
tmp_db.close()
tmp_db_path = tmp_db.name
backend = SqliteGraphBackend(tmp_db_path)
if sqlite_db_path is not None:
if overwrite_sqlite_db:
_remove_sqlite_artifacts(sqlite_db_path)
elif sqlite_db_path.exists() and not reuse_sqlite_db:
raise FileExistsError(
f"{sqlite_db_path} exists; pass --reuse-sqlite-db to reuse it "
"or --overwrite-sqlite-db to rebuild it"
)
sqlite_db_path.parent.mkdir(parents=True, exist_ok=True)
backend = SqliteGraphBackend(str(sqlite_db_path))
else:
tmp_db = tempfile.NamedTemporaryFile(
prefix=f"tier1_{ds.name.replace(' ', '_')}_",
suffix=".db",
delete=False,
)
tmp_db.close()
backend = SqliteGraphBackend(tmp_db.name)
else:
backend = MemoryBackend()
await backend.connect()
Expand All @@ -313,10 +446,28 @@ async def run_one(
phrase_extractor=phrase_extractor,
)

# Pre-compute embeddings in large batches (GPU-friendly).
# ``graph.add()`` accepts an ``embedding`` arg; if we pass it we
# avoid the per-node single embed call that bottlenecks at batch=1.
items = _load_corpus_items(data, ds.path, qrels, query_items, corpus_limit)
reused_sqlite = False
items: list[CorpusItem] = []
if reuse_sqlite_db:
assert sqlite_db_path is not None
existing_meta = _read_reuse_meta(sqlite_db_path)
mismatches = _reuse_meta_mismatches(existing_meta, reuse_signature)
if mismatches:
joined = "; ".join(mismatches)
raise ValueError(f"{sqlite_db_path} reuse metadata mismatch: {joined}")
n_docs = int(existing_meta.get("node_count") or await backend.count_nodes())
print(
f" reuse sqlite db: {_display_path(sqlite_db_path)} ({n_docs:,} docs)",
flush=True,
)
reused_sqlite = True
else:
# Pre-compute embeddings in large batches (GPU-friendly).
# ``graph.add()`` accepts an ``embedding`` arg; if we pass it we
# avoid the per-node single embed call that bottlenecks at batch=1.
items = _load_corpus_items(data, ds.path, qrels, query_items, corpus_limit)
n_docs = len(items)

total_items = len(items)

def maybe_print_progress(done: int, start: float) -> None:
Expand All @@ -331,7 +482,7 @@ def maybe_print_progress(done: int, start: float) -> None:
)

embeddings: list[list[float] | None] = [None] * len(items)
if embedder is not None:
if not reused_sqlite and embedder is not None:
embed_inputs = [f"{title}\n{(text or '')[:1500]}" for _doc_id, title, text in items]
for i in range(0, len(embed_inputs), embed_batch):
chunk = embed_inputs[i : i + embed_batch]
Expand All @@ -341,7 +492,9 @@ def maybe_print_progress(done: int, start: float) -> None:

save_nodes_batch = getattr(backend, "save_nodes_batch", None)
t_ingest = time.perf_counter()
if phrase_extractor is None and callable(save_nodes_batch):
if reused_sqlite:
pass
elif phrase_extractor is None and callable(save_nodes_batch):
for i in range(0, len(items), ingest_batch):
done = min(i + ingest_batch, total_items)
batch = [
Expand Down Expand Up @@ -375,7 +528,7 @@ def maybe_print_progress(done: int, start: float) -> None:
# Runs AFTER ingest because the DF filter needs global corpus
# statistics. Typically 5-20× cheaper than inline phrase-extractor
# because it uses batch writes and skips per-node re-hash.
if entity_linker_cfg is not None:
if entity_linker_cfg is not None and not reused_sqlite:
from synaptic.extensions.domain_profile import DomainProfile
from synaptic.extensions.entity_linker import EntityLinker
from synaptic.extensions.phrase_extractor import PhraseExtractor
Expand Down Expand Up @@ -406,6 +559,8 @@ def maybe_print_progress(done: int, start: float) -> None:
print(f" top-DF: {top5}")

build_sec = time.perf_counter() - t_build
if sqlite_db_path is not None and not reused_sqlite:
_write_reuse_meta(sqlite_db_path, reuse_signature, node_count=n_docs)

mrr_total = 0.0
r5_total = 0.0
Expand Down Expand Up @@ -435,7 +590,7 @@ def maybe_print_progress(done: int, start: float) -> None:
n = max(len(query_items), 1)
report = Report(
name=ds.name,
n_docs=len(items),
n_docs=n_docs,
n_queries=len(query_items),
mrr=mrr_total / n,
recall_at_5=r5_total / n,
Expand Down Expand Up @@ -463,6 +618,8 @@ def _emit_markdown(
corpus_limit: int | None = None,
ingest_batch: int = 20000,
progress_every: int = 100000,
sqlite_db_path: Path | None = None,
reuse_sqlite_db: bool = False,
) -> Path:
OUT_DIR.mkdir(parents=True, exist_ok=True)
stamp = time.strftime("%Y%m%d_%H%M%S")
Expand All @@ -475,6 +632,8 @@ def _emit_markdown(
f"- Corpus limit: {corpus_limit if corpus_limit else 'full'}",
f"- Ingest batch: {ingest_batch}",
f"- Progress every: {progress_every if progress_every > 0 else 'disabled'}",
f"- SQLite DB path: {_display_path(sqlite_db_path) if sqlite_db_path else 'temporary'}",
f"- SQLite DB reuse: {'yes' if reuse_sqlite_db else 'no'}",
f"- Embedder: {embedder_label}",
f"- Reranker: {reranker_label}",
f"- Decomposer: {decomposer_label}",
Expand Down Expand Up @@ -580,6 +739,31 @@ async def amain(argv: list[str]) -> int:
help="Use SqliteGraphBackend (usearch HNSW) instead of MemoryBackend. "
"Required for fast vector search at corpus sizes > 5k.",
)
p.add_argument(
"--sqlite-db-path",
type=Path,
default=None,
help=(
"Persistent SQLite DB path for SqliteGraphBackend. Use with "
"--use-sqlite-graph to keep a built large-corpus index."
),
)
p.add_argument(
"--reuse-sqlite-db",
action="store_true",
help=(
"Reuse an existing --sqlite-db-path and skip corpus ingest after "
"validating its tier1 sidecar metadata."
),
)
p.add_argument(
"--overwrite-sqlite-db",
action="store_true",
help=(
"Delete an existing --sqlite-db-path and its tier1 sidecar before "
"building. Mutually exclusive with --reuse-sqlite-db."
),
)
p.add_argument(
"--embed-batch",
type=int,
Expand Down Expand Up @@ -692,6 +876,14 @@ async def amain(argv: list[str]) -> int:
"('American', 'French') don't form super-hubs.",
)
args = p.parse_args(argv)
if args.sqlite_db_path is not None and not args.use_sqlite_graph:
raise SystemExit("--sqlite-db-path requires --use-sqlite-graph")
if args.reuse_sqlite_db and args.sqlite_db_path is None:
raise SystemExit("--reuse-sqlite-db requires --sqlite-db-path")
if args.overwrite_sqlite_db and args.sqlite_db_path is None:
raise SystemExit("--overwrite-sqlite-db requires --sqlite-db-path")
if args.reuse_sqlite_db and args.overwrite_sqlite_db:
raise SystemExit("--reuse-sqlite-db and --overwrite-sqlite-db are mutually exclusive")

embedder: EmbeddingProvider | None = None
embedder_label = "none (FTS-only baseline)"
Expand Down Expand Up @@ -794,6 +986,15 @@ async def amain(argv: list[str]) -> int:
print(f" corpus limit: {args.corpus_limit if args.corpus_limit else 'full'}")
print(f" ingest batch: {args.ingest_batch}")
print(f" progress every: {args.progress_every if args.progress_every > 0 else 'disabled'}")
print(
f" sqlite db: {_display_path(args.sqlite_db_path) if args.sqlite_db_path else 'temporary'}"
)
if args.sqlite_db_path:
print(
" sqlite reuse: "
f"{'yes' if args.reuse_sqlite_db else 'no'}"
f"{' (overwrite)' if args.overwrite_sqlite_db else ''}"
)
if embedder is not None:
print(f" embed batch: {args.embed_batch}")
print()
Expand All @@ -817,6 +1018,9 @@ async def amain(argv: list[str]) -> int:
ingest_batch=args.ingest_batch,
corpus_limit=args.corpus_limit,
progress_every=args.progress_every,
sqlite_db_path=args.sqlite_db_path,
reuse_sqlite_db=args.reuse_sqlite_db,
overwrite_sqlite_db=args.overwrite_sqlite_db,
)
except FileNotFoundError as e:
print(f"{ds.name:<24} SKIP — {e}")
Expand All @@ -840,6 +1044,8 @@ async def amain(argv: list[str]) -> int:
corpus_limit=args.corpus_limit,
ingest_batch=args.ingest_batch,
progress_every=args.progress_every,
sqlite_db_path=args.sqlite_db_path,
reuse_sqlite_db=args.reuse_sqlite_db,
)
print()
print(f"Markdown report → {out.relative_to(REPO_ROOT)}")
Expand Down
Loading
Loading