diff --git a/util/README.md b/util/README.md index 5748175..1039eaa 100644 --- a/util/README.md +++ b/util/README.md @@ -3,4 +3,69 @@ Utility classes and functions commonly used in DataSource integrations, such as efficiently observable Maps, and stream transformations. -@see com.caplin.integration.datasourcex.util.flow.FlowMap \ No newline at end of file +## Key components + +### Observable maps and shared flows + +| Component | Description | +| --- | --- | +| `FlowMap` / `MutableFlowMap` | A `Map` that is also observable via `asFlow`, `asFlowWithState`, and per-key `valueFlow`. Built with `mutableFlowMapOf` / `toMutableFlowMap`, or collected from an event stream with `flowMapIn`. | +| `CompletingSharedFlow` / `MutableCompletingSharedFlow` | A `SharedFlow` variant that also propagates completion and error events to subscribers. Built with `shareInCompleting`. | +| `SharedFlowCache` | Keyed cache of `SharedFlow`s, sharing one upstream collection per key and evicting a key once its upstream ends so the cache can't grow unbounded. Built with `sharedFlowCache(...)`, or `completingSharedFlowCache(...)` for a `CompletingSharedFlowCache` that propagates completion/errors. | +| `CompletingSharedFlowCache` / `LoadingCompletingSharedFlowCache` | Keyed cache of `CompletingSharedFlow`s, sharing one underlying collection per key. | + +### Stores + +| Component | Description | +| --- | --- | +| `FlowStore` / `MutableFlowStore` | A store-backed map exposing a delta-only stream plus a read-through, Caffeine-bounded cache. Built with `flowStore` / `flowStoreIn` / `mutableFlowStore`. | +| `AsyncFlowStore` / `AsyncMutableFlowStore` | Suspending views of the stores whose reads/writes dispatch the store I/O themselves. | +| `Store` / `StoreReader` / `StoreWriter` | The SPI you implement to back a `MutableFlowStore`; mutations enlist on the caller's transaction and publish on commit. | +| `TxContext` / `Versioned` | The transaction handle a write enlists on, and a value paired with the store-assigned version. | + +### Flow operators + +| Operator | Description | +| --- | --- | +| `bufferingDebounce` | Buffers elements until a quiet period elapses, then emits them as a `List`. | +| `throttleLatest` | Emits at most once per interval, keeping the latest value and dropping older ones. | +| `flatMapFirst` | Applies a function to the first element together with the entire upstream flow. | +| `demultiplexBy` | Groups elements by a key selector and processes each group's sub-flow. | +| `retryWithExponentialBackoff` | Retries the upstream on error with an exponential delay between attempts. | +| `cast` | Casts a `Flow<*>` to a `Flow`. | +| `timeoutFirst` / `timeoutFirstOrNull` / `timeoutFirstOrDefault` | Errors / emits null / emits a default if no first element arrives within a timeout. | +| `materialize` / `dematerialize` (and `*Unboxed`) | Convert between flow values and `ValueOrCompletion` events. | + +### Event models and folds + +| Component | Description | +| --- | --- | +| `MapEvent` / `SimpleMapEvent` / `SetEvent` / `VersionedMapEvent` | Sealed event types describing map and set mutations (with or without old values / versions). | +| `FlowMapStreamEvent` / `ValueOrCompletion` | Materialised stream events: initial-state-plus-deltas, and value-or-terminal-signal. | +| `runningFoldToMap*` / `runningFoldToSet` | Fold a delta stream into a live `Flow` of map / set snapshots. | +| `conflateKeys` / `toEvents` | Collapse per-key updates, and expand a collection stream into entry events. | + +### Serialization + +| Component | Description | +| --- | --- | +| `registerDataSourceSerializers` / `registerPersistentCollectionSerializers` | Register Fory serializers for the event types and persistent collections. | +| `DataSourceModule`, `registerDataSourceModule` / `addDataSourceModule` | Jackson 2 / Jackson 3 modules that serialize the event types without annotations. | +| `Jackson2JsonHandler` / `Jackson3JsonHandler` | `JsonHandler` implementations backed by a Jackson `ObjectMapper`. | + +### DataSource and general utilities + +| Component | Description | +| --- | --- | +| `AntPatternNamespace` | A `Namespace` matching subjects by Ant-style patterns, with path-variable extraction. | +| `SimpleDataSourceFactory` / `SimpleDataSourceConfig` | Build a `DataSource` from a simplified config for tests and examples. | +| `KLogger` | An slf4j `Logger` wrapper with lazily-evaluated message lambdas. | +| `ReadWriteLock` | A non-reentrant suspending read/write lock. | +| `withTimeout` | A coroutine timeout that throws `TimeoutException` rather than a `CancellationException`. | + +@see com.caplin.integration.datasourcex.util.flow.FlowMap +@see com.caplin.integration.datasourcex.util.store.MutableFlowStore + +Participating in an application-owned jOOQ transaction: + +@sample samples.StoreSamples.jooqSample diff --git a/util/api/datasourcex-util.api b/util/api/datasourcex-util.api index cecc302..14874b8 100644 --- a/util/api/datasourcex-util.api +++ b/util/api/datasourcex-util.api @@ -185,9 +185,11 @@ public abstract interface class com/caplin/integration/datasourcex/util/flow/Flo } public final class com/caplin/integration/datasourcex/util/flow/FlowMapKt { + public static final fun flowMapIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun mutableFlowMapOf ()Lcom/caplin/integration/datasourcex/util/flow/MutableFlowMap; public static final fun mutableFlowMapOf ([Lkotlin/Pair;)Lcom/caplin/integration/datasourcex/util/flow/MutableFlowMap; public static final fun runningFoldToMapFlowMapStreamEvent (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; + public static final fun simpleFlowMapIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun simpleToFlowMapIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun toFlowMapIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun toMutableFlowMap (Ljava/util/Map;)Lcom/caplin/integration/datasourcex/util/flow/MutableFlowMap; @@ -371,6 +373,17 @@ public final class com/caplin/integration/datasourcex/util/flow/SetEventKt { public static final fun toEvents (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; } +public final class com/caplin/integration/datasourcex/util/flow/SharedFlowCache { + public final fun get (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/SharedFlow; +} + +public final class com/caplin/integration/datasourcex/util/flow/SharedFlowCacheKt { + public static final fun completingSharedFlowCache (Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;I)Lcom/caplin/integration/datasourcex/util/flow/CompletingSharedFlowCache; + public static synthetic fun completingSharedFlowCache$default (Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;IILjava/lang/Object;)Lcom/caplin/integration/datasourcex/util/flow/CompletingSharedFlowCache; + public static final fun sharedFlowCache (Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;I)Lcom/caplin/integration/datasourcex/util/flow/SharedFlowCache; + public static synthetic fun sharedFlowCache$default (Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;IILjava/lang/Object;)Lcom/caplin/integration/datasourcex/util/flow/SharedFlowCache; +} + public abstract interface class com/caplin/integration/datasourcex/util/flow/SimpleMapEvent { } @@ -464,6 +477,39 @@ public final class com/caplin/integration/datasourcex/util/flow/ValueOrCompletio public static final fun materializeUnboxed (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; } +public abstract interface class com/caplin/integration/datasourcex/util/flow/VersionedMapEvent { + public abstract fun getKey ()Ljava/lang/Object; + public abstract fun getVersion ()J +} + +public final class com/caplin/integration/datasourcex/util/flow/VersionedMapEvent$Removed : com/caplin/integration/datasourcex/util/flow/VersionedMapEvent { + public fun (Ljava/lang/Object;J)V + public final fun component1 ()Ljava/lang/Object; + public final fun component2 ()J + public final fun copy (Ljava/lang/Object;J)Lcom/caplin/integration/datasourcex/util/flow/VersionedMapEvent$Removed; + public static synthetic fun copy$default (Lcom/caplin/integration/datasourcex/util/flow/VersionedMapEvent$Removed;Ljava/lang/Object;JILjava/lang/Object;)Lcom/caplin/integration/datasourcex/util/flow/VersionedMapEvent$Removed; + public fun equals (Ljava/lang/Object;)Z + public fun getKey ()Ljava/lang/Object; + public fun getVersion ()J + public fun hashCode ()I + public fun toString ()Ljava/lang/String; +} + +public final class com/caplin/integration/datasourcex/util/flow/VersionedMapEvent$Upsert : com/caplin/integration/datasourcex/util/flow/VersionedMapEvent { + public fun (Ljava/lang/Object;Ljava/lang/Object;J)V + public final fun component1 ()Ljava/lang/Object; + public final fun component2 ()Ljava/lang/Object; + public final fun component3 ()J + public final fun copy (Ljava/lang/Object;Ljava/lang/Object;J)Lcom/caplin/integration/datasourcex/util/flow/VersionedMapEvent$Upsert; + public static synthetic fun copy$default (Lcom/caplin/integration/datasourcex/util/flow/VersionedMapEvent$Upsert;Ljava/lang/Object;Ljava/lang/Object;JILjava/lang/Object;)Lcom/caplin/integration/datasourcex/util/flow/VersionedMapEvent$Upsert; + public fun equals (Ljava/lang/Object;)Z + public fun getKey ()Ljava/lang/Object; + public final fun getValue ()Ljava/lang/Object; + public fun getVersion ()J + public fun hashCode ()I + public fun toString ()Ljava/lang/String; +} + public final class com/caplin/integration/datasourcex/util/serialization/fory/DataSourceModuleKt { public static final fun registerDataSourceSerializers (Lorg/apache/fory/Fory;Z)Lorg/apache/fory/Fory; public static synthetic fun registerDataSourceSerializers$default (Lorg/apache/fory/Fory;ZILjava/lang/Object;)Lorg/apache/fory/Fory; @@ -518,3 +564,100 @@ public final class com/caplin/integration/datasourcex/util/serialization/jackson public fun toObject (Ltools/jackson/databind/JsonNode;Ljava/lang/Class;)Ljava/lang/Object; } +public abstract interface class com/caplin/integration/datasourcex/util/store/AsyncFlowStore { + public abstract fun asFlow ()Lkotlinx/coroutines/flow/SharedFlow; + public abstract fun asFlow (Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; + public static synthetic fun asFlow$default (Lcom/caplin/integration/datasourcex/util/store/AsyncFlowStore;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow; + public abstract fun get (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract fun valueFlow (Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow; +} + +public final class com/caplin/integration/datasourcex/util/store/AsyncFlowStore$DefaultImpls { + public static synthetic fun asFlow$default (Lcom/caplin/integration/datasourcex/util/store/AsyncFlowStore;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow; +} + +public abstract interface class com/caplin/integration/datasourcex/util/store/AsyncMutableFlowStore : com/caplin/integration/datasourcex/util/store/AsyncFlowStore { + public abstract fun get (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract fun put (Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract fun putAll (Ljava/util/Map;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract fun remove (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; +} + +public abstract interface class com/caplin/integration/datasourcex/util/store/FlowStore { + public abstract fun asFlow ()Lkotlinx/coroutines/flow/SharedFlow; + public abstract fun asFlow (Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; + public static synthetic fun asFlow$default (Lcom/caplin/integration/datasourcex/util/store/FlowStore;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow; + public abstract fun get (Ljava/lang/Object;)Ljava/lang/Object; + public abstract fun getAsync ()Lcom/caplin/integration/datasourcex/util/store/AsyncFlowStore; + public abstract fun valueFlow (Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow; +} + +public final class com/caplin/integration/datasourcex/util/store/FlowStore$DefaultImpls { + public static synthetic fun asFlow$default (Lcom/caplin/integration/datasourcex/util/store/FlowStore;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow; +} + +public final class com/caplin/integration/datasourcex/util/store/FlowStoreKt { + public static final fun flowStore (Lcom/caplin/integration/datasourcex/util/store/StoreReader;Lkotlinx/coroutines/flow/Flow;Lcom/github/benmanes/caffeine/cache/Caffeine;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/CoroutineDispatcher;I)Lcom/caplin/integration/datasourcex/util/store/FlowStore; + public static synthetic fun flowStore$default (Lcom/caplin/integration/datasourcex/util/store/StoreReader;Lkotlinx/coroutines/flow/Flow;Lcom/github/benmanes/caffeine/cache/Caffeine;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/CoroutineDispatcher;IILjava/lang/Object;)Lcom/caplin/integration/datasourcex/util/store/FlowStore; + public static final fun flowStoreIn (Lkotlinx/coroutines/flow/Flow;Lcom/caplin/integration/datasourcex/util/store/StoreReader;Lcom/github/benmanes/caffeine/cache/Caffeine;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/CoroutineDispatcher;I)Lcom/caplin/integration/datasourcex/util/store/FlowStore; + public static synthetic fun flowStoreIn$default (Lkotlinx/coroutines/flow/Flow;Lcom/caplin/integration/datasourcex/util/store/StoreReader;Lcom/github/benmanes/caffeine/cache/Caffeine;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/CoroutineDispatcher;IILjava/lang/Object;)Lcom/caplin/integration/datasourcex/util/store/FlowStore; +} + +public abstract interface class com/caplin/integration/datasourcex/util/store/MutableFlowStore : com/caplin/integration/datasourcex/util/store/FlowStore { + public abstract fun get (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object; + public abstract fun getAsync ()Lcom/caplin/integration/datasourcex/util/store/AsyncMutableFlowStore; + public abstract fun put (Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;)V + public abstract fun putAll (Ljava/util/Map;Ljava/lang/Object;)V + public abstract fun remove (Ljava/lang/Object;Ljava/lang/Object;)V +} + +public final class com/caplin/integration/datasourcex/util/store/MutableFlowStoreKt { + public static final fun mutableFlowStore (Lcom/caplin/integration/datasourcex/util/store/Store;Lcom/github/benmanes/caffeine/cache/Caffeine;Lkotlinx/coroutines/CoroutineDispatcher;Lkotlin/jvm/functions/Function1;)Lcom/caplin/integration/datasourcex/util/store/MutableFlowStore; + public static synthetic fun mutableFlowStore$default (Lcom/caplin/integration/datasourcex/util/store/Store;Lcom/github/benmanes/caffeine/cache/Caffeine;Lkotlinx/coroutines/CoroutineDispatcher;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lcom/caplin/integration/datasourcex/util/store/MutableFlowStore; +} + +public abstract interface class com/caplin/integration/datasourcex/util/store/Store : com/caplin/integration/datasourcex/util/store/StoreReader, com/caplin/integration/datasourcex/util/store/StoreWriter { +} + +public final class com/caplin/integration/datasourcex/util/store/Store$DefaultImpls { + public static fun writeAll (Lcom/caplin/integration/datasourcex/util/store/Store;Ljava/util/Map;Lcom/caplin/integration/datasourcex/util/store/TxContext;)Ljava/util/Map; +} + +public abstract interface class com/caplin/integration/datasourcex/util/store/StoreReader { + public abstract fun load (Ljava/lang/Object;)Lcom/caplin/integration/datasourcex/util/store/Versioned; +} + +public abstract interface class com/caplin/integration/datasourcex/util/store/StoreWriter { + public abstract fun delete (Ljava/lang/Object;Lcom/caplin/integration/datasourcex/util/store/TxContext;)J + public abstract fun load (Ljava/lang/Object;Lcom/caplin/integration/datasourcex/util/store/TxContext;)Lcom/caplin/integration/datasourcex/util/store/Versioned; + public abstract fun write (Ljava/lang/Object;Ljava/lang/Object;Lcom/caplin/integration/datasourcex/util/store/TxContext;)J + public fun writeAll (Ljava/util/Map;Lcom/caplin/integration/datasourcex/util/store/TxContext;)Ljava/util/Map; +} + +public final class com/caplin/integration/datasourcex/util/store/StoreWriter$DefaultImpls { + public static fun writeAll (Lcom/caplin/integration/datasourcex/util/store/StoreWriter;Ljava/util/Map;Lcom/caplin/integration/datasourcex/util/store/TxContext;)Ljava/util/Map; +} + +public abstract interface class com/caplin/integration/datasourcex/util/store/TxContext { + public abstract fun getTransaction ()Ljava/lang/Object; + public abstract fun onCommitEnd (Lkotlin/jvm/functions/Function0;)V + public fun onRollback (Lkotlin/jvm/functions/Function0;)V +} + +public final class com/caplin/integration/datasourcex/util/store/TxContext$DefaultImpls { + public static fun onRollback (Lcom/caplin/integration/datasourcex/util/store/TxContext;Lkotlin/jvm/functions/Function0;)V +} + +public final class com/caplin/integration/datasourcex/util/store/Versioned { + public fun (Ljava/lang/Object;J)V + public final fun component1 ()Ljava/lang/Object; + public final fun component2 ()J + public final fun copy (Ljava/lang/Object;J)Lcom/caplin/integration/datasourcex/util/store/Versioned; + public static synthetic fun copy$default (Lcom/caplin/integration/datasourcex/util/store/Versioned;Ljava/lang/Object;JILjava/lang/Object;)Lcom/caplin/integration/datasourcex/util/store/Versioned; + public fun equals (Ljava/lang/Object;)Z + public final fun getValue ()Ljava/lang/Object; + public final fun getVersion ()J + public fun hashCode ()I + public fun toString ()Ljava/lang/String; +} + diff --git a/util/build.gradle.kts b/util/build.gradle.kts index 0eaf024..7cf0e3a 100644 --- a/util/build.gradle.kts +++ b/util/build.gradle.kts @@ -15,6 +15,7 @@ dependencies { api("org.jetbrains.kotlinx:kotlinx-coroutines-core") api("org.jetbrains.kotlinx:kotlinx-coroutines-core-jvm") api("com.fasterxml.jackson.core:jackson-core") + api("com.github.ben-manes.caffeine:caffeine") implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310") implementation("com.fasterxml.jackson.module:jackson-module-kotlin") implementation(libs.zjsonpatch) @@ -32,12 +33,18 @@ dependencies { compileOnly(libs.fory.core) compileOnly(libs.fory.kotlin) + // Samples (compiled for Dokka only): show MutableFlowStore participating in a blocking jOOQ + // transaction. Off the published runtime classpath; jooq is version-managed by the Spring Boot + // BOM. + samplesImplementation("org.jooq:jooq") + testRuntimeOnly("org.slf4j:slf4j-simple") testImplementation("org.springframework:spring-core") // For testing the RegexPathMatcher testImplementation(libs.turbine) testImplementation(libs.kotest.assertions) testImplementation(libs.kotest.runner) + testImplementation(libs.mockk) testImplementation(libs.fory.core) testImplementation(libs.fory.kotlin) testImplementation(libs.jackson3.databind) @@ -51,4 +58,9 @@ dependencies { jmh { duplicateClassesStrategy.set(DuplicatesStrategy.EXCLUDE) } -dokka { dokkaSourceSets.configureEach { includes.from("README.md") } } +dokka { + dokkaSourceSets.configureEach { + includes.from("README.md") + samples.from(layout.projectDirectory.dir("src/samples/kotlin")) + } +} diff --git a/util/src/jmh/kotlin/com/caplin/integration/datasourcex/util/flow/FlowMapBenchmark.kt b/util/src/jmh/kotlin/com/caplin/integration/datasourcex/util/flow/FlowMapBenchmark.kt index e65d928..4714ce6 100644 --- a/util/src/jmh/kotlin/com/caplin/integration/datasourcex/util/flow/FlowMapBenchmark.kt +++ b/util/src/jmh/kotlin/com/caplin/integration/datasourcex/util/flow/FlowMapBenchmark.kt @@ -192,17 +192,16 @@ open class FlowMapBenchmark { } /** - * Measures the overhead of reconstructing a [FlowMap] from a stream of events using - * [toFlowMapIn]. + * Measures the overhead of reconstructing a [FlowMap] from a stream of events using [flowMapIn]. */ @Benchmark - fun toFlowMapInBenchmark() = runBlocking { + fun flowMapInBenchmark() = runBlocking { val events = flow { repeat(100) { emit(Upsert("key$it", null, it)) } emit(Populated) } val scope = CoroutineScope(Dispatchers.Default) - events.toFlowMapIn(scope) + events.flowMapIn(scope) scope.cancel() } diff --git a/util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/FlowMap.kt b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/FlowMap.kt index c97d6eb..cdee18e 100644 --- a/util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/FlowMap.kt +++ b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/FlowMap.kt @@ -360,7 +360,7 @@ private class FlowMapImpl(initialMap: PersistentMap) : } } -suspend fun Flow>.toFlowMapIn( +suspend fun Flow>.flowMapIn( scope: CoroutineScope ): FlowMap { val flowMap = mutableFlowMapOf() @@ -377,8 +377,8 @@ suspend fun Flow>.toFlowMapIn( return flowMap } -@JvmName("simpleToFlowMapIn") -suspend fun Flow>.toFlowMapIn( +@JvmName("simpleFlowMapIn") +suspend fun Flow>.flowMapIn( scope: CoroutineScope ): FlowMap { val flowMap = mutableFlowMapOf() @@ -394,3 +394,14 @@ suspend fun Flow>.toFlowMapIn( populated.lock() return flowMap } + +@Deprecated("Renamed to flowMapIn", ReplaceWith("this.flowMapIn(scope)")) +suspend fun Flow>.toFlowMapIn( + scope: CoroutineScope +): FlowMap = flowMapIn(scope) + +@JvmName("simpleToFlowMapIn") +@Deprecated("Renamed to flowMapIn", ReplaceWith("this.flowMapIn(scope)")) +suspend fun Flow>.toFlowMapIn( + scope: CoroutineScope +): FlowMap = flowMapIn(scope) diff --git a/util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/SharedFlowCache.kt b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/SharedFlowCache.kt new file mode 100644 index 0000000..99f55e2 --- /dev/null +++ b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/SharedFlowCache.kt @@ -0,0 +1,118 @@ +package com.caplin.integration.datasourcex.util.flow + +import com.caplin.integration.datasourcex.util.flow.ValueOrCompletion.Completion +import java.util.concurrent.ConcurrentHashMap +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.DelicateCoroutinesApi +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.flow.SharingStarted +import kotlinx.coroutines.flow.onCompletion +import kotlinx.coroutines.flow.shareIn +import kotlinx.coroutines.flow.transformWhile +import kotlinx.coroutines.job + +/** + * A cache of [SharedFlow]s keyed by [K], sharing a single upstream collection per key across that + * key's subscribers. Create one with [sharedFlowCache], or [completingSharedFlowCache] for + * completion/error propagation. + * + * A key is created on first [get] and evicted once its upstream ends - it completes or errors, or + * (under a stopping `started` such as [SharingStarted.WhileSubscribed]) its subscribers have all + * gone - so the cache cannot grow without bound. Eviction tears down the key's sharing coroutine. + * + * This exposes a plain [SharedFlow], so subscribers see values only: the upstream's completion is + * not delivered to them, and neither is an unhandled upstream error - it fails the key's own + * sharing coroutine in isolation (the entry is evicted; the cache scope and other keys are + * unaffected) and reaches the scope's `CoroutineExceptionHandler`. Handle errors in the supplier + * (e.g. retry or catch). For completion/error propagation to subscribers, use + * [completingSharedFlowCache]. + * + * [get] resolves the entry lazily, so subscribe to the returned flow promptly: a flow retained + * across its key's eviction will not restart the upstream. + */ +class SharedFlowCache +internal constructor( + private val scope: CoroutineScope, + private val started: SharingStarted, + private val replay: Int, + private val evictWhen: ((V) -> Boolean)? = null, +) { + private val cache = ConcurrentHashMap>() + + /** The shared flow for [key], creating it from [supplier] on first access. */ + operator fun get(key: K, supplier: (K) -> Flow): SharedFlow = + cache.computeIfAbsent(key) { k -> share(k, supplier) } + + private fun share(key: K, supplier: (K) -> Flow): SharedFlow { + val entryScope = + CoroutineScope(scope.coroutineContext + SupervisorJob(scope.coroutineContext.job)) + val evictWhen = evictWhen + lateinit var shared: SharedFlow + val upstream = + if (evictWhen == null) supplier(key) + else + supplier(key).transformWhile { + // Evict before publishing a terminal value, so a re-get racing the completion misses + // and rebuilds rather than re-reading the terminal, then stop the upstream. + val terminal = evictWhen(it) + if (terminal) cache.remove(key, shared) + emit(it) + !terminal + } + shared = + upstream + // Drop the entry (if still ours) and tear the sharing coroutine down once the upstream + // ends: it completed/errored, or `started` cancelled it after the subscribers left. + .onCompletion { + cache.remove(key, shared) + entryScope.cancel() + } + .shareIn(entryScope, started, replay) + return shared + } +} + +/** + * Creates a [SharedFlowCache] - a keyed cache of plain [SharedFlow]s, one shared upstream per key, + * each evicted once its upstream ends (see [SharedFlowCache]). + * + * @param scope parent scope; each key's sharing coroutine is a child of it. + * @param started the [SharingStarted] strategy applied to each key's upstream. + * @param replay number of values replayed to new subscribers of a key. + */ +fun sharedFlowCache( + scope: CoroutineScope, + started: SharingStarted = SharingStarted.WhileSubscribed(), + replay: Int = 0, +): SharedFlowCache = SharedFlowCache(scope, started, replay) + +/** + * Creates a [CompletingSharedFlowCache] backed by a [SharedFlowCache]: each supplier's terminal + * completion/error is materialised into the shared stream and rematerialised per subscriber, so - + * unlike a plain [SharedFlow] - completion and errors propagate downstream. The entry is evicted as + * the terminal is published, so a downstream `retry` re-resolves to a fresh entry (re-running the + * supplier) rather than re-reading the terminal. + * + * Each subscription re-resolves the entry, and the entry is evicted before its terminal is + * published, so a subscriber arriving after completion rebuilds (re-running the supplier) rather + * than attaching to the terminated stream - the terminal is never replayed to a late subscriber, so + * `replay` does not need to cover it. + * + * @param scope parent scope; each key's sharing coroutine is a child of it. + * @param started the [SharingStarted] strategy applied to each key's upstream. + * @param replay number of values replayed to new subscribers of a live (not-yet-completed) entry. + */ +@OptIn(DelicateCoroutinesApi::class) +fun completingSharedFlowCache( + scope: CoroutineScope, + started: SharingStarted = SharingStarted.WhileSubscribed(), + replay: Int = 0, +): CompletingSharedFlowCache { + val cache = SharedFlowCache(scope, started, replay) { it is Completion } + return CompletingSharedFlowCache { key, supplier -> + InternalCompletingSharedFlow { cache.get(key) { k -> supplier(k).materializeUnboxed() } } + } +} diff --git a/util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/VersionedMapEvent.kt b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/VersionedMapEvent.kt new file mode 100644 index 0000000..136bb43 --- /dev/null +++ b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/VersionedMapEvent.kt @@ -0,0 +1,20 @@ +package com.caplin.integration.datasourcex.util.flow + +/** + * A map mutation carrying the [version] it was written at, for distribution to consumers that gate + * on `version >`. Unlike [MapEvent] there is no old value and no `Populated` marker: the stream is + * delta-only and current values are read from the store. + */ +sealed interface VersionedMapEvent { + val key: K + val version: Long + + data class Upsert( + override val key: K, + val value: V, + override val version: Long, + ) : VersionedMapEvent + + data class Removed(override val key: K, override val version: Long) : + VersionedMapEvent +} diff --git a/util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/fory/DataSourceModule.kt b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/fory/DataSourceModule.kt index 8315a2c..20851e4 100644 --- a/util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/fory/DataSourceModule.kt +++ b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/fory/DataSourceModule.kt @@ -5,6 +5,7 @@ import com.caplin.integration.datasourcex.util.flow.MapEvent import com.caplin.integration.datasourcex.util.flow.SetEvent import com.caplin.integration.datasourcex.util.flow.SimpleMapEvent import com.caplin.integration.datasourcex.util.flow.ValueOrCompletion +import com.caplin.integration.datasourcex.util.flow.VersionedMapEvent import org.apache.fory.Fory /** Registers serializers for internal types with the provided [Fory] instance. */ @@ -32,6 +33,7 @@ fun Fory.registerDataSourceSerializers(preserveExceptionTypes: Boolean = false): registerSerializer(MapEvent::class.java, MapEventSerializer::class.java) registerSerializer(SimpleMapEvent::class.java, SimpleMapEventSerializer::class.java) registerSerializer(SetEvent::class.java, SetEventSerializer::class.java) + registerSerializer(VersionedMapEvent::class.java, VersionedMapEventSerializer::class.java) registerSerializer( ValueOrCompletion::class.java, diff --git a/util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/fory/VersionedMapEventSerializer.kt b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/fory/VersionedMapEventSerializer.kt new file mode 100644 index 0000000..bd49059 --- /dev/null +++ b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/fory/VersionedMapEventSerializer.kt @@ -0,0 +1,48 @@ +package com.caplin.integration.datasourcex.util.serialization.fory + +import com.caplin.integration.datasourcex.util.flow.VersionedMapEvent +import org.apache.fory.config.Config +import org.apache.fory.context.ReadContext +import org.apache.fory.context.WriteContext +import org.apache.fory.serializer.Serializer + +internal class VersionedMapEventSerializer(config: Config, type: Class>) : + Serializer>(config, type) { + + private enum class Type { + UPSERT, + REMOVED, + } + + override fun write(writeContext: WriteContext, value: VersionedMapEvent<*, *>) { + when (value) { + is VersionedMapEvent.Upsert -> { + writeContext.writeByte(Type.UPSERT.ordinal.toByte()) + writeContext.writeRef(value.key) + writeContext.writeRef(value.value) + writeContext.writeRef(value.version) + } + is VersionedMapEvent.Removed -> { + writeContext.writeByte(Type.REMOVED.ordinal.toByte()) + writeContext.writeRef(value.key) + writeContext.writeRef(value.version) + } + } + } + + override fun read(readContext: ReadContext): VersionedMapEvent<*, *> { + return when (Type.entries[readContext.readByte().toInt()]) { + Type.UPSERT -> { + val key = readContext.readRef() as Any + val value = readContext.readRef() as Any + val version = readContext.readRef() as Long + VersionedMapEvent.Upsert(key, value, version) + } + Type.REMOVED -> { + val key = readContext.readRef() as Any + val version = readContext.readRef() as Long + VersionedMapEvent.Removed(key, version) + } + } + } +} diff --git a/util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/jackson2/DataSourceModule.kt b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/jackson2/DataSourceModule.kt index d145222..4bff589 100644 --- a/util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/jackson2/DataSourceModule.kt +++ b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/jackson2/DataSourceModule.kt @@ -5,6 +5,7 @@ import com.caplin.integration.datasourcex.util.flow.MapEvent import com.caplin.integration.datasourcex.util.flow.SetEvent import com.caplin.integration.datasourcex.util.flow.SimpleMapEvent import com.caplin.integration.datasourcex.util.flow.ValueOrCompletion +import com.caplin.integration.datasourcex.util.flow.VersionedMapEvent import com.fasterxml.jackson.databind.JsonDeserializer import com.fasterxml.jackson.databind.JsonSerializer import com.fasterxml.jackson.databind.Module @@ -58,6 +59,23 @@ object DataSourceModule : SimpleModule() { simpleMapEventDeserializer as JsonDeserializer>, ) + val versionedMapEventSerializer = VersionedMapEventSerializer() + val versionedMapEventDeserializer = VersionedMapEventDeserializer() + + addSerializer(VersionedMapEvent::class.java, versionedMapEventSerializer) + addDeserializer(VersionedMapEvent::class.java, versionedMapEventDeserializer) + + @Suppress("UNCHECKED_CAST") + addSerializer( + VersionedMapEvent.Upsert::class.java, + versionedMapEventSerializer as JsonSerializer>, + ) + @Suppress("UNCHECKED_CAST") + addSerializer( + VersionedMapEvent.Removed::class.java, + versionedMapEventSerializer as JsonSerializer>, + ) + val setEventSerializer = SetEventSerializer() val setEventDeserializer = SetEventDeserializer() diff --git a/util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/jackson2/VersionedMapEventDeserializer.kt b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/jackson2/VersionedMapEventDeserializer.kt new file mode 100644 index 0000000..bae4a4f --- /dev/null +++ b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/jackson2/VersionedMapEventDeserializer.kt @@ -0,0 +1,42 @@ +package com.caplin.integration.datasourcex.util.serialization.jackson2 + +import com.caplin.integration.datasourcex.util.flow.VersionedMapEvent +import com.fasterxml.jackson.core.JsonParser +import com.fasterxml.jackson.databind.DeserializationContext +import com.fasterxml.jackson.databind.JsonMappingException +import com.fasterxml.jackson.databind.deser.std.StdDeserializer +import com.fasterxml.jackson.databind.node.ObjectNode + +/** + * Keys and values round-trip as JSON-native types only: a String key returns a String, but a + * numeric key comes back as Integer/Long and a structured value as a Map, not its original type. + * Intended for String (or otherwise JSON-native) keys and values. + */ +internal class VersionedMapEventDeserializer : + StdDeserializer>(VersionedMapEvent::class.java) { + override fun deserialize(p: JsonParser, ctxt: DeserializationContext): VersionedMapEvent<*, *> { + val node = p.codec.readTree(p) + val type = + node.get("type")?.asText() + ?: throw JsonMappingException.from(p, "Missing type field for VersionedMapEvent") + val key = + node.get("key")?.let { p.codec.treeToValue(it, Any::class.java) } + ?: throw JsonMappingException.from(p, "Missing key field for VersionedMapEvent") + val version = + node.get("version")?.takeIf { it.isIntegralNumber }?.asLong() + ?: throw JsonMappingException.from( + p, + "Missing or non-integral version field for VersionedMapEvent", + ) + return when (type) { + "upsert" -> { + val value = + node.get("value")?.let { p.codec.treeToValue(it, Any::class.java) } + ?: throw JsonMappingException.from(p, "Missing value field for upsert") + VersionedMapEvent.Upsert(key, value, version) + } + "removed" -> VersionedMapEvent.Removed(key, version) + else -> throw JsonMappingException.from(p, "Unknown VersionedMapEvent type: $type") + } + } +} diff --git a/util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/jackson2/VersionedMapEventSerializer.kt b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/jackson2/VersionedMapEventSerializer.kt new file mode 100644 index 0000000..3b8b997 --- /dev/null +++ b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/jackson2/VersionedMapEventSerializer.kt @@ -0,0 +1,34 @@ +package com.caplin.integration.datasourcex.util.serialization.jackson2 + +import com.caplin.integration.datasourcex.util.flow.VersionedMapEvent +import com.fasterxml.jackson.core.JsonGenerator +import com.fasterxml.jackson.databind.SerializerProvider +import com.fasterxml.jackson.databind.ser.std.StdSerializer + +internal class VersionedMapEventSerializer : + StdSerializer>(VersionedMapEvent::class.java) { + override fun serialize( + value: VersionedMapEvent<*, *>, + gen: JsonGenerator, + provider: SerializerProvider, + ) { + gen.writeStartObject() + when (value) { + is VersionedMapEvent.Upsert -> { + gen.writeStringField("type", "upsert") + gen.writeFieldName("key") + provider.defaultSerializeValue(value.key, gen) + gen.writeFieldName("value") + provider.defaultSerializeValue(value.value, gen) + gen.writeNumberField("version", value.version) + } + is VersionedMapEvent.Removed -> { + gen.writeStringField("type", "removed") + gen.writeFieldName("key") + provider.defaultSerializeValue(value.key, gen) + gen.writeNumberField("version", value.version) + } + } + gen.writeEndObject() + } +} diff --git a/util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/jackson3/DataSourceModule.kt b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/jackson3/DataSourceModule.kt index 794ca83..cd652ea 100644 --- a/util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/jackson3/DataSourceModule.kt +++ b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/jackson3/DataSourceModule.kt @@ -5,6 +5,7 @@ import com.caplin.integration.datasourcex.util.flow.MapEvent import com.caplin.integration.datasourcex.util.flow.SetEvent import com.caplin.integration.datasourcex.util.flow.SimpleMapEvent import com.caplin.integration.datasourcex.util.flow.ValueOrCompletion +import com.caplin.integration.datasourcex.util.flow.VersionedMapEvent import tools.jackson.databind.ValueDeserializer import tools.jackson.databind.ValueSerializer import tools.jackson.databind.json.JsonMapper @@ -64,6 +65,23 @@ object DataSourceModule : SimpleModule() { simpleMapEventDeserializer as ValueDeserializer>, ) + val versionedMapEventSerializer = VersionedMapEventSerializer() + val versionedMapEventDeserializer = VersionedMapEventDeserializer() + + addSerializer(VersionedMapEvent::class.java, versionedMapEventSerializer) + addDeserializer(VersionedMapEvent::class.java, versionedMapEventDeserializer) + + @Suppress("UNCHECKED_CAST") + addSerializer( + VersionedMapEvent.Upsert::class.java, + versionedMapEventSerializer as ValueSerializer>, + ) + @Suppress("UNCHECKED_CAST") + addSerializer( + VersionedMapEvent.Removed::class.java, + versionedMapEventSerializer as ValueSerializer>, + ) + val setEventSerializer = SetEventSerializer() val setEventDeserializer = SetEventDeserializer() diff --git a/util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/jackson3/VersionedMapEventDeserializer.kt b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/jackson3/VersionedMapEventDeserializer.kt new file mode 100644 index 0000000..d2d979d --- /dev/null +++ b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/jackson3/VersionedMapEventDeserializer.kt @@ -0,0 +1,41 @@ +package com.caplin.integration.datasourcex.util.serialization.jackson3 + +import com.caplin.integration.datasourcex.util.flow.VersionedMapEvent +import tools.jackson.core.JsonParser +import tools.jackson.databind.DatabindException +import tools.jackson.databind.DeserializationContext +import tools.jackson.databind.deser.std.StdDeserializer + +/** + * Keys and values round-trip as JSON-native types only: a String key returns a String, but a + * numeric key comes back as Integer/Long and a structured value as a Map, not its original type. + * Intended for String (or otherwise JSON-native) keys and values. + */ +internal class VersionedMapEventDeserializer : + StdDeserializer>(VersionedMapEvent::class.java) { + override fun deserialize(p: JsonParser, ctxt: DeserializationContext): VersionedMapEvent<*, *> { + val node = ctxt.readTree(p) + val type = + node.get("type")?.asString() + ?: throw DatabindException.from(p, "Missing type field for VersionedMapEvent") + val key = + node.get("key")?.let { ctxt.readTreeAsValue(it, Any::class.java) } + ?: throw DatabindException.from(p, "Missing key field for VersionedMapEvent") + val version = + node.get("version")?.takeIf { it.isIntegralNumber }?.asLong() + ?: throw DatabindException.from( + p, + "Missing or non-integral version field for VersionedMapEvent", + ) + return when (type) { + "upsert" -> { + val value = + node.get("value")?.let { ctxt.readTreeAsValue(it, Any::class.java) } + ?: throw DatabindException.from(p, "Missing value field for upsert") + VersionedMapEvent.Upsert(key, value, version) + } + "removed" -> VersionedMapEvent.Removed(key, version) + else -> throw DatabindException.from(p, "Unknown VersionedMapEvent type: $type") + } + } +} diff --git a/util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/jackson3/VersionedMapEventSerializer.kt b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/jackson3/VersionedMapEventSerializer.kt new file mode 100644 index 0000000..e15f932 --- /dev/null +++ b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/jackson3/VersionedMapEventSerializer.kt @@ -0,0 +1,34 @@ +package com.caplin.integration.datasourcex.util.serialization.jackson3 + +import com.caplin.integration.datasourcex.util.flow.VersionedMapEvent +import tools.jackson.core.JsonGenerator +import tools.jackson.databind.SerializationContext +import tools.jackson.databind.ser.std.StdSerializer + +internal class VersionedMapEventSerializer : + StdSerializer>(VersionedMapEvent::class.java) { + override fun serialize( + value: VersionedMapEvent<*, *>, + gen: JsonGenerator, + provider: SerializationContext, + ) { + gen.writeStartObject() + when (value) { + is VersionedMapEvent.Upsert -> { + gen.writeStringProperty("type", "upsert") + gen.writeName("key") + provider.writeValue(gen, value.key) + gen.writeName("value") + provider.writeValue(gen, value.value) + gen.writeNumberProperty("version", value.version) + } + is VersionedMapEvent.Removed -> { + gen.writeStringProperty("type", "removed") + gen.writeName("key") + provider.writeValue(gen, value.key) + gen.writeNumberProperty("version", value.version) + } + } + gen.writeEndObject() + } +} diff --git a/util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/AbstractFlowStore.kt b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/AbstractFlowStore.kt new file mode 100644 index 0000000..55948dc --- /dev/null +++ b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/AbstractFlowStore.kt @@ -0,0 +1,116 @@ +package com.caplin.integration.datasourcex.util.store + +import com.caplin.integration.datasourcex.util.flow.VersionedMapEvent +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.channels.BufferOverflow +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.flow.asSharedFlow +import kotlinx.coroutines.flow.distinctUntilChanged +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.onSubscription +import kotlinx.coroutines.withContext + +internal const val DEFAULT_SIGNAL_BUFFER: Int = 256 + +/** + * Shared core for the store-backed [FlowStore] variants: a delta-only [signal] bus and a versioned + * hot-set [cache] read through to the store on a miss. The blocking read-through load runs on + * [dispatcher]. + */ +internal abstract class AbstractFlowStore( + protected val loader: StoreReader, + protected val cache: FlowStoreCache, + protected val dispatcher: CoroutineDispatcher, + bufferCapacity: Int = DEFAULT_SIGNAL_BUFFER, +) : FlowStore { + + protected val signal = + MutableSharedFlow>( + replay = 0, + extraBufferCapacity = bufferCapacity, + onBufferOverflow = BufferOverflow.SUSPEND, + ) + + override fun asFlow(): SharedFlow> = signal.asSharedFlow() + + override val async: AsyncFlowStore by lazy { AsyncFlowStoreImpl(this) } + + // Blocking on a miss; the caller dispatches it, as with the transactional operations. + override fun get(key: K): V? = readThrough(key)?.valueOrNull() + + private fun readThrough(key: K): CacheEntry? = cache.getOrLoad(key, loader::load) + + // Suspending [get] for the [async] view: a cache hit returns inline; only a miss is dispatched. + internal suspend fun getSuspending(key: K): V? { + cache.getIfPresent(key)?.let { + return it.valueOrNull() + } + return withContext(dispatcher) { cache.getOrLoad(key, loader::load) }?.valueOrNull() + } + + override fun asFlow( + query: () -> Map>, + predicate: (K, V) -> Boolean, + ): Flow> = flow { + val highest = HashMap() // keys in view -> last version emitted + signal + .onSubscription { + val snapshot = withContext(dispatcher) { query() } + snapshot.forEach { (key, v) -> + highest[key] = v.version + this@flow.emit(VersionedMapEvent.Upsert(key, v.value, v.version)) + } + } + .collect { event -> + val baseline = highest[event.key] + if (baseline != null && event.version <= baseline) return@collect + when (event) { + is VersionedMapEvent.Upsert -> + if (predicate(event.key, event.value)) { + highest[event.key] = event.version + this@flow.emit(event) + } else if (baseline != null) { + highest.remove(event.key) + this@flow.emit(VersionedMapEvent.Removed(event.key, event.version)) + } + is VersionedMapEvent.Removed -> + if (baseline != null) { + highest.remove(event.key) + this@flow.emit(event) + } + } + } + } + + override fun valueFlow(key: K): Flow = + flow { + var highest = Long.MIN_VALUE + signal + .onSubscription { + val initial = withContext(dispatcher) { readThrough(key) } + highest = initial?.version ?: Long.MIN_VALUE + this@flow.emit(initial?.valueOrNull()) + } + .collect { event -> + if (event.key == key && event.version > highest) { + highest = event.version + this@flow.emit(event.toEntry().valueOrNull()) + } + } + } + .distinctUntilChanged() +} + +private fun CacheEntry.valueOrNull(): V? = + when (this) { + is Live -> value + is Tombstone -> null + } + +internal fun VersionedMapEvent<*, V>.toEntry(): CacheEntry = + when (this) { + is VersionedMapEvent.Upsert -> Live(value, version) + is VersionedMapEvent.Removed -> Tombstone(version) + } diff --git a/util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/AsyncFlowStore.kt b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/AsyncFlowStore.kt new file mode 100644 index 0000000..a2e0f37 --- /dev/null +++ b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/AsyncFlowStore.kt @@ -0,0 +1,81 @@ +package com.caplin.integration.datasourcex.util.store + +import com.caplin.integration.datasourcex.util.flow.VersionedMapEvent +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.withContext + +/** + * Suspending view of a [FlowStore]: [get] runs the blocking read-through on the store's dispatcher, + * short-circuiting a cache hit inline so only a miss is dispatched. + */ +interface AsyncFlowStore { + fun asFlow(): SharedFlow> + + fun asFlow( + query: () -> Map>, + predicate: (K, V) -> Boolean = { _, _ -> true }, + ): Flow> + + fun valueFlow(key: K): Flow + + suspend fun get(key: K): V? +} + +/** + * Suspending view of a [MutableFlowStore]: each call dispatches its blocking work to the store's + * dispatcher. The mutations take the caller's transaction handle [T], so use this where [T] + * tolerates use from the dispatcher (a transaction managed across threads with serial access); + * where the transaction is driven by a thread-bound, non-suspending callback, use the + * non-suspending [MutableFlowStore] instead. + */ +interface AsyncMutableFlowStore : AsyncFlowStore { + suspend fun get(key: K, tx: T): V? + + suspend fun put(key: K, value: V, tx: T) + + suspend fun putAll(from: Map, tx: T) + + suspend fun remove(key: K, tx: T) +} + +internal class AsyncFlowStoreImpl(private val store: AbstractFlowStore) : + AsyncFlowStore { + override fun asFlow(): SharedFlow> = store.asFlow() + + override fun asFlow( + query: () -> Map>, + predicate: (K, V) -> Boolean, + ): Flow> = store.asFlow(query, predicate) + + override fun valueFlow(key: K): Flow = store.valueFlow(key) + + override suspend fun get(key: K): V? = store.getSuspending(key) +} + +internal class AsyncMutableFlowStoreImpl( + private val store: MutableFlowStoreImpl, + private val dispatcher: CoroutineDispatcher, +) : AsyncMutableFlowStore { + override fun asFlow(): SharedFlow> = store.asFlow() + + override fun asFlow( + query: () -> Map>, + predicate: (K, V) -> Boolean, + ): Flow> = store.asFlow(query, predicate) + + override fun valueFlow(key: K): Flow = store.valueFlow(key) + + override suspend fun get(key: K): V? = store.getSuspending(key) + + override suspend fun get(key: K, tx: T): V? = withContext(dispatcher) { store.get(key, tx) } + + override suspend fun put(key: K, value: V, tx: T) = + withContext(dispatcher) { store.put(key, value, tx) } + + override suspend fun putAll(from: Map, tx: T) = + withContext(dispatcher) { store.putAll(from, tx) } + + override suspend fun remove(key: K, tx: T) = withContext(dispatcher) { store.remove(key, tx) } +} diff --git a/util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/CacheEntry.kt b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/CacheEntry.kt new file mode 100644 index 0000000..90fb15a --- /dev/null +++ b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/CacheEntry.kt @@ -0,0 +1,13 @@ +package com.caplin.integration.datasourcex.util.store + +/** + * A versioned cache entry. [Live] holds a present value; [Tombstone] marks a removed key so a + * stale, older read-through is rejected by version instead of silently repopulating the value. + */ +internal sealed interface CacheEntry { + val version: Long +} + +internal data class Live(val value: V, override val version: Long) : CacheEntry + +internal data class Tombstone(override val version: Long) : CacheEntry diff --git a/util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/FlowStore.kt b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/FlowStore.kt new file mode 100644 index 0000000..4f2891b --- /dev/null +++ b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/FlowStore.kt @@ -0,0 +1,119 @@ +package com.caplin.integration.datasourcex.util.store + +import com.caplin.integration.datasourcex.util.flow.VersionedMapEvent +import com.github.benmanes.caffeine.cache.Caffeine +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.launch + +/** + * Read view of a store-backed map. Exposes the delta stream and per-key access only; there is no + * full-map snapshot, so values are read through to the store on a miss. + * + * The value [V] must be an **aggregate root**: every write replaces a key's value as a whole (no + * field-level or partial updates), so each [VersionedMapEvent.Upsert] carries the full value and a + * miss reads the whole value back. With single-writer-per-key ownership this makes a key's version + * sequence totally ordered. + */ +interface FlowStore { + /** The live, delta-only stream of versioned mutations. */ + fun asFlow(): SharedFlow> + + /** + * A get-and-subscribe view: emits the entries [query] loads now (as [VersionedMapEvent.Upsert]), + * then follows the live stream, version-gated per key so the snapshot and the tail never + * conflict. [query] is the bulk current state (key to value+version); it runs blocking on the + * store's dispatcher and is not written to the cache. [predicate] scopes the live tail to the + * same logical set: a live upsert that matches enters or updates the view, one that stops + * matching leaves it as a [VersionedMapEvent.Removed], and removals forward only for keys in the + * view. [query] and [predicate] must agree (query = current rows matching predicate). + */ + fun asFlow( + query: () -> Map>, + predicate: (K, V) -> Boolean = { _, _ -> true }, + ): Flow> + + /** The latest value for [key], starting from a read-through load then following the stream. */ + fun valueFlow(key: K): Flow + + /** + * A suspending view of this store whose [AsyncFlowStore.get] dispatches the read-through itself. + */ + val async: AsyncFlowStore + + /** + * The latest value for [key]: cache-first, reading through to the store on a miss. The + * read-through is blocking — call it within an IO context, as with the store's writes. Not + * ordered against [asFlow]: a value just published as a delta may not be visible here yet. + */ + fun get(key: K): V? +} + +/** + * Operator form of [flowStore], reading this delta stream as the `inbound` source: `deltas + * .flowStoreIn(reader, caffeine, scope)`. Mirrors `shareIn`/`stateIn` — collection is launched in + * [scope] and the returned [FlowStore] is the read-only consumer. See [flowStore] for the + * lifecycle, caching, and [dispatcher] semantics. + */ +fun Flow>.flowStoreIn( + reader: StoreReader, + caffeine: Caffeine, + scope: CoroutineScope, + dispatcher: CoroutineDispatcher = Dispatchers.IO, + bufferCapacity: Int = DEFAULT_SIGNAL_BUFFER, +): FlowStore = + flowStore( + reader, + this, + caffeine, + scope, + dispatcher, + bufferCapacity, + ) + +/** + * Creates a read-only [FlowStore] consumer fed by an [inbound] delta stream (the owner's published + * mutations) and reading [reader] on a cache miss. The collection of [inbound] is launched in + * [scope], so cancelling [scope] stops it and a failure of [inbound] surfaces through [scope] (its + * parent / exception handler), after which the store serves only stale reads. The hot set is a + * [Caffeine] cache built from [caffeine] (size it to bound memory). The blocking read-through load + * runs on [dispatcher] (IO by default). [V] must be an aggregate root — see [FlowStore]. + */ +fun flowStore( + reader: StoreReader, + inbound: Flow>, + caffeine: Caffeine, + scope: CoroutineScope, + dispatcher: CoroutineDispatcher = Dispatchers.IO, + bufferCapacity: Int = DEFAULT_SIGNAL_BUFFER, +): FlowStore = + FlowStoreImpl( + reader, + caffeine.buildFlowStoreCache(), + inbound, + scope, + dispatcher, + bufferCapacity, + ) + +internal class FlowStoreImpl( + reader: StoreReader, + cache: FlowStoreCache, + inbound: Flow>, + scope: CoroutineScope, + dispatcher: CoroutineDispatcher, + bufferCapacity: Int = DEFAULT_SIGNAL_BUFFER, +) : AbstractFlowStore(reader, cache, dispatcher, bufferCapacity) { + + init { + scope.launch { + inbound.collect { event -> + cache.reflectIfNewer(event) + signal.emit(event) + } + } + } +} diff --git a/util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/FlowStoreCache.kt b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/FlowStoreCache.kt new file mode 100644 index 0000000..7024733 --- /dev/null +++ b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/FlowStoreCache.kt @@ -0,0 +1,55 @@ +package com.caplin.integration.datasourcex.util.store + +import com.caplin.integration.datasourcex.util.flow.VersionedMapEvent +import com.github.benmanes.caffeine.cache.Cache +import com.github.benmanes.caffeine.cache.Caffeine +import com.github.benmanes.caffeine.cache.LoadingCache + +/** + * Builds a [FlowStoreCache] from a caller-supplied [Caffeine] spec. The entry type is internal, so + * the public factories take the untyped builder and stamp the value type here. The value is + * nullable so a read-through miss can cache nothing (Caffeine's compute skips a null result). + */ +@Suppress("UNCHECKED_CAST") +internal fun Caffeine.buildFlowStoreCache(): FlowStoreCache = + FlowStoreCache(build?>()) + +/** + * A synchronous Caffeine [Cache] of versioned [CacheEntry] values that owns the store's version + * gating. A read-through miss loads single-flight under Caffeine's per-key compute; owner writes + * and inbound deltas apply through version-gated [putIfNewer] / [reflectIfNewer]. + */ +internal class FlowStoreCache(private val cache: Cache?>) { + + init { + // The store owns read-through loading; a LoadingCache's loader and refreshAfterWrite would + // write entries that bypass version gating. + require(cache !is LoadingCache<*, *>) { "Pass a plain Cache, not a LoadingCache." } + } + + fun getIfPresent(key: K): CacheEntry? = cache.getIfPresent(key) + + /** + * Cache-first; single-flight loads via [load] on a miss. A null load caches nothing. The atomic + * per-key compute coalesces concurrent misses and serialises loads against deltas for the key. + */ + fun getOrLoad(key: K, load: (K) -> Versioned?): CacheEntry? = + cache.get(key) { load(it)?.let { v -> Live(v.value, v.version) } } + + /** Caches [candidate] unless a newer entry is already resident; returns the resident entry. */ + fun putIfNewer(key: K, candidate: CacheEntry): CacheEntry = + cache.asMap().merge(key, candidate) { old, new -> + if (old.version >= candidate.version) old else new + }!! + + /** + * Applies [event] to the cache, gated on version: a strictly-newer delta updates a resident entry + * or seeds an absent one. Seeding absent keys keeps the cache consistent with the stream the + * consumer has already observed, so a later read-through can never regress to a value older than + * a delta the consumer saw (e.g. a lagging replica read). The [Caffeine] size bound still caps + * the hot set, evicting cold keys as deltas for them arrive. + */ + fun reflectIfNewer(event: VersionedMapEvent) { + putIfNewer(event.key, event.toEntry()) + } +} diff --git a/util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/MutableFlowStore.kt b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/MutableFlowStore.kt new file mode 100644 index 0000000..d848c66 --- /dev/null +++ b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/MutableFlowStore.kt @@ -0,0 +1,108 @@ +package com.caplin.integration.datasourcex.util.store + +import com.caplin.integration.datasourcex.util.flow.VersionedMapEvent +import com.github.benmanes.caffeine.cache.Caffeine +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.Dispatchers + +/** + * Read/write view of a store-backed map. Every mutation takes the caller's transaction handle [T]: + * the write is enlisted on it through [StoreWriter], which assigns the version, and the cache + * update and delta are published only when that transaction commits. + * + * The owner must **serialise writes to a given key** (single-writer-per-key): the version is the + * store's commit order, so unserialised concurrent writes to one key would settle on the wrong + * version. Serialise at the transaction layer — a locking read such as `SELECT … FOR UPDATE` via + * [Store.load], held across the transaction — not an in-process lock, which orders the calls but + * not the commits. [V] must be an aggregate root — see [FlowStore]. + */ +interface MutableFlowStore : FlowStore { + override val async: AsyncMutableFlowStore + + /** + * Reads [key]'s current value within [tx], always through the store and bypassing the cache, so + * it sees this transaction's own uncommitted writes and can take a locking read. Use for + * read-modify-write; use the cache-first [get] outside a transaction. + */ + fun get(key: K, tx: T): V? + + fun put(key: K, value: V, tx: T) + + fun putAll(from: Map, tx: T) + + fun remove(key: K, tx: T) +} + +/** + * Creates a [MutableFlowStore] backed by [store], with a bounded hot set built from [caffeine] + * (size it to bound memory). + * + * Each mutation takes the caller's transaction handle [T], which [txContext] adapts into a + * [TxContext] so the write can enlist on it and register its publish. Mutations are non-suspending + * and run on that transaction; on commit the store refreshes its cache and `tryEmit`s the delta + * onto its stream. The stream's buffer is unbounded so the commit callback never suspends on + * backpressure; a permanently slow consumer therefore grows the buffer without bound (eventually + * OOM) rather than blocking the committer. [store] assigns each write's version; the blocking [get] + * read-through runs on the caller's thread, as the writes do. The suspending [async] operations and + * the [valueFlow] read-through instead run their blocking work on [dispatcher] (IO by default). [V] + * must be an aggregate root — see [FlowStore]. + */ +fun mutableFlowStore( + store: Store, + caffeine: Caffeine, + dispatcher: CoroutineDispatcher = Dispatchers.IO, + txContext: (T) -> TxContext, +): MutableFlowStore = + MutableFlowStoreImpl(store, caffeine.buildFlowStoreCache(), dispatcher, txContext) + +internal class MutableFlowStoreImpl( + private val writer: Store, + cache: FlowStoreCache, + dispatcher: CoroutineDispatcher, + private val txContext: (T) -> TxContext, + // Unbounded buffer so the commit callback's tryEmit always succeeds without suspending: a slow + // consumer grows the buffer, never blocks the committing thread. +) : AbstractFlowStore(writer, cache, dispatcher, Int.MAX_VALUE), MutableFlowStore { + + override val async: AsyncMutableFlowStore by lazy { + AsyncMutableFlowStoreImpl(this, dispatcher) + } + + override fun get(key: K, tx: T): V? = writer.load(key, txContext(tx))?.value + + override fun put(key: K, value: V, tx: T) { + val ctx = txContext(tx) + val version = writer.write(key, value, ctx) + ctx.onCommitEnd { publish(VersionedMapEvent.Upsert(key, value, version)) } + } + + override fun putAll(from: Map, tx: T) { + val ctx = txContext(tx) + val versions = writer.writeAll(from, ctx) + // Build the deltas eagerly, before commit, so a writer that omits a key fails the transaction + // here rather than throwing from the post-commit callback after the writes are durable. + val deltas = + from.map { (key, value) -> + val version = versions[key] ?: error("writeAll returned no version for key $key") + VersionedMapEvent.Upsert(key, value, version) + } + ctx.onCommitEnd { deltas.forEach(::publish) } + } + + override fun remove(key: K, tx: T) { + val ctx = txContext(tx) + val version = writer.delete(key, ctx) + ctx.onCommitEnd { publish(VersionedMapEvent.Removed(key, version)) } + } + + /** + * Updates the cache then emits [event]: a [valueFlow] that subscribes concurrently with the + * commit either reads the already-updated cache in `onSubscription` or receives the delta, never + * losing the update to the gap between the two steps. Both are non-suspending and run inside the + * synchronous commit callback; the unbounded buffer means [signal] never rejects the delta. + */ + private fun publish(event: VersionedMapEvent) { + cache.putIfNewer(event.key, event.toEntry()) + signal.tryEmit(event) + } +} diff --git a/util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/Store.kt b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/Store.kt new file mode 100644 index 0000000..ac0583f --- /dev/null +++ b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/Store.kt @@ -0,0 +1,4 @@ +package com.caplin.integration.datasourcex.util.store + +/** Combines [StoreReader] and [StoreWriter] for a backend that implements both. */ +interface Store : StoreReader, StoreWriter diff --git a/util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/StoreReader.kt b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/StoreReader.kt new file mode 100644 index 0000000..47b74cf --- /dev/null +++ b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/StoreReader.kt @@ -0,0 +1,6 @@ +package com.caplin.integration.datasourcex.util.store + +/** Read half of the store SPI. */ +interface StoreReader { + fun load(key: K): Versioned? +} diff --git a/util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/StoreWriter.kt b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/StoreWriter.kt new file mode 100644 index 0000000..9af866e --- /dev/null +++ b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/StoreWriter.kt @@ -0,0 +1,27 @@ +package com.caplin.integration.datasourcex.util.store + +/** + * Write half of the store SPI. Implementations enlist on [TxContext.transaction] and must not + * commit the transaction themselves. Each write assigns and returns the new version from the + * store's commit order (a sequence, identity, or version column); the caller never supplies it. + */ +interface StoreWriter { + /** + * Reads [key]'s current persisted value within [tx] for read-modify-write, e.g. a locking `SELECT + * … FOR UPDATE` on the transaction's connection. Override it to support [MutableFlowStore.get] + * within a transaction (the default throws); the read should see the transaction's own + * uncommitted writes and serialise concurrent writers. + */ + fun load(key: K, tx: TxContext): Versioned? + + /** Writes [value] for [key] and returns the version the store assigned it. */ + fun write(key: K, value: V, tx: TxContext): Long + + /** Writes many entries, returning the version assigned to each. */ + fun writeAll(values: Map, tx: TxContext): Map = buildMap { + for ((key, value) in values) put(key, write(key, value, tx)) + } + + /** Deletes [key] and returns the version the store assigned the removal. */ + fun delete(key: K, tx: TxContext): Long +} diff --git a/util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/TxContext.kt b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/TxContext.kt new file mode 100644 index 0000000..c70da54 --- /dev/null +++ b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/TxContext.kt @@ -0,0 +1,15 @@ +package com.caplin.integration.datasourcex.util.store + +/** + * A driver-agnostic unit of work. [transaction] is the underlying transaction handle. The owner of + * the transaction begins and commits it; callers only register post-commit side-effects via + * [onCommitEnd] (and optional [onRollback] cleanup), which run inside a synchronous commit/rollback + * callback. + */ +interface TxContext { + val transaction: T + + fun onCommitEnd(action: () -> Unit) + + fun onRollback(action: () -> Unit) {} +} diff --git a/util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/Versioned.kt b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/Versioned.kt new file mode 100644 index 0000000..b625a75 --- /dev/null +++ b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/Versioned.kt @@ -0,0 +1,4 @@ +package com.caplin.integration.datasourcex.util.store + +/** A [value] paired with the monotonically increasing [version] it was written at. */ +data class Versioned(val value: V, val version: Long) diff --git a/util/src/samples/kotlin/samples/StoreSamples.kt b/util/src/samples/kotlin/samples/StoreSamples.kt new file mode 100644 index 0000000..75a714d --- /dev/null +++ b/util/src/samples/kotlin/samples/StoreSamples.kt @@ -0,0 +1,173 @@ +package samples + +import com.caplin.integration.datasourcex.util.flow.VersionedMapEvent +import com.caplin.integration.datasourcex.util.store.FlowStore +import com.caplin.integration.datasourcex.util.store.Store +import com.caplin.integration.datasourcex.util.store.TxContext +import com.caplin.integration.datasourcex.util.store.Versioned +import com.caplin.integration.datasourcex.util.store.mutableFlowStore +import com.github.benmanes.caffeine.cache.Caffeine +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.withContext +import org.jooq.Configuration +import org.jooq.DSLContext +import org.jooq.Record +import org.jooq.TransactionContext +import org.jooq.TransactionListener +import org.jooq.impl.DefaultTransactionListenerProvider +import samples.FlowStorePublishingListener.commitEnd +import samples.FlowStorePublishingListener.rollbackEnd + +class StoreSamples { + + /** The aggregate root stored under each key — replaced as a whole on every write. */ + data class Account(val id: String, val balance: Long) + + /** + * A [Store] over a jOOQ `account` table whose `version` is drawn from a database sequence, so the + * version is the store's durable commit order rather than an in-process counter. + * + * The transactional operations ([write], [delete] and the read-modify-write [load]) run on the + * caller's transaction via `tx.transaction`. The cache-miss read-through [load] has no + * transaction; the store runs its blocking jOOQ call on its IO dispatcher. + */ + class JooqAccountStore(private val dsl: DSLContext) : Store { + + override fun load(key: String): Versioned? = + dsl.fetchOne("select balance, version from account where id = ?", key)?.toVersioned(key) + + /** A locking read on the transaction's connection so a read-modify-write serialises writers. */ + override fun load(key: String, tx: TxContext): Versioned? = + tx.transaction + .dsl() + .fetchOne("select balance, version from account where id = ? for update", key) + ?.toVersioned(key) + + override fun write(key: String, value: Account, tx: TxContext): Long = + tx.transaction + .dsl() + .fetchOne( + """ + insert into account (id, balance, version) + values (?, ?, nextval('account_version')) + on conflict (id) do update + set balance = excluded.balance, version = nextval('account_version') + returning version + """, + key, + value.balance, + )!! + .get("version", Long::class.java) + + override fun delete(key: String, tx: TxContext): Long = + tx.transaction + .dsl() + .fetchOne( + "delete from account where id = ? returning nextval('account_version') as version", + key, + )!! + .get("version", Long::class.java) + + private fun Record.toVersioned(key: String): Versioned = + Versioned(Account(key, get("balance", Long::class.java)), get("version", Long::class.java)) + } + + /** + * Wires a [mutableFlowStore] over a jOOQ `account` table and moves funds between two accounts in + * a single `dsl.transaction { … }`: the locking read-modify-write (`store.get(key, config)`) and + * both writes run on the transaction, and both deltas reach the stream together on commit or + * neither on rollback. [FlowStorePublishingListener], installed once on the [DSLContext], is what + * turns each commit into the delta publish. + */ + suspend fun jooqSample(rootDsl: DSLContext) { + val store = + mutableFlowStore( + JooqAccountStore(rootDsl), + Caffeine.newBuilder().maximumSize(10_000), + txContext = Configuration::asTxContext, + ) + + // Install the publishing listener once on the DSLContext; transactions opened from it run the + // store's buffered commit/rollback actions in their commit/rollback callbacks. + val dsl = + rootDsl + .configuration() + .derive(DefaultTransactionListenerProvider(FlowStorePublishingListener)) + .dsl() + + withContext(Dispatchers.IO) { + dsl.transaction { config -> + val alice = store.get("alice", config) ?: Account("alice", 0) + val bob = store.get("bob", config) ?: Account("bob", 0) + store.put("alice", alice.copy(balance = alice.balance - 10), config) + store.put("bob", bob.copy(balance = bob.balance + 10), config) + } + } + } + + /** + * Snapshot-and-subscribe over the same store: [FlowStore.asFlow] runs a bulk jOOQ query for the + * accounts currently in credit, emits them as the starting state, then follows the live delta + * stream. The predicate keeps the tail scoped to that set, so an account whose balance drops to + * zero leaves the view as a [VersionedMapEvent.Removed]. + */ + fun creditedAccounts( + store: FlowStore, + dsl: DSLContext, + ): Flow> = + store.asFlow( + query = { + dsl.fetch("select id, balance, version from account where balance > 0").associate { row + -> + val id = row.get("id", String::class.java) + id to + Versioned( + Account(id, row.get("balance", Long::class.java)), + row.get("version", Long::class.java), + ) + } + }, + predicate = { _, account -> account.balance > 0 }, + ) +} + +private const val COMMIT_ACTIONS = "datasourcex.flowstore.commit-actions" +private const val ROLLBACK_ACTIONS = "datasourcex.flowstore.rollback-actions" + +/** + * Runs the store's buffered post-commit / rollback actions from within jOOQ's transaction + * lifecycle: [commitEnd] fires the commit actions ([COMMIT_ACTIONS]) and [rollbackEnd] the rollback + * actions ([ROLLBACK_ACTIONS]), each adapted from the transaction's [Configuration] by + * [asTxContext]. + */ +private object FlowStorePublishingListener : TransactionListener { + override fun commitEnd(ctx: TransactionContext) = ctx.configuration().runActions(COMMIT_ACTIONS) + + override fun rollbackEnd(ctx: TransactionContext) = + ctx.configuration().runActions(ROLLBACK_ACTIONS) +} + +/** + * Adapts a jOOQ transaction [Configuration] to a [TxContext], buffering the store's post-commit + * side effects in the transaction-scoped [Configuration.data] so [FlowStorePublishingListener] can + * run them after the commit or discard them on rollback. + */ +private fun Configuration.asTxContext(): TxContext = + object : TxContext { + override val transaction: Configuration = this@asTxContext + + override fun onCommitEnd(action: () -> Unit) { + actions(COMMIT_ACTIONS).add(action) + } + + override fun onRollback(action: () -> Unit) { + actions(ROLLBACK_ACTIONS).add(action) + } + } + +private fun Configuration.runActions(key: String) = actions(key).forEach { it() } + +@Suppress("UNCHECKED_CAST") +private fun Configuration.actions(key: String): MutableList<() -> Unit> = + (data(key) as? MutableList<() -> Unit>) ?: mutableListOf<() -> Unit>().also { data(key, it) } diff --git a/util/src/test/kotlin/com/caplin/integration/datasourcex/util/flow/FlowMapTest.kt b/util/src/test/kotlin/com/caplin/integration/datasourcex/util/flow/FlowMapTest.kt index 23c4ef2..cdbe14a 100644 --- a/util/src/test/kotlin/com/caplin/integration/datasourcex/util/flow/FlowMapTest.kt +++ b/util/src/test/kotlin/com/caplin/integration/datasourcex/util/flow/FlowMapTest.kt @@ -196,7 +196,7 @@ class FlowMapTest : val primaryMap = mutableFlowMapOf("1" to "A", "2" to "Ax") val filteredMap = - primaryMap.asFlow { _, value -> value.contains("x") }.toFlowMapIn(backgroundScope) + primaryMap.asFlow { _, value -> value.contains("x") }.flowMapIn(backgroundScope) filteredMap["1"] shouldBe null filteredMap["2"].shouldNotBeNull() shouldBeEqual "Ax" diff --git a/util/src/test/kotlin/com/caplin/integration/datasourcex/util/flow/SharedFlowCacheTest.kt b/util/src/test/kotlin/com/caplin/integration/datasourcex/util/flow/SharedFlowCacheTest.kt new file mode 100644 index 0000000..43d8350 --- /dev/null +++ b/util/src/test/kotlin/com/caplin/integration/datasourcex/util/flow/SharedFlowCacheTest.kt @@ -0,0 +1,144 @@ +@file:OptIn(ExperimentalCoroutinesApi::class) + +package com.caplin.integration.datasourcex.util.flow + +import app.cash.turbine.test +import com.caplin.integration.datasourcex.util.flow.ValueOrCompletion.Completion +import com.caplin.integration.datasourcex.util.flow.ValueOrCompletion.Value +import io.kotest.core.spec.style.FunSpec +import io.kotest.engine.coroutines.backgroundScope +import io.kotest.matchers.collections.shouldContainExactly +import io.kotest.matchers.collections.shouldContainExactlyInAnyOrder +import io.kotest.matchers.equals.shouldBeEqual +import io.kotest.matchers.types.shouldBeInstanceOf +import io.kotest.matchers.types.shouldNotBeSameInstanceAs +import java.util.concurrent.atomic.AtomicInteger +import kotlin.time.Duration.Companion.milliseconds +import kotlinx.coroutines.CoroutineExceptionHandler +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.SharingStarted +import kotlinx.coroutines.flow.emitAll +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.receiveAsFlow +import kotlinx.coroutines.flow.retry +import kotlinx.coroutines.launch + +class SharedFlowCacheTest : + FunSpec({ + test("shares a single upstream across subscribers") { + val collections = AtomicInteger(0) + val upstream = Channel(Channel.BUFFERED) + val cache = + sharedFlowCache(backgroundScope, SharingStarted.WhileSubscribed(), 1) + val supplier = { _: String -> + flow { + collections.incrementAndGet() + emitAll(upstream.receiveAsFlow()) + } + } + val got = mutableListOf() + + val shared = cache["K", supplier] + backgroundScope.launch { shared.collect { got.add("a:$it") } } + backgroundScope.launch { shared.collect { got.add("b:$it") } } + delay(100.milliseconds) + + upstream.send("X") + delay(100.milliseconds) + + got shouldContainExactlyInAnyOrder listOf("a:X", "b:X") + collections.get() shouldBeEqual 1 + } + + test("evicts a key once all subscribers leave") { + val upstream = Channel(Channel.BUFFERED) + val cache = + sharedFlowCache(backgroundScope, SharingStarted.WhileSubscribed(), 1) + val supplier = { _: String -> upstream.receiveAsFlow() } + + val first = cache["K", supplier] + val subscriber = backgroundScope.launch { first.collect {} } + delay(100.milliseconds) + + subscriber.cancelAndJoin() + delay(100.milliseconds) + + // The only subscriber has gone, so the key is evicted and a fresh get builds a new entry. + cache["K", supplier] shouldNotBeSameInstanceAs first + } + + test("an upstream error on one key does not affect other keys") { + val errors = mutableListOf() + val cacheScope = + CoroutineScope( + backgroundScope.coroutineContext + + CoroutineExceptionHandler { _, e -> errors.add(e) } + ) + val cache = sharedFlowCache(cacheScope, SharingStarted.WhileSubscribed(), 1) + val good = Channel(Channel.BUFFERED) + val gotGood = mutableListOf() + + backgroundScope.launch { + cache["good", { good.receiveAsFlow() }].collect { gotGood.add(it) } + } + backgroundScope.launch { cache["bad", { flow { error("boom") } }].collect {} } + delay(100.milliseconds) + + good.send("X") + delay(100.milliseconds) + + // The "bad" key's sharing coroutine failed in isolation; "good" keeps working. + gotGood shouldContainExactly listOf("X") + } + + test("completing adapter propagates completion") { + val upstream = Channel>(Channel.BUFFERED) + val cache: CompletingSharedFlowCache = + completingSharedFlowCache(backgroundScope, SharingStarted.WhileSubscribed(), 1) + + cache["A", { upstream.receiveAsFlow().dematerialize() }].test { + upstream.send(Value("X")) + awaitItem() shouldBeEqual "X" + upstream.send(Completion()) + awaitComplete() + } + } + + test("completing adapter propagates errors") { + val upstream = Channel>(Channel.BUFFERED) + val cache: CompletingSharedFlowCache = + completingSharedFlowCache(backgroundScope, SharingStarted.WhileSubscribed(), 1) + + cache["A", { upstream.receiveAsFlow().dematerialize() }].test { + upstream.send(Completion(IllegalArgumentException())) + awaitError().shouldBeInstanceOf() + } + } + + test("completing adapter supports downstream retry after an upstream error") { + val attempts = AtomicInteger(0) + val cache: CompletingSharedFlowCache = + completingSharedFlowCache(backgroundScope, SharingStarted.WhileSubscribed(), 1) + val supplier = { _: String -> + flow { + if (attempts.getAndIncrement() == 0) error("boom") + emit("ok") + } + } + + // The first attempt errors; the entry is evicted as the terminal is published, so retry + // re-resolves to a fresh entry, re-runs the supplier, and succeeds - no back-off needed. + cache["K", supplier] + .retry(3) { it is IllegalStateException } + .test { + awaitItem() shouldBeEqual "ok" + awaitComplete() + } + + attempts.get() shouldBeEqual 2 + } + }) diff --git a/util/src/test/kotlin/com/caplin/integration/datasourcex/util/serialization/fory/VersionedMapEventSerializationTest.kt b/util/src/test/kotlin/com/caplin/integration/datasourcex/util/serialization/fory/VersionedMapEventSerializationTest.kt new file mode 100644 index 0000000..ca2e435 --- /dev/null +++ b/util/src/test/kotlin/com/caplin/integration/datasourcex/util/serialization/fory/VersionedMapEventSerializationTest.kt @@ -0,0 +1,31 @@ +package com.caplin.integration.datasourcex.util.serialization.fory + +import com.caplin.integration.datasourcex.util.flow.VersionedMapEvent +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.shouldBe +import org.apache.fory.Fory +import org.apache.fory.config.Language + +class VersionedMapEventSerializationTest : + FunSpec({ + val fory = + Fory.builder() + .withLanguage(Language.JAVA) + .requireClassRegistration(false) + .build() + .registerDataSourceSerializers() + + test("Upsert") { + val event = VersionedMapEvent.Upsert("key", "value", 7L) + val bytes = fory.serialize(event) + val deserialized = fory.deserialize(bytes) + deserialized shouldBe event + } + + test("Removed") { + val event = VersionedMapEvent.Removed("key", 9L) + val bytes = fory.serialize(event) + val deserialized = fory.deserialize(bytes) + deserialized shouldBe event + } + }) diff --git a/util/src/test/kotlin/com/caplin/integration/datasourcex/util/serialization/jackson2/VersionedMapEventSerializationTest.kt b/util/src/test/kotlin/com/caplin/integration/datasourcex/util/serialization/jackson2/VersionedMapEventSerializationTest.kt new file mode 100644 index 0000000..988a18e --- /dev/null +++ b/util/src/test/kotlin/com/caplin/integration/datasourcex/util/serialization/jackson2/VersionedMapEventSerializationTest.kt @@ -0,0 +1,45 @@ +package com.caplin.integration.datasourcex.util.serialization.jackson2 + +import com.caplin.integration.datasourcex.util.flow.VersionedMapEvent +import com.fasterxml.jackson.core.type.TypeReference +import com.fasterxml.jackson.databind.JsonMappingException +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import io.kotest.assertions.throwables.shouldThrow +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.shouldBe + +class VersionedMapEventSerializationTest : + FunSpec({ + val mapper: ObjectMapper = jacksonObjectMapper().registerDataSourceModule() + + test("Upsert") { + val event: VersionedMapEvent = VersionedMapEvent.Upsert("key", "value", 7L) + val json = mapper.writeValueAsString(event) + val deserialized = + mapper.readValue(json, object : TypeReference>() {}) + deserialized shouldBe event + } + + test("Removed") { + val event: VersionedMapEvent = VersionedMapEvent.Removed("key", 9L) + val json = mapper.writeValueAsString(event) + val deserialized = + mapper.readValue(json, object : TypeReference>() {}) + deserialized shouldBe event + } + + test("a non-integral version is rejected") { + val json = """{"type":"upsert","key":"key","value":"value","version":"7"}""" + shouldThrow { + mapper.readValue(json, object : TypeReference>() {}) + } + } + + test("an explicit-null version is rejected") { + val json = """{"type":"upsert","key":"key","value":"value","version":null}""" + shouldThrow { + mapper.readValue(json, object : TypeReference>() {}) + } + } + }) diff --git a/util/src/test/kotlin/com/caplin/integration/datasourcex/util/serialization/jackson3/VersionedMapEventSerializationTest.kt b/util/src/test/kotlin/com/caplin/integration/datasourcex/util/serialization/jackson3/VersionedMapEventSerializationTest.kt new file mode 100644 index 0000000..0402f72 --- /dev/null +++ b/util/src/test/kotlin/com/caplin/integration/datasourcex/util/serialization/jackson3/VersionedMapEventSerializationTest.kt @@ -0,0 +1,44 @@ +package com.caplin.integration.datasourcex.util.serialization.jackson3 + +import com.caplin.integration.datasourcex.util.flow.VersionedMapEvent +import io.kotest.assertions.throwables.shouldThrow +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.shouldBe +import tools.jackson.core.type.TypeReference +import tools.jackson.databind.DatabindException +import tools.jackson.module.kotlin.jacksonMapperBuilder + +class VersionedMapEventSerializationTest : + FunSpec({ + val mapper = jacksonMapperBuilder().addDataSourceModule().build() + + test("Upsert") { + val event: VersionedMapEvent = VersionedMapEvent.Upsert("key", "value", 7L) + val json = mapper.writeValueAsString(event) + val deserialized = + mapper.readValue(json, object : TypeReference>() {}) + deserialized shouldBe event + } + + test("Removed") { + val event: VersionedMapEvent = VersionedMapEvent.Removed("key", 9L) + val json = mapper.writeValueAsString(event) + val deserialized = + mapper.readValue(json, object : TypeReference>() {}) + deserialized shouldBe event + } + + test("a non-integral version is rejected") { + val json = """{"type":"upsert","key":"key","value":"value","version":"7"}""" + shouldThrow { + mapper.readValue(json, object : TypeReference>() {}) + } + } + + test("an explicit-null version is rejected") { + val json = """{"type":"upsert","key":"key","value":"value","version":null}""" + shouldThrow { + mapper.readValue(json, object : TypeReference>() {}) + } + } + }) diff --git a/util/src/test/kotlin/com/caplin/integration/datasourcex/util/store/FlowStoreCacheTest.kt b/util/src/test/kotlin/com/caplin/integration/datasourcex/util/store/FlowStoreCacheTest.kt new file mode 100644 index 0000000..3e578af --- /dev/null +++ b/util/src/test/kotlin/com/caplin/integration/datasourcex/util/store/FlowStoreCacheTest.kt @@ -0,0 +1,63 @@ +package com.caplin.integration.datasourcex.util.store + +import com.caplin.integration.datasourcex.util.flow.VersionedMapEvent +import com.github.benmanes.caffeine.cache.Caffeine +import io.kotest.assertions.throwables.shouldThrow +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.nulls.shouldBeNull +import io.kotest.matchers.shouldBe +import java.util.concurrent.atomic.AtomicInteger + +class FlowStoreCacheTest : + FunSpec({ + fun newCache() = FlowStoreCache(Caffeine.newBuilder().build()) + + test("getOrLoad caches the loaded value and serves it cache-first") { + val calls = AtomicInteger(0) + val cache = newCache() + val load = { k: String -> + calls.incrementAndGet() + Versioned("v:$k", 1L) + } + + cache.getOrLoad("k", load) shouldBe Live("v:k", 1L) + cache.getOrLoad("k", load) shouldBe Live("v:k", 1L) + calls.get() shouldBe 1 + } + + test("a load that finds nothing caches nothing") { + val cache = newCache() + + cache.getOrLoad("missing") { null }.shouldBeNull() + cache.getIfPresent("missing").shouldBeNull() + } + + test("putIfNewer keeps the strictly-newer resident entry") { + val cache = newCache() + + cache.putIfNewer("k", Live("v5", 5L)) shouldBe Live("v5", 5L) + cache.putIfNewer("k", Live("v3", 3L)) shouldBe Live("v5", 5L) // older rejected + cache.putIfNewer("k", Live("v5b", 5L)) shouldBe Live("v5", 5L) // equal rejected + cache.putIfNewer("k", Live("v9", 9L)) shouldBe Live("v9", 9L) + } + + test("reflectIfNewer seeds an absent key and updates a resident entry, version-gated") { + val cache = newCache() + + cache.reflectIfNewer(VersionedMapEvent.Upsert("k", "v1", 1L)) // absent -> seeded + cache.getIfPresent("k") shouldBe Live("v1", 1L) + + cache.reflectIfNewer(VersionedMapEvent.Upsert("k", "v3", 3L)) // newer -> applied + cache.getIfPresent("k") shouldBe Live("v3", 3L) + cache.reflectIfNewer(VersionedMapEvent.Upsert("k", "stale", 2L)) // older -> ignored + cache.getIfPresent("k") shouldBe Live("v3", 3L) + cache.reflectIfNewer(VersionedMapEvent.Removed("k", 6L)) // newer -> tombstone + cache.getIfPresent("k") shouldBe Tombstone(6L) + } + + test("rejects a LoadingCache, whose own loader would bypass version gating") { + val loading = Caffeine.newBuilder().build?> { Live("v", 1L) } + + shouldThrow { FlowStoreCache(loading) } + } + }) diff --git a/util/src/test/kotlin/com/caplin/integration/datasourcex/util/store/FlowStoreTest.kt b/util/src/test/kotlin/com/caplin/integration/datasourcex/util/store/FlowStoreTest.kt new file mode 100644 index 0000000..08d409c --- /dev/null +++ b/util/src/test/kotlin/com/caplin/integration/datasourcex/util/store/FlowStoreTest.kt @@ -0,0 +1,281 @@ +package com.caplin.integration.datasourcex.util.store + +import app.cash.turbine.test +import com.caplin.integration.datasourcex.util.flow.VersionedMapEvent +import com.github.benmanes.caffeine.cache.Caffeine +import io.kotest.core.spec.style.FunSpec +import io.kotest.engine.coroutines.backgroundScope +import io.kotest.matchers.nulls.shouldBeNull +import io.kotest.matchers.shouldBe +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.emptyFlow +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.onSubscription + +class FlowStoreTest : + FunSpec({ + test("get reads through to the store on a miss") { + val store = InMemoryStore() + store.seed("k", "seeded", 1L) + val consumer = flowStore(store, emptyFlow(), Caffeine.newBuilder(), backgroundScope) + + consumer.get("k") shouldBe "seeded" + consumer.get("missing").shouldBeNull() + } + + test("a delta older than a read-through load is dropped; a newer one is applied") { + val store = InMemoryStore() + store.seed("k", "v5", 5L) + val inbound = MutableSharedFlow>(extraBufferCapacity = 16) + val consumer = flowStore(store, inbound, Caffeine.newBuilder(), backgroundScope) + + consumer.asFlow().test { + inbound.subscriptionCount.first { + it >= 1 + } // the store's collector has attached to inbound + + consumer.get("k") shouldBe "v5" + + inbound.emit(VersionedMapEvent.Upsert("k", "stale", 3L)) + awaitItem() // collector processed the delta (gated out) + consumer.get("k") shouldBe "v5" + + inbound.emit(VersionedMapEvent.Upsert("k", "v9", 9L)) + awaitItem() + consumer.get("k") shouldBe "v9" + } + } + + test( + "a delta for an absent key is reflected, so a later read does not regress to a stale value" + ) { + // The owner committed v9 and published the delta, but the read path (a lagging replica, + // modelled by leaving the backing store at v5) has not yet applied it. The consumer has + // already seen v9 on its stream, so it must serve v9 and never regress to the replica's v5. + val store = InMemoryStore() + store.seed("k", "v5", 5L) + val inbound = MutableSharedFlow>(extraBufferCapacity = 16) + val consumer = flowStore(store, inbound, Caffeine.newBuilder(), backgroundScope) + + consumer.asFlow().test { + inbound.subscriptionCount.first { it >= 1 } + + // v9 reaches the stream before any read-through has populated the cache. + inbound.emit(VersionedMapEvent.Upsert("k", "v9", 9L)) + awaitItem() shouldBe VersionedMapEvent.Upsert("k", "v9", 9L) + + // The delta must have been reflected even though the key was absent, so get serves v9 + // without falling back to the stale replica read. (Currently reflectIfNewer only updates + // resident keys, so the delta is dropped and get reads v5 — this assertion fails.) + consumer.get("k") shouldBe "v9" + store.loadCount.get() shouldBe 0 // the delta populated the cache; no read-through needed + } + } + + test("an equal-version delta after a read-through is gated out") { + val store = InMemoryStore() + store.seed("k", "v2", 2L) + val inbound = MutableSharedFlow>(extraBufferCapacity = 16) + val consumer = flowStore(store, inbound, Caffeine.newBuilder(), backgroundScope) + + consumer.asFlow().test { + inbound.subscriptionCount.first { it >= 1 } + + consumer.get("k") shouldBe "v2" + + inbound.emit(VersionedMapEvent.Upsert("k", "wrong", 2L)) + awaitItem() + consumer.get("k") shouldBe "v2" + } + } + + test("a removal tombstones a resident entry so a later read cannot resurrect it") { + val store = InMemoryStore() + store.seed("k", "v5", 5L) + val inbound = MutableSharedFlow>(extraBufferCapacity = 16) + val cache = Caffeine.newBuilder().build?>() + val consumer = + FlowStoreImpl(store, FlowStoreCache(cache), inbound, backgroundScope, Dispatchers.IO) + + consumer.asFlow().test { + inbound.subscriptionCount.first { it >= 1 } + + consumer.get("k") shouldBe "v5" // resident Live(v5, 5) + + inbound.emit(VersionedMapEvent.Removed("k", 6L)) + awaitItem() + cache.getIfPresent("k") shouldBe Tombstone(6L) + consumer.get("k").shouldBeNull() // tombstone short-circuits the read-through + } + } + + test("consumer converges to the owner's state through the delta stream") { + val store = InMemoryStore() + val owner = mutableFlowStore(store, Caffeine.newBuilder(), txContext = inMemoryTxContext) + fun commit(action: (InMemoryTx) -> Unit) = InMemoryTx().also { action(it) }.commit() + val attached = MutableStateFlow(false) + val ownerDeltas = owner.asFlow().onSubscription { attached.value = true } + val consumer = flowStore(store, ownerDeltas, Caffeine.newBuilder(), backgroundScope) + + consumer.valueFlow("k").test { + attached.first { it } // the consumer's collector has attached to the owner's stream + awaitItem().shouldBeNull() + + commit { owner.put("k", "v1", it) } + awaitItem() shouldBe "v1" + + commit { owner.put("k", "v2", it) } + awaitItem() shouldBe "v2" + + commit { owner.remove("k", it) } + awaitItem().shouldBeNull() + } + } + + test("valueFlow ignores deltas for other keys and stale versions") { + val store = InMemoryStore() + store.seed("k", "v5", 5L) + val inbound = MutableSharedFlow>(extraBufferCapacity = 16) + val consumer = flowStore(store, inbound, Caffeine.newBuilder(), backgroundScope) + + consumer.valueFlow("k").test { + awaitItem() shouldBe "v5" // seeded via read-through at version 5 + inbound.subscriptionCount.first { it >= 1 } + + inbound.emit(VersionedMapEvent.Upsert("other", "x", 9L)) // different key -> ignored + inbound.emit(VersionedMapEvent.Upsert("k", "stale", 3L)) // older version -> ignored + inbound.emit(VersionedMapEvent.Upsert("k", "v9", 9L)) // newer -> emitted + awaitItem() shouldBe "v9" + expectNoEvents() + } + } + + test("the consumer's async view reads through and follows the stream") { + val store = InMemoryStore() + store.seed("k", "seeded", 1L) + val inbound = MutableSharedFlow>(extraBufferCapacity = 16) + val consumer = flowStore(store, inbound, Caffeine.newBuilder(), backgroundScope) + + consumer.async.get("k") shouldBe "seeded" // miss -> dispatched read-through + consumer.async.get("k") shouldBe "seeded" // hit -> served inline + consumer.async.get("missing").shouldBeNull() + + consumer.async.valueFlow("k").test { + awaitItem() shouldBe "seeded" + inbound.subscriptionCount.first { it >= 1 } + inbound.emit(VersionedMapEvent.Upsert("k", "v9", 9L)) + awaitItem() shouldBe "v9" + } + + consumer.async.asFlow().replayCache shouldBe emptyList() // delegates to the delta stream + } + + test("asFlow(query) emits the snapshot then follows newer deltas") { + val store = InMemoryStore() + val inbound = MutableSharedFlow>(extraBufferCapacity = 16) + val consumer = flowStore(store, inbound, Caffeine.newBuilder(), backgroundScope) + val query = { mapOf("a" to Versioned("a1", 1L), "b" to Versioned("b2", 2L)) } + + consumer.asFlow(query).test { + setOf(awaitItem(), awaitItem()) shouldBe + setOf( + VersionedMapEvent.Upsert("a", "a1", 1L), + VersionedMapEvent.Upsert("b", "b2", 2L), + ) + inbound.subscriptionCount.first { it >= 1 } + + inbound.emit(VersionedMapEvent.Upsert("a", "a3", 3L)) + awaitItem() shouldBe VersionedMapEvent.Upsert("a", "a3", 3L) + } + } + + test("asFlow(query) gates a delta older-or-equal to the snapshot version") { + val store = InMemoryStore() + val inbound = MutableSharedFlow>(extraBufferCapacity = 16) + val consumer = flowStore(store, inbound, Caffeine.newBuilder(), backgroundScope) + val query = { mapOf("k" to Versioned("v5", 5L)) } + + consumer.asFlow(query).test { + awaitItem() shouldBe VersionedMapEvent.Upsert("k", "v5", 5L) + inbound.subscriptionCount.first { it >= 1 } + + inbound.emit(VersionedMapEvent.Upsert("k", "stale", 5L)) // equal version -> gated + inbound.emit(VersionedMapEvent.Upsert("k", "v9", 9L)) // newer -> emitted + awaitItem() shouldBe VersionedMapEvent.Upsert("k", "v9", 9L) + expectNoEvents() + } + } + + test("asFlow(query) with no predicate follows the whole store: new keys and removals") { + val store = InMemoryStore() + val inbound = MutableSharedFlow>(extraBufferCapacity = 16) + val consumer = flowStore(store, inbound, Caffeine.newBuilder(), backgroundScope) + val query = { emptyMap>() } + + consumer.asFlow(query).test { + inbound.subscriptionCount.first { it >= 1 } + + inbound.emit(VersionedMapEvent.Upsert("new", "x", 1L)) // new key enters the view + awaitItem() shouldBe VersionedMapEvent.Upsert("new", "x", 1L) + + inbound.emit(VersionedMapEvent.Removed("new", 2L)) // removal forwards + awaitItem() shouldBe VersionedMapEvent.Removed("new", 2L) + } + } + + test("asFlow(query, predicate) ignores a non-matching upsert for an untracked key") { + val store = InMemoryStore() + val inbound = MutableSharedFlow>(extraBufferCapacity = 16) + val consumer = flowStore(store, inbound, Caffeine.newBuilder(), backgroundScope) + val query = { emptyMap>() } + val predicate = { _: String, v: String -> v.startsWith("keep") } + + consumer.asFlow(query, predicate).test { + inbound.subscriptionCount.first { it >= 1 } + + inbound.emit( + VersionedMapEvent.Upsert("a", "drop-me", 1L) + ) // untracked, no match -> ignored + inbound.emit(VersionedMapEvent.Upsert("b", "keep-me", 2L)) // matches -> enters + awaitItem() shouldBe VersionedMapEvent.Upsert("b", "keep-me", 2L) + expectNoEvents() + } + } + + test("asFlow(query, predicate) emits Removed when an in-view item stops matching") { + val store = InMemoryStore() + val inbound = MutableSharedFlow>(extraBufferCapacity = 16) + val consumer = flowStore(store, inbound, Caffeine.newBuilder(), backgroundScope) + val query = { mapOf("k" to Versioned("keep-1", 1L)) } + val predicate = { _: String, v: String -> v.startsWith("keep") } + + consumer.asFlow(query, predicate).test { + awaitItem() shouldBe VersionedMapEvent.Upsert("k", "keep-1", 1L) + inbound.subscriptionCount.first { it >= 1 } + + inbound.emit(VersionedMapEvent.Upsert("k", "gone-2", 2L)) // no longer matches -> leaves + awaitItem() shouldBe VersionedMapEvent.Removed("k", 2L) + expectNoEvents() + } + } + + test("asFlow(query, predicate) forwards a removal only for a key in the view") { + val store = InMemoryStore() + val inbound = MutableSharedFlow>(extraBufferCapacity = 16) + val consumer = flowStore(store, inbound, Caffeine.newBuilder(), backgroundScope) + val query = { mapOf("k" to Versioned("keep-1", 1L)) } + val predicate = { _: String, v: String -> v.startsWith("keep") } + + consumer.asFlow(query, predicate).test { + awaitItem() shouldBe VersionedMapEvent.Upsert("k", "keep-1", 1L) + inbound.subscriptionCount.first { it >= 1 } + + inbound.emit(VersionedMapEvent.Removed("other", 2L)) // untracked -> ignored + inbound.emit(VersionedMapEvent.Removed("k", 3L)) // in view -> forwarded + awaitItem() shouldBe VersionedMapEvent.Removed("k", 3L) + expectNoEvents() + } + } + }) diff --git a/util/src/test/kotlin/com/caplin/integration/datasourcex/util/store/InMemoryStore.kt b/util/src/test/kotlin/com/caplin/integration/datasourcex/util/store/InMemoryStore.kt new file mode 100644 index 0000000..c96cf26 --- /dev/null +++ b/util/src/test/kotlin/com/caplin/integration/datasourcex/util/store/InMemoryStore.kt @@ -0,0 +1,70 @@ +package com.caplin.integration.datasourcex.util.store + +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicLong + +/** + * A trivial in-memory unit of work standing in for a database transaction: it buffers the store's + * post-commit / rollback side effects until the test drives [commit] or [rollback]. Mirrors how a + * jOOQ `Configuration` carries transaction-scoped state. + */ +class InMemoryTx { + internal val commitActions = mutableListOf<() -> Unit>() + internal val rollbackActions = mutableListOf<() -> Unit>() + + fun commit() = commitActions.forEach { it() } + + fun rollback() = rollbackActions.forEach { it() } +} + +/** Adapts an [InMemoryTx] handle into the [TxContext] the store enlists on. */ +val inMemoryTxContext: (InMemoryTx) -> TxContext = { handle -> + object : TxContext { + override val transaction = handle + + override fun onCommitEnd(action: () -> Unit) { + handle.commitActions += action + } + + override fun onRollback(action: () -> Unit) { + handle.rollbackActions += action + } + } +} + +/** + * In-memory reference implementation of the store SPI for tests. Writes apply immediately to a + * backing [ConcurrentHashMap], versioned from a shared counter that stands in for a DB sequence; + * the [TxContext] is ignored. + */ +class InMemoryStore : Store { + private val backing = ConcurrentHashMap>() + private val sequence = AtomicLong(0L) + val loadCount = AtomicInteger(0) + + /** Seeds a specific version directly, for tests that exercise version gating. */ + fun seed(key: K, value: V, version: Long) { + backing[key] = Versioned(value, version) + sequence.updateAndGet { maxOf(it, version) } + } + + override fun load(key: K): Versioned? { + loadCount.incrementAndGet() + return backing[key] + } + + override fun load(key: K, tx: TxContext): Versioned? = backing[key] + + override fun write(key: K, value: V, tx: TxContext): Long { + val version = sequence.incrementAndGet() + backing[key] = Versioned(value, version) + return version + } + + override fun delete(key: K, tx: TxContext): Long { + val version = sequence.incrementAndGet() + backing.remove(key) + return version + } +} diff --git a/util/src/test/kotlin/com/caplin/integration/datasourcex/util/store/InMemoryStoreTest.kt b/util/src/test/kotlin/com/caplin/integration/datasourcex/util/store/InMemoryStoreTest.kt new file mode 100644 index 0000000..32e1998 --- /dev/null +++ b/util/src/test/kotlin/com/caplin/integration/datasourcex/util/store/InMemoryStoreTest.kt @@ -0,0 +1,19 @@ +package com.caplin.integration.datasourcex.util.store + +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.nulls.shouldBeNull +import io.kotest.matchers.shouldBe + +class InMemoryStoreTest : + FunSpec({ + test("writes are loadable and deletes remove the entry") { + val store = InMemoryStore() + val tx = inMemoryTxContext(InMemoryTx()) + + store.write("a", "A", tx) + store.load("a") shouldBe Versioned("A", 1L) + + store.delete("a", tx) + store.load("a").shouldBeNull() + } + }) diff --git a/util/src/test/kotlin/com/caplin/integration/datasourcex/util/store/MutableFlowStoreTest.kt b/util/src/test/kotlin/com/caplin/integration/datasourcex/util/store/MutableFlowStoreTest.kt new file mode 100644 index 0000000..5c3455b --- /dev/null +++ b/util/src/test/kotlin/com/caplin/integration/datasourcex/util/store/MutableFlowStoreTest.kt @@ -0,0 +1,274 @@ +package com.caplin.integration.datasourcex.util.store + +import app.cash.turbine.test +import com.caplin.integration.datasourcex.util.flow.VersionedMapEvent +import com.github.benmanes.caffeine.cache.Cache +import com.github.benmanes.caffeine.cache.Caffeine +import io.kotest.assertions.throwables.shouldThrow +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.nulls.shouldBeNull +import io.kotest.matchers.shouldBe +import io.mockk.every +import io.mockk.mockk +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch + +private fun newStore( + backing: Store, + cache: Cache?>, +) = MutableFlowStoreImpl(backing, FlowStoreCache(cache), Dispatchers.IO, inMemoryTxContext) + +class MutableFlowStoreTest : + FunSpec({ + test("put publishes a versioned delta and updates the cache only on commit") { + val backing = InMemoryStore() + val cache = Caffeine.newBuilder().build?>() + val store = newStore(backing, cache) + + store.asFlow().test { + val tx = InMemoryTx() + store.put("k", "v", tx) + expectNoEvents() + cache.getIfPresent("k").shouldBeNull() + + tx.commit() + awaitItem() shouldBe VersionedMapEvent.Upsert("k", "v", 1L) + cache.getIfPresent("k") shouldBe Live("v", 1L) + } + } + + test( + "publish updates the cache before emitting, so a delta observer never sees a stale cache" + ) { + val backing = InMemoryStore() + val cache = Caffeine.newBuilder().build?>() + val store = newStore(backing, cache) + + // An unconfined collector resumes synchronously inside the emitting tryEmit, so it captures + // the cache exactly as of the emit. The cache must already reflect the delta — otherwise a + // valueFlow subscribing in that window would seed from a stale entry and lose the update. + val cacheAtEmit = CompletableDeferred?>() + val collector = + CoroutineScope(Dispatchers.Unconfined).launch { + store.asFlow().collect { event -> + cacheAtEmit.complete(cache.getIfPresent(event.key)) + } + } + + InMemoryTx().also { + store.put("k", "v", it) + it.commit() + } + + cacheAtEmit.await() shouldBe Live("v", 1L) + collector.cancel() + } + + test("rollback publishes nothing and the next write gets a strictly greater version") { + val backing = InMemoryStore() + val cache = Caffeine.newBuilder().build?>() + val store = newStore(backing, cache) + + store.asFlow().test { + val tx = InMemoryTx() + store.put("k", "v1", tx) + tx.rollback() + expectNoEvents() + + val tx2 = InMemoryTx() + store.put("k", "v2", tx2) + tx2.commit() + awaitItem() shouldBe VersionedMapEvent.Upsert("k", "v2", 2L) + } + } + + test("a writer failure propagates and leaves the cache and stream untouched") { + val backing = mockk>() + every { backing.write(any(), any(), any()) } throws RuntimeException("boom") + val cache = Caffeine.newBuilder().build?>() + val store = newStore(backing, cache) + + store.asFlow().test { + shouldThrow { store.put("k", "v", InMemoryTx()) } + expectNoEvents() + cache.getIfPresent("k").shouldBeNull() + } + } + + test("get reflects the committed value only after commit") { + val backing = InMemoryStore() + val cache = Caffeine.newBuilder().build?>() + val store = newStore(backing, cache) + + InMemoryTx().also { + store.put("k", "v1", it) + it.commit() + } + store.get("k") shouldBe "v1" + + val tx = InMemoryTx() + store.put("k", "v2", tx) + store.get("k") shouldBe "v1" + + tx.commit() + store.get("k") shouldBe "v2" + } + + test("get(key, tx) reads through the store within the transaction, bypassing the cache") { + val backing = InMemoryStore() + backing.seed("k", "fresh-in-db", 9L) + val cache = Caffeine.newBuilder().build?>() + // A stale, lower-versioned entry sits in the cache. + cache.put("k", Live("stale-in-cache", 1L)) + val store = newStore(backing, cache) + + store.get("k") shouldBe "stale-in-cache" // cache-first + store.get("k", InMemoryTx()) shouldBe "fresh-in-db" // bypasses the cache, reads the store + } + + test("remove publishes a Removed delta and tombstones the cache") { + val backing = InMemoryStore() + val cache = Caffeine.newBuilder().build?>() + val store = newStore(backing, cache) + + store.asFlow().test { + InMemoryTx().also { + store.put("k", "v1", it) + it.commit() + } + awaitItem() shouldBe VersionedMapEvent.Upsert("k", "v1", 1L) + + InMemoryTx().also { + store.remove("k", it) + it.commit() + } + awaitItem() shouldBe VersionedMapEvent.Removed("k", 2L) + cache.getIfPresent("k") shouldBe Tombstone(2L) + store.get("k").shouldBeNull() + } + } + + test("putAll publishes a delta per entry on commit") { + val backing = InMemoryStore() + val cache = Caffeine.newBuilder().build?>() + val store = newStore(backing, cache) + + store.asFlow().test { + val tx = InMemoryTx() + store.putAll(mapOf("a" to "A", "b" to "B"), tx) + expectNoEvents() + + tx.commit() + setOf(awaitItem(), awaitItem()) shouldBe + setOf( + VersionedMapEvent.Upsert("a", "A", 1L), + VersionedMapEvent.Upsert("b", "B", 2L), + ) + } + } + + test("putAll fails before commit when the writer omits a version, publishing nothing") { + val backing = mockk>() + every { backing.writeAll(any(), any()) } returns mapOf("a" to 1L) // "b" missing + val cache = Caffeine.newBuilder().build?>() + val store = newStore(backing, cache) + + store.asFlow().test { + val tx = InMemoryTx() + shouldThrow { store.putAll(mapOf("a" to "A", "b" to "B"), tx) } + tx.commit() + expectNoEvents() + } + } + + test("async.get short-circuits a cache hit and reads through on a miss") { + val backing = InMemoryStore() + val cache = Caffeine.newBuilder().build?>() + val store = newStore(backing, cache) + + InMemoryTx().also { + store.put("k", "v1", it) + it.commit() + } + + store.async.get("k") shouldBe "v1" + backing.loadCount.get() shouldBe 0 // served from cache, no read-through + + store.async.get("missing").shouldBeNull() + backing.loadCount.get() shouldBe 1 // miss dispatched one read-through + } + + test("async mutations write through and publish on commit") { + val backing = InMemoryStore() + val cache = Caffeine.newBuilder().build?>() + val store = newStore(backing, cache) + + store.asFlow().test { + val tx = InMemoryTx() + store.async.put("k", "v1", tx) + expectNoEvents() + + tx.commit() + awaitItem() shouldBe VersionedMapEvent.Upsert("k", "v1", 1L) + store.async.get("k") shouldBe "v1" + } + } + + test("get(key, tx) returns null for an absent key") { + val backing = InMemoryStore() + val cache = Caffeine.newBuilder().build?>() + val store = newStore(backing, cache) + + store.get("missing", InMemoryTx()).shouldBeNull() + } + + test("async putAll and remove write through and publish on commit") { + val backing = InMemoryStore() + val cache = Caffeine.newBuilder().build?>() + val store = newStore(backing, cache) + + store.asFlow().test { + val tx = InMemoryTx() + store.async.putAll(mapOf("a" to "A", "b" to "B"), tx) + expectNoEvents() + tx.commit() + setOf(awaitItem(), awaitItem()) shouldBe + setOf( + VersionedMapEvent.Upsert("a", "A", 1L), + VersionedMapEvent.Upsert("b", "B", 2L), + ) + + val tx2 = InMemoryTx() + store.async.remove("a", tx2) + tx2.commit() + awaitItem() shouldBe VersionedMapEvent.Removed("a", 3L) + } + } + + test("async.get(key, tx) reads through within the transaction") { + val backing = InMemoryStore() + backing.seed("k", "fresh-in-db", 9L) + val cache = Caffeine.newBuilder().build?>() + val store = newStore(backing, cache) + + store.async.get("k", InMemoryTx()) shouldBe "fresh-in-db" + store.async.asFlow().replayCache shouldBe emptyList() // delegates to the delta stream + } + + test("async.valueFlow follows the mutable store") { + val backing = InMemoryStore() + val cache = Caffeine.newBuilder().build?>() + val store = newStore(backing, cache) + + store.async.valueFlow("k").test { + awaitItem().shouldBeNull() + InMemoryTx().also { + store.put("k", "v1", it) + it.commit() + } + awaitItem() shouldBe "v1" + } + } + }) diff --git a/util/src/test/kotlin/com/caplin/integration/datasourcex/util/store/TxContextTest.kt b/util/src/test/kotlin/com/caplin/integration/datasourcex/util/store/TxContextTest.kt new file mode 100644 index 0000000..cbf23ac --- /dev/null +++ b/util/src/test/kotlin/com/caplin/integration/datasourcex/util/store/TxContextTest.kt @@ -0,0 +1,17 @@ +package com.caplin.integration.datasourcex.util.store + +import io.kotest.core.spec.style.FunSpec + +class TxContextTest : + FunSpec({ + test("onRollback defaults to a no-op") { + val ctx = + object : TxContext { + override val transaction = Unit + + override fun onCommitEnd(action: () -> Unit) {} + } + + ctx.onRollback { error("must not run") } // default no-op: registers nothing, never fires + } + })