diff --git a/sdk-core/api/sdk-core.api b/sdk-core/api/sdk-core.api index 7ebcc312..7a456e4f 100644 --- a/sdk-core/api/sdk-core.api +++ b/sdk-core/api/sdk-core.api @@ -1560,6 +1560,58 @@ public final class org/dexpace/sdk/core/http/sse/ServerSentEventReader { public final class org/dexpace/sdk/core/http/sse/ServerSentEvents { public static final fun readServerSentEvents (Lorg/dexpace/sdk/core/io/BufferedSource;)Lkotlin/sequences/Sequence; public static final fun readServerSentEventsAsIterable (Lorg/dexpace/sdk/core/io/BufferedSource;)Ljava/lang/Iterable; + public static final fun sseStream (Lorg/dexpace/sdk/core/http/response/Response;)Lorg/dexpace/sdk/core/http/sse/SseStream; + public static final fun typed (Lorg/dexpace/sdk/core/http/sse/SseStream;Lorg/dexpace/sdk/core/http/sse/SseEventMapper;)Lorg/dexpace/sdk/core/http/sse/TypedSseStream; +} + +public abstract interface class org/dexpace/sdk/core/http/sse/SseEventMapper { + public static final field Companion Lorg/dexpace/sdk/core/http/sse/SseEventMapper$Companion; + public static fun done ()Lorg/dexpace/sdk/core/http/sse/SseEventMapper$Result; + public abstract fun map (Ljava/lang/String;Ljava/lang/String;)Lorg/dexpace/sdk/core/http/sse/SseEventMapper$Result; + public static fun skip ()Lorg/dexpace/sdk/core/http/sse/SseEventMapper$Result; + public static fun value (Ljava/lang/Object;)Lorg/dexpace/sdk/core/http/sse/SseEventMapper$Result; +} + +public final class org/dexpace/sdk/core/http/sse/SseEventMapper$Companion { + public final fun done ()Lorg/dexpace/sdk/core/http/sse/SseEventMapper$Result; + public final fun skip ()Lorg/dexpace/sdk/core/http/sse/SseEventMapper$Result; + public final fun value (Ljava/lang/Object;)Lorg/dexpace/sdk/core/http/sse/SseEventMapper$Result; +} + +public abstract class org/dexpace/sdk/core/http/sse/SseEventMapper$Result { +} + +public final class org/dexpace/sdk/core/http/sse/SseEventMapper$Result$Done : org/dexpace/sdk/core/http/sse/SseEventMapper$Result { + public static final field INSTANCE Lorg/dexpace/sdk/core/http/sse/SseEventMapper$Result$Done; +} + +public final class org/dexpace/sdk/core/http/sse/SseEventMapper$Result$Skip : org/dexpace/sdk/core/http/sse/SseEventMapper$Result { + public static final field INSTANCE Lorg/dexpace/sdk/core/http/sse/SseEventMapper$Result$Skip; +} + +public final class org/dexpace/sdk/core/http/sse/SseEventMapper$Result$Value : org/dexpace/sdk/core/http/sse/SseEventMapper$Result { + public fun (Ljava/lang/Object;)V + public final fun getModel ()Ljava/lang/Object; +} + +public final class org/dexpace/sdk/core/http/sse/SseStream : java/lang/AutoCloseable, java/lang/Iterable, kotlin/jvm/internal/markers/KMappedMarker { + public static final field Companion Lorg/dexpace/sdk/core/http/sse/SseStream$Companion; + public synthetic fun (Lorg/dexpace/sdk/core/http/sse/ServerSentEventReader;Ljava/io/Closeable;Lkotlin/jvm/internal/DefaultConstructorMarker;)V + public fun close ()V + public static final fun from (Lorg/dexpace/sdk/core/io/BufferedSource;Ljava/io/Closeable;)Lorg/dexpace/sdk/core/http/sse/SseStream; + public static final fun fromReader (Lorg/dexpace/sdk/core/http/sse/ServerSentEventReader;Ljava/io/Closeable;)Lorg/dexpace/sdk/core/http/sse/SseStream; + public fun iterator ()Ljava/util/Iterator; +} + +public final class org/dexpace/sdk/core/http/sse/SseStream$Companion { + public final fun from (Lorg/dexpace/sdk/core/io/BufferedSource;Ljava/io/Closeable;)Lorg/dexpace/sdk/core/http/sse/SseStream; + public final fun fromReader (Lorg/dexpace/sdk/core/http/sse/ServerSentEventReader;Ljava/io/Closeable;)Lorg/dexpace/sdk/core/http/sse/SseStream; +} + +public final class org/dexpace/sdk/core/http/sse/TypedSseStream : java/lang/AutoCloseable, java/lang/Iterable, kotlin/jvm/internal/markers/KMappedMarker { + public fun (Lorg/dexpace/sdk/core/http/sse/SseStream;Lorg/dexpace/sdk/core/http/sse/SseEventMapper;)V + public fun close ()V + public fun iterator ()Ljava/util/Iterator; } public final class org/dexpace/sdk/core/instrumentation/ClientLogger { diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/ServerSentEventExtensions.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/ServerSentEventExtensions.kt index 317948ae..fad370e8 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/ServerSentEventExtensions.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/ServerSentEventExtensions.kt @@ -9,6 +9,7 @@ package org.dexpace.sdk.core.http.sse +import org.dexpace.sdk.core.http.response.Response import org.dexpace.sdk.core.io.BufferedSource /** @@ -53,3 +54,36 @@ public fun BufferedSource.readServerSentEvents(): Sequence { */ public fun BufferedSource.readServerSentEventsAsIterable(): Iterable = readServerSentEvents().asIterable() + +/** + * Opens an [SseStream] over this response's body, binding the stream's lifecycle to the + * response: closing the returned stream (explicitly, via `use {}` / try-with-resources, or + * implicitly when iteration completes) closes the response and releases its connection. + * + * Use this on a streaming response to consume events without stranding the body on a partial + * consume: + * + * ``` + * response.sseStream().use { stream -> + * for (event in stream) { /* handle event */ } + * } + * ``` + * + * @throws IllegalStateException if the response has no body. + */ +public fun Response.sseStream(): SseStream { + val responseBody = checkNotNull(body) { "Response has no body to stream as Server-Sent Events" } + return SseStream.from(responseBody.source(), this) +} + +/** + * Wraps this raw [SseStream] in a [TypedSseStream] that maps each event to a model `T` via + * [mapper], decoding lazily on consume. The returned adapter inherits this stream's lifecycle: + * closing it closes this stream (and the response). + * + * The [mapper] is the per-endpoint seam — it is where the [org.dexpace.sdk.core.serde.Serde] + * SPI is called and where done-sentinel / error-envelope conventions live. + * + * @param mapper Maps a raw event to a [SseEventMapper.Result]. + */ +public fun SseStream.typed(mapper: SseEventMapper): TypedSseStream = TypedSseStream(this, mapper) diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/SseEventMapper.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/SseEventMapper.kt new file mode 100644 index 00000000..63ae3853 --- /dev/null +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/SseEventMapper.kt @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2026 dexpace and Omar Aljarrah + * + * Licensed under the MIT License. See LICENSE in the project root. + * SPDX-License-Identifier: MIT + */ + +package org.dexpace.sdk.core.http.sse + +/** + * Maps a raw [ServerSentEvent] to a typed model, or signals that the event should be skipped + * or that the stream is finished. + * + * This is the per-endpoint seam between core's format-agnostic SSE plumbing and an API's + * own conventions. A mapper receives the event's `event:` name (or `null` if the server sent + * no `event:` field) and the joined `data:` payload, and returns: + * + * - a decoded value `T` — yielded to the caller; + * - [Skip] — the event carries no model (a keep-alive comment, a bare `id:` cursor, an + * ignored event type); iteration silently moves on; + * - [Done] — a sentinel event marking end of stream (e.g. OpenAI's `data: [DONE]`); iteration + * terminates cleanly and the underlying [SseStream] closes. + * + * The mapper is the place to call the [org.dexpace.sdk.core.serde.Serde] SPI — for example + * `serde.deserializer.deserialize(data, MyDto::class.java)` — and the place to translate an + * error-envelope event into a thrown exception. Core deliberately holds **no** sentinel or + * error conventions; those are per-API and live entirely in the caller-supplied mapper. + * + * Mapping is applied **lazily, one element at a time**, as the typed iterator is pulled, so a + * partially consumed stream only decodes the events actually taken. + * + * Kotlin callers may pass a lambda; Java callers may use a lambda or implement this interface. + * + * @param T The model type events decode into. + */ +public fun interface SseEventMapper { + /** + * Maps one event to a [Result]. + * + * @param eventName The event's `event:` field, or `null` if the server omitted it. + * @param data The event's `data:` lines joined with `\n` (empty string if the event had + * no `data:` field). Joining matches the conventional WHATWG client-side reconstruction. + * @return [value] to yield a decoded model, [Skip] to drop the event, or [Done] to end + * the stream. + * @throws RuntimeException the mapper may throw (e.g. on a decode failure or a mapped + * error-envelope); the exception propagates to the consumer's pull. + */ + public fun map( + eventName: String?, + data: String, + ): Result + + /** + * The outcome of mapping a single SSE event: a decoded [value], [Skip], or [Done]. + * + * @param T The model type. + */ + public sealed class Result { + /** + * A successfully decoded model to yield to the consumer. + * + * @property model The decoded value. + */ + public class Value(public val model: T) : Result() + + /** + * The event carries no model and should be silently skipped (keep-alives, bare + * cursors, ignored event types). Iteration continues with the next event. + */ + public object Skip : Result() + + /** + * The event is a done-sentinel: iteration terminates cleanly and the backing + * [SseStream] closes. No model is yielded for the sentinel event itself. + */ + public object Done : Result() + } + + public companion object { + /** Wraps [model] in a [Result.Value]. */ + @JvmStatic + public fun value(model: T): Result = Result.Value(model) + + /** The shared [Result.Skip] singleton, exposed for Java callers. */ + @JvmStatic + public fun skip(): Result = Result.Skip + + /** The shared [Result.Done] singleton, exposed for Java callers. */ + @JvmStatic + public fun done(): Result = Result.Done + } +} diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/SseStream.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/SseStream.kt new file mode 100644 index 00000000..1cab3f73 --- /dev/null +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/SseStream.kt @@ -0,0 +1,150 @@ +/* + * Copyright (c) 2026 dexpace and Omar Aljarrah + * + * Licensed under the MIT License. See LICENSE in the project root. + * SPDX-License-Identifier: MIT + */ + +package org.dexpace.sdk.core.http.sse + +import org.dexpace.sdk.core.io.BufferedSource +import java.io.Closeable +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock + +/** + * An [AutoCloseable] [Iterable] of [ServerSentEvent] whose lifecycle is bound to the + * underlying HTTP response. + * + * `SseStream` closes the [resource] it was opened over — typically the + * [org.dexpace.sdk.core.http.response.Response] (or its body) — whenever the stream is + * [closed][close], whether that close is explicit, via a `use {}` / try-with-resources block, + * or implicit when iteration runs to completion. This mirrors the close-on-partial-consume + * invariant [org.dexpace.sdk.core.http.paging.PagedIterable] enforces: a consumer that pulls + * only the first few events and walks away never strands the response body or its pooled + * connection. + * + * Unlike the bare [Sequence] returned by [BufferedSource.readServerSentEvents], an `SseStream` + * **owns** the response. The [ServerSentEventReader] it drives is single-pass and stateful, so + * the stream is single-pass too: + * + * - [iterator] may be called **once**. A second call throws [IllegalStateException]; this + * surfaces the previously documented "do not iterate twice" warning as an enforced guard. + * - Once [close] has run, [iterator] (and any further pulls from an in-flight iterator) throw + * [IllegalStateException] — the is-closed guard. + * - When the backing reader reports end-of-stream the iterator terminates cleanly **and** + * closes the stream, releasing the response without a separate [close] call. + * + * `close()` is idempotent and propagates to [resource] exactly once. The underlying response + * `close()` is itself expected to be idempotent, so a redundant [close] here is harmless. + * + * **Threading**: not thread-safe for iteration — drive a single iterator from one thread, as + * with the backing reader. [close] is safe to call from another thread (e.g. to cancel a + * long-lived stream); it takes a lock and flips an atomic guard, so a concurrent [close] + * races cleanly with iteration and the iterating thread observes the closed state on its next + * pull. + * + * @property reader The WHATWG parser driving the byte stream. Owned by this stream. + * @property resource The response (or body) whose lifecycle this stream governs; closed once + * when the stream closes. + */ +public class SseStream private constructor( + private val reader: ServerSentEventReader, + private val resource: Closeable, +) : AutoCloseable, Iterable { + private val closed = AtomicBoolean(false) + private val iteratorTaken = AtomicBoolean(false) + private val closeLock = ReentrantLock() + + /** + * Returns the single-pass iterator over the stream's events. + * + * Each pull advances the backing [ServerSentEventReader]. When the reader signals + * end-of-stream the iterator terminates and the stream is [closed][close] eagerly, so a + * fully consumed stream needs no explicit `close()`. + * + * @throws IllegalStateException if the stream is already closed, or if [iterator] was + * already called on this instance (the stream is single-pass). + */ + override fun iterator(): Iterator { + check(!closed.get()) { "SseStream is closed" } + check(iteratorTaken.compareAndSet(false, true)) { + "SseStream is single-pass; iterator() may only be called once" + } + return SseIterator() + } + + /** + * Closes the stream and the underlying [resource]. Idempotent: only the first call + * propagates to [resource]; later calls are no-ops. Safe to call concurrently with + * iteration. + */ + override fun close() { + if (closed.compareAndSet(false, true)) { + closeLock.withLock { resource.close() } + } + } + + private inner class SseIterator : AbstractIterator() { + override fun computeNext() { + // An out-of-band close() (e.g. cancellation from another thread) ends iteration + // cleanly rather than reading from a resource that is being torn down. + if (closed.get()) { + done() + return + } + // Reader exceptions (mid-stream connection drops) propagate to the caller; the + // stream stays open so the caller can still close()/use{} the response on the way + // out. AbstractIterator transitions to FAILED, matching PagedIterable's contract. + val event = reader.next() + if (event == null) { + // Clean end-of-stream: release the response without a separate close() call. + done() + close() + return + } + setNext(event) + } + } + + public companion object { + /** + * Opens an [SseStream] that parses [source] and, when closed, closes [resource]. + * + * Typical use binds [resource] to the originating + * [org.dexpace.sdk.core.http.response.Response] so closing the stream releases the + * response body and its connection: + * + * ``` + * SseStream.from(response.body!!.source(), response).use { stream -> + * for (event in stream) { /* handle event */ } + * } + * ``` + * + * [source] and [resource] may be the same object when the source itself owns the + * transport handle. + * + * @param source The byte stream to parse as Server-Sent Events. + * @param resource The handle to close when the stream closes (response, body, etc.). + */ + @JvmStatic + public fun from( + source: BufferedSource, + resource: Closeable, + ): SseStream = SseStream(ServerSentEventReader(source), resource) + + /** + * Opens an [SseStream] over a pre-built [reader], closing [resource] when the stream + * closes. Use when the caller already holds a configured [ServerSentEventReader]. + * + * @param reader The parser to drive. Owned by the returned stream. + * @param resource The handle to close when the stream closes. + */ + @JvmStatic + public fun fromReader( + reader: ServerSentEventReader, + resource: Closeable, + ): SseStream = SseStream(reader, resource) + } +} diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/TypedSseStream.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/TypedSseStream.kt new file mode 100644 index 00000000..19976754 --- /dev/null +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/TypedSseStream.kt @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2026 dexpace and Omar Aljarrah + * + * Licensed under the MIT License. See LICENSE in the project root. + * SPDX-License-Identifier: MIT + */ + +package org.dexpace.sdk.core.http.sse + +/** + * A reusable, per-endpoint adapter that turns an [SseStream] of raw [ServerSentEvent] into an + * [AutoCloseable] [Iterable] of typed models `T`, applying a caller-supplied [SseEventMapper] + * lazily as elements are pulled. + * + * This is the runtime primitive a code generator would target for a streaming endpoint: given + * the AutoCloseable SSE stream (#35) and a mapper that knows the endpoint's `event:` names, + * done-sentinel, and error-envelope conventions, it yields decoded models on demand. It is + * **fully usable by hand today** — construct one with any lambda mapper that calls the + * [org.dexpace.sdk.core.serde.Serde] SPI. + * + * Lifecycle is inherited from the wrapped [SseStream]: + * + * - [close] propagates to the underlying stream, which closes the response/body. Idempotent. + * - Running iteration to completion (mapper end-of-stream or a [SseEventMapper.Result.Done] + * sentinel) closes the stream automatically, so a fully consumed adapter needs no explicit + * `close()`. + * - Single-pass: [iterator] may be called once (the backing [SseStream] enforces this). + * + * **Lazy per-element decode.** The mapper — and therefore any [org.dexpace.sdk.core.serde.Serde] + * deserialize call inside it — runs only when the consumer pulls the next element. A partial + * consume (`first()`, `take(n)`, `stream().findFirst()`) decodes only the events actually + * taken, then `close()` releases the rest of the response. + * + * **Skip / Done.** Events the mapper returns [SseEventMapper.Result.Skip] for are dropped and + * iteration advances to the next event. A [SseEventMapper.Result.Done] ends iteration cleanly + * and closes the stream. + * + * **Errors.** A mapper that throws (decode failure, mapped error-envelope) propagates the + * exception to the consumer's pull; the underlying [SseStream] stays open so the caller's + * `use {}` / try-with-resources still closes the response on the way out. + * + * **Threading**: not thread-safe for iteration — drive a single iterator from one thread. + * [close] is safe from another thread (delegates to [SseStream.close]). + * + * @param T The model type events decode into. + * @property events The underlying raw SSE stream. Owned by this adapter; closed when this + * adapter closes. + * @property mapper The per-endpoint mapping from raw event to typed [Result]. + */ +public class TypedSseStream( + private val events: SseStream, + private val mapper: SseEventMapper, +) : AutoCloseable, Iterable { + /** + * Returns the single-pass iterator over decoded models. Delegates iteration to the wrapped + * [SseStream] and applies [mapper] lazily per element. + * + * @throws IllegalStateException if the underlying [SseStream] is already closed or its + * iterator was already taken. + */ + override fun iterator(): Iterator = MappingIterator(events.iterator()) + + /** + * Closes the adapter and the underlying [SseStream] (and thereby the response). Idempotent. + */ + override fun close() { + events.close() + } + + private inner class MappingIterator( + private val raw: Iterator, + ) : AbstractIterator() { + override fun computeNext() { + // Pull raw events until the mapper yields a value, signals Done, or the stream ends. + // Skips don't surface to the caller; only taken elements are decoded (lazy decode). + while (raw.hasNext()) { + val event = raw.next() + when (val result = mapper.map(event.event, event.data.joinToString("\n"))) { + is SseEventMapper.Result.Value -> { + setNext(result.model) + return + } + SseEventMapper.Result.Skip -> continue + SseEventMapper.Result.Done -> { + done() + close() + return + } + } + } + done() + } + } +} diff --git a/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/sse/SseStreamTest.kt b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/sse/SseStreamTest.kt new file mode 100644 index 00000000..8e765e7c --- /dev/null +++ b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/sse/SseStreamTest.kt @@ -0,0 +1,212 @@ +/* + * Copyright (c) 2026 dexpace and Omar Aljarrah + * + * Licensed under the MIT License. See LICENSE in the project root. + * SPDX-License-Identifier: MIT + */ + +package org.dexpace.sdk.core.http.sse + +import org.dexpace.sdk.core.http.common.Protocol +import org.dexpace.sdk.core.http.request.Method +import org.dexpace.sdk.core.http.request.Request +import org.dexpace.sdk.core.http.response.Response +import org.dexpace.sdk.core.http.response.ResponseBody +import org.dexpace.sdk.core.http.response.Status +import org.dexpace.sdk.core.io.BufferedSource +import org.dexpace.sdk.core.io.Io +import org.dexpace.sdk.io.OkioIoProvider +import java.io.Closeable +import java.io.IOException +import java.util.concurrent.atomic.AtomicInteger +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith +import kotlin.test.assertFalse +import kotlin.test.assertTrue + +class SseStreamTest { + @BeforeTest + fun installProvider() { + Io.installProvider(OkioIoProvider) + } + + private fun source(text: String): BufferedSource = Io.provider.source(text.toByteArray(Charsets.UTF_8)) + + /** Records how many times close() ran, standing in for a Response/body. */ + private class CountingCloseable : Closeable { + val closeCount = AtomicInteger(0) + + override fun close() { + closeCount.incrementAndGet() + } + + val isClosed: Boolean get() = closeCount.get() > 0 + } + + @Test + fun `full iteration yields all events and auto-closes the resource`() { + val resource = CountingCloseable() + val stream = SseStream.from(source("data: a\n\ndata: b\n\n"), resource) + + val events = stream.toList() + + assertEquals(2, events.size) + assertEquals(listOf("a"), events[0].data) + assertEquals(listOf("b"), events[1].data) + // Clean end-of-stream closes the resource exactly once without an explicit close(). + assertEquals(1, resource.closeCount.get()) + } + + @Test + fun `explicit close propagates to the resource`() { + val resource = CountingCloseable() + val stream = SseStream.from(source("data: a\n\n"), resource) + + assertFalse(resource.isClosed) + stream.close() + assertTrue(resource.isClosed) + } + + @Test + fun `use block closes the resource even on partial consume`() { + val resource = CountingCloseable() + val first = + SseStream.from(source("data: a\n\ndata: b\n\ndata: c\n\n"), resource).use { stream -> + // Pull only the first event, then leave the use block — the rest are abandoned. + stream.iterator().next() + } + + assertEquals(listOf("a"), first.data) + // Partial consume must not strand the resource: use{} closed it. + assertEquals(1, resource.closeCount.get()) + } + + @Test + fun `close is idempotent`() { + val resource = CountingCloseable() + val stream = SseStream.from(source("data: a\n\n"), resource) + + stream.close() + stream.close() + stream.close() + + assertEquals(1, resource.closeCount.get()) + } + + @Test + fun `iterator may only be taken once`() { + val stream = SseStream.from(source("data: a\n\n"), CountingCloseable()) + + stream.iterator() + assertFailsWith { stream.iterator() } + } + + @Test + fun `iterator on a closed stream throws`() { + val stream = SseStream.from(source("data: a\n\n"), CountingCloseable()) + stream.close() + + assertFailsWith { stream.iterator() } + } + + @Test + fun `out-of-band close ends an in-flight iteration cleanly`() { + val resource = CountingCloseable() + val stream = SseStream.from(source("data: a\n\ndata: b\n\ndata: c\n\n"), resource) + val iter = stream.iterator() + + // Consume the first event, then close mid-iteration (e.g. cancellation). + assertTrue(iter.hasNext()) + assertEquals(listOf("a"), iter.next().data) + stream.close() + + // The next pull observes the closed state and terminates without error. + assertFalse(iter.hasNext()) + assertEquals(1, resource.closeCount.get()) + } + + @Test + fun `reader exception propagates and leaves the resource closeable`() { + val resource = CountingCloseable() + val goodPart = "data: good\n\n".toByteArray(Charsets.UTF_8) + val backing = Io.provider.buffer().also { it.write(goodPart) } + val failingSource = + object : BufferedSource by backing { + override fun exhausted(): Boolean = false + + override fun readByte(): Byte { + if (backing.size == 0L) throw IOException("simulated connection drop") + return backing.readByte() + } + + override fun peek(): BufferedSource = backing.peek() + } + + val stream = SseStream.from(failingSource, resource) + val iter = stream.iterator() + + assertEquals(listOf("good"), iter.next().data) + // The mid-stream drop surfaces on the next pull. + assertFailsWith { iter.hasNext() } + // The stream stayed open after the failure, so the caller can still release it. + stream.close() + assertEquals(1, resource.closeCount.get()) + } + + @Test + fun `Response sseStream binds the stream to the response body lifecycle`() { + val closed = AtomicInteger(0) + val backing = source("data: a\n\ndata: b\n\n") + val body = + object : ResponseBody() { + override fun mediaType() = null + + override fun contentLength(): Long = -1L + + override fun source(): BufferedSource = backing + + override fun close() { + closed.incrementAndGet() + backing.close() + } + } + val response = + Response.builder() + .request(Request.builder().method(Method.GET).url("https://example.test/stream").build()) + .protocol(Protocol.HTTP_1_1) + .status(Status.OK) + .body(body) + .build() + + val events = response.sseStream().use { it.toList() } + + assertEquals(2, events.size) + // Closing the SSE stream closed the response body. + assertEquals(1, closed.get()) + } + + @Test + fun `Response sseStream throws when the response has no body`() { + val response = + Response.builder() + .request(Request.builder().method(Method.GET).url("https://example.test/stream").build()) + .protocol(Protocol.HTTP_1_1) + .status(Status.OK) + .build() + + assertFailsWith { response.sseStream() } + } + + @Test + fun `fromReader binds an existing reader to a resource`() { + val resource = CountingCloseable() + val reader = ServerSentEventReader(source("data: x\n\n")) + val stream = SseStream.fromReader(reader, resource) + + val events = stream.toList() + assertEquals(listOf("x"), events.single().data) + assertEquals(1, resource.closeCount.get()) + } +} diff --git a/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/sse/TypedSseStreamTest.kt b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/sse/TypedSseStreamTest.kt new file mode 100644 index 00000000..691f3ddf --- /dev/null +++ b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/sse/TypedSseStreamTest.kt @@ -0,0 +1,183 @@ +/* + * Copyright (c) 2026 dexpace and Omar Aljarrah + * + * Licensed under the MIT License. See LICENSE in the project root. + * SPDX-License-Identifier: MIT + */ + +package org.dexpace.sdk.core.http.sse + +import org.dexpace.sdk.core.io.BufferedSource +import org.dexpace.sdk.core.io.Io +import org.dexpace.sdk.core.serde.Deserializer +import org.dexpace.sdk.io.OkioIoProvider +import java.io.Closeable +import java.io.InputStream +import java.util.concurrent.atomic.AtomicInteger +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith +import kotlin.test.assertTrue + +class TypedSseStreamTest { + @BeforeTest + fun installProvider() { + Io.installProvider(OkioIoProvider) + } + + private fun source(text: String): BufferedSource = Io.provider.source(text.toByteArray(Charsets.UTF_8)) + + /** Builds a single `data:`-only SSE event block carrying [payload]. */ + private fun event(payload: String): String = "data: $payload\n\n" + + /** Toy JSON for a [Chunk] with [text]. */ + private fun chunkJson(text: String): String = """{"text":"$text"}""" + + private fun stream( + text: String, + resource: Closeable = CountingCloseable(), + ): SseStream = SseStream.from(source(text), resource) + + private class CountingCloseable : Closeable { + val closeCount = AtomicInteger(0) + + override fun close() { + closeCount.incrementAndGet() + } + } + + private data class Chunk(val text: String) + + /** + * Minimal [Deserializer] standing in for a real adapter (e.g. sdk-serde-jackson): it + * records every decode call and parses the toy `{"text":"..."}` shape into [Chunk], + * letting the test assert both typed output and the Serde SPI being exercised lazily. + */ + private class RecordingDeserializer : Deserializer { + val decoded = mutableListOf() + + @Suppress("UNCHECKED_CAST") + override fun deserialize( + input: String, + type: Class, + ): T { + decoded.add(input) + val text = Regex("\"text\"\\s*:\\s*\"([^\"]*)\"").find(input)?.groupValues?.get(1) ?: "" + return Chunk(text) as T + } + + override fun deserialize( + input: ByteArray, + type: Class, + ): T = deserialize(String(input, Charsets.UTF_8), type) + + override fun deserialize( + inputStream: InputStream, + type: Class, + ): T = deserialize(inputStream.readBytes().toString(Charsets.UTF_8), type) + } + + /** Endpoint mapper: `[DONE]` ends the stream, comment-only events skip, else decode. */ + private fun chunkMapper(deserializer: Deserializer): SseEventMapper = + SseEventMapper { _, data -> + when { + data == "[DONE]" -> SseEventMapper.Result.Done + data.isEmpty() -> SseEventMapper.Result.Skip + else -> SseEventMapper.Result.Value(deserializer.deserialize(data, Chunk::class.java)) + } + } + + @Test + fun `maps raw events to typed models via the deserializer`() { + val deser = RecordingDeserializer() + val raw = stream(event(chunkJson("hi")) + event(chunkJson("bye"))) + + val chunks = TypedSseStream(raw, chunkMapper(deser)).toList() + + assertEquals(listOf(Chunk("hi"), Chunk("bye")), chunks) + assertEquals(2, deser.decoded.size) + } + + @Test + fun `done sentinel ends iteration and closes the underlying stream`() { + val resource = CountingCloseable() + val deser = RecordingDeserializer() + val raw = stream(event(chunkJson("a")) + event("[DONE]") + event(chunkJson("never")), resource) + + val chunks = TypedSseStream(raw, chunkMapper(deser)).toList() + + // Only the pre-sentinel element is yielded; the post-sentinel event is never decoded. + assertEquals(listOf(Chunk("a")), chunks) + assertEquals(1, deser.decoded.size) + // Reaching the done-sentinel closes the underlying stream/resource. + assertEquals(1, resource.closeCount.get()) + } + + @Test + fun `skip results are dropped and do not surface to the caller`() { + val deser = RecordingDeserializer() + // The middle event is a comment-only keep-alive: its data is empty, so the mapper skips it. + val raw = stream(event(chunkJson("a")) + ":keep-alive\n\n" + event(chunkJson("b"))) + + val chunks = TypedSseStream(raw, chunkMapper(deser)).toList() + + assertEquals(listOf(Chunk("a"), Chunk("b")), chunks) + } + + @Test + fun `decode happens lazily per element on consume`() { + val deser = RecordingDeserializer() + val raw = stream(event(chunkJson("a")) + event(chunkJson("b"))) + val iter = TypedSseStream(raw, chunkMapper(deser)).iterator() + + // Nothing decoded before the first pull. + assertEquals(0, deser.decoded.size) + assertEquals(Chunk("a"), iter.next()) + // Only the first element has been decoded; the second is untouched until pulled. + assertEquals(1, deser.decoded.size) + assertEquals(Chunk("b"), iter.next()) + assertEquals(2, deser.decoded.size) + } + + @Test + fun `close propagates from the typed adapter to the underlying resource`() { + val resource = CountingCloseable() + val raw = stream(event(chunkJson("a")), resource) + val typed = TypedSseStream(raw, chunkMapper(RecordingDeserializer())) + + typed.close() + assertEquals(1, resource.closeCount.get()) + } + + @Test + fun `mapper exceptions propagate and leave the stream closeable`() { + val resource = CountingCloseable() + val raw = stream(event("boom"), resource) + val typed = + TypedSseStream(raw) { _, _ -> + error("decode failed") + } + + assertFailsWith { typed.iterator().next() } + // The underlying stream stayed open; the caller can still release it. + typed.close() + assertEquals(1, resource.closeCount.get()) + } + + @Test + fun `result factory helpers build the expected variants`() { + val value = SseEventMapper.value(Chunk("v")) + assertTrue(value is SseEventMapper.Result.Value) + assertEquals(Chunk("v"), value.model) + assertTrue(SseEventMapper.skip() === SseEventMapper.Result.Skip) + assertTrue(SseEventMapper.done() === SseEventMapper.Result.Done) + } + + @Test + fun `typed extension wraps a stream`() { + val deser = RecordingDeserializer() + val chunks = stream(event(chunkJson("z"))).typed(chunkMapper(deser)).toList() + assertTrue(chunks.contains(Chunk("z"))) + } +}