diff --git a/examples/ablation/diagnostics/public_scale_20260702.md b/examples/ablation/diagnostics/public_scale_20260702.md index 6fb3200..cc28698 100644 --- a/examples/ablation/diagnostics/public_scale_20260702.md +++ b/examples/ablation/diagnostics/public_scale_20260702.md @@ -74,11 +74,14 @@ Manual large-tier shard from BEIR/MS MARCO passage validation: | persistent SQLite build | 1,000,000 | 50 | 0.462 | 0.543 | 0.580 | 30/50 | 2184.3s | 71.0s | | persistent SQLite reuse | 1,000,000 | 50 | 0.462 | 0.543 | 0.580 | 30/50 | 0.0s | 70.1s | | persistent SQLite reuse + English query filter | 1,000,000 | 50 | 0.479 | 0.553 | 0.600 | 31/50 | 0.0s | 9.1s | +| persistent SQLite reuse + tag-filtered anchors | 1,000,000 | 50 | 0.479 | 0.553 | 0.600 | 31/50 | 0.0s | 7.5s | The local artifacts are gitignored: - `tests/benchmark/data/msmarco_passage.json` - 511 KB manifest - `tests/benchmark/data/msmarco_passage.corpus.jsonl` - 35 MB at 100k, 361 MB at 1M +- `tests/benchmark/data/msmarco_passage_5m.json` - 511 KB 5M manifest +- `tests/benchmark/data/msmarco_passage_5m.corpus.jsonl` - 1.8 GB, 5,000,000 rows - `tests/benchmark/data/msmarco_1m.db` - 1.2 GB persistent SQLite DB - `tests/benchmark/data/msmarco_1m.db.tier1.json` - 535 byte reuse sidecar @@ -105,6 +108,16 @@ The local artifacts are gitignored: (`how/is/the/of/to` etc.) before FTS5 `OR` matching. On the persistent 1M DB this reduced 50-query search time from 70.1s to 9.1s while improving MRR@10 from 0.462 to 0.479. +- QueryAnchor category loading now asks backends for nodes tagged `category` + instead of materializing the first 500 `CONCEPT` rows and filtering in + Python. On the persistent 1M MS MARCO DB, first anchor extraction dropped + from roughly 1.7-2.0s to 0.218s, and the 50-query reuse smoke improved from + 9.1s to 7.5s with unchanged quality. +- 5M MS MARCO corpus data is now locally available as a side-by-side shard. + Generate it with: + `uv run --extra eval python examples/ablation/download_benchmarks.py --only msmarco_passage --large-corpus-limit 5000000 --large-output-suffix _5m` + and run it with: + `uv run python examples/ablation/run_tier1_benchmarks.py --only msmarco --msmarco-path tests/benchmark/data/msmarco_passage_5m.json --corpus-limit 5000000 --use-sqlite-graph --sqlite-db-path tests/benchmark/data/msmarco_5m.db --overwrite-sqlite-db`. ## Guard Policy diff --git a/examples/ablation/diagnostics/tier1_20260702_131442.md b/examples/ablation/diagnostics/tier1_20260702_131442.md new file mode 100644 index 0000000..22b2bb7 --- /dev/null +++ b/examples/ablation/diagnostics/tier1_20260702_131442.md @@ -0,0 +1,23 @@ +# Tier-1 English retrieval benchmark — Synaptic + +- Run at: 2026-07-02 13:14:42 KST +- Subset: 50 +- Corpus limit: 1000000 +- Ingest batch: 20000 +- Progress every: 100000 +- SQLite DB path: tests/benchmark/data/msmarco_1m.db +- SQLite DB reuse: yes +- Embedder: none (FTS-only baseline) +- Reranker: none +- Decomposer: none +- Phrase hub (inline): none +- Entity linker (post-hoc): none +- Engine: `graph.search()` default (EvidenceSearch) + +| Dataset | Docs | Queries | MRR@10 | R@5 | R@10 | Hit@10 | Build | Search | +|---------|-----:|--------:|-------:|----:|-----:|-------:|------:|-------:| +| MS MARCO passage dev | 1000000 | 50 | 0.479 | 0.553 | 0.600 | 31/50 | 0.0s | 7.5s | + +## Context + +- **MS MARCO passage dev** — published baseline: BEIR/MS MARCO passage: ~8.8M source passages; JSONL shard \ No newline at end of file diff --git a/examples/ablation/download_benchmarks.py b/examples/ablation/download_benchmarks.py index 0949838..053d217 100644 --- a/examples/ablation/download_benchmarks.py +++ b/examples/ablation/download_benchmarks.py @@ -508,6 +508,15 @@ def main() -> None: f"msmarco_passage (default: {MSMARCO_DEFAULT_CORPUS_LIMIT:,})." ), ) + p.add_argument( + "--large-output-suffix", + default="", + help=( + "Optional suffix for large JSONL-sharded output manifests, e.g. " + "'_5m' writes msmarco_passage_5m.json and " + "msmarco_passage_5m.corpus.jsonl without replacing the default 1M shard." + ), + ) args = p.parse_args() names = [n.strip() for n in args.only.split(",") if n.strip()] @@ -521,6 +530,10 @@ def main() -> None: if name in LARGE_BUILDERS: builder, filename = LARGE_BUILDERS[name] out_path = OUT_DIR / filename + if args.large_output_suffix: + out_path = out_path.with_name( + f"{out_path.stem}{args.large_output_suffix}{out_path.suffix}" + ) print(f"\n=== {name} ===") builder(out_path, corpus_limit=args.large_corpus_limit) continue diff --git a/examples/ablation/run_tier1_benchmarks.py b/examples/ablation/run_tier1_benchmarks.py index 3516e62..1365782 100644 --- a/examples/ablation/run_tier1_benchmarks.py +++ b/examples/ablation/run_tier1_benchmarks.py @@ -154,6 +154,26 @@ class Dataset: CorpusItem = tuple[str, str, str] +def _dataset_key_map(msmarco_path: Path | None = None) -> dict[str, Dataset]: + by_key = { + "hotpotqa": DATASETS[0], + "musique": DATASETS[1], + "2wiki": DATASETS[2], + "fiqa": DATASETS[3], + "trec_covid": DATASETS[4], + "scifact": DATASETS[5], + "msmarco": DATASETS[6], + } + if msmarco_path is not None: + base = by_key["msmarco"] + by_key["msmarco"] = Dataset( + name=base.name, + path=msmarco_path, + reference=base.reference, + ) + return by_key + + def _selected_gold_doc_ids( qrels: dict, query_items: list[tuple[str, str]], @@ -748,6 +768,15 @@ async def amain(argv: list[str]) -> int: "--use-sqlite-graph to keep a built large-corpus index." ), ) + p.add_argument( + "--msmarco-path", + type=Path, + default=None, + help=( + "Override the MS MARCO manifest path, e.g. " + "tests/benchmark/data/msmarco_passage_5m.json for side-by-side large tiers." + ), + ) p.add_argument( "--reuse-sqlite-db", action="store_true", @@ -956,15 +985,7 @@ async def amain(argv: list[str]) -> int: reranker = TEIReranker(base_url=args.reranker_url) reranker_label = f"TEI cross-encoder @ {args.reranker_url}" - by_key = { - "hotpotqa": DATASETS[0], - "musique": DATASETS[1], - "2wiki": DATASETS[2], - "fiqa": DATASETS[3], - "trec_covid": DATASETS[4], - "scifact": DATASETS[5], - "msmarco": DATASETS[6], - } + by_key = _dataset_key_map(args.msmarco_path) selected = [] for raw_key in args.only.split(","): key = raw_key.strip() diff --git a/src/synaptic/backends/composite.py b/src/synaptic/backends/composite.py index e598b18..ba3a065 100644 --- a/src/synaptic/backends/composite.py +++ b/src/synaptic/backends/composite.py @@ -154,6 +154,19 @@ async def list_nodes( ) -> list[Node]: return await self._graph.list_nodes(kind=kind, level=level, limit=limit) + async def list_nodes_by_tag( + self, + tag: str, + *, + kind: str | NodeKind | None = None, + limit: int = 100, + ) -> list[Node]: + list_by_tag = getattr(self._graph, "list_nodes_by_tag", None) + if callable(list_by_tag): + return await list_by_tag(tag, kind=kind, limit=limit) + nodes = await self._graph.list_nodes(kind=kind, limit=limit) + return [node for node in nodes if tag in (node.tags or [])] + # --- Edge CRUD (all to Neo4j) --- async def save_edge(self, edge: Edge) -> None: diff --git a/src/synaptic/backends/memory.py b/src/synaptic/backends/memory.py index edcbb41..c7d1c72 100644 --- a/src/synaptic/backends/memory.py +++ b/src/synaptic/backends/memory.py @@ -83,6 +83,24 @@ async def list_nodes( break return result + async def list_nodes_by_tag( + self, + tag: str, + *, + kind: str | NodeKind | None = None, + limit: int = 100, + ) -> list[Node]: + result: list[Node] = [] + for node in self._nodes.values(): + if kind is not None and node.kind != kind: + continue + if tag not in (node.tags or []): + continue + result.append(node) + if len(result) >= limit: + break + return result + async def get_nodes_batch(self, node_ids: list[str]) -> list[Node]: return [self._nodes[nid] for nid in node_ids if nid in self._nodes] diff --git a/src/synaptic/backends/sqlite.py b/src/synaptic/backends/sqlite.py index 2e5f20d..fee3df8 100644 --- a/src/synaptic/backends/sqlite.py +++ b/src/synaptic/backends/sqlite.py @@ -616,6 +616,26 @@ async def list_nodes( rows = await cur.fetchall() return [_row_to_node(r) for r in rows] + async def list_nodes_by_tag( + self, + tag: str, + *, + kind: str | NodeKind | None = None, + limit: int = 100, + ) -> list[Node]: + db = self._db() + conditions = ["EXISTS (SELECT 1 FROM json_each(syn_nodes.tags_json) WHERE value = ?)"] + params: list[str | int] = [tag] + if kind is not None: + conditions.append("kind = ?") + params.append(str(kind)) + params.append(limit) + where = " AND ".join(conditions) + sql = f"SELECT * FROM syn_nodes WHERE {where} ORDER BY updated_at DESC LIMIT ?" + async with db.execute(sql, params) as cur: + rows = await cur.fetchall() + return [_row_to_node(r) for r in rows] + async def find_nodes_by_property( self, key: str, value: str, *, limit: int = 1000 ) -> list[Node]: diff --git a/src/synaptic/extensions/query_anchor.py b/src/synaptic/extensions/query_anchor.py index d87b706..c682ec7 100644 --- a/src/synaptic/extensions/query_anchor.py +++ b/src/synaptic/extensions/query_anchor.py @@ -339,18 +339,25 @@ async def _load_categories(self) -> list[tuple[str, str]]: return self._category_cache try: - nodes = await self._backend.list_nodes( - kind=NodeKind.CONCEPT, - limit=self._category_cache_limit, - ) + list_by_tag = getattr(self._backend, "list_nodes_by_tag", None) + if callable(list_by_tag): + nodes = await list_by_tag( + "category", + kind=NodeKind.CONCEPT, + limit=self._category_cache_limit, + ) + else: + nodes = await self._backend.list_nodes( + kind=NodeKind.CONCEPT, + limit=self._category_cache_limit, + ) + nodes = [n for n in nodes if "category" in (n.tags or [])] except Exception as exc: logger.warning("query-anchor: failed to load categories — %s", exc) self._category_cache = [] return [] - pairs = [ - (_nfc(n.title or ""), n.id) for n in nodes if n.title and "category" in (n.tags or []) - ] + pairs = [(_nfc(n.title or ""), n.id) for n in nodes if n.title] self._category_cache = pairs logger.debug("query-anchor: cached %d category nodes", len(pairs)) return pairs diff --git a/tests/test_backend_memory.py b/tests/test_backend_memory.py index d813302..f65be93 100644 --- a/tests/test_backend_memory.py +++ b/tests/test_backend_memory.py @@ -65,6 +65,18 @@ async def test_list_with_limit(self, backend: MemoryBackend) -> None: limited = await backend.list_nodes(limit=3) assert len(limited) == 3 + async def test_list_nodes_by_tag_filters_kind_and_limit(self, backend: MemoryBackend) -> None: + await backend.save_node(Node(id="cat_a", title="A", tags=["category"])) + await backend.save_node(Node(id="cat_b", title="B", tags=["category"])) + await backend.save_node(Node(id="doc", title="Doc", tags=["document"])) + await backend.save_node( + Node(id="lesson", title="Lesson", kind=NodeKind.LESSON, tags=["category"]) + ) + + hits = await backend.list_nodes_by_tag("category", kind=NodeKind.CONCEPT, limit=1) + + assert [node.id for node in hits] == ["cat_a"] + class TestMemoryBackendEdges: async def test_save_and_get(self, backend: MemoryBackend) -> None: diff --git a/tests/test_backend_sqlite.py b/tests/test_backend_sqlite.py index 1157f2c..cf80442 100644 --- a/tests/test_backend_sqlite.py +++ b/tests/test_backend_sqlite.py @@ -525,3 +525,24 @@ async def test_respects_limit(self, sqlite: SQLiteBackend) -> None: for i in range(5): await sqlite.save_node(Node(id=f"n{i}", properties={"k": "v"})) assert len(await sqlite.find_nodes_by_property("k", "v", limit=3)) == 3 + + +class TestListNodesByTag: + async def test_finds_exact_tag_with_kind_filter(self, sqlite: SQLiteBackend) -> None: + await sqlite.save_node(Node(id="cat", title="Category", tags=["category"])) + await sqlite.save_node(Node(id="doc", title="Doc", tags=["document"])) + await sqlite.save_node( + Node(id="lesson", title="Lesson", kind=NodeKind.LESSON, tags=["category"]) + ) + + hits = await sqlite.list_nodes_by_tag("category", kind=NodeKind.CONCEPT) + + assert [node.id for node in hits] == ["cat"] + + async def test_respects_limit(self, sqlite: SQLiteBackend) -> None: + for i in range(5): + await sqlite.save_node(Node(id=f"cat_{i}", title=str(i), tags=["category"])) + + hits = await sqlite.list_nodes_by_tag("category", limit=3) + + assert len(hits) == 3 diff --git a/tests/test_download_benchmarks.py b/tests/test_download_benchmarks.py index 86726d6..06f4099 100644 --- a/tests/test_download_benchmarks.py +++ b/tests/test_download_benchmarks.py @@ -64,3 +64,31 @@ def fake_load_dataset(repo, config=None, *, split, streaming=False): assert manifest["corpus_size"] == 3 assert manifest["preserved_gold_docs"] == 1 assert [row["_id"] for row in rows] == ["3", "0", "1"] + + +def test_large_output_suffix_keeps_default_shard(monkeypatch, tmp_path): + calls: list[Path] = [] + monkeypatch.setattr(downloader, "OUT_DIR", tmp_path) + monkeypatch.setitem( + downloader.LARGE_BUILDERS, + "msmarco_passage", + (lambda out_path, *, corpus_limit: calls.append(out_path), "msmarco_passage.json"), + ) + + monkeypatch.setattr( + sys, + "argv", + [ + "download_benchmarks.py", + "--only", + "msmarco_passage", + "--large-corpus-limit", + "5000000", + "--large-output-suffix", + "_5m", + ], + ) + + downloader.main() + + assert calls == [tmp_path / "msmarco_passage_5m.json"] diff --git a/tests/test_query_anchor.py b/tests/test_query_anchor.py index dad1599..e5aad50 100644 --- a/tests/test_query_anchor.py +++ b/tests/test_query_anchor.py @@ -231,11 +231,16 @@ class _CountingBackend: def __init__(self, inner: MemoryBackend) -> None: self._inner = inner self.list_calls = 0 + self.list_by_tag_calls = 0 async def list_nodes(self, **kwargs): self.list_calls += 1 return await self._inner.list_nodes(**kwargs) + async def list_nodes_by_tag(self, *args, **kwargs): + self.list_by_tag_calls += 1 + return await self._inner.list_nodes_by_tag(*args, **kwargs) + # Pass-through for anything else the extractor might call def __getattr__(self, name: str): return getattr(self._inner, name) @@ -254,7 +259,8 @@ async def test_cache_reuse_across_calls(self): await extractor.extract("규정 및 지침") await extractor.extract("규정 및 지침") - assert backend.list_calls == 1 + assert backend.list_calls == 0 + assert backend.list_by_tag_calls == 1 async def test_invalidate_cache_forces_reload(self): inner = MemoryBackend() @@ -267,7 +273,8 @@ async def test_invalidate_cache_forces_reload(self): extractor.invalidate_cache() await extractor.extract("규정 및 지침") - assert backend.list_calls == 2 + assert backend.list_calls == 0 + assert backend.list_by_tag_calls == 2 # --- Integration: full anchor shape --- diff --git a/tests/test_tier1_benchmarks.py b/tests/test_tier1_benchmarks.py index 5c1b0db..30f5ecb 100644 --- a/tests/test_tier1_benchmarks.py +++ b/tests/test_tier1_benchmarks.py @@ -20,6 +20,14 @@ SPEC.loader.exec_module(runner) +def test_msmarco_path_override_retargets_dataset(tmp_path): + manifest = tmp_path / "msmarco_passage_5m.json" + by_key = runner._dataset_key_map(manifest) + + assert by_key["msmarco"].path == manifest + assert by_key["msmarco"].name == runner.DATASETS[6].name + + @pytest.mark.asyncio async def test_corpus_limit_keeps_selected_query_gold_docs(tmp_path): path = tmp_path / "tiny_bench.json"