Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions sdk-core/api/sdk-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -1560,6 +1560,58 @@ public final class org/dexpace/sdk/core/http/sse/ServerSentEventReader {
public final class org/dexpace/sdk/core/http/sse/ServerSentEvents {
public static final fun readServerSentEvents (Lorg/dexpace/sdk/core/io/BufferedSource;)Lkotlin/sequences/Sequence;
public static final fun readServerSentEventsAsIterable (Lorg/dexpace/sdk/core/io/BufferedSource;)Ljava/lang/Iterable;
public static final fun sseStream (Lorg/dexpace/sdk/core/http/response/Response;)Lorg/dexpace/sdk/core/http/sse/SseStream;
public static final fun typed (Lorg/dexpace/sdk/core/http/sse/SseStream;Lorg/dexpace/sdk/core/http/sse/SseEventMapper;)Lorg/dexpace/sdk/core/http/sse/TypedSseStream;
}

public abstract interface class org/dexpace/sdk/core/http/sse/SseEventMapper {
public static final field Companion Lorg/dexpace/sdk/core/http/sse/SseEventMapper$Companion;
public static fun done ()Lorg/dexpace/sdk/core/http/sse/SseEventMapper$Result;
public abstract fun map (Ljava/lang/String;Ljava/lang/String;)Lorg/dexpace/sdk/core/http/sse/SseEventMapper$Result;
public static fun skip ()Lorg/dexpace/sdk/core/http/sse/SseEventMapper$Result;
public static fun value (Ljava/lang/Object;)Lorg/dexpace/sdk/core/http/sse/SseEventMapper$Result;
}

public final class org/dexpace/sdk/core/http/sse/SseEventMapper$Companion {
public final fun done ()Lorg/dexpace/sdk/core/http/sse/SseEventMapper$Result;
public final fun skip ()Lorg/dexpace/sdk/core/http/sse/SseEventMapper$Result;
public final fun value (Ljava/lang/Object;)Lorg/dexpace/sdk/core/http/sse/SseEventMapper$Result;
}

public abstract class org/dexpace/sdk/core/http/sse/SseEventMapper$Result {
}

public final class org/dexpace/sdk/core/http/sse/SseEventMapper$Result$Done : org/dexpace/sdk/core/http/sse/SseEventMapper$Result {
public static final field INSTANCE Lorg/dexpace/sdk/core/http/sse/SseEventMapper$Result$Done;
}

public final class org/dexpace/sdk/core/http/sse/SseEventMapper$Result$Skip : org/dexpace/sdk/core/http/sse/SseEventMapper$Result {
public static final field INSTANCE Lorg/dexpace/sdk/core/http/sse/SseEventMapper$Result$Skip;
}

public final class org/dexpace/sdk/core/http/sse/SseEventMapper$Result$Value : org/dexpace/sdk/core/http/sse/SseEventMapper$Result {
public fun <init> (Ljava/lang/Object;)V
public final fun getModel ()Ljava/lang/Object;
}

public final class org/dexpace/sdk/core/http/sse/SseStream : java/lang/AutoCloseable, java/lang/Iterable, kotlin/jvm/internal/markers/KMappedMarker {
public static final field Companion Lorg/dexpace/sdk/core/http/sse/SseStream$Companion;
public synthetic fun <init> (Lorg/dexpace/sdk/core/http/sse/ServerSentEventReader;Ljava/io/Closeable;Lkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun close ()V
public static final fun from (Lorg/dexpace/sdk/core/io/BufferedSource;Ljava/io/Closeable;)Lorg/dexpace/sdk/core/http/sse/SseStream;
public static final fun fromReader (Lorg/dexpace/sdk/core/http/sse/ServerSentEventReader;Ljava/io/Closeable;)Lorg/dexpace/sdk/core/http/sse/SseStream;
public fun iterator ()Ljava/util/Iterator;
}

public final class org/dexpace/sdk/core/http/sse/SseStream$Companion {
public final fun from (Lorg/dexpace/sdk/core/io/BufferedSource;Ljava/io/Closeable;)Lorg/dexpace/sdk/core/http/sse/SseStream;
public final fun fromReader (Lorg/dexpace/sdk/core/http/sse/ServerSentEventReader;Ljava/io/Closeable;)Lorg/dexpace/sdk/core/http/sse/SseStream;
}

public final class org/dexpace/sdk/core/http/sse/TypedSseStream : java/lang/AutoCloseable, java/lang/Iterable, kotlin/jvm/internal/markers/KMappedMarker {
public fun <init> (Lorg/dexpace/sdk/core/http/sse/SseStream;Lorg/dexpace/sdk/core/http/sse/SseEventMapper;)V
public fun close ()V
public fun iterator ()Ljava/util/Iterator;
}

public final class org/dexpace/sdk/core/instrumentation/ClientLogger {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.dexpace.sdk.core.http.sse

import org.dexpace.sdk.core.http.response.Response
import org.dexpace.sdk.core.io.BufferedSource

/**
Expand Down Expand Up @@ -53,3 +54,36 @@ public fun BufferedSource.readServerSentEvents(): Sequence<ServerSentEvent> {
*/
public fun BufferedSource.readServerSentEventsAsIterable(): Iterable<ServerSentEvent> =
readServerSentEvents().asIterable()

/**
* Opens an [SseStream] over this response's body, binding the stream's lifecycle to the
* response: closing the returned stream (explicitly, via `use {}` / try-with-resources, or
* implicitly when iteration completes) closes the response and releases its connection.
*
* Use this on a streaming response to consume events without stranding the body on a partial
* consume:
*
* ```
* response.sseStream().use { stream ->
* for (event in stream) { /* handle event */ }
* }
* ```
*
* @throws IllegalStateException if the response has no body.
*/
public fun Response.sseStream(): SseStream {
val responseBody = checkNotNull(body) { "Response has no body to stream as Server-Sent Events" }
return SseStream.from(responseBody.source(), this)
}

/**
* Wraps this raw [SseStream] in a [TypedSseStream] that maps each event to a model `T` via
* [mapper], decoding lazily on consume. The returned adapter inherits this stream's lifecycle:
* closing it closes this stream (and the response).
*
* The [mapper] is the per-endpoint seam — it is where the [org.dexpace.sdk.core.serde.Serde]
* SPI is called and where done-sentinel / error-envelope conventions live.
*
* @param mapper Maps a raw event to a [SseEventMapper.Result].
*/
public fun <T> SseStream.typed(mapper: SseEventMapper<T>): TypedSseStream<T> = TypedSseStream(this, mapper)
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright (c) 2026 dexpace and Omar Aljarrah
*
* Licensed under the MIT License. See LICENSE in the project root.
* SPDX-License-Identifier: MIT
*/

package org.dexpace.sdk.core.http.sse

/**
* Maps a raw [ServerSentEvent] to a typed model, or signals that the event should be skipped
* or that the stream is finished.
*
* This is the per-endpoint seam between core's format-agnostic SSE plumbing and an API's
* own conventions. A mapper receives the event's `event:` name (or `null` if the server sent
* no `event:` field) and the joined `data:` payload, and returns:
*
* - a decoded value `T` — yielded to the caller;
* - [Skip] — the event carries no model (a keep-alive comment, a bare `id:` cursor, an
* ignored event type); iteration silently moves on;
* - [Done] — a sentinel event marking end of stream (e.g. OpenAI's `data: [DONE]`); iteration
* terminates cleanly and the underlying [SseStream] closes.
*
* The mapper is the place to call the [org.dexpace.sdk.core.serde.Serde] SPI — for example
* `serde.deserializer.deserialize(data, MyDto::class.java)` — and the place to translate an
* error-envelope event into a thrown exception. Core deliberately holds **no** sentinel or
* error conventions; those are per-API and live entirely in the caller-supplied mapper.
*
* Mapping is applied **lazily, one element at a time**, as the typed iterator is pulled, so a
* partially consumed stream only decodes the events actually taken.
*
* Kotlin callers may pass a lambda; Java callers may use a lambda or implement this interface.
*
* @param T The model type events decode into.
*/
public fun interface SseEventMapper<out T> {
/**
* Maps one event to a [Result].
*
* @param eventName The event's `event:` field, or `null` if the server omitted it.
* @param data The event's `data:` lines joined with `\n` (empty string if the event had
* no `data:` field). Joining matches the conventional WHATWG client-side reconstruction.
* @return [value] to yield a decoded model, [Skip] to drop the event, or [Done] to end
* the stream.
* @throws RuntimeException the mapper may throw (e.g. on a decode failure or a mapped
* error-envelope); the exception propagates to the consumer's pull.
*/
public fun map(
eventName: String?,
data: String,
): Result<T>

/**
* The outcome of mapping a single SSE event: a decoded [value], [Skip], or [Done].
*
* @param T The model type.
*/
public sealed class Result<out T> {
/**
* A successfully decoded model to yield to the consumer.
*
* @property model The decoded value.
*/
public class Value<out T>(public val model: T) : Result<T>()

/**
* The event carries no model and should be silently skipped (keep-alives, bare
* cursors, ignored event types). Iteration continues with the next event.
*/
public object Skip : Result<Nothing>()

/**
* The event is a done-sentinel: iteration terminates cleanly and the backing
* [SseStream] closes. No model is yielded for the sentinel event itself.
*/
public object Done : Result<Nothing>()
}

public companion object {
/** Wraps [model] in a [Result.Value]. */
@JvmStatic
public fun <T> value(model: T): Result<T> = Result.Value(model)

/** The shared [Result.Skip] singleton, exposed for Java callers. */
@JvmStatic
public fun skip(): Result<Nothing> = Result.Skip

/** The shared [Result.Done] singleton, exposed for Java callers. */
@JvmStatic
public fun done(): Result<Nothing> = Result.Done
}
}
150 changes: 150 additions & 0 deletions sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/SseStream.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Copyright (c) 2026 dexpace and Omar Aljarrah
*
* Licensed under the MIT License. See LICENSE in the project root.
* SPDX-License-Identifier: MIT
*/

package org.dexpace.sdk.core.http.sse

import org.dexpace.sdk.core.io.BufferedSource
import java.io.Closeable
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock

/**
* An [AutoCloseable] [Iterable] of [ServerSentEvent] whose lifecycle is bound to the
* underlying HTTP response.
*
* `SseStream` closes the [resource] it was opened over — typically the
* [org.dexpace.sdk.core.http.response.Response] (or its body) — whenever the stream is
* [closed][close], whether that close is explicit, via a `use {}` / try-with-resources block,
* or implicit when iteration runs to completion. This mirrors the close-on-partial-consume
* invariant [org.dexpace.sdk.core.http.paging.PagedIterable] enforces: a consumer that pulls
* only the first few events and walks away never strands the response body or its pooled
* connection.
*
* Unlike the bare [Sequence] returned by [BufferedSource.readServerSentEvents], an `SseStream`
* **owns** the response. The [ServerSentEventReader] it drives is single-pass and stateful, so
* the stream is single-pass too:
*
* - [iterator] may be called **once**. A second call throws [IllegalStateException]; this
* surfaces the previously documented "do not iterate twice" warning as an enforced guard.
* - Once [close] has run, [iterator] (and any further pulls from an in-flight iterator) throw
* [IllegalStateException] — the is-closed guard.
* - When the backing reader reports end-of-stream the iterator terminates cleanly **and**
* closes the stream, releasing the response without a separate [close] call.
*
* `close()` is idempotent and propagates to [resource] exactly once. The underlying response
* `close()` is itself expected to be idempotent, so a redundant [close] here is harmless.
*
* **Threading**: not thread-safe for iteration — drive a single iterator from one thread, as
* with the backing reader. [close] is safe to call from another thread (e.g. to cancel a
* long-lived stream); it takes a lock and flips an atomic guard, so a concurrent [close]
* races cleanly with iteration and the iterating thread observes the closed state on its next
* pull.
*
* @property reader The WHATWG parser driving the byte stream. Owned by this stream.
* @property resource The response (or body) whose lifecycle this stream governs; closed once
* when the stream closes.
*/
public class SseStream private constructor(
private val reader: ServerSentEventReader,
private val resource: Closeable,
) : AutoCloseable, Iterable<ServerSentEvent> {
private val closed = AtomicBoolean(false)
private val iteratorTaken = AtomicBoolean(false)
private val closeLock = ReentrantLock()

/**
* Returns the single-pass iterator over the stream's events.
*
* Each pull advances the backing [ServerSentEventReader]. When the reader signals
* end-of-stream the iterator terminates and the stream is [closed][close] eagerly, so a
* fully consumed stream needs no explicit `close()`.
*
* @throws IllegalStateException if the stream is already closed, or if [iterator] was
* already called on this instance (the stream is single-pass).
*/
override fun iterator(): Iterator<ServerSentEvent> {
check(!closed.get()) { "SseStream is closed" }
check(iteratorTaken.compareAndSet(false, true)) {
"SseStream is single-pass; iterator() may only be called once"
}
return SseIterator()
}

/**
* Closes the stream and the underlying [resource]. Idempotent: only the first call
* propagates to [resource]; later calls are no-ops. Safe to call concurrently with
* iteration.
*/
override fun close() {
if (closed.compareAndSet(false, true)) {
closeLock.withLock { resource.close() }
}
}

private inner class SseIterator : AbstractIterator<ServerSentEvent>() {
override fun computeNext() {
// An out-of-band close() (e.g. cancellation from another thread) ends iteration
// cleanly rather than reading from a resource that is being torn down.
if (closed.get()) {
done()
return
}
// Reader exceptions (mid-stream connection drops) propagate to the caller; the
// stream stays open so the caller can still close()/use{} the response on the way
// out. AbstractIterator transitions to FAILED, matching PagedIterable's contract.
val event = reader.next()
if (event == null) {
// Clean end-of-stream: release the response without a separate close() call.
done()
close()
return
}
setNext(event)
}
}

public companion object {
/**
* Opens an [SseStream] that parses [source] and, when closed, closes [resource].
*
* Typical use binds [resource] to the originating
* [org.dexpace.sdk.core.http.response.Response] so closing the stream releases the
* response body and its connection:
*
* ```
* SseStream.from(response.body!!.source(), response).use { stream ->
* for (event in stream) { /* handle event */ }
* }
* ```
*
* [source] and [resource] may be the same object when the source itself owns the
* transport handle.
*
* @param source The byte stream to parse as Server-Sent Events.
* @param resource The handle to close when the stream closes (response, body, etc.).
*/
@JvmStatic
public fun from(
source: BufferedSource,
resource: Closeable,
): SseStream = SseStream(ServerSentEventReader(source), resource)

/**
* Opens an [SseStream] over a pre-built [reader], closing [resource] when the stream
* closes. Use when the caller already holds a configured [ServerSentEventReader].
*
* @param reader The parser to drive. Owned by the returned stream.
* @param resource The handle to close when the stream closes.
*/
@JvmStatic
public fun fromReader(
reader: ServerSentEventReader,
resource: Closeable,
): SseStream = SseStream(reader, resource)
}
}
Loading
Loading