[python] Reject non-checkpoint-stable Python memory values at set()#839
Merged
Conversation
Python memory values cross the Pemja boundary into Flink state. Values Pemja cannot materialize into native JVM types (Pydantic models, UUID, str/int Enums, custom classes, tuple/set, non-str dict keys) are stored as stale PyObject wrappers and SIGSEGV in JcpPyObject_FromJObject on checkpoint restore. Add validate_memory_value() and call it from both LocalMemoryObject.set and FlinkMemoryObject.set, rejecting any value not recursively composed of None/bool/int/float/str/list/dict[str, ...] with a clear, actionable TypeError. Exact-type matching (not isinstance) is required so a str/int Enum cannot slip through. Conform the existing call sites that stored non-stable values: drop the set/custom-class stores in the memory unit tests, and materialize the Pydantic payloads in the workflow and flink-integration e2e agents (model_dump on write, model_validate on read). This follows apache#828 (built-in tool-context normalization) and is forward-looking: it does not migrate pre-fix checkpoints. This closes part of apache#723.
3 tasks
This was referenced Jun 13, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Linked issue: #723
Purpose of change
Python memory values cross the Pemja boundary into Flink state when written via
MemoryObject.set(). Values Pemja cannot materialize into native JVM types — Pydantic models,uuid.UUID,str/intEnums, custom classes,tuple/set, dicts with non-strkeys — are stored as stalePyObjectwrappers and SIGSEGV inJcpPyObject_FromJObjecton state restore.This adds a
set()-time guard that rejects such values early with a clearTypeError(naming the offending location and a conversion), instead of letting them fail on restore. It follows #828 (built-in tool-context normalization) and covers arbitrary user values. Forward-looking: it does not migrate pre-fix checkpoints.The accepted contract is recursive and exact-typed:
None | bool | int | float | str | list[...] | dict[str, ...]. Exact-type (notisinstance) is required so astr/intEnum— which passesisinstance(x, str)yet is still Pemja-wrapped — cannot slip through. The same guard runs in bothLocalMemoryObject.setandFlinkMemoryObject.set, so local execution fails the same way production would.The memory unit tests no longer store a
set/custom class, and theworkflow/flink-integratione2e agents now materialize their Pydantic payloads (model_dump(mode="json")on write,model_validateon read) — they stored models on the real Flink path and were latent instances of this bug.bytesis intentionally not yet accepted: the Python→Javabytesconversion through Pemja is unverified, and wrongly accepting an unsafe value would defeat the guard. A follow-up will verify it. The Python value-contract docs are a separate follow-up PR (disjoint file).Tests
New
runtime/tests/test_memory_value_validation.pycovers the accepted types (incl. nestedlist/dict) and every rejection, plus thatFlinkMemoryObject.setraises a rawTypeError(notMemoryObjectError). A true checkpoint-restore test can't run on the MiniCluster (in-place recovery doesn't recreate the JVM, so the Pemja path isn't crossed); the unit tests assert the accept/reject classification as the proxy, same as #828.uv run --no-sync pytest flink_agents/runtime/tests flink_agents/plan/tests/actions -k "not e2e"passes;ruff checkis clean.API
Adds
validate_memory_value(path, value)inflink_agents.api.memory_object.MemoryObject.set()now raisesTypeErrorfor values that are not recursively checkpoint-stable; previously they were accepted and failed only on restore.Documentation
doc-neededdoc-not-neededdoc-included