Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion examples/ablation/diagnostics/public_scale_20260702.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,26 @@ Manual large-tier shard from BEIR/MS MARCO passage validation:
| Docs | Queries | MRR@10 | R@5 | R@10 | Hit@10 | Build | Search |
|-----:|--------:|-------:|----:|-----:|-------:|------:|-------:|
| 100,000 | 50 | 0.673 | 0.740 | 0.770 | 39/50 | 81.9s | 5.4s |
| 1,000,000 | 50 | 0.462 | 0.543 | 0.580 | 30/50 | 1913.3s | 69.9s |

The local artifacts are gitignored:

- `tests/benchmark/data/msmarco_passage.json` - 511 KB manifest
- `tests/benchmark/data/msmarco_passage.corpus.jsonl` - 35 MB corpus shard
- `tests/benchmark/data/msmarco_passage.corpus.jsonl` - 35 MB at 100k, 361 MB at 1M

## Interpretation

- Search latency remains usable at 171k docs: 5.2s over 10 queries.
- MS MARCO confirms the large-tier path on a web passage corpus: 100k docs,
50 queries, 5.4s total search, and 0.673 MRR@10 without embeddings or reranking.
- MS MARCO 1M is now proven end-to-end, but it is a heavy manual tier:
31.9 minutes build time and 69.9s total search for 50 queries.
- The main large-corpus bottleneck is still initial FTS/index build, not retrieval.
- Avoiding unnecessary FTS deletes for newly inserted nodes reduced full FiQA build time by about 9.9x.
- Raising benchmark ingest batches to 20k reduced full TREC-COVID build time by about 2.7x.
- `--corpus-limit` provides practical staged scale gates while preserving selected query gold docs.
- Long 1M runs should use ingest progress output; without it the build phase is
too quiet for practical monitoring.

## Guard Policy

Expand Down
32 changes: 31 additions & 1 deletion examples/ablation/run_tier1_benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ async def run_one(
embed_batch: int = 256,
ingest_batch: int = 20000,
corpus_limit: int | None = None,
progress_every: int = 100000,
) -> Report:
if not ds.path.exists():
raise FileNotFoundError(
Expand Down Expand Up @@ -316,6 +317,18 @@ async def run_one(
# ``graph.add()`` accepts an ``embedding`` arg; if we pass it we
# avoid the per-node single embed call that bottlenecks at batch=1.
items = _load_corpus_items(data, ds.path, qrels, query_items, corpus_limit)
total_items = len(items)

def maybe_print_progress(done: int, start: float) -> None:
if progress_every <= 0:
return
if done < total_items and done % progress_every != 0:
return
elapsed = time.perf_counter() - start
print(
f" ingest: {done:,}/{total_items:,} docs ({elapsed:.1f}s)",
flush=True,
)

embeddings: list[list[float] | None] = [None] * len(items)
if embedder is not None:
Expand All @@ -327,8 +340,10 @@ async def run_one(
embeddings[i + j] = v if v else None

save_nodes_batch = getattr(backend, "save_nodes_batch", None)
t_ingest = time.perf_counter()
if phrase_extractor is None and callable(save_nodes_batch):
for i in range(0, len(items), ingest_batch):
done = min(i + ingest_batch, total_items)
batch = [
Node(
id=_benchmark_node_id(doc_id),
Expand All @@ -344,15 +359,17 @@ async def run_one(
)
]
await save_nodes_batch(batch)
maybe_print_progress(done, t_ingest)
else:
for (doc_id, title, text), emb in zip(items, embeddings):
for idx, ((doc_id, title, text), emb) in enumerate(zip(items, embeddings), start=1):
await graph.add(
title=title,
content=text,
properties={"doc_id": doc_id},
embedding=emb,
record_memory_event=False,
)
maybe_print_progress(idx, t_ingest)

# Post-hoc DF-filtered entity linking (opt-in via --entity-linker).
# Runs AFTER ingest because the DF filter needs global corpus
Expand Down Expand Up @@ -445,6 +462,7 @@ def _emit_markdown(
entity_linker_label: str = "none",
corpus_limit: int | None = None,
ingest_batch: int = 20000,
progress_every: int = 100000,
) -> Path:
OUT_DIR.mkdir(parents=True, exist_ok=True)
stamp = time.strftime("%Y%m%d_%H%M%S")
Expand All @@ -456,6 +474,7 @@ def _emit_markdown(
f"- Subset: {subset if subset else 'full'}",
f"- Corpus limit: {corpus_limit if corpus_limit else 'full'}",
f"- Ingest batch: {ingest_batch}",
f"- Progress every: {progress_every if progress_every > 0 else 'disabled'}",
f"- Embedder: {embedder_label}",
f"- Reranker: {reranker_label}",
f"- Decomposer: {decomposer_label}",
Expand Down Expand Up @@ -575,6 +594,14 @@ async def amain(argv: list[str]) -> int:
default=20000,
help="Batch size for benchmark corpus node writes (default: 20000).",
)
p.add_argument(
"--progress-every",
type=int,
default=100000,
help=(
"Print ingest progress every N docs during build (default: 100000; set 0 to disable)."
),
)
p.add_argument(
"--corpus-limit",
type=int,
Expand Down Expand Up @@ -766,6 +793,7 @@ async def amain(argv: list[str]) -> int:
print(f" entity linker: {entity_linker_label}")
print(f" corpus limit: {args.corpus_limit if args.corpus_limit else 'full'}")
print(f" ingest batch: {args.ingest_batch}")
print(f" progress every: {args.progress_every if args.progress_every > 0 else 'disabled'}")
if embedder is not None:
print(f" embed batch: {args.embed_batch}")
print()
Expand All @@ -788,6 +816,7 @@ async def amain(argv: list[str]) -> int:
embed_batch=args.embed_batch,
ingest_batch=args.ingest_batch,
corpus_limit=args.corpus_limit,
progress_every=args.progress_every,
)
except FileNotFoundError as e:
print(f"{ds.name:<24} SKIP — {e}")
Expand All @@ -810,6 +839,7 @@ async def amain(argv: list[str]) -> int:
entity_linker_label=entity_linker_label,
corpus_limit=args.corpus_limit,
ingest_batch=args.ingest_batch,
progress_every=args.progress_every,
)
print()
print(f"Markdown report → {out.relative_to(REPO_ROOT)}")
Expand Down
31 changes: 31 additions & 0 deletions tests/test_tier1_benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,37 @@ async def test_jsonl_corpus_limit_keeps_selected_query_gold_docs(tmp_path):
assert report.recall_at_10 == 1.0


@pytest.mark.asyncio
async def test_progress_every_reports_ingest_progress(tmp_path, capsys):
path = tmp_path / "tiny_bench.json"
path.write_text(
json.dumps(
{
"corpus": {
"gold_doc": {"title": "Gold", "text": "needle targetterm"},
"filler": {"title": "Filler", "text": "unrelated alpha"},
},
"queries": {"q1": "targetterm"},
"qrels": {"q1": {"gold_doc": 1}},
}
),
encoding="utf-8",
)

await runner.run_one(
runner.Dataset(name="Tiny", path=path, reference="unit"),
subset=1,
corpus_limit=2,
ingest_batch=1,
progress_every=1,
)

output = capsys.readouterr().out

assert "ingest: 1/2 docs" in output
assert "ingest: 2/2 docs" in output


def test_threshold_violations_report_scale_regressions():
report = runner.Report(
name="Tiny",
Expand Down
Loading