From e1bc280c8deb409132a5be9b86508f991df40c57 Mon Sep 17 00:00:00 2001 From: Anna Tao Date: Fri, 5 Jun 2026 08:40:17 -0700 Subject: [PATCH 1/5] Add Valkey memory store scaffolding -- config, registry, tests * Add Valkey memory store scaffolding (config, registry, tests) * Address PR feedback: port/db bounds, Docker note, collapse tests * change required to True for port and host * combine tests into one * Revert host+port to required=False --- entity/configs/node/memory.py | 125 +++++++ pyproject.toml | 3 + runtime/node/agent/memory/builtin_stores.py | 14 + tests/test_valkey_memory.py | 374 ++++++++++++++++++++ yaml_instance/demo_valkey_memory.yaml | 47 +++ 5 files changed, 563 insertions(+) create mode 100644 tests/test_valkey_memory.py create mode 100644 yaml_instance/demo_valkey_memory.yaml diff --git a/entity/configs/node/memory.py b/entity/configs/node/memory.py index 970183e0c2..37104aa56f 100755 --- a/entity/configs/node/memory.py +++ b/entity/configs/node/memory.py @@ -189,6 +189,131 @@ def from_dict(cls, data: Mapping[str, Any], *, path: str) -> "SimpleMemoryConfig } +@dataclass +class ValkeyMemoryConfig(BaseConfig): + """Configuration for Valkey-backed memory store.""" + + host: str = "localhost" + port: int = 6379 + password: str | None = None + db: int = 0 + index_name: str = "memory_index" + key_prefix: str = "memory:" + ttl_seconds: int | None = None + embedding: EmbeddingConfig | None = None + + @classmethod + def from_dict(cls, data: Mapping[str, Any], *, path: str) -> "ValkeyMemoryConfig": + mapping = require_mapping(data, path) + host = optional_str(mapping, "host", path) or "localhost" + + port_value = mapping.get("port", 6379) + if not isinstance(port_value, int) or port_value < 1 or port_value > 65535: + raise ConfigError("port must be a valid port number (1-65535)", extend_path(path, "port")) + + password = optional_str(mapping, "password", path) + + db_value = mapping.get("db", 0) + if not isinstance(db_value, int) or db_value < 0 or db_value > 15: + raise ConfigError("db must be a valid database index (0-15)", extend_path(path, "db")) + + index_name = optional_str(mapping, "index_name", path) or "memory_index" + + key_prefix = optional_str(mapping, "key_prefix", path) or "memory:" + + ttl_seconds: int | None = None + ttl_raw = mapping.get("ttl_seconds") + if ttl_raw is not None: + if not isinstance(ttl_raw, int) or ttl_raw <= 0: + raise ConfigError("ttl_seconds must be a positive integer", extend_path(path, "ttl_seconds")) + ttl_seconds = ttl_raw + + embedding_cfg = None + if "embedding" in mapping and mapping["embedding"] is not None: + embedding_cfg = EmbeddingConfig.from_dict(mapping["embedding"], path=extend_path(path, "embedding")) + + return cls( + host=host, + port=port_value, + password=password, + db=db_value, + index_name=index_name, + key_prefix=key_prefix, + ttl_seconds=ttl_seconds, + embedding=embedding_cfg, + path=path, + ) + + FIELD_SPECS = { + "host": ConfigFieldSpec( + name="host", + display_name="Host", + type_hint="str", + required=False, + default="localhost", + description="Valkey server hostname or IP address", + ), + "port": ConfigFieldSpec( + name="port", + display_name="Port", + type_hint="int", + required=False, + default=6379, + description="Valkey server port", + ), + "password": ConfigFieldSpec( + name="password", + display_name="Password", + type_hint="str", + required=False, + description="Valkey server password", + default="${VALKEY_PASSWORD}", + advance=True, + ), + "db": ConfigFieldSpec( + name="db", + display_name="Database Index", + type_hint="int", + required=False, + default=0, + description="Valkey database index", + advance=True, + ), + "index_name": ConfigFieldSpec( + name="index_name", + display_name="Index Name", + type_hint="str", + required=False, + default="memory_index", + description="Name of the Valkey Search index for vector similarity queries", + ), + "key_prefix": ConfigFieldSpec( + name="key_prefix", + display_name="Key Prefix", + type_hint="str", + required=False, + default="memory:", + description="Prefix for all keys stored in Valkey", + advance=True, + ), + "ttl_seconds": ConfigFieldSpec( + name="ttl_seconds", + display_name="TTL (seconds)", + type_hint="int", + required=False, + description="Time-to-live for memory entries in seconds (no expiry if omitted)", + ), + "embedding": ConfigFieldSpec( + name="embedding", + display_name="Embedding Configuration", + type_hint="EmbeddingConfig", + required=False, + description="Optional embedding configuration for vector similarity search", + child=EmbeddingConfig, + ), + } + + @dataclass class FileMemoryConfig(BaseConfig): index_path: str | None = None diff --git a/pyproject.toml b/pyproject.toml index e1e9cebe20..aee6811de5 100755 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,6 +43,9 @@ dependencies = [ "mem0ai>=1.0.9", ] +[project.optional-dependencies] +valkey = ["valkey-glide>=2.4"] + [build-system] requires = ["hatchling"] build-backend = "hatchling.build" diff --git a/runtime/node/agent/memory/builtin_stores.py b/runtime/node/agent/memory/builtin_stores.py index bd10a62f79..701c4a63a8 100755 --- a/runtime/node/agent/memory/builtin_stores.py +++ b/runtime/node/agent/memory/builtin_stores.py @@ -5,6 +5,7 @@ FileMemoryConfig, Mem0MemoryConfig, SimpleMemoryConfig, + ValkeyMemoryConfig, MemoryStoreConfig, ) from runtime.node.agent.memory.blackboard_memory import BlackboardMemory @@ -48,6 +49,19 @@ def _create_mem0_memory(store): ) +def _create_valkey_memory(store): + from runtime.node.agent.memory.valkey_memory import ValkeyMemory + return ValkeyMemory(store) + + +register_memory_store( + "valkey", + config_cls=ValkeyMemoryConfig, + factory=_create_valkey_memory, + summary="Valkey-backed memory store with vector similarity search", +) + + class MemoryFactory: @staticmethod def create_memory(store: MemoryStoreConfig) -> MemoryBase: diff --git a/tests/test_valkey_memory.py b/tests/test_valkey_memory.py new file mode 100644 index 0000000000..3f73c9d943 --- /dev/null +++ b/tests/test_valkey_memory.py @@ -0,0 +1,374 @@ +"""Tests for Valkey memory store: config validation, registry, and integration skeleton.""" + +from unittest.mock import MagicMock, patch, AsyncMock +import pytest + +from entity.configs.base import ConfigError +from entity.configs.node.memory import ValkeyMemoryConfig, EmbeddingConfig + + +# ============================================================================= +# Unit Tests: ValkeyMemoryConfig +# ============================================================================= + + +class TestValkeyMemoryConfigFromDict: + """Test ValkeyMemoryConfig.from_dict parsing and validation.""" + + def test_minimal_config(self): + """Parses with all defaults when only empty dict provided.""" + cfg = ValkeyMemoryConfig.from_dict({}, path="test") + assert cfg.host == "localhost" + assert cfg.port == 6379 + assert cfg.password is None + assert cfg.db == 0 + assert cfg.index_name == "memory_index" + assert cfg.key_prefix == "memory:" + assert cfg.ttl_seconds is None + assert cfg.embedding is None + + def test_full_config(self): + """Parses all fields correctly.""" + data = { + "host": "valkey.internal", + "port": 6380, + "password": "secret", + "db": 2, + "index_name": "chatdev_memory", + "key_prefix": "agent:", + "ttl_seconds": 86400, + "embedding": { + "provider": "openai", + "model": "text-embedding-3-small", + }, + } + cfg = ValkeyMemoryConfig.from_dict(data, path="test") + assert cfg.host == "valkey.internal" + assert cfg.port == 6380 + assert cfg.password == "secret" + assert cfg.db == 2 + assert cfg.index_name == "chatdev_memory" + assert cfg.key_prefix == "agent:" + assert cfg.ttl_seconds == 86400 + assert cfg.embedding is not None + assert cfg.embedding.provider == "openai" + assert cfg.embedding.model == "text-embedding-3-small" + + def test_invalid_port_zero(self): + """Port must be at least 1.""" + with pytest.raises(ConfigError, match="port"): + ValkeyMemoryConfig.from_dict({"port": 0}, path="test") + + def test_invalid_port_negative(self): + """Negative port rejected.""" + with pytest.raises(ConfigError, match="port"): + ValkeyMemoryConfig.from_dict({"port": -1}, path="test") + + def test_invalid_port_string(self): + """Non-integer port rejected.""" + with pytest.raises(ConfigError, match="port"): + ValkeyMemoryConfig.from_dict({"port": "6379"}, path="test") + + def test_invalid_port_too_high(self): + """Port above 65535 rejected.""" + with pytest.raises(ConfigError, match="port"): + ValkeyMemoryConfig.from_dict({"port": 65536}, path="test") + + def test_invalid_db_negative(self): + """Negative db index rejected.""" + with pytest.raises(ConfigError, match="db"): + ValkeyMemoryConfig.from_dict({"db": -1}, path="test") + + def test_invalid_db_too_high(self): + """Database index above 15 rejected.""" + with pytest.raises(ConfigError, match="db"): + ValkeyMemoryConfig.from_dict({"db": 16}, path="test") + + def test_invalid_ttl_zero(self): + """TTL must be positive if specified.""" + with pytest.raises(ConfigError, match="ttl_seconds"): + ValkeyMemoryConfig.from_dict({"ttl_seconds": 0}, path="test") + + def test_invalid_ttl_negative(self): + """Negative TTL rejected.""" + with pytest.raises(ConfigError, match="ttl_seconds"): + ValkeyMemoryConfig.from_dict({"ttl_seconds": -100}, path="test") + + def test_invalid_ttl_string(self): + """Non-integer TTL rejected.""" + with pytest.raises(ConfigError, match="ttl_seconds"): + ValkeyMemoryConfig.from_dict({"ttl_seconds": "3600"}, path="test") + + def test_optional_fields_none_when_omitted(self): + """TTL and embedding are None when not specified.""" + cfg = ValkeyMemoryConfig.from_dict({"host": "localhost"}, path="test") + assert cfg.ttl_seconds is None + assert cfg.embedding is None + + def test_embedding_none_when_explicitly_none(self): + """Embedding config is None when explicitly set to None.""" + cfg = ValkeyMemoryConfig.from_dict({"embedding": None}, path="test") + assert cfg.embedding is None + + +class TestValkeyMemoryConfigFieldSpecs: + """Test FIELD_SPECS metadata for UI generation.""" + + def test_field_specs_defined(self): + """All expected fields have specs.""" + specs = ValkeyMemoryConfig.field_specs() + expected_fields = {"host", "port", "password", "db", "index_name", "key_prefix", "ttl_seconds", "embedding"} + assert expected_fields.issubset(set(specs.keys())) + + def test_host_not_required(self): + """Host has a default so it's not required.""" + specs = ValkeyMemoryConfig.field_specs() + assert specs["host"].required is False + + def test_index_name_not_required(self): + """index_name has a default.""" + specs = ValkeyMemoryConfig.field_specs() + assert specs["index_name"].required is False + assert specs["index_name"].default == "memory_index" + + def test_ttl_not_required(self): + """ttl_seconds is optional.""" + specs = ValkeyMemoryConfig.field_specs() + assert specs["ttl_seconds"].required is False + + def test_embedding_has_child(self): + """Embedding spec references EmbeddingConfig as child.""" + specs = ValkeyMemoryConfig.field_specs() + assert specs["embedding"].child is EmbeddingConfig + + +# ============================================================================= +# Unit Tests: Registry +# ============================================================================= + + +class TestValkeyRegistration: + """Test that 'valkey' is properly registered as a memory store type.""" + + def test_valkey_in_registry(self): + """'valkey' appears in memory store registrations.""" + from runtime.node.agent.memory.registry import iter_memory_store_registrations + + stores = iter_memory_store_registrations() + assert "valkey" in stores + + def test_valkey_config_cls(self): + """Registry entry points to ValkeyMemoryConfig.""" + from runtime.node.agent.memory.registry import get_memory_store_registration + + reg = get_memory_store_registration("valkey") + assert reg.config_cls is ValkeyMemoryConfig + + def test_valkey_has_summary(self): + """Registry entry has a non-empty summary.""" + from runtime.node.agent.memory.registry import get_memory_store_registration + + reg = get_memory_store_registration("valkey") + assert reg.summary and len(reg.summary) > 0 + + def test_schema_registry_has_valkey(self): + """schema_registry also knows about 'valkey' for config parsing.""" + from schema_registry import get_memory_store_schema + + schema = get_memory_store_schema("valkey") + assert schema.config_cls is ValkeyMemoryConfig + + +# ============================================================================= +# Unit Tests: MemoryStoreConfig can parse type=valkey +# ============================================================================= + + +class TestMemoryStoreConfigValkey: + """Test that MemoryStoreConfig.from_dict handles type='valkey'.""" + + def test_parses_valkey_store(self): + """Full memory store config with type=valkey parses successfully.""" + from entity.configs.node.memory import MemoryStoreConfig + + data = { + "name": "chatdev_memory", + "type": "valkey", + "config": { + "host": "localhost", + "port": 6379, + "index_name": "chatdev_memory", + "ttl_seconds": 86400, + }, + } + store = MemoryStoreConfig.from_dict(data, path="test") + assert store.name == "chatdev_memory" + assert store.type == "valkey" + assert isinstance(store.config, ValkeyMemoryConfig) + assert store.config.index_name == "chatdev_memory" + assert store.config.ttl_seconds == 86400 + + def test_rejects_missing_config_block(self): + """Config block is required.""" + from entity.configs.node.memory import MemoryStoreConfig + + data = { + "name": "bad", + "type": "valkey", + "config": None, + } + with pytest.raises(ConfigError): + MemoryStoreConfig.from_dict(data, path="test") + + +# ============================================================================= +# Integration Test Skeleton: ValkeyMemory (requires running Valkey server) +# ============================================================================= + + +@pytest.mark.skipif(True, reason="Integration test — requires running Valkey server with Search module") +class TestValkeyMemoryIntegration: + """ + Integration tests for ValkeyMemory. + + Prerequisites: + - Valkey server running on localhost:6379 + - Valkey Search module loaded (valkey-server --loadmodule valkeysearch.so) + - Or use the valkey/valkey-bundle Docker image which includes Search: + docker run -p 6379:6379 valkey/valkey-bundle:latest + - valkey-glide installed (pip install .[valkey]) + + Run with: pytest tests/test_valkey_memory.py::TestValkeyMemoryIntegration -v --no-header + """ + + @pytest.fixture + def valkey_store(self): + """Create a MemoryStoreConfig for integration testing.""" + from entity.configs.node.memory import MemoryStoreConfig + + data = { + "name": "integration_test", + "type": "valkey", + "config": { + "host": "localhost", + "port": 6379, + "index_name": "test_memory_idx", + "key_prefix": "test:memory:", + "ttl_seconds": 60, + "embedding": { + "provider": "openai", + "model": "text-embedding-3-small", + "api_key": "test-key", + }, + }, + } + return MemoryStoreConfig.from_dict(data, path="integration") + + @pytest.fixture + def memory(self, valkey_store): + """Create a ValkeyMemory instance and clean up after test.""" + from runtime.node.agent.memory.valkey_memory import ValkeyMemory + + mem = ValkeyMemory(valkey_store) + yield mem + # Cleanup: drop index and keys + # mem._cleanup_test_data() + + def test_load_is_noop(self, memory): + """load() completes without error (server handles persistence).""" + memory.load() + + def test_save_is_noop(self, memory): + """save() completes without error (server handles persistence).""" + memory.save() + + def test_update_stores_memory(self, memory): + """update() writes a memory item to Valkey as a hash.""" + from runtime.node.agent.memory.memory_base import ( + MemoryContentSnapshot, + MemoryWritePayload, + ) + + payload = MemoryWritePayload( + agent_role="writer", + inputs_text="The capital of France is Paris", + input_snapshot=MemoryContentSnapshot(text="The capital of France is Paris"), + output_snapshot=MemoryContentSnapshot(text="Noted."), + ) + memory.update(payload) + # Verify key exists in Valkey + # assert memory.count_memories() >= 1 + + def test_retrieve_returns_relevant_items(self, memory): + """retrieve() finds stored memories via KNN search.""" + from runtime.node.agent.memory.memory_base import ( + MemoryContentSnapshot, + MemoryWritePayload, + ) + + # Store a fact + payload = MemoryWritePayload( + agent_role="writer", + inputs_text="Python was created by Guido van Rossum", + input_snapshot=MemoryContentSnapshot(text="Python was created by Guido van Rossum"), + output_snapshot=None, + ) + memory.update(payload) + + # Query for it + query = MemoryContentSnapshot(text="Who created Python?") + results = memory.retrieve("writer", query, top_k=3, similarity_threshold=-1.0) + + assert len(results) >= 1 + assert "Python" in results[0].content_summary or "Guido" in results[0].content_summary + + def test_retrieve_empty_query_returns_empty(self, memory): + """Empty query returns empty list without searching.""" + from runtime.node.agent.memory.memory_base import MemoryContentSnapshot + + query = MemoryContentSnapshot(text=" ") + results = memory.retrieve("writer", query, top_k=3, similarity_threshold=-1.0) + assert results == [] + + def test_retrieve_respects_top_k(self, memory): + """Number of results does not exceed top_k.""" + from runtime.node.agent.memory.memory_base import ( + MemoryContentSnapshot, + MemoryWritePayload, + ) + + # Store multiple items + for i in range(5): + payload = MemoryWritePayload( + agent_role="writer", + inputs_text=f"Fact number {i} about testing", + input_snapshot=MemoryContentSnapshot(text=f"Fact number {i} about testing"), + output_snapshot=None, + ) + memory.update(payload) + + query = MemoryContentSnapshot(text="testing facts") + results = memory.retrieve("writer", query, top_k=2, similarity_threshold=-1.0) + assert len(results) <= 2 + + def test_ttl_expiry(self, memory): + """Items expire after ttl_seconds (would need time manipulation or short TTL).""" + # This test would require either: + # 1. Setting ttl_seconds=1 and sleeping + # 2. Using Valkey's DEBUG SLEEP or TIME commands + pass + + def test_index_created_lazily(self, memory): + """Index is created on first use, not at construction time.""" + # Verify FT.INFO raises (no index) before first operation + # Then after first update, FT.INFO succeeds + pass + + def test_import_error_without_glide(self): + """Clear ImportError when valkey-glide is not installed.""" + with patch.dict("sys.modules", {"glide": None}): + with pytest.raises(ImportError, match="valkey-glide"): + from runtime.node.agent.memory import valkey_memory # noqa: F401 + # Force re-import + import importlib + importlib.reload(valkey_memory) diff --git a/yaml_instance/demo_valkey_memory.yaml b/yaml_instance/demo_valkey_memory.yaml new file mode 100644 index 0000000000..c7ae7f22bc --- /dev/null +++ b/yaml_instance/demo_valkey_memory.yaml @@ -0,0 +1,47 @@ +version: 0.4.0 +vars: {} +graph: + id: '' + description: Memory-backed conversation using Valkey as the vector store. + is_majority_voting: false + nodes: + - id: assistant + type: agent + config: + base_url: ${BASE_URL} + api_key: ${API_KEY} + provider: openai + name: gpt-4o + role: | + You are a helpful assistant with long-term memory backed by Valkey. + If memory sections are provided (wrapped by ===== Begin of memory results ===== + and ===== End of memory results =====), use that context to give more relevant + and personalized responses. + params: + temperature: 0.7 + max_tokens: 2000 + memories: + - name: chatdev_memory + top_k: 3 + retrieve_stage: + - gen + read: true + write: true + edges: [] + memory: + - name: chatdev_memory + type: valkey + config: + host: localhost + port: 6379 + index_name: "chatdev_memory" + ttl_seconds: 86400 + embedding: + provider: openai + model: text-embedding-3-small + api_key: ${API_KEY} + base_url: ${BASE_URL} + start: + - assistant + end: [] + initial_instruction: '' From 25e82ae9b14480d67853b729d77f23a42e041022 Mon Sep 17 00:00:00 2001 From: Edward Liang <76571219+edlng@users.noreply.github.com> Date: Fri, 5 Jun 2026 15:59:39 -0700 Subject: [PATCH 2/5] feat: add valkey memory (#3) Signed-off-by: Edward Liang --- docs/user_guide/zh/modules/memory.md | 52 +++ entity/configs/node/memory.py | 24 + pyproject.toml | 2 +- runtime/node/agent/memory/valkey_memory.py | 255 +++++++++++ tests/test_valkey_memory.py | 502 ++++++++++++++++----- uv.lock | 24 + 6 files changed, 739 insertions(+), 120 deletions(-) create mode 100644 runtime/node/agent/memory/valkey_memory.py diff --git a/docs/user_guide/zh/modules/memory.md b/docs/user_guide/zh/modules/memory.md index 453e0d4d2e..baf9a9f69c 100755 --- a/docs/user_guide/zh/modules/memory.md +++ b/docs/user_guide/zh/modules/memory.md @@ -42,6 +42,22 @@ memory: agent_id: my-agent ``` +### Valkey Memory 配置 +```yaml +memory: + - name: chatdev_memory + type: valkey + config: + host: localhost + port: 6379 + index_name: chatdev_memory + ttl_seconds: 86400 + embedding: + provider: openai + model: text-embedding-3-small + api_key: ${API_KEY} +``` + ## 3. 内置 Memory Store 对比 | 类型 | 路径 | 特点 | 适用场景 | | --- | --- | --- | --- | @@ -49,6 +65,7 @@ memory: | `file` | `node/agent/memory/file_memory.py` | 将指定文件/目录切片为向量索引,只读;自动检测文件变更并更新索引 | 知识库、文档问答 | | `blackboard` | `node/agent/memory/blackboard_memory.py` | 轻量附加日志,按时间/条数裁剪;不依赖向量检索 | 简易广播板、流水线调试 | | `mem0` | `node/agent/memory/mem0_memory.py` | 由 Mem0 云端托管;支持语义搜索 + 图关系;无需本地 embedding 或持久化。需安装 `mem0ai` 包。 | 生产级记忆、跨会话持久化、多 Agent 记忆共享 | +| `valkey` | `node/agent/memory/valkey_memory.py` | 使用 Valkey Search 的 HNSW 向量索引;服务端持久化、跨进程共享、支持 TTL 自动过期。需安装 `valkey-glide-sync` 包。 | 自托管持久记忆、多进程部署、隐私敏感环境 | > 所有内置 store 都会在 `register_memory_store()` 中注册,摘要可通过 `MemoryStoreConfig.field_specs()` 在 UI 中展示。 @@ -119,6 +136,41 @@ nodes: - **持久化**:完全由云端托管。`load()` 和 `save()` 为空操作(no-op)。记忆在不同运行和会话间自动持久化。 - **依赖**:需安装 `mem0ai` 包(`pip install mem0ai`)。 +### 5.5 ValkeyMemory +- **配置**:通过 `ValkeyMemoryConfig` 指定连接参数和索引设置。 + ```yaml + memory: + - name: chatdev_memory + type: valkey + config: + host: localhost + port: 6379 + index_name: chatdev_memory + ttl_seconds: 86400 + embedding: + provider: openai + model: text-embedding-3-small + api_key: ${API_KEY} + ``` + | 字段 | 说明 | 默认值 | + | --- | --- | --- | + | `host` | Valkey 服务器地址 | `localhost` | + | `port` | 端口(1-65535) | `6379` | + | `username` | ACL 用户名 | `None` | + | `password` | 认证密码 | `None` | + | `db` | 数据库索引(0-15) | `0` | + | `index_name` | FT 索引名称 | `memory_index` | + | `key_prefix` | Hash key 前缀 | `memory:` | + | `ttl_seconds` | 记忆过期时间(秒),`None` 表示永不过期 | `None` | + | `embedding` | `EmbeddingConfig` 嵌套配置(必须提供) | — | +- **索引创建**:首次实例化时自动调用 `FT.CREATE` 创建 HNSW 向量索引(COSINE 距离),包含 `content_summary`(TEXT)、`agent_role`(TAG)、`timestamp`(NUMERIC)、`embedding`(VECTOR)字段。若索引已存在则静默跳过。 +- **检索**:使用 `FT.SEARCH` 执行 KNN 查询,返回 top-k 结果并按余弦相似度排序。通过 `similarity_threshold` 过滤低相关结果。 +- **写入**:`update()` 将输入文本编码为 float32 向量,存储为 Valkey Hash(`HSET`)。若配置了 `ttl_seconds`,同时调用 `EXPIRE` 设置过期时间。 +- **持久化**:由 Valkey 服务端管理。`load()` 和 `save()` 为空操作(no-op)。数据在进程重启后自动保留,多进程可并发读写同一索引。 +- **错误处理**:若 Valkey 未加载 Search 模块,初始化时抛出 `RuntimeError` 并给出安装提示。 +- **依赖**:需安装 `valkey-glide-sync` 包(`pip install 'chatdev[valkey]'`)。 +- **适用场景**:需要持久化、跨进程共享、自动过期的自托管部署;隐私敏感环境中 Mem0 云服务不可用时的替代方案。 + ## 6. EmbeddingConfig 提示 - 字段:`provider`, `model`, `api_key`, `base_url`, `params`。 - `provider=openai` 时使用 `openai.OpenAI` 客户端,可配置 `base_url` 以兼容兼容层。 diff --git a/entity/configs/node/memory.py b/entity/configs/node/memory.py index 37104aa56f..67c39e04e7 100755 --- a/entity/configs/node/memory.py +++ b/entity/configs/node/memory.py @@ -195,8 +195,10 @@ class ValkeyMemoryConfig(BaseConfig): host: str = "localhost" port: int = 6379 + username: str | None = None password: str | None = None db: int = 0 + use_tls: bool = False index_name: str = "memory_index" key_prefix: str = "memory:" ttl_seconds: int | None = None @@ -211,12 +213,15 @@ def from_dict(cls, data: Mapping[str, Any], *, path: str) -> "ValkeyMemoryConfig if not isinstance(port_value, int) or port_value < 1 or port_value > 65535: raise ConfigError("port must be a valid port number (1-65535)", extend_path(path, "port")) + username = optional_str(mapping, "username", path) password = optional_str(mapping, "password", path) db_value = mapping.get("db", 0) if not isinstance(db_value, int) or db_value < 0 or db_value > 15: raise ConfigError("db must be a valid database index (0-15)", extend_path(path, "db")) + use_tls = bool(mapping.get("use_tls", False)) + index_name = optional_str(mapping, "index_name", path) or "memory_index" key_prefix = optional_str(mapping, "key_prefix", path) or "memory:" @@ -235,8 +240,10 @@ def from_dict(cls, data: Mapping[str, Any], *, path: str) -> "ValkeyMemoryConfig return cls( host=host, port=port_value, + username=username, password=password, db=db_value, + use_tls=use_tls, index_name=index_name, key_prefix=key_prefix, ttl_seconds=ttl_seconds, @@ -261,6 +268,14 @@ def from_dict(cls, data: Mapping[str, Any], *, path: str) -> "ValkeyMemoryConfig default=6379, description="Valkey server port", ), + "username": ConfigFieldSpec( + name="username", + display_name="Username", + type_hint="str", + required=False, + description="Valkey ACL username (required for ACL-based auth)", + advance=True, + ), "password": ConfigFieldSpec( name="password", display_name="Password", @@ -279,6 +294,15 @@ def from_dict(cls, data: Mapping[str, Any], *, path: str) -> "ValkeyMemoryConfig description="Valkey database index", advance=True, ), + "use_tls": ConfigFieldSpec( + name="use_tls", + display_name="Use TLS", + type_hint="bool", + required=False, + default=False, + description="Enable TLS encryption for the Valkey connection", + advance=True, + ), "index_name": ConfigFieldSpec( name="index_name", display_name="Index Name", diff --git a/pyproject.toml b/pyproject.toml index aee6811de5..207ee930cc 100755 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,7 +44,7 @@ dependencies = [ ] [project.optional-dependencies] -valkey = ["valkey-glide>=2.4"] +valkey = ["valkey-glide-sync>=2.4"] [build-system] requires = ["hatchling"] diff --git a/runtime/node/agent/memory/valkey_memory.py b/runtime/node/agent/memory/valkey_memory.py new file mode 100644 index 0000000000..2ab6eed641 --- /dev/null +++ b/runtime/node/agent/memory/valkey_memory.py @@ -0,0 +1,255 @@ +"""Valkey Search persistent vector memory store implementation.""" + +import logging +import struct +import time +import uuid +from typing import List + +from entity.configs import MemoryStoreConfig +from entity.configs.node.memory import ValkeyMemoryConfig +from runtime.node.agent.memory.embedding import EmbeddingBase, EmbeddingFactory +from runtime.node.agent.memory.memory_base import ( + MemoryBase, + MemoryContentSnapshot, + MemoryItem, + MemoryWritePayload, +) + +logger = logging.getLogger(__name__) + + +def _get_glide_sync(): + try: + import glide_sync + return glide_sync + except ImportError: + raise ImportError( + "valkey-glide-sync is required for ValkeyMemory. " + "Install it with: pip install 'chatdev[valkey]' " + "(https://pypi.org/project/valkey-glide-sync/)" + ) + + +def _make_client(host: str, port: int, username: str | None = None, password: str | None = None, db: int = 0, use_tls: bool = False): + glide_sync = _get_glide_sync() + credentials = None + if password: + credentials = glide_sync.ServerCredentials(username=username or "default", password=password) + config = glide_sync.GlideClientConfiguration( + addresses=[glide_sync.NodeAddress(host, port)], + use_tls=use_tls, + credentials=credentials, + database_id=db if db != 0 else None, + ) + return glide_sync.GlideClient.create(config) + + +class ValkeyMemory(MemoryBase): + """Memory store backed by Valkey Search with persistent HNSW vector index. + + Each memory item is stored as a Valkey Hash at `memory:{uuid}` with fields: + content_summary, embedding (float32 bytes), agent_role, and timestamp. + A FT index with HNSW COSINE metric enables KNN retrieval. + """ + + def __init__(self, store: MemoryStoreConfig): + config = store.as_config(ValkeyMemoryConfig) + if not config: + raise ValueError("ValkeyMemory requires a ValkeyMemoryConfig") + # Skip MemoryBase.__init__ embedding logic — we handle it via config.embedding directly + self.store = store + self.name = store.name + self.contents: list = [] # satisfy MemoryBase contract (unused; state lives in Valkey) + self.config = config + + if config.embedding: + self.embedding: EmbeddingBase | None = EmbeddingFactory.create_embedding(config.embedding) + else: + self.embedding = None + + self._glide = _get_glide_sync() + self._client = _make_client(config.host, config.port, config.username, config.password, config.db, config.use_tls) + self._ensure_index() + + # -------- Index lifecycle -------- + + def _ensure_index(self) -> None: + """Create the HNSW FT index if absent, probing dimension from a test embedding.""" + if self.embedding is None: + raise ValueError( + "ValkeyMemory requires an embedding configuration to determine vector dimension" + ) + + glide_sync = self._glide + + test_vec = self.embedding.get_embedding("test") + dim = len(test_vec) + + schema = [ + glide_sync.TextField("content_summary"), + glide_sync.TagField("agent_role"), + glide_sync.NumericField("timestamp"), + glide_sync.VectorField( + "embedding", + glide_sync.VectorAlgorithm.HNSW, + glide_sync.VectorFieldAttributesHnsw( + dimensions=dim, + distance_metric=glide_sync.DistanceMetricType.COSINE, + type=glide_sync.VectorType.FLOAT32, + ), + ), + ] + options = glide_sync.FtCreateOptions(prefixes=[self.config.key_prefix]) + + try: + glide_sync.ft.create(self._client, self.config.index_name, schema, options) + logger.info("Created Valkey FT index '%s' (dim=%d)", self.config.index_name, dim) + except Exception as exc: + msg = str(exc).lower() + if "index already exists" in msg or "already exists" in msg: + logger.debug("Valkey FT index '%s' already exists", self.config.index_name) + elif "unknown command" in msg or "module" in msg: + raise RuntimeError( + f"Valkey server at {self.config.host}:{self.config.port} does not have the " + "Search module loaded. Install valkey-search or use the valkey/valkey-bundle " + "Docker image that includes it." + ) from exc + else: + raise + + @staticmethod + def _sanitize_tag(value: str) -> str: + """Sanitize a string for use as a Valkey TAG field value.""" + if not value: + return "" + # TAG syntax breaks on commas, braces, spaces, pipes + for ch in ",{}|<> \t\n\r": + value = value.replace(ch, "_") + return value + + # -------- Persistence (no-ops — server-side) -------- + + def load(self) -> None: + pass + + def save(self) -> None: + pass + + # -------- Update -------- + + def update(self, payload: MemoryWritePayload) -> None: + text = (payload.inputs_text or "").strip() + if not text: + return + if self.embedding is None: + logger.warning("ValkeyMemory: no embedding configured, skipping update") + return + + embedding_vec = self.embedding.get_embedding(text) + embedding_bytes = struct.pack(f"{len(embedding_vec)}f", *embedding_vec) + + key = f"{self.config.key_prefix}{uuid.uuid4().hex}" + ts = time.time() + + try: + self._client.hset(key, { + "content_summary": text, + "embedding": embedding_bytes, + "agent_role": self._sanitize_tag(payload.agent_role or ""), + "timestamp": str(ts), + }) + + if self.config.ttl_seconds is not None: + self._client.expire(key, self.config.ttl_seconds) + except Exception as exc: + logger.error("ValkeyMemory update failed: %s", exc) + return + + logger.debug("Stored memory at %s (agent_role=%s)", key, payload.agent_role) + + # -------- Retrieval -------- + + def retrieve( + self, + agent_role: str, + query: MemoryContentSnapshot, + top_k: int, + similarity_threshold: float, + ) -> List[MemoryItem]: + text = query.text.strip() + if not text: + return [] + if self.embedding is None: + return [] + + glide_sync = self._glide + + query_vec = self.embedding.get_embedding(text) + query_bytes = struct.pack(f"{len(query_vec)}f", *query_vec) + + top_k = max(1, int(top_k)) + + # Filter by agent_role when available + safe_role = self._sanitize_tag(agent_role) + if safe_role: + ft_query = f"(@agent_role:{{{safe_role}}})=>[KNN {top_k} @embedding $vec]" + else: + ft_query = f"*=>[KNN {top_k} @embedding $vec]" + options = glide_sync.FtSearchOptions( + params={"vec": query_bytes}, + dialect=2, + ) + + try: + results = glide_sync.ft.search(self._client, self.config.index_name, ft_query, options) + except Exception as exc: + logger.error("ValkeyMemory search failed: %s", exc) + return [] + + # results is [total_count, {key: {field: value, ...}}, ...] + # Skip the first element (count), then iterate doc dicts + items: List[MemoryItem] = [] + doc_entries = results[1:] if results else [] + for entry in doc_entries: + if not isinstance(entry, dict): + continue + for key, fields in entry.items(): + score_raw = fields.get(b"__embedding_score") or fields.get("__embedding_score") or b"1.0" + try: + distance = float(score_raw) + except (ValueError, TypeError): + distance = 1.0 + similarity = 1.0 - distance + + if similarity_threshold >= 0 and similarity < similarity_threshold: + continue + + content = fields.get(b"content_summary") or fields.get("content_summary", b"") + role = fields.get(b"agent_role") or fields.get("agent_role", b"") + ts_raw = fields.get(b"timestamp") or fields.get("timestamp", b"0") + + items.append(MemoryItem( + id=key.decode() if isinstance(key, bytes) else key, + content_summary=content.decode() if isinstance(content, bytes) else content, + metadata={ + "agent_role": role.decode() if isinstance(role, bytes) else role, + "score": similarity, + "source": "valkey", + }, + timestamp=float(ts_raw) if ts_raw else time.time(), + )) + + items.sort(key=lambda item: item.metadata.get("score", 0.0), reverse=True) + return items + + # -------- Count -------- + + def count_memories(self) -> int: + try: + info = self._glide.ft.info(self._client, self.config.index_name) + num_docs = info.get(b"num_docs") or info.get("num_docs", 0) + return int(num_docs) + except Exception as exc: + logger.error("ValkeyMemory count_memories failed: %s", exc) + return 0 diff --git a/tests/test_valkey_memory.py b/tests/test_valkey_memory.py index 3f73c9d943..106c5b2d2c 100644 --- a/tests/test_valkey_memory.py +++ b/tests/test_valkey_memory.py @@ -1,10 +1,15 @@ -"""Tests for Valkey memory store: config validation, registry, and integration skeleton.""" +"""Tests for Valkey memory store: config validation, registry, and runtime behavior.""" -from unittest.mock import MagicMock, patch, AsyncMock +import struct +from unittest.mock import MagicMock, patch import pytest from entity.configs.base import ConfigError from entity.configs.node.memory import ValkeyMemoryConfig, EmbeddingConfig +from runtime.node.agent.memory.memory_base import ( + MemoryContentSnapshot, + MemoryWritePayload, +) # ============================================================================= @@ -222,153 +227,412 @@ def test_rejects_missing_config_block(self): # ============================================================================= -# Integration Test Skeleton: ValkeyMemory (requires running Valkey server) +# Unit Tests: ValkeyMemory Runtime Behavior # ============================================================================= -@pytest.mark.skipif(True, reason="Integration test — requires running Valkey server with Search module") -class TestValkeyMemoryIntegration: - """ - Integration tests for ValkeyMemory. +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_store( + host="localhost", + port=6379, + index_name="chatdev_memory", + ttl_seconds=None, +): + """Build a minimal MemoryStoreConfig mock for ValkeyMemory.""" + valkey_cfg = MagicMock(spec=ValkeyMemoryConfig) + valkey_cfg.host = host + valkey_cfg.port = port + valkey_cfg.index_name = index_name + valkey_cfg.ttl_seconds = ttl_seconds + valkey_cfg.key_prefix = "memory:" + valkey_cfg.username = None + valkey_cfg.password = None + valkey_cfg.db = 0 + valkey_cfg.embedding = MagicMock() # non-None so embedding branch is taken + + store = MagicMock() + store.name = "test_valkey" + + def _as_config_side_effect(expected_type, **kwargs): + if expected_type is ValkeyMemoryConfig: + return valkey_cfg + return None + + store.as_config.side_effect = _as_config_side_effect + return store, valkey_cfg + + +def _make_valkey_memory(host="localhost", port=6379, ttl_seconds=None): + """Create a ValkeyMemory with mocked glide_sync client and embedding.""" + store, valkey_cfg = _make_store(host=host, port=port, ttl_seconds=ttl_seconds) + + mock_client = MagicMock() + mock_embedding = MagicMock() + mock_embedding.get_embedding.return_value = [0.1, 0.2, 0.3] # dim=3 + + mock_glide_module = MagicMock() + mock_glide_module.ft.create.return_value = "OK" + mock_glide_module.ft.search.return_value = [] + + with patch("runtime.node.agent.memory.valkey_memory._get_glide_sync") as mock_get_glide, \ + patch("runtime.node.agent.memory.valkey_memory._make_client") as mock_make_client, \ + patch("runtime.node.agent.memory.valkey_memory.EmbeddingFactory") as mock_factory: + mock_get_glide.return_value = mock_glide_module + mock_make_client.return_value = mock_client + mock_factory.create_embedding.return_value = mock_embedding - Prerequisites: - - Valkey server running on localhost:6379 - - Valkey Search module loaded (valkey-server --loadmodule valkeysearch.so) - - Or use the valkey/valkey-bundle Docker image which includes Search: - docker run -p 6379:6379 valkey/valkey-bundle:latest - - valkey-glide installed (pip install .[valkey]) + from runtime.node.agent.memory.valkey_memory import ValkeyMemory + memory = ValkeyMemory(store) + # Attach the glide module mock so tests can inspect ft.create / ft.search calls + memory._glide = mock_glide_module + return memory, mock_client, mock_embedding + + +# --------------------------------------------------------------------------- +# Instantiation +# --------------------------------------------------------------------------- + +class TestValkeyMemoryInstantiation: + + def test_instantiation_creates_ft_index(self): + """ValkeyMemory creates FT index at __init__ time.""" + memory, client, _ = _make_valkey_memory() + memory._glide.ft.create.assert_called_once() + args = memory._glide.ft.create.call_args[0] + assert args[1] == "chatdev_memory" # index_name is second positional arg + + def test_instantiation_idempotent_when_index_exists(self): + """ValkeyMemory silently ignores 'index already exists' error.""" + store, _ = _make_store() + mock_client = MagicMock() + mock_embedding = MagicMock() + mock_embedding.get_embedding.return_value = [0.1, 0.2, 0.3] + mock_glide_module = MagicMock() + mock_glide_module.ft.create.side_effect = Exception("Index already exists") + + with patch("runtime.node.agent.memory.valkey_memory._get_glide_sync") as mock_get_glide, \ + patch("runtime.node.agent.memory.valkey_memory._make_client") as mock_make_client, \ + patch("runtime.node.agent.memory.valkey_memory.EmbeddingFactory") as mock_factory: + mock_get_glide.return_value = mock_glide_module + mock_make_client.return_value = mock_client + mock_factory.create_embedding.return_value = mock_embedding + + from runtime.node.agent.memory.valkey_memory import ValkeyMemory + ValkeyMemory(store) # Should not raise + + def test_instantiation_raises_on_missing_search_module(self): + """ValkeyMemory raises RuntimeError when Search module is absent.""" + store, _ = _make_store() + mock_client = MagicMock() + mock_embedding = MagicMock() + mock_embedding.get_embedding.return_value = [0.1, 0.2, 0.3] + mock_glide_module = MagicMock() + mock_glide_module.ft.create.side_effect = Exception("unknown command ft.create") + + with patch("runtime.node.agent.memory.valkey_memory._get_glide_sync") as mock_get_glide, \ + patch("runtime.node.agent.memory.valkey_memory._make_client") as mock_make_client, \ + patch("runtime.node.agent.memory.valkey_memory.EmbeddingFactory") as mock_factory: + mock_get_glide.return_value = mock_glide_module + mock_make_client.return_value = mock_client + mock_factory.create_embedding.return_value = mock_embedding + + from runtime.node.agent.memory.valkey_memory import ValkeyMemory + with pytest.raises(RuntimeError, match="Search module"): + ValkeyMemory(store) + + def test_raises_on_wrong_config_type(self): + """ValkeyMemory raises ValueError when store has wrong config type.""" + from runtime.node.agent.memory.valkey_memory import ValkeyMemory - Run with: pytest tests/test_valkey_memory.py::TestValkeyMemoryIntegration -v --no-header - """ + store = MagicMock() + store.name = "bad" + store.as_config.return_value = None - @pytest.fixture - def valkey_store(self): - """Create a MemoryStoreConfig for integration testing.""" - from entity.configs.node.memory import MemoryStoreConfig + with patch("runtime.node.agent.memory.valkey_memory._get_glide_sync"), \ + patch("runtime.node.agent.memory.valkey_memory._make_client"): + with pytest.raises(ValueError, match="ValkeyMemoryConfig"): + ValkeyMemory(store) - data = { - "name": "integration_test", - "type": "valkey", - "config": { - "host": "localhost", - "port": 6379, - "index_name": "test_memory_idx", - "key_prefix": "test:memory:", - "ttl_seconds": 60, - "embedding": { - "provider": "openai", - "model": "text-embedding-3-small", - "api_key": "test-key", - }, - }, - } - return MemoryStoreConfig.from_dict(data, path="integration") - @pytest.fixture - def memory(self, valkey_store): - """Create a ValkeyMemory instance and clean up after test.""" - from runtime.node.agent.memory.valkey_memory import ValkeyMemory +# --------------------------------------------------------------------------- +# Update +# --------------------------------------------------------------------------- + +class TestValkeyMemoryUpdate: - mem = ValkeyMemory(valkey_store) - yield mem - # Cleanup: drop index and keys - # mem._cleanup_test_data() + def test_update_stores_hash(self): + """update() calls hset with expected fields.""" + memory, client, embedding = _make_valkey_memory() + embedding.get_embedding.return_value = [0.5, 0.6, 0.7] - def test_load_is_noop(self, memory): - """load() completes without error (server handles persistence).""" - memory.load() + payload = MemoryWritePayload( + agent_role="coder", + inputs_text="I prefer Python", + input_snapshot=None, + output_snapshot=None, + ) + memory.update(payload) - def test_save_is_noop(self, memory): - """save() completes without error (server handles persistence).""" - memory.save() + client.hset.assert_called_once() + args = client.hset.call_args + key = args[0][0] + fields = args[0][1] + assert key.startswith("memory:") + assert fields["content_summary"] == "I prefer Python" + assert fields["agent_role"] == "coder" + assert "embedding" in fields + assert "timestamp" in fields + + def test_update_embedding_bytes_are_float32(self): + """update() encodes embedding as packed float32 bytes.""" + memory, client, embedding = _make_valkey_memory() + vec = [0.1, 0.2, 0.3] + embedding.get_embedding.return_value = vec - def test_update_stores_memory(self, memory): - """update() writes a memory item to Valkey as a hash.""" - from runtime.node.agent.memory.memory_base import ( - MemoryContentSnapshot, - MemoryWritePayload, + payload = MemoryWritePayload( + agent_role="writer", + inputs_text="test", + input_snapshot=None, + output_snapshot=None, ) + memory.update(payload) + + fields = client.hset.call_args[0][1] + expected_bytes = struct.pack("3f", *vec) + assert fields["embedding"] == expected_bytes + + def test_update_with_ttl_calls_expire(self): + """update() calls expire on the key when ttl_seconds is configured.""" + memory, client, _ = _make_valkey_memory(ttl_seconds=3600) payload = MemoryWritePayload( agent_role="writer", - inputs_text="The capital of France is Paris", - input_snapshot=MemoryContentSnapshot(text="The capital of France is Paris"), - output_snapshot=MemoryContentSnapshot(text="Noted."), + inputs_text="test input", + input_snapshot=None, + output_snapshot=None, ) memory.update(payload) - # Verify key exists in Valkey - # assert memory.count_memories() >= 1 - - def test_retrieve_returns_relevant_items(self, memory): - """retrieve() finds stored memories via KNN search.""" - from runtime.node.agent.memory.memory_base import ( - MemoryContentSnapshot, - MemoryWritePayload, + + client.expire.assert_called_once() + expire_args = client.expire.call_args[0] + assert expire_args[1] == 3600 + + def test_update_without_ttl_does_not_call_expire(self): + """update() does not call expire when ttl_seconds is None.""" + memory, client, _ = _make_valkey_memory(ttl_seconds=None) + + payload = MemoryWritePayload( + agent_role="writer", + inputs_text="test input", + input_snapshot=None, + output_snapshot=None, ) + memory.update(payload) + + client.expire.assert_not_called() + + def test_update_empty_input_is_noop(self): + """update() with empty inputs_text skips hset.""" + memory, client, _ = _make_valkey_memory() - # Store a fact payload = MemoryWritePayload( agent_role="writer", - inputs_text="Python was created by Guido van Rossum", - input_snapshot=MemoryContentSnapshot(text="Python was created by Guido van Rossum"), + inputs_text=" ", + input_snapshot=None, output_snapshot=None, ) memory.update(payload) - # Query for it - query = MemoryContentSnapshot(text="Who created Python?") - results = memory.retrieve("writer", query, top_k=3, similarity_threshold=-1.0) + client.hset.assert_not_called() + + +# --------------------------------------------------------------------------- +# Retrieve +# --------------------------------------------------------------------------- + +class TestValkeyMemoryRetrieve: + + def _ft_result(self, docs): + """Build a ft.search return value: [count, {key: {fields}}, ...] + docs: list of (key, content, agent_role, distance) tuples. + """ + result = [len(docs)] + for key, content, role, distance in docs: + result.append({ + key.encode(): { + b"content_summary": content.encode(), + b"agent_role": role.encode(), + b"timestamp": b"1700000000.0", + b"__embedding_score": str(distance).encode(), + } + }) + return result + + def test_retrieve_returns_memory_items(self): + """retrieve() returns MemoryItem list from ft.search results.""" + memory, client, embedding = _make_valkey_memory() + embedding.get_embedding.return_value = [0.1, 0.2, 0.3] + memory._glide.ft.search.return_value = self._ft_result([ + ("memory:abc", "Python is great", "coder", 0.05), + ]) + + query = MemoryContentSnapshot(text="Python") + results = memory.retrieve("coder", query, top_k=3, similarity_threshold=-1.0) + + assert len(results) == 1 + assert results[0].content_summary == "Python is great" + assert results[0].metadata["source"] == "valkey" + + def test_retrieve_filters_by_agent_role(self): + """retrieve() filters KNN search by agent_role when provided (spec: AEA-500).""" + memory, client, embedding = _make_valkey_memory() + embedding.get_embedding.return_value = [0.1, 0.2, 0.3] + memory._glide.ft.search.return_value = [] + + query = MemoryContentSnapshot(text="test") + memory.retrieve("designer", query, top_k=5, similarity_threshold=-1.0) + + ft_query_arg = memory._glide.ft.search.call_args[0][2] + assert "(@agent_role:{designer})=>[KNN" in ft_query_arg + + def test_retrieve_threshold_filtering(self): + """retrieve() excludes results below similarity_threshold.""" + memory, client, embedding = _make_valkey_memory() + embedding.get_embedding.return_value = [0.1, 0.2, 0.3] + # distance=0.8 -> similarity=0.2, below threshold of 0.5 + memory._glide.ft.search.return_value = self._ft_result([ + ("memory:a", "low relevance", "coder", 0.8), + ]) + + query = MemoryContentSnapshot(text="test") + results = memory.retrieve("coder", query, top_k=5, similarity_threshold=0.5) - assert len(results) >= 1 - assert "Python" in results[0].content_summary or "Guido" in results[0].content_summary + assert results == [] - def test_retrieve_empty_query_returns_empty(self, memory): - """Empty query returns empty list without searching.""" - from runtime.node.agent.memory.memory_base import MemoryContentSnapshot + def test_retrieve_threshold_passes_high_similarity(self): + """retrieve() includes results at or above similarity_threshold.""" + memory, client, embedding = _make_valkey_memory() + embedding.get_embedding.return_value = [0.1, 0.2, 0.3] + # distance=0.1 -> similarity=0.9 + memory._glide.ft.search.return_value = self._ft_result([ + ("memory:a", "high relevance", "coder", 0.1), + ]) + + query = MemoryContentSnapshot(text="test") + results = memory.retrieve("coder", query, top_k=5, similarity_threshold=0.5) + + assert len(results) == 1 + + def test_retrieve_ordered_by_similarity_descending(self): + """retrieve() returns results ordered by descending cosine similarity.""" + memory, client, embedding = _make_valkey_memory() + embedding.get_embedding.return_value = [0.1, 0.2, 0.3] + memory._glide.ft.search.return_value = self._ft_result([ + ("memory:a", "less relevant", "coder", 0.4), # similarity=0.6 + ("memory:b", "most relevant", "coder", 0.1), # similarity=0.9 + ("memory:c", "middle", "coder", 0.2), # similarity=0.8 + ]) + + query = MemoryContentSnapshot(text="test") + results = memory.retrieve("coder", query, top_k=5, similarity_threshold=-1.0) + + assert results[0].content_summary == "most relevant" + assert results[1].content_summary == "middle" + assert results[2].content_summary == "less relevant" + + def test_retrieve_empty_query_returns_empty(self): + """retrieve() returns empty list for blank query without calling ft.search.""" + memory, client, _ = _make_valkey_memory() query = MemoryContentSnapshot(text=" ") - results = memory.retrieve("writer", query, top_k=3, similarity_threshold=-1.0) + results = memory.retrieve("coder", query, top_k=3, similarity_threshold=-1.0) + assert results == [] + memory._glide.ft.search.assert_not_called() - def test_retrieve_respects_top_k(self, memory): - """Number of results does not exceed top_k.""" - from runtime.node.agent.memory.memory_base import ( - MemoryContentSnapshot, - MemoryWritePayload, - ) + def test_retrieve_search_error_returns_empty(self): + """retrieve() returns empty list when ft.search raises.""" + memory, client, embedding = _make_valkey_memory() + embedding.get_embedding.return_value = [0.1, 0.2, 0.3] + memory._glide.ft.search.side_effect = Exception("connection error") + + query = MemoryContentSnapshot(text="test") + results = memory.retrieve("coder", query, top_k=3, similarity_threshold=-1.0) + + assert results == [] + + def test_retrieve_no_threshold_passes_all(self): + """retrieve() with threshold=-1.0 returns all results regardless of similarity.""" + memory, client, embedding = _make_valkey_memory() + embedding.get_embedding.return_value = [0.1, 0.2, 0.3] + memory._glide.ft.search.return_value = self._ft_result([ + ("memory:a", "very low", "coder", 0.99), # similarity=0.01 + ]) + + query = MemoryContentSnapshot(text="test") + results = memory.retrieve("coder", query, top_k=5, similarity_threshold=-1.0) + + assert len(results) == 1 + + +# --------------------------------------------------------------------------- +# Count memories +# --------------------------------------------------------------------------- + +class TestValkeyMemoryCount: + + def test_count_memories_returns_num_docs(self): + """count_memories() returns num_docs from ft.info.""" + memory, client, _ = _make_valkey_memory() + memory._glide.ft.info.return_value = {b"num_docs": 3} + + assert memory.count_memories() == 3 + + def test_count_memories_empty(self): + """count_memories() returns 0 when index has no docs.""" + memory, client, _ = _make_valkey_memory() + memory._glide.ft.info.return_value = {b"num_docs": 0} + + assert memory.count_memories() == 0 + + def test_count_memories_error_returns_zero(self): + """count_memories() returns 0 on error.""" + memory, client, _ = _make_valkey_memory() + memory._glide.ft.info.side_effect = Exception("connection error") + + assert memory.count_memories() == 0 + + +# --------------------------------------------------------------------------- +# Load / Save no-ops +# --------------------------------------------------------------------------- + +class TestValkeyMemoryLoadSave: + + def test_load_is_noop(self): + """load() does nothing for server-managed store.""" + memory, _, _ = _make_valkey_memory() + memory.load() # Should not raise + + def test_save_is_noop(self): + """save() does nothing for server-managed store.""" + memory, _, _ = _make_valkey_memory() + memory.save() # Should not raise + + +# --------------------------------------------------------------------------- +# Lazy import error +# --------------------------------------------------------------------------- + +class TestValkeyMemoryLazyImport: + + def test_import_error_when_valkey_glide_missing(self): + """Helpful ImportError when valkey-glide is not installed.""" + from runtime.node.agent.memory.valkey_memory import _get_glide_sync - # Store multiple items - for i in range(5): - payload = MemoryWritePayload( - agent_role="writer", - inputs_text=f"Fact number {i} about testing", - input_snapshot=MemoryContentSnapshot(text=f"Fact number {i} about testing"), - output_snapshot=None, - ) - memory.update(payload) - - query = MemoryContentSnapshot(text="testing facts") - results = memory.retrieve("writer", query, top_k=2, similarity_threshold=-1.0) - assert len(results) <= 2 - - def test_ttl_expiry(self, memory): - """Items expire after ttl_seconds (would need time manipulation or short TTL).""" - # This test would require either: - # 1. Setting ttl_seconds=1 and sleeping - # 2. Using Valkey's DEBUG SLEEP or TIME commands - pass - - def test_index_created_lazily(self, memory): - """Index is created on first use, not at construction time.""" - # Verify FT.INFO raises (no index) before first operation - # Then after first update, FT.INFO succeeds - pass - - def test_import_error_without_glide(self): - """Clear ImportError when valkey-glide is not installed.""" - with patch.dict("sys.modules", {"glide": None}): - with pytest.raises(ImportError, match="valkey-glide"): - from runtime.node.agent.memory import valkey_memory # noqa: F401 - # Force re-import - import importlib - importlib.reload(valkey_memory) + with patch.dict("sys.modules", {"glide_sync": None}): + with pytest.raises(ImportError, match="valkey-glide-sync is required"): + _get_glide_sync() diff --git a/uv.lock b/uv.lock index 7a96cf5f85..06790fbec1 100755 --- a/uv.lock +++ b/uv.lock @@ -404,6 +404,11 @@ dependencies = [ { name = "xhtml2pdf" }, ] +[package.optional-dependencies] +valkey = [ + { name = "valkey-glide-sync" }, +] + [package.metadata] requires-dist = [ { name = "beautifulsoup4" }, @@ -433,11 +438,13 @@ requires-dist = [ { name = "seaborn", specifier = ">=0.13.2" }, { name = "tenacity" }, { name = "uvicorn" }, + { name = "valkey-glide-sync", marker = "extra == 'valkey'", specifier = ">=2.4" }, { name = "watchfiles" }, { name = "websockets" }, { name = "wsproto" }, { name = "xhtml2pdf", specifier = ">=0.2.17" }, ] +provides-extras = ["valkey"] [[package]] name = "distro" @@ -2105,6 +2112,23 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/83/e4/d04a086285c20886c0daad0e026f250869201013d18f81d9ff5eada73a88/uvicorn-0.41.0-py3-none-any.whl", hash = "sha256:29e35b1d2c36a04b9e180d4007ede3bcb32a85fbdfd6c6aeb3f26839de088187", size = 68783, upload-time = "2026-02-16T23:07:22.357Z" }, ] +[[package]] +name = "valkey-glide-sync" +version = "2.4.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "cffi" }, + { name = "protobuf" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/e0/12/4ce252ae24fcd906eb18da4ef2c95b6d1ba8438b1a83674f2d3ea240a009/valkey_glide_sync-2.4.1.tar.gz", hash = "sha256:a579f0565ffa5a7479cb1a71d03a2deab43d6658ce4b255cd543d023ade3bce8", size = 749458, upload-time = "2026-05-28T21:43:09.764Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/de/63/b5e6730d27a1729153b54c758c83f7e8415c87dd4eb9c642e1d02234c8ec/valkey_glide_sync-2.4.1-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:239b3995fcac6d1f8a3a37566ffd921fa6cae9b90599d04084016765a2de4c90", size = 5677734, upload-time = "2026-05-28T21:42:28.398Z" }, + { url = "https://files.pythonhosted.org/packages/d1/d7/c9432abe4b890620414f406868cd11f0ff4c60f981744afbe8e0f4fcea99/valkey_glide_sync-2.4.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:ddaf805c09213a2ae3024195282a2661629d3a9672b1aee0bf1b7a5f273a3c82", size = 5250464, upload-time = "2026-05-28T21:42:29.894Z" }, + { url = "https://files.pythonhosted.org/packages/ba/12/419f910dabb9de68b9f7e674f847446d933e7da8aa3ee5c15f6e14ed2962/valkey_glide_sync-2.4.1-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:51e7320bd24bf9a22b1b24e6b0ead0c2ad6948462b5e3758c73ca6a5b2098912", size = 5804212, upload-time = "2026-05-28T21:42:31.212Z" }, + { url = "https://files.pythonhosted.org/packages/00/cd/e2125ada427ab297ea6e2845aaaba1ac07fc452a8ec83beeba0227f1cfb3/valkey_glide_sync-2.4.1-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:3a469c5aea965f7135ecd3a66ab0af601b965475f668ea362a7caa2c2c52364f", size = 6079069, upload-time = "2026-05-28T21:42:32.696Z" }, +] + [[package]] name = "watchfiles" version = "1.1.1" From b510d70f741c213ebdea2637bc65953b62bd0212 Mon Sep 17 00:00:00 2001 From: Anna Tao Date: Mon, 8 Jun 2026 08:37:31 -0700 Subject: [PATCH 3/5] Adds missing test coverage for fields and logic introduced in PR #3 (ValkeyMemory implementation). (#4) --- tests/test_valkey_memory.py | 110 ++++++++++++++++++++++++++++++++++-- 1 file changed, 105 insertions(+), 5 deletions(-) diff --git a/tests/test_valkey_memory.py b/tests/test_valkey_memory.py index 106c5b2d2c..9dcf27cf7f 100644 --- a/tests/test_valkey_memory.py +++ b/tests/test_valkey_memory.py @@ -25,8 +25,10 @@ def test_minimal_config(self): cfg = ValkeyMemoryConfig.from_dict({}, path="test") assert cfg.host == "localhost" assert cfg.port == 6379 + assert cfg.username is None assert cfg.password is None assert cfg.db == 0 + assert cfg.use_tls is False assert cfg.index_name == "memory_index" assert cfg.key_prefix == "memory:" assert cfg.ttl_seconds is None @@ -37,8 +39,10 @@ def test_full_config(self): data = { "host": "valkey.internal", "port": 6380, + "username": "admin", "password": "secret", "db": 2, + "use_tls": True, "index_name": "chatdev_memory", "key_prefix": "agent:", "ttl_seconds": 86400, @@ -50,8 +54,10 @@ def test_full_config(self): cfg = ValkeyMemoryConfig.from_dict(data, path="test") assert cfg.host == "valkey.internal" assert cfg.port == 6380 + assert cfg.username == "admin" assert cfg.password == "secret" assert cfg.db == 2 + assert cfg.use_tls is True assert cfg.index_name == "chatdev_memory" assert cfg.key_prefix == "agent:" assert cfg.ttl_seconds == 86400 @@ -122,7 +128,7 @@ class TestValkeyMemoryConfigFieldSpecs: def test_field_specs_defined(self): """All expected fields have specs.""" specs = ValkeyMemoryConfig.field_specs() - expected_fields = {"host", "port", "password", "db", "index_name", "key_prefix", "ttl_seconds", "embedding"} + expected_fields = {"host", "port", "username", "password", "db", "use_tls", "index_name", "key_prefix", "ttl_seconds", "embedding"} assert expected_fields.issubset(set(specs.keys())) def test_host_not_required(self): @@ -240,6 +246,9 @@ def _make_store( port=6379, index_name="chatdev_memory", ttl_seconds=None, + username=None, + password=None, + use_tls=False, ): """Build a minimal MemoryStoreConfig mock for ValkeyMemory.""" valkey_cfg = MagicMock(spec=ValkeyMemoryConfig) @@ -248,9 +257,10 @@ def _make_store( valkey_cfg.index_name = index_name valkey_cfg.ttl_seconds = ttl_seconds valkey_cfg.key_prefix = "memory:" - valkey_cfg.username = None - valkey_cfg.password = None + valkey_cfg.username = username + valkey_cfg.password = password valkey_cfg.db = 0 + valkey_cfg.use_tls = use_tls valkey_cfg.embedding = MagicMock() # non-None so embedding branch is taken store = MagicMock() @@ -265,9 +275,9 @@ def _as_config_side_effect(expected_type, **kwargs): return store, valkey_cfg -def _make_valkey_memory(host="localhost", port=6379, ttl_seconds=None): +def _make_valkey_memory(host="localhost", port=6379, ttl_seconds=None, username=None, password=None, use_tls=False): """Create a ValkeyMemory with mocked glide_sync client and embedding.""" - store, valkey_cfg = _make_store(host=host, port=port, ttl_seconds=ttl_seconds) + store, valkey_cfg = _make_store(host=host, port=port, ttl_seconds=ttl_seconds, username=username, password=password, use_tls=use_tls) mock_client = MagicMock() mock_embedding = MagicMock() @@ -357,6 +367,96 @@ def test_raises_on_wrong_config_type(self): ValkeyMemory(store) +# --------------------------------------------------------------------------- +# Connection options (TLS, username) +# --------------------------------------------------------------------------- + +class TestValkeyMemoryConnectionOptions: + + def test_use_tls_passed_to_make_client(self): + """use_tls=True is forwarded to _make_client.""" + store, _ = _make_store(use_tls=True, password="secret", username="admin") + mock_client = MagicMock() + mock_embedding = MagicMock() + mock_embedding.get_embedding.return_value = [0.1, 0.2, 0.3] + mock_glide_module = MagicMock() + mock_glide_module.ft.create.return_value = "OK" + + with patch("runtime.node.agent.memory.valkey_memory._get_glide_sync") as mock_get_glide, \ + patch("runtime.node.agent.memory.valkey_memory._make_client") as mock_make_client, \ + patch("runtime.node.agent.memory.valkey_memory.EmbeddingFactory") as mock_factory: + mock_get_glide.return_value = mock_glide_module + mock_make_client.return_value = mock_client + mock_factory.create_embedding.return_value = mock_embedding + + from runtime.node.agent.memory.valkey_memory import ValkeyMemory + ValkeyMemory(store) + + mock_make_client.assert_called_once_with( + "localhost", 6379, "admin", "secret", 0, True + ) + + def test_username_passed_to_make_client(self): + """username is forwarded to _make_client when provided.""" + store, _ = _make_store(username="myuser", password="mypass") + mock_client = MagicMock() + mock_embedding = MagicMock() + mock_embedding.get_embedding.return_value = [0.1, 0.2, 0.3] + mock_glide_module = MagicMock() + mock_glide_module.ft.create.return_value = "OK" + + with patch("runtime.node.agent.memory.valkey_memory._get_glide_sync") as mock_get_glide, \ + patch("runtime.node.agent.memory.valkey_memory._make_client") as mock_make_client, \ + patch("runtime.node.agent.memory.valkey_memory.EmbeddingFactory") as mock_factory: + mock_get_glide.return_value = mock_glide_module + mock_make_client.return_value = mock_client + mock_factory.create_embedding.return_value = mock_embedding + + from runtime.node.agent.memory.valkey_memory import ValkeyMemory + ValkeyMemory(store) + + call_args = mock_make_client.call_args[0] + assert call_args[2] == "myuser" # username + assert call_args[3] == "mypass" # password + + +# --------------------------------------------------------------------------- +# Tag sanitization +# --------------------------------------------------------------------------- + +class TestValkeyMemorySanitizeTag: + + def test_sanitize_tag_removes_commas(self): + """Commas in agent_role are replaced with underscores.""" + from runtime.node.agent.memory.valkey_memory import ValkeyMemory + assert ValkeyMemory._sanitize_tag("role,with,commas") == "role_with_commas" + + def test_sanitize_tag_removes_braces(self): + """Braces are replaced.""" + from runtime.node.agent.memory.valkey_memory import ValkeyMemory + assert ValkeyMemory._sanitize_tag("{admin}") == "_admin_" + + def test_sanitize_tag_removes_spaces(self): + """Spaces are replaced.""" + from runtime.node.agent.memory.valkey_memory import ValkeyMemory + assert ValkeyMemory._sanitize_tag("my role") == "my_role" + + def test_sanitize_tag_removes_pipes(self): + """Pipes are replaced.""" + from runtime.node.agent.memory.valkey_memory import ValkeyMemory + assert ValkeyMemory._sanitize_tag("a|b") == "a_b" + + def test_sanitize_tag_empty_string(self): + """Empty string returns empty.""" + from runtime.node.agent.memory.valkey_memory import ValkeyMemory + assert ValkeyMemory._sanitize_tag("") == "" + + def test_sanitize_tag_clean_input_unchanged(self): + """Clean input passes through unchanged.""" + from runtime.node.agent.memory.valkey_memory import ValkeyMemory + assert ValkeyMemory._sanitize_tag("coder") == "coder" + + # --------------------------------------------------------------------------- # Update # --------------------------------------------------------------------------- From ea2d1c685f8026fe25854b90ef48b05aa20d6141 Mon Sep 17 00:00:00 2001 From: Edward Liang Date: Mon, 8 Jun 2026 08:53:01 -0700 Subject: [PATCH 4/5] fix: docs and missing field Signed-off-by: Edward Liang --- docs/user_guide/en/modules/memory.md | 53 ++++++++++++++++++++++++++++ docs/user_guide/zh/modules/memory.md | 1 + 2 files changed, 54 insertions(+) diff --git a/docs/user_guide/en/modules/memory.md b/docs/user_guide/en/modules/memory.md index d0e66ab788..f1f892e452 100755 --- a/docs/user_guide/en/modules/memory.md +++ b/docs/user_guide/en/modules/memory.md @@ -42,6 +42,22 @@ memory: agent_id: my-agent ``` +### Valkey Memory Config +```yaml +memory: + - name: chatdev_memory + type: valkey + config: + host: localhost + port: 6379 + index_name: chatdev_memory + ttl_seconds: 86400 + embedding: + provider: openai + model: text-embedding-3-small + api_key: ${API_KEY} +``` + ## 3. Built-in Store Comparison | Type | Path | Highlights | Best for | | --- | --- | --- | --- | @@ -49,6 +65,7 @@ memory: | `file` | `node/agent/memory/file_memory.py` | Chunks files/dirs into a vector index, read-only, auto rebuilds when files change. | Knowledge bases, doc QA. | | `blackboard` | `node/agent/memory/blackboard_memory.py` | Lightweight append-only log trimmed by time/count; no vector search. | Broadcast boards, pipeline debugging. | | `mem0` | `node/agent/memory/mem0_memory.py` | Cloud-managed by Mem0; semantic search + graph relationships; no local embeddings or persistence needed. Requires `mem0ai` package. | Production memory, cross-session persistence, multi-agent memory sharing. | +| `valkey` | `node/agent/memory/valkey_memory.py` | HNSW vector index via Valkey Search; server-side persistence, cross-process sharing, TTL auto-expiry. Requires `valkey-glide-sync` package. | Self-hosted persistent memory, multi-process deployments, privacy-sensitive environments. | All stores register through `register_memory_store()` so summaries show up in UI via `MemoryStoreConfig.field_specs()`. @@ -117,6 +134,42 @@ This schema lets multimodal outputs flow into Memory/Thinking modules without ex - **Persistence** – Fully cloud-managed. `load()` and `save()` are no-ops. Memories persist across runs and sessions automatically. - **Dependencies** – Requires `mem0ai` package (`pip install mem0ai`). +### 5.5 ValkeyMemory +- **Config** – Specify connection parameters and index settings via `ValkeyMemoryConfig`. + ```yaml + memory: + - name: chatdev_memory + type: valkey + config: + host: localhost + port: 6379 + index_name: chatdev_memory + ttl_seconds: 86400 + embedding: + provider: openai + model: text-embedding-3-small + api_key: ${API_KEY} + ``` + | Field | Description | Default | + | --- | --- | --- | + | `host` | Valkey server address | `localhost` | + | `port` | Port (1-65535) | `6379` | + | `username` | ACL username | `None` | + | `password` | Auth password | `None` | + | `db` | Database index (0-15) | `0` | + | `use_tls` | Enable TLS encryption for the connection | `false` | + | `index_name` | FT index name | `memory_index` | + | `key_prefix` | Hash key prefix | `memory:` | + | `ttl_seconds` | Memory expiry in seconds; `None` means never expire | `None` | + | `embedding` | Nested `EmbeddingConfig` (required) | — | +- **Index creation** – On first instantiation, automatically calls `FT.CREATE` to build an HNSW vector index (COSINE distance) with `content_summary` (TEXT), `agent_role` (TAG), `timestamp` (NUMERIC), and `embedding` (VECTOR) fields. Silently skips if the index already exists. +- **Retrieval** – Executes a KNN query via `FT.SEARCH`, returns top-k results sorted by cosine similarity. Low-relevance results are filtered by `similarity_threshold`. +- **Write** – `update()` encodes input text into a float32 vector and stores it as a Valkey Hash (`HSET`). When `ttl_seconds` is configured, `EXPIRE` is called to set the TTL. +- **Persistence** – Managed by the Valkey server. `load()` and `save()` are no-ops. Data survives process restarts, and multiple processes can concurrently read/write the same index. +- **Error handling** – If Valkey does not have the Search module loaded, initialization raises a `RuntimeError` with installation guidance. +- **Dependencies** – Requires `valkey-glide-sync` package (`pip install 'chatdev[valkey]'`). +- **Best for** – Self-hosted deployments needing persistence, cross-process sharing, and automatic expiry; a drop-in alternative to Mem0 when cloud services are not acceptable for privacy reasons. + ## 6. EmbeddingConfig Notes - Fields: `provider`, `model`, `api_key`, `base_url`, `params`. - `provider=openai` uses the official client; override `base_url` for compatibility layers. diff --git a/docs/user_guide/zh/modules/memory.md b/docs/user_guide/zh/modules/memory.md index baf9a9f69c..f9a6250e77 100755 --- a/docs/user_guide/zh/modules/memory.md +++ b/docs/user_guide/zh/modules/memory.md @@ -159,6 +159,7 @@ nodes: | `username` | ACL 用户名 | `None` | | `password` | 认证密码 | `None` | | `db` | 数据库索引(0-15) | `0` | + | `use_tls` | 启用 TLS 加密连接 | `false` | | `index_name` | FT 索引名称 | `memory_index` | | `key_prefix` | Hash key 前缀 | `memory:` | | `ttl_seconds` | 记忆过期时间(秒),`None` 表示永不过期 | `None` | From 151d2eae5ef7e089c84a2b3a6ec96e6f4810eea5 Mon Sep 17 00:00:00 2001 From: Anna Tao Date: Mon, 8 Jun 2026 13:05:04 -0700 Subject: [PATCH 5/5] feat: set CLIENT SETNAME to chatdev_memory_client for server-side identification (#5) --- runtime/node/agent/memory/valkey_memory.py | 1 + tests/test_valkey_memory.py | 14 ++++++++++++++ 2 files changed, 15 insertions(+) diff --git a/runtime/node/agent/memory/valkey_memory.py b/runtime/node/agent/memory/valkey_memory.py index 2ab6eed641..1267b2921e 100644 --- a/runtime/node/agent/memory/valkey_memory.py +++ b/runtime/node/agent/memory/valkey_memory.py @@ -40,6 +40,7 @@ def _make_client(host: str, port: int, username: str | None = None, password: st addresses=[glide_sync.NodeAddress(host, port)], use_tls=use_tls, credentials=credentials, + client_name="chatdev_memory_client", database_id=db if db != 0 else None, ) return glide_sync.GlideClient.create(config) diff --git a/tests/test_valkey_memory.py b/tests/test_valkey_memory.py index 9dcf27cf7f..5e813b70ef 100644 --- a/tests/test_valkey_memory.py +++ b/tests/test_valkey_memory.py @@ -419,6 +419,20 @@ def test_username_passed_to_make_client(self): assert call_args[2] == "myuser" # username assert call_args[3] == "mypass" # password + def test_client_name_set_in_configuration(self): + """_make_client sets client_name='chatdev_memory_client' for server-side identification.""" + from runtime.node.agent.memory.valkey_memory import _make_client + + mock_glide_module = MagicMock() + + with patch("runtime.node.agent.memory.valkey_memory._get_glide_sync") as mock_get_glide: + mock_get_glide.return_value = mock_glide_module + _make_client("localhost", 6379) + + # Verify GlideClientConfiguration was called with client_name + config_call = mock_glide_module.GlideClientConfiguration.call_args + assert config_call[1]["client_name"] == "chatdev_memory_client" + # --------------------------------------------------------------------------- # Tag sanitization