[FLINK-39949][state/forst] Implement full snapshot restore for ForSt state backend#28472
Open
jubins wants to merge 2 commits into
Open
[FLINK-39949][state/forst] Implement full snapshot restore for ForSt state backend#28472jubins wants to merge 2 commits into
jubins wants to merge 2 commits into
Conversation
Replace the previous UnsupportedOperationException in ForStSyncKeyedStateBackendBuilder with a concrete ForStFullRestoreOperation construction (add import). This enables restoring From KeyGroupsStateHandle (canonical/full snapshots) into the ForSt sync backend. Add ForStFullRestoreOperationTest with two tests that write state using HashMapStateBackend (producing KeyGroupsStateHandle) and verify end-to-end restore into ForStStateBackend configured with ForStDB timers, including multi-key-group distribution checks.
Collaborator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What is the purpose of the change
Fixes FLINK-39949 — The ForSt state backend's
getForStRestoreOperation()method throwsUnsupportedOperationException("Not support restoring yet for ForStStateBackend")when restoring from a canonical savepoint withPriorityQueueStateType.ForStDB(native timers), making it impossible to restore jobs that use the ForSt backend with native priority queues from full/canonical savepoints.The
getForStRestoreOperation()method inForStKeyedStateBackendBuilderhandles three restore paths: incremental handles, full handles with heap timers, and full handles with native ForSt timers. The third branch was left as an unimplemented stub. This PR implements it by introducingForStFullRestoreOperation, which uses the existingFullSnapshotRestoreOperation+ForStDBWriteBatchWrapperpattern already established byForStHeapTimersFullRestoreOperation, mirroring the equivalentRocksDBFullRestoreOperationin the RocksDB state backend.Brief change log
ForStFullRestoreOperation— a new restore operation class that reads key-group data from a full/canonical savepoint and replays it into ForSt column families viaForStDBWriteBatchWrapperForStFullRestoreOperationintoForStKeyedStateBackendBuilder.getForStRestoreOperation()as the fallback branch (full handle +PriorityQueueStateType.ForStDB), replacing the previousUnsupportedOperationExceptionForStFullRestoreOperationTestcovering: successful restore of state values from a full snapshot, and correct handling of multiple key groupsVerifying this change
This change is covered by new unit tests in
ForStFullRestoreOperationTest:testRestoreValueStateFromFullSnapshot()— Verifies that state written to a ForSt backend and snapshotted as a canonical savepoint (KeyGroupsStateHandle) can be fully restored into a new backend instance with correct valuestestRestoreAcrossMultipleKeyGroups()— Validates that key-group data spanning multiple groups is correctly distributed and readable after restoreThe implementation follows the same pattern established by
ForStHeapTimersFullRestoreOperation(for heap timers) andRocksDBFullRestoreOperation(the RocksDB equivalent), ensuring consistency across the codebase.Does this pull request potentially affect one of the following parts
@Public(Evolving): noDocumentation
Was generative AI tooling used to co-author this PR?
Generated-by: Claude Opus 4.8