Add store-backed FlowStore to datasourcex-util#36
Open
rossdanderson wants to merge 8 commits into
Open
Conversation
A map whose entries are backed by an external store with a bounded in-memory hot set, read-through on miss, and changes exposed as a coroutine Flow of versioned deltas.
rossdanderson
commented
Jun 2, 2026
rossdanderson
commented
Jun 2, 2026
rossdanderson
commented
Jun 2, 2026
rossdanderson
commented
Jun 2, 2026
rossdanderson
commented
Jun 2, 2026
rossdanderson
commented
Jun 2, 2026
rossdanderson
commented
Jun 2, 2026
rossdanderson
commented
Jun 2, 2026
rossdanderson
commented
Jun 2, 2026
rossdanderson
commented
Jun 2, 2026
rossdanderson
commented
Jun 2, 2026
rossdanderson
commented
Jun 2, 2026
rossdanderson
commented
Jun 2, 2026
Source versions from the writer (DB), not an app-side VersionSource: the version is now commit-ordered and durable, fixing same-key write ordering and removing the in-memory counter that reset on restart. CacheWriter.write/delete return the assigned version; VersionSource and its implementations are deleted. - All cache writes are version-gated via Caffeine's per-key atomic compute (lock-free), so a read-through, owner commit, or out-of-order delta cannot clobber a newer entry. Owner commits emit the delta before refreshing the cache to close a cancellation gap. - Add tombstones (CacheEntry: Live/Tombstone) so a removal cannot be resurrected by a stale older read-through. - Bound the signal SharedFlow buffer (default 256, tunable) instead of Int.MAX_VALUE. - Reject null/non-integral version in both Jackson deserializers; document the String/JSON-native key/value round-trip limitation. - AutoCommitTxContext is one-shot (reuse throws; rollback-after-commit no-ops). - deleteAll versions per key; document loadAll/loadAllKeys defaults; delegate the CacheLoaderWriter factory overload. - De-flake FlowStoreTest (subscription synchronisation instead of delays); add tombstone, version-parsing, and tx-guard coverage.
- Centralize the VersionedMapEvent -> CacheEntry mapping in one toEntry() helper instead of restating it across the owner and consumer paths. - Add AbstractFlowStore.cacheReflectIfNewer and collapse the consumer's hand-rolled version-gated computeIfPresent onto it. - Drop the redundant loader param from MutableFlowStoreImpl (always the writer). - Rewrite KDoc/sample docs to describe behaviour without rejected-alternative framing, and drop stale references to the removed delta channel.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Adds
FlowStoretodatasourcex-util: a map whose entries are backed by an external store instead of being held entirely in memory.Flowof versioned deltas.MutableFlowStore(owner): each mutation takes the caller's transaction handle (a jOOQConfiguration, a JDBCConnection, …), adapted viaTxContext, and enlists the write on it. On commit the store refreshes its cache and emits the delta; on rollback nothing is published.FlowStore(consumer): follows a delta stream and reads through on a miss, staying coherent via per-key versions. Removals leave a tombstone so a stale read-through cannot resurrect a deleted key.CacheLoader/CacheWriter/CacheLoaderWriter. The store assigns and returns each write's version (a DB sequence, identity, or version column), so the version is the durable commit order and survives restarts.VersionedMapEventwith Jackson 2, Jackson 3, and Fory serializers for distribution.StoreSamples) showing aMutableFlowStoreenlisted on a blocking jOOQ transaction, with aTransactionListenerthat runs the buffered publish on commit/rollback so callers write plaindsl.transaction { … }.Intended for values that are aggregate roots — each key is written and published as a whole.