Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 66 additions & 1 deletion util/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,69 @@
Utility classes and functions commonly used in DataSource integrations, such as efficiently observable Maps, and stream
transformations.

@see com.caplin.integration.datasourcex.util.flow.FlowMap
## Key components

### Observable maps and shared flows

| Component | Description |
| --- | --- |
| `FlowMap` / `MutableFlowMap` | A `Map` that is also observable via `asFlow`, `asFlowWithState`, and per-key `valueFlow`. Built with `mutableFlowMapOf` / `toMutableFlowMap`, or collected from an event stream with `flowMapIn`. |
| `CompletingSharedFlow` / `MutableCompletingSharedFlow` | A `SharedFlow` variant that also propagates completion and error events to subscribers. Built with `shareInCompleting`. |
| `SharedFlowCache` | Keyed cache of `SharedFlow`s, sharing one upstream collection per key and evicting a key once its upstream ends so the cache can't grow unbounded. Built with `sharedFlowCache(...)`, or `completingSharedFlowCache(...)` for a `CompletingSharedFlowCache` that propagates completion/errors. |
| `CompletingSharedFlowCache` / `LoadingCompletingSharedFlowCache` | Keyed cache of `CompletingSharedFlow`s, sharing one underlying collection per key. |

### Stores

| Component | Description |
| --- | --- |
| `FlowStore` / `MutableFlowStore` | A store-backed map exposing a delta-only stream plus a read-through, Caffeine-bounded cache. Built with `flowStore` / `flowStoreIn` / `mutableFlowStore`. |
| `AsyncFlowStore` / `AsyncMutableFlowStore` | Suspending views of the stores whose reads/writes dispatch the store I/O themselves. |
| `Store` / `StoreReader` / `StoreWriter` | The SPI you implement to back a `MutableFlowStore`; mutations enlist on the caller's transaction and publish on commit. |
| `TxContext` / `Versioned` | The transaction handle a write enlists on, and a value paired with the store-assigned version. |

### Flow operators

| Operator | Description |
| --- | --- |
| `bufferingDebounce` | Buffers elements until a quiet period elapses, then emits them as a `List`. |
| `throttleLatest` | Emits at most once per interval, keeping the latest value and dropping older ones. |
| `flatMapFirst` | Applies a function to the first element together with the entire upstream flow. |
| `demultiplexBy` | Groups elements by a key selector and processes each group's sub-flow. |
| `retryWithExponentialBackoff` | Retries the upstream on error with an exponential delay between attempts. |
| `cast` | Casts a `Flow<*>` to a `Flow<T>`. |
| `timeoutFirst` / `timeoutFirstOrNull` / `timeoutFirstOrDefault` | Errors / emits null / emits a default if no first element arrives within a timeout. |
| `materialize` / `dematerialize` (and `*Unboxed`) | Convert between flow values and `ValueOrCompletion` events. |

### Event models and folds

| Component | Description |
| --- | --- |
| `MapEvent` / `SimpleMapEvent` / `SetEvent` / `VersionedMapEvent` | Sealed event types describing map and set mutations (with or without old values / versions). |
| `FlowMapStreamEvent` / `ValueOrCompletion` | Materialised stream events: initial-state-plus-deltas, and value-or-terminal-signal. |
| `runningFoldToMap*` / `runningFoldToSet` | Fold a delta stream into a live `Flow` of map / set snapshots. |
| `conflateKeys` / `toEvents` | Collapse per-key updates, and expand a collection stream into entry events. |

### Serialization

| Component | Description |
| --- | --- |
| `registerDataSourceSerializers` / `registerPersistentCollectionSerializers` | Register Fory serializers for the event types and persistent collections. |
| `DataSourceModule`, `registerDataSourceModule` / `addDataSourceModule` | Jackson 2 / Jackson 3 modules that serialize the event types without annotations. |
| `Jackson2JsonHandler` / `Jackson3JsonHandler` | `JsonHandler` implementations backed by a Jackson `ObjectMapper`. |

### DataSource and general utilities

| Component | Description |
| --- | --- |
| `AntPatternNamespace` | A `Namespace` matching subjects by Ant-style patterns, with path-variable extraction. |
| `SimpleDataSourceFactory` / `SimpleDataSourceConfig` | Build a `DataSource` from a simplified config for tests and examples. |
| `KLogger` | An slf4j `Logger` wrapper with lazily-evaluated message lambdas. |
| `ReadWriteLock` | A non-reentrant suspending read/write lock. |
| `withTimeout` | A coroutine timeout that throws `TimeoutException` rather than a `CancellationException`. |

@see com.caplin.integration.datasourcex.util.flow.FlowMap
@see com.caplin.integration.datasourcex.util.store.MutableFlowStore

Participating in an application-owned jOOQ transaction:

@sample samples.StoreSamples.jooqSample
143 changes: 143 additions & 0 deletions util/api/datasourcex-util.api
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,11 @@ public abstract interface class com/caplin/integration/datasourcex/util/flow/Flo
}

public final class com/caplin/integration/datasourcex/util/flow/FlowMapKt {
public static final fun flowMapIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun mutableFlowMapOf ()Lcom/caplin/integration/datasourcex/util/flow/MutableFlowMap;
public static final fun mutableFlowMapOf ([Lkotlin/Pair;)Lcom/caplin/integration/datasourcex/util/flow/MutableFlowMap;
public static final fun runningFoldToMapFlowMapStreamEvent (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun simpleFlowMapIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun simpleToFlowMapIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun toFlowMapIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun toMutableFlowMap (Ljava/util/Map;)Lcom/caplin/integration/datasourcex/util/flow/MutableFlowMap;
Expand Down Expand Up @@ -371,6 +373,17 @@ public final class com/caplin/integration/datasourcex/util/flow/SetEventKt {
public static final fun toEvents (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
}

public final class com/caplin/integration/datasourcex/util/flow/SharedFlowCache {
public final fun get (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/SharedFlow;
}

public final class com/caplin/integration/datasourcex/util/flow/SharedFlowCacheKt {
public static final fun completingSharedFlowCache (Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;I)Lcom/caplin/integration/datasourcex/util/flow/CompletingSharedFlowCache;
public static synthetic fun completingSharedFlowCache$default (Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;IILjava/lang/Object;)Lcom/caplin/integration/datasourcex/util/flow/CompletingSharedFlowCache;
public static final fun sharedFlowCache (Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;I)Lcom/caplin/integration/datasourcex/util/flow/SharedFlowCache;
public static synthetic fun sharedFlowCache$default (Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;IILjava/lang/Object;)Lcom/caplin/integration/datasourcex/util/flow/SharedFlowCache;
}

public abstract interface class com/caplin/integration/datasourcex/util/flow/SimpleMapEvent {
}

Expand Down Expand Up @@ -464,6 +477,39 @@ public final class com/caplin/integration/datasourcex/util/flow/ValueOrCompletio
public static final fun materializeUnboxed (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
}

public abstract interface class com/caplin/integration/datasourcex/util/flow/VersionedMapEvent {
public abstract fun getKey ()Ljava/lang/Object;
public abstract fun getVersion ()J
}

public final class com/caplin/integration/datasourcex/util/flow/VersionedMapEvent$Removed : com/caplin/integration/datasourcex/util/flow/VersionedMapEvent {
public fun <init> (Ljava/lang/Object;J)V
public final fun component1 ()Ljava/lang/Object;
public final fun component2 ()J
public final fun copy (Ljava/lang/Object;J)Lcom/caplin/integration/datasourcex/util/flow/VersionedMapEvent$Removed;
public static synthetic fun copy$default (Lcom/caplin/integration/datasourcex/util/flow/VersionedMapEvent$Removed;Ljava/lang/Object;JILjava/lang/Object;)Lcom/caplin/integration/datasourcex/util/flow/VersionedMapEvent$Removed;
public fun equals (Ljava/lang/Object;)Z
public fun getKey ()Ljava/lang/Object;
public fun getVersion ()J
public fun hashCode ()I
public fun toString ()Ljava/lang/String;
}

public final class com/caplin/integration/datasourcex/util/flow/VersionedMapEvent$Upsert : com/caplin/integration/datasourcex/util/flow/VersionedMapEvent {
public fun <init> (Ljava/lang/Object;Ljava/lang/Object;J)V
public final fun component1 ()Ljava/lang/Object;
public final fun component2 ()Ljava/lang/Object;
public final fun component3 ()J
public final fun copy (Ljava/lang/Object;Ljava/lang/Object;J)Lcom/caplin/integration/datasourcex/util/flow/VersionedMapEvent$Upsert;
public static synthetic fun copy$default (Lcom/caplin/integration/datasourcex/util/flow/VersionedMapEvent$Upsert;Ljava/lang/Object;Ljava/lang/Object;JILjava/lang/Object;)Lcom/caplin/integration/datasourcex/util/flow/VersionedMapEvent$Upsert;
public fun equals (Ljava/lang/Object;)Z
public fun getKey ()Ljava/lang/Object;
public final fun getValue ()Ljava/lang/Object;
public fun getVersion ()J
public fun hashCode ()I
public fun toString ()Ljava/lang/String;
}

public final class com/caplin/integration/datasourcex/util/serialization/fory/DataSourceModuleKt {
public static final fun registerDataSourceSerializers (Lorg/apache/fory/Fory;Z)Lorg/apache/fory/Fory;
public static synthetic fun registerDataSourceSerializers$default (Lorg/apache/fory/Fory;ZILjava/lang/Object;)Lorg/apache/fory/Fory;
Expand Down Expand Up @@ -518,3 +564,100 @@ public final class com/caplin/integration/datasourcex/util/serialization/jackson
public fun toObject (Ltools/jackson/databind/JsonNode;Ljava/lang/Class;)Ljava/lang/Object;
}

public abstract interface class com/caplin/integration/datasourcex/util/store/AsyncFlowStore {
public abstract fun asFlow ()Lkotlinx/coroutines/flow/SharedFlow;
public abstract fun asFlow (Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun asFlow$default (Lcom/caplin/integration/datasourcex/util/store/AsyncFlowStore;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public abstract fun get (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun valueFlow (Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
}

public final class com/caplin/integration/datasourcex/util/store/AsyncFlowStore$DefaultImpls {
public static synthetic fun asFlow$default (Lcom/caplin/integration/datasourcex/util/store/AsyncFlowStore;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
}

public abstract interface class com/caplin/integration/datasourcex/util/store/AsyncMutableFlowStore : com/caplin/integration/datasourcex/util/store/AsyncFlowStore {
public abstract fun get (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun put (Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun putAll (Ljava/util/Map;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun remove (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract interface class com/caplin/integration/datasourcex/util/store/FlowStore {
public abstract fun asFlow ()Lkotlinx/coroutines/flow/SharedFlow;
public abstract fun asFlow (Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun asFlow$default (Lcom/caplin/integration/datasourcex/util/store/FlowStore;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public abstract fun get (Ljava/lang/Object;)Ljava/lang/Object;
public abstract fun getAsync ()Lcom/caplin/integration/datasourcex/util/store/AsyncFlowStore;
public abstract fun valueFlow (Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
}

public final class com/caplin/integration/datasourcex/util/store/FlowStore$DefaultImpls {
public static synthetic fun asFlow$default (Lcom/caplin/integration/datasourcex/util/store/FlowStore;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
}

public final class com/caplin/integration/datasourcex/util/store/FlowStoreKt {
public static final fun flowStore (Lcom/caplin/integration/datasourcex/util/store/StoreReader;Lkotlinx/coroutines/flow/Flow;Lcom/github/benmanes/caffeine/cache/Caffeine;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/CoroutineDispatcher;I)Lcom/caplin/integration/datasourcex/util/store/FlowStore;
public static synthetic fun flowStore$default (Lcom/caplin/integration/datasourcex/util/store/StoreReader;Lkotlinx/coroutines/flow/Flow;Lcom/github/benmanes/caffeine/cache/Caffeine;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/CoroutineDispatcher;IILjava/lang/Object;)Lcom/caplin/integration/datasourcex/util/store/FlowStore;
public static final fun flowStoreIn (Lkotlinx/coroutines/flow/Flow;Lcom/caplin/integration/datasourcex/util/store/StoreReader;Lcom/github/benmanes/caffeine/cache/Caffeine;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/CoroutineDispatcher;I)Lcom/caplin/integration/datasourcex/util/store/FlowStore;
public static synthetic fun flowStoreIn$default (Lkotlinx/coroutines/flow/Flow;Lcom/caplin/integration/datasourcex/util/store/StoreReader;Lcom/github/benmanes/caffeine/cache/Caffeine;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/CoroutineDispatcher;IILjava/lang/Object;)Lcom/caplin/integration/datasourcex/util/store/FlowStore;
}

public abstract interface class com/caplin/integration/datasourcex/util/store/MutableFlowStore : com/caplin/integration/datasourcex/util/store/FlowStore {
public abstract fun get (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
public abstract fun getAsync ()Lcom/caplin/integration/datasourcex/util/store/AsyncMutableFlowStore;
public abstract fun put (Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;)V
public abstract fun putAll (Ljava/util/Map;Ljava/lang/Object;)V
public abstract fun remove (Ljava/lang/Object;Ljava/lang/Object;)V
}

public final class com/caplin/integration/datasourcex/util/store/MutableFlowStoreKt {
public static final fun mutableFlowStore (Lcom/caplin/integration/datasourcex/util/store/Store;Lcom/github/benmanes/caffeine/cache/Caffeine;Lkotlinx/coroutines/CoroutineDispatcher;Lkotlin/jvm/functions/Function1;)Lcom/caplin/integration/datasourcex/util/store/MutableFlowStore;
public static synthetic fun mutableFlowStore$default (Lcom/caplin/integration/datasourcex/util/store/Store;Lcom/github/benmanes/caffeine/cache/Caffeine;Lkotlinx/coroutines/CoroutineDispatcher;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lcom/caplin/integration/datasourcex/util/store/MutableFlowStore;
}

public abstract interface class com/caplin/integration/datasourcex/util/store/Store : com/caplin/integration/datasourcex/util/store/StoreReader, com/caplin/integration/datasourcex/util/store/StoreWriter {
}

public final class com/caplin/integration/datasourcex/util/store/Store$DefaultImpls {
public static fun writeAll (Lcom/caplin/integration/datasourcex/util/store/Store;Ljava/util/Map;Lcom/caplin/integration/datasourcex/util/store/TxContext;)Ljava/util/Map;
}

public abstract interface class com/caplin/integration/datasourcex/util/store/StoreReader {
public abstract fun load (Ljava/lang/Object;)Lcom/caplin/integration/datasourcex/util/store/Versioned;
}

public abstract interface class com/caplin/integration/datasourcex/util/store/StoreWriter {
public abstract fun delete (Ljava/lang/Object;Lcom/caplin/integration/datasourcex/util/store/TxContext;)J
public abstract fun load (Ljava/lang/Object;Lcom/caplin/integration/datasourcex/util/store/TxContext;)Lcom/caplin/integration/datasourcex/util/store/Versioned;
public abstract fun write (Ljava/lang/Object;Ljava/lang/Object;Lcom/caplin/integration/datasourcex/util/store/TxContext;)J
public fun writeAll (Ljava/util/Map;Lcom/caplin/integration/datasourcex/util/store/TxContext;)Ljava/util/Map;
}

public final class com/caplin/integration/datasourcex/util/store/StoreWriter$DefaultImpls {
public static fun writeAll (Lcom/caplin/integration/datasourcex/util/store/StoreWriter;Ljava/util/Map;Lcom/caplin/integration/datasourcex/util/store/TxContext;)Ljava/util/Map;
}

public abstract interface class com/caplin/integration/datasourcex/util/store/TxContext {
public abstract fun getTransaction ()Ljava/lang/Object;
public abstract fun onCommitEnd (Lkotlin/jvm/functions/Function0;)V
public fun onRollback (Lkotlin/jvm/functions/Function0;)V
}

public final class com/caplin/integration/datasourcex/util/store/TxContext$DefaultImpls {
public static fun onRollback (Lcom/caplin/integration/datasourcex/util/store/TxContext;Lkotlin/jvm/functions/Function0;)V
}

public final class com/caplin/integration/datasourcex/util/store/Versioned {
public fun <init> (Ljava/lang/Object;J)V
public final fun component1 ()Ljava/lang/Object;
public final fun component2 ()J
public final fun copy (Ljava/lang/Object;J)Lcom/caplin/integration/datasourcex/util/store/Versioned;
public static synthetic fun copy$default (Lcom/caplin/integration/datasourcex/util/store/Versioned;Ljava/lang/Object;JILjava/lang/Object;)Lcom/caplin/integration/datasourcex/util/store/Versioned;
public fun equals (Ljava/lang/Object;)Z
public final fun getValue ()Ljava/lang/Object;
public final fun getVersion ()J
public fun hashCode ()I
public fun toString ()Ljava/lang/String;
}

14 changes: 13 additions & 1 deletion util/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dependencies {
api("org.jetbrains.kotlinx:kotlinx-coroutines-core")
api("org.jetbrains.kotlinx:kotlinx-coroutines-core-jvm")
api("com.fasterxml.jackson.core:jackson-core")
api("com.github.ben-manes.caffeine:caffeine")
Comment thread
rossdanderson marked this conversation as resolved.
implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation(libs.zjsonpatch)
Expand All @@ -32,12 +33,18 @@ dependencies {
compileOnly(libs.fory.core)
compileOnly(libs.fory.kotlin)

// Samples (compiled for Dokka only): show MutableFlowStore participating in a blocking jOOQ
// transaction. Off the published runtime classpath; jooq is version-managed by the Spring Boot
// BOM.
samplesImplementation("org.jooq:jooq")

testRuntimeOnly("org.slf4j:slf4j-simple")

testImplementation("org.springframework:spring-core") // For testing the RegexPathMatcher
testImplementation(libs.turbine)
testImplementation(libs.kotest.assertions)
testImplementation(libs.kotest.runner)
testImplementation(libs.mockk)
testImplementation(libs.fory.core)
testImplementation(libs.fory.kotlin)
testImplementation(libs.jackson3.databind)
Expand All @@ -51,4 +58,9 @@ dependencies {

jmh { duplicateClassesStrategy.set(DuplicatesStrategy.EXCLUDE) }

dokka { dokkaSourceSets.configureEach { includes.from("README.md") } }
dokka {
dokkaSourceSets.configureEach {
includes.from("README.md")
samples.from(layout.projectDirectory.dir("src/samples/kotlin"))
}
}
Loading
Loading