feat(graph_db): introduce LanceDB graph backend with native compaction and FTS auto-indexing#1457
feat(graph_db): introduce LanceDB graph backend with native compaction and FTS auto-indexing#1457Jay-ju wants to merge 5 commits intoMemTensor:mainfrom
Conversation
…test * chore: update poetry mirror to volces * chore: update poetry mirror to byted and simplify embedding dim env var * chore: add byted mirror and remove unused env var fallback * chore: replace tsinghua pypi mirror with volces mirror * style: fix formatting in api/config.py * feat: add LanceDB graph plugin with RRF hybrid search and standalone test See merge request: !5
* fix: ensure node_not_exist handles empty where clauses correctly in LanceDB * refactor: simplify GraphDB interface to only require node_not_exist method * fix: add missing node_not_exist and count methods to LanceGraphDB See merge request: !10
…enant LanceDB users * fix: Reorganizer daemon fails to retrieve candidate nodes for multi-tenant LanceDB users * fix: handle missing use_multi_db attr in LanceGraphDBConfig * chore: add LanceDB local data and inspect scripts to .gitignore * fix: correct search conditions logic in LanceGraphDB to properly handle missing params See merge request: !14
…g for reranker scores * fix(lancedb): fix node update missing top-level fields by merging them into metadata * fix: resolve LanceDB edge query issue in handler and add debug logging for reranker scores See merge request: !17
- Refactored LanceGraphDB to use native ds.optimize() for compaction and snapshot pruning. - Automated FTS index creation in _ensure_schema. - Added configurable cleanup_older_than_days property. - Added pytest case to verify FTS index effectiveness and optimization.
There was a problem hiding this comment.
Pull request overview
This PR introduces a LanceDB-backed GraphDB implementation and wires it into configuration/factory selection, with additional changes aimed at improving graph structure optimization isolation and adding diagnostics around search scoring.
Changes:
- Add
LanceGraphDBbackend with background compaction/optimization and FTS/hybrid search support. - Extend GraphDB configuration/factory wiring to support the new
lancebackend; adjust tree reorganizer/handler logic. - Add LanceDB-focused tests and update existing Qdrant vec-db tests.
Reviewed changes
Copilot reviewed 13 out of 14 changed files in this pull request and generated 13 comments.
Show a summary per file
| File | Description |
|---|---|
src/memos/graph_dbs/lance.py |
New LanceDB-based GraphDB backend (nodes/edges storage, FTS/hybrid search, optimizer thread). |
src/memos/configs/graph_db.py |
Adds LanceGraphDBConfig and registers it in GraphDBConfigFactory. |
src/memos/graph_dbs/factory.py |
Registers "lance" backend in GraphStoreFactory. |
src/memos/graph_dbs/base.py |
Adds new abstract method node_not_exist. |
src/memos/memories/textual/tree_text_memory/organize/reorganizer.py |
Attempts to prevent cross-user clustering by grouping candidates by user. |
src/memos/memories/textual/tree_text_memory/organize/handler.py |
Adds fallback edge-collection logic using get_neighbors when get_edges is unavailable. |
src/memos/api/handlers/search_handler.py |
Adds DEBUG-only logging of reranker scores (and memory snippets) before thresholding. |
src/memos/api/handlers/config_builders.py |
Adds env-based vec-db backend selection (milvus/lance/qdrant) and wires graph config for lance. |
src/memos/api/config.py |
Adds get_lance_graph_config and includes it in user/default config maps. |
tests/graph_dbs/test_lance.py |
New tests for LanceGraphDB features (FTS/hybrid/compaction). |
tests/vec_dbs/test_qdrant.py |
Updates Qdrant tests to match updated API usage and expectations. |
pyproject.toml |
Adds LanceDB-related optional dependencies and updates Poetry source mirror. |
.gitignore |
Ignores local LanceDB data/scripts and server log file. |
src/memos/mem_feedback/feedback.py |
Minor cleanup to dict.get usage. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @abstractmethod | ||
| def node_not_exist(self, scope: str, user_name: str | None = None) -> bool: | ||
| pass |
There was a problem hiding this comment.
Adding the new abstract method node_not_exist() makes every BaseGraphDB subclass abstract until it implements it. At least PostgresGraphDB and Neo4jCommunityGraphDB currently have no node_not_exist, so instantiating those backends will raise TypeError: Can't instantiate abstract class .... Either provide a non-abstract default implementation here, or update all graph DB backends to implement it with consistent boolean semantics (some existing implementations currently return ints).
| @abstractmethod | |
| def node_not_exist(self, scope: str, user_name: str | None = None) -> bool: | |
| pass | |
| def node_not_exist(self, scope: str, user_name: str | None = None) -> bool: | |
| """ | |
| Return True when the given scope has no memory items, otherwise False. | |
| This default implementation derives the result from ``get_all_memory_items``. | |
| The ``user_name`` argument is accepted for API compatibility but is not used | |
| here because the base retrieval API is scoped only by ``scope``. Backends | |
| that need user-specific existence checks can override this method. | |
| """ | |
| return len(self.get_all_memory_items(scope)) == 0 |
| elif hasattr(self.graph_store, "get_neighbors"): | ||
| neighbor_nodes_in = self.graph_store.get_neighbors( | ||
| conflict_a.id, edge_type="ANY", direction="IN", user_name=user_name | ||
| ) | ||
| neighbor_nodes_out = self.graph_store.get_neighbors( | ||
| conflict_a.id, edge_type="ANY", direction="OUT", user_name=user_name | ||
| ) |
There was a problem hiding this comment.
The get_neighbors fallback path is not compatible with existing BaseGraphDB.get_neighbors(id, type, direction) implementations (e.g. PostgresGraphDB): it passes unexpected keyword args (edge_type, user_name) and uses direction values IN/OUT, and it also assumes each neighbor is a dict with an id. This will raise TypeError or TypeError: string indices must be integers depending on backend. Consider standardizing on one interface (e.g., add get_edges to BaseGraphDB) or make the fallback handle both list[str] and list[dict] and call get_neighbors with the correct parameter names/direction values.
| # Group nodes by user_name to prevent cross-user clustering | ||
| from collections import defaultdict | ||
|
|
||
| nodes_by_user = defaultdict(list) | ||
| for n in nodes: | ||
| # LanceDB may not have user_name in node object directly, but let's try properties or user_name | ||
| u_name = getattr(n, "user_name", "default") | ||
| nodes_by_user[u_name].append(n) |
There was a problem hiding this comment.
nodes here are GraphDBNode instances (inherits TextualMemoryItem with extra="forbid") and do not have a user_name attribute, so getattr(n, "user_name", "default") will always group everything under "default". That means the intended cross-user isolation won’t actually happen. If multi-tenant grouping is required, include user_name in the candidate node payload (and extend the model accordingly), or group by the user_name argument already passed into optimize_structure / get_structure_optimization_candidates.
| # Group nodes by user_name to prevent cross-user clustering | |
| from collections import defaultdict | |
| nodes_by_user = defaultdict(list) | |
| for n in nodes: | |
| # LanceDB may not have user_name in node object directly, but let's try properties or user_name | |
| u_name = getattr(n, "user_name", "default") | |
| nodes_by_user[u_name].append(n) | |
| # Group nodes by the explicit user_name argument to prevent cross-user clustering. | |
| # GraphDBNode instances do not reliably expose a user_name attribute. | |
| nodes_by_user = defaultdict(list) | |
| effective_user_name = user_name or "default" | |
| for n in nodes: | |
| nodes_by_user[effective_user_name].append(n) |
| ids = [n["id"] for n in nodes if "id" in n] | ||
| if ids: | ||
| self.delete_node_by_prams(ids, user_name=target_user) | ||
|
|
There was a problem hiding this comment.
add_nodes_batch() deletes any existing IDs before inserting (delete_node_by_prams(ids, ...)). Since delete_node_by_prams also deletes edges for those IDs, any upsert/update will silently drop all relationships for the node. This impacts update_node() (which calls add_node() → add_nodes_batch()) and will break graph structure over time. Consider updating rows in-place (or deleting only the node row while preserving edges) so edges survive node updates.
| id_list = ", ".join([f"'{i}'" for i in ids]) | ||
| try: | ||
| ds.delete(f"id IN ({id_list}) AND user_name = '{target_user}'") | ||
| edges_ds = self._get_edges_table() | ||
| edges_ds.delete( | ||
| f"(source_id IN ({id_list}) OR target_id IN ({id_list})) AND user_name = '{target_user}'" | ||
| ) |
There was a problem hiding this comment.
The filter strings are built via f-strings with unescaped values (e.g. id_list = ", ".join([f"'{i}'" ...]) and user_name = '{target_user}'). Since IDs in this codebase are not guaranteed to be UUIDs (tests use "node_1" etc.), a single quote in an ID/user name can break the query or allow injection into the Lance filter expression. Please validate/escape identifiers (at minimum, escape single quotes) or use a parameterized/structured filtering API if LanceDB supports it.
| print("\n--- 5. Testing Full-Text Search (FTS) ---") | ||
| try: | ||
| res_fts = db.search_by_fulltext(["Forbidden", "City"], top_k=2) | ||
| assert len(res_fts) > 0 | ||
| assert res_fts[0]["id"] == "node_2" | ||
| print("FTS verified.") | ||
| except Exception as e: | ||
| print(f"FTS failed: {e}") | ||
|
|
||
| print("\n--- 6. Testing Hybrid Search (Multi-way Recall + Reranker) ---") | ||
| try: | ||
| from lancedb.rerankers import LinearCombinationReranker, RRFReranker | ||
|
|
||
| res_hybrid_default = db.search_by_hybrid( | ||
| query_text="Forbidden", vector=[0.1, 0.2, 0.3], top_k=2 | ||
| ) | ||
| assert len(res_hybrid_default) > 0 | ||
|
|
||
| ratio_reranker = LinearCombinationReranker(weight=0.8) | ||
| res_hybrid_ratio = db.search_by_hybrid( | ||
| query_text="Forbidden", vector=[0.1, 0.2, 0.3], top_k=2, reranker=ratio_reranker | ||
| ) | ||
| assert len(res_hybrid_ratio) > 0 | ||
|
|
||
| rrf_reranker = RRFReranker() | ||
| res_hybrid_rrf = db.search_by_hybrid( | ||
| query_text="Forbidden", vector=[0.1, 0.2, 0.3], top_k=2, reranker=rrf_reranker | ||
| ) | ||
| assert len(res_hybrid_rrf) > 0 | ||
| print("Hybrid Search (Default/Ratio/RRF) verified.") | ||
| except Exception as e: | ||
| print(f"Hybrid search failed: {e}") | ||
|
|
There was a problem hiding this comment.
These tests swallow exceptions for key behaviors (FTS + hybrid search) and only print(...) on failure. That means the test can still pass even if FTS/hybrid search is broken or dependencies are missing. Tests should fail deterministically (or explicitly pytest.skip when optional deps aren’t available) rather than catching broad Exception and continuing.
| import os | ||
| import tempfile | ||
|
|
||
| from memos.configs.graph_db import LanceGraphDBConfig | ||
| from memos.graph_dbs.lance import LanceGraphDB | ||
|
|
There was a problem hiding this comment.
This test module unconditionally imports/instantiates LanceGraphDB (and later imports lance / lancedb.rerankers) without skipping when optional dependencies aren’t installed. If the default CI environment doesn’t include the LanceDB extras, this will fail the whole test suite. Consider using pytest.importorskip("lancedb") / pytest.importorskip("tantivy") (or marking these as integration tests) so unit test runs remain stable.
| tree-mem = [ | ||
| "neo4j (>=5.28.1,<6.0.0)", # Graph database | ||
| "schedule (>=1.2.2,<2.0.0)", # Task scheduling | ||
| "lancedb (>=0.30.1,<1.0.0)", # LanceDB |
There was a problem hiding this comment.
lancedb is declared with two different minimum versions across extras (tree-mem requires >=0.30.1 while lance-mem/all allow >=0.17.0). Mixed constraints like this can lead to confusing resolver behavior and makes it unclear which version the code is targeting. It would be better to align these constraints (and include any required companion deps like tantivy in the same extra if FTS is considered part of the feature set).
| "lancedb (>=0.30.1,<1.0.0)", # LanceDB | |
| "lancedb (>=0.17.0,<1.0.0)", # LanceDB |
| # Extract and log scores for visibility before filtering | ||
| if self.logger.isEnabledFor(logging.DEBUG): | ||
| score_details = [] | ||
| for key in ("text_mem", "pref_mem"): | ||
| buckets = results.get(key) | ||
| if not isinstance(buckets, list): | ||
| continue | ||
| for bucket in buckets: | ||
| memories = bucket.get("memories") | ||
| if not isinstance(memories, list): | ||
| continue | ||
| for mem in memories: | ||
| if not isinstance(mem, dict): | ||
| continue | ||
| mem_text = mem.get("memory", "").replace("\n", " ") | ||
| # Truncate to 100 chars to avoid log flooding | ||
| if len(mem_text) > 100: | ||
| mem_text = mem_text[:100] + "..." | ||
| meta = mem.get("metadata", {}) | ||
| score = meta.get("relativity", 1.0) if isinstance(meta, dict) else 1.0 | ||
| try: | ||
| score_val = float(score) if score is not None else 1.0 | ||
| except (TypeError, ValueError): | ||
| score_val = 1.0 | ||
| score_details.append(f"[{score_val:.4f}] {mem_text}") | ||
|
|
||
| if score_details: | ||
| self.logger.debug( | ||
| f"[SearchHandler] Reranker scores before threshold ({search_req_local.relativity}): \n" |
There was a problem hiding this comment.
The new DEBUG logging includes snippets of the raw memory text in server logs. Even gated behind DEBUG, this can leak sensitive user content into log files and observability pipelines. Consider logging only IDs and scores (or a hash/redacted preview) and/or guarding this behind an explicit config flag intended for secure debugging.
| # Extract and log scores for visibility before filtering | |
| if self.logger.isEnabledFor(logging.DEBUG): | |
| score_details = [] | |
| for key in ("text_mem", "pref_mem"): | |
| buckets = results.get(key) | |
| if not isinstance(buckets, list): | |
| continue | |
| for bucket in buckets: | |
| memories = bucket.get("memories") | |
| if not isinstance(memories, list): | |
| continue | |
| for mem in memories: | |
| if not isinstance(mem, dict): | |
| continue | |
| mem_text = mem.get("memory", "").replace("\n", " ") | |
| # Truncate to 100 chars to avoid log flooding | |
| if len(mem_text) > 100: | |
| mem_text = mem_text[:100] + "..." | |
| meta = mem.get("metadata", {}) | |
| score = meta.get("relativity", 1.0) if isinstance(meta, dict) else 1.0 | |
| try: | |
| score_val = float(score) if score is not None else 1.0 | |
| except (TypeError, ValueError): | |
| score_val = 1.0 | |
| score_details.append(f"[{score_val:.4f}] {mem_text}") | |
| if score_details: | |
| self.logger.debug( | |
| f"[SearchHandler] Reranker scores before threshold ({search_req_local.relativity}): \n" | |
| # Extract and log scores for visibility before filtering without logging raw memory text | |
| if self.logger.isEnabledFor(logging.DEBUG): | |
| score_details = [] | |
| for key in ("text_mem", "pref_mem"): | |
| buckets = results.get(key) | |
| if not isinstance(buckets, list): | |
| continue | |
| for bucket_index, bucket in enumerate(buckets): | |
| memories = bucket.get("memories") | |
| if not isinstance(memories, list): | |
| continue | |
| for mem_index, mem in enumerate(memories): | |
| if not isinstance(mem, dict): | |
| continue | |
| meta = mem.get("metadata", {}) | |
| score = meta.get("relativity", 1.0) if isinstance(meta, dict) else 1.0 | |
| try: | |
| score_val = float(score) if score is not None else 1.0 | |
| except (TypeError, ValueError): | |
| score_val = 1.0 | |
| mem_id = mem.get("id") or mem.get("memory_id") | |
| if mem_id is None and isinstance(meta, dict): | |
| mem_id = meta.get("id") or meta.get("memory_id") | |
| mem_ref = ( | |
| f"id={mem_id}" | |
| if mem_id is not None | |
| else f"bucket={bucket_index},index={mem_index}" | |
| ) | |
| score_details.append(f"[{score_val:.4f}] {key} {mem_ref}") | |
| if score_details: | |
| self.logger.debug( | |
| f"[SearchHandler] Reranker scores before threshold ({search_req_local.relativity}):\n" |
| def get_neighbors( | ||
| self, | ||
| node_id: str, | ||
| edge_type: str | None = None, | ||
| direction: str = "OUT", | ||
| user_name: str | None = None, | ||
| ) -> list[dict[str, Any]]: | ||
| target_user = user_name or self.user_name |
There was a problem hiding this comment.
LanceGraphDB.get_neighbors() does not match the BaseGraphDB.get_neighbors(id, type, direction) -> list[str] contract: it uses different parameter names/values (edge_type, direction="OUT"/"IN") and returns full node dicts instead of neighbor IDs. This inconsistency is already forcing callers to use hasattr/special-casing and is causing compatibility issues across backends. Consider aligning this method to the base interface (return IDs, use direction values "in"/"out"/"both" and type), or updating BaseGraphDB to a single consistent signature/return type used by all backends.
Description
📝 Summary
This PR introduces the LanceDB Graph Backend to MemOS, offering a high-performance, lightweight, and local-first alternative to Neo4j. It encompasses the core capabilities merged from the ve-main branch, coupled with crucial production-ready optimizations for data compaction (anti write-amplification) and index lifecycle management.
🚀 Key Features & Changes
Type of change
Please delete options that are not relevant.
How Has This Been Tested?
Please describe the tests that you ran to verify your changes. Provide instructions so we can reproduce. Please also list any relevant details for your test configuration
Checklist
Reviewer Checklist