diff --git a/examples/ablation/diagnostics/public_scale_20260702.md b/examples/ablation/diagnostics/public_scale_20260702.md index 9ca4b94..8da4151 100644 --- a/examples/ablation/diagnostics/public_scale_20260702.md +++ b/examples/ablation/diagnostics/public_scale_20260702.md @@ -66,21 +66,26 @@ Manual large-tier shard from BEIR/MS MARCO passage validation: | Docs | Queries | MRR@10 | R@5 | R@10 | Hit@10 | Build | Search | |-----:|--------:|-------:|----:|-----:|-------:|------:|-------:| | 100,000 | 50 | 0.673 | 0.740 | 0.770 | 39/50 | 81.9s | 5.4s | +| 1,000,000 | 50 | 0.462 | 0.543 | 0.580 | 30/50 | 1913.3s | 69.9s | The local artifacts are gitignored: - `tests/benchmark/data/msmarco_passage.json` - 511 KB manifest -- `tests/benchmark/data/msmarco_passage.corpus.jsonl` - 35 MB corpus shard +- `tests/benchmark/data/msmarco_passage.corpus.jsonl` - 35 MB at 100k, 361 MB at 1M ## Interpretation - Search latency remains usable at 171k docs: 5.2s over 10 queries. - MS MARCO confirms the large-tier path on a web passage corpus: 100k docs, 50 queries, 5.4s total search, and 0.673 MRR@10 without embeddings or reranking. +- MS MARCO 1M is now proven end-to-end, but it is a heavy manual tier: + 31.9 minutes build time and 69.9s total search for 50 queries. - The main large-corpus bottleneck is still initial FTS/index build, not retrieval. - Avoiding unnecessary FTS deletes for newly inserted nodes reduced full FiQA build time by about 9.9x. - Raising benchmark ingest batches to 20k reduced full TREC-COVID build time by about 2.7x. - `--corpus-limit` provides practical staged scale gates while preserving selected query gold docs. +- Long 1M runs should use ingest progress output; without it the build phase is + too quiet for practical monitoring. ## Guard Policy diff --git a/examples/ablation/run_tier1_benchmarks.py b/examples/ablation/run_tier1_benchmarks.py index 2fdba61..5708a99 100644 --- a/examples/ablation/run_tier1_benchmarks.py +++ b/examples/ablation/run_tier1_benchmarks.py @@ -274,6 +274,7 @@ async def run_one( embed_batch: int = 256, ingest_batch: int = 20000, corpus_limit: int | None = None, + progress_every: int = 100000, ) -> Report: if not ds.path.exists(): raise FileNotFoundError( @@ -316,6 +317,18 @@ async def run_one( # ``graph.add()`` accepts an ``embedding`` arg; if we pass it we # avoid the per-node single embed call that bottlenecks at batch=1. items = _load_corpus_items(data, ds.path, qrels, query_items, corpus_limit) + total_items = len(items) + + def maybe_print_progress(done: int, start: float) -> None: + if progress_every <= 0: + return + if done < total_items and done % progress_every != 0: + return + elapsed = time.perf_counter() - start + print( + f" ingest: {done:,}/{total_items:,} docs ({elapsed:.1f}s)", + flush=True, + ) embeddings: list[list[float] | None] = [None] * len(items) if embedder is not None: @@ -327,8 +340,10 @@ async def run_one( embeddings[i + j] = v if v else None save_nodes_batch = getattr(backend, "save_nodes_batch", None) + t_ingest = time.perf_counter() if phrase_extractor is None and callable(save_nodes_batch): for i in range(0, len(items), ingest_batch): + done = min(i + ingest_batch, total_items) batch = [ Node( id=_benchmark_node_id(doc_id), @@ -344,8 +359,9 @@ async def run_one( ) ] await save_nodes_batch(batch) + maybe_print_progress(done, t_ingest) else: - for (doc_id, title, text), emb in zip(items, embeddings): + for idx, ((doc_id, title, text), emb) in enumerate(zip(items, embeddings), start=1): await graph.add( title=title, content=text, @@ -353,6 +369,7 @@ async def run_one( embedding=emb, record_memory_event=False, ) + maybe_print_progress(idx, t_ingest) # Post-hoc DF-filtered entity linking (opt-in via --entity-linker). # Runs AFTER ingest because the DF filter needs global corpus @@ -445,6 +462,7 @@ def _emit_markdown( entity_linker_label: str = "none", corpus_limit: int | None = None, ingest_batch: int = 20000, + progress_every: int = 100000, ) -> Path: OUT_DIR.mkdir(parents=True, exist_ok=True) stamp = time.strftime("%Y%m%d_%H%M%S") @@ -456,6 +474,7 @@ def _emit_markdown( f"- Subset: {subset if subset else 'full'}", f"- Corpus limit: {corpus_limit if corpus_limit else 'full'}", f"- Ingest batch: {ingest_batch}", + f"- Progress every: {progress_every if progress_every > 0 else 'disabled'}", f"- Embedder: {embedder_label}", f"- Reranker: {reranker_label}", f"- Decomposer: {decomposer_label}", @@ -575,6 +594,14 @@ async def amain(argv: list[str]) -> int: default=20000, help="Batch size for benchmark corpus node writes (default: 20000).", ) + p.add_argument( + "--progress-every", + type=int, + default=100000, + help=( + "Print ingest progress every N docs during build (default: 100000; set 0 to disable)." + ), + ) p.add_argument( "--corpus-limit", type=int, @@ -766,6 +793,7 @@ async def amain(argv: list[str]) -> int: print(f" entity linker: {entity_linker_label}") print(f" corpus limit: {args.corpus_limit if args.corpus_limit else 'full'}") print(f" ingest batch: {args.ingest_batch}") + print(f" progress every: {args.progress_every if args.progress_every > 0 else 'disabled'}") if embedder is not None: print(f" embed batch: {args.embed_batch}") print() @@ -788,6 +816,7 @@ async def amain(argv: list[str]) -> int: embed_batch=args.embed_batch, ingest_batch=args.ingest_batch, corpus_limit=args.corpus_limit, + progress_every=args.progress_every, ) except FileNotFoundError as e: print(f"{ds.name:<24} SKIP — {e}") @@ -810,6 +839,7 @@ async def amain(argv: list[str]) -> int: entity_linker_label=entity_linker_label, corpus_limit=args.corpus_limit, ingest_batch=args.ingest_batch, + progress_every=args.progress_every, ) print() print(f"Markdown report → {out.relative_to(REPO_ROOT)}") diff --git a/tests/test_tier1_benchmarks.py b/tests/test_tier1_benchmarks.py index 0f6d7e8..d3959bc 100644 --- a/tests/test_tier1_benchmarks.py +++ b/tests/test_tier1_benchmarks.py @@ -106,6 +106,37 @@ async def test_jsonl_corpus_limit_keeps_selected_query_gold_docs(tmp_path): assert report.recall_at_10 == 1.0 +@pytest.mark.asyncio +async def test_progress_every_reports_ingest_progress(tmp_path, capsys): + path = tmp_path / "tiny_bench.json" + path.write_text( + json.dumps( + { + "corpus": { + "gold_doc": {"title": "Gold", "text": "needle targetterm"}, + "filler": {"title": "Filler", "text": "unrelated alpha"}, + }, + "queries": {"q1": "targetterm"}, + "qrels": {"q1": {"gold_doc": 1}}, + } + ), + encoding="utf-8", + ) + + await runner.run_one( + runner.Dataset(name="Tiny", path=path, reference="unit"), + subset=1, + corpus_limit=2, + ingest_batch=1, + progress_every=1, + ) + + output = capsys.readouterr().out + + assert "ingest: 1/2 docs" in output + assert "ingest: 2/2 docs" in output + + def test_threshold_violations_report_scale_regressions(): report = runner.Report( name="Tiny",