Skip to content

feat: add lifecycle-bound SSE stream and typed per-endpoint adapter#153

Open
OmarAlJarrah wants to merge 1 commit into
mainfrom
feat/sse-stream-lifecycle
Open

feat: add lifecycle-bound SSE stream and typed per-endpoint adapter#153
OmarAlJarrah wants to merge 1 commit into
mainfrom
feat/sse-stream-lifecycle

Conversation

@OmarAlJarrah

Copy link
Copy Markdown
Member

Summary

Adds a lifecycle-bound Server-Sent Events stream and a reusable typed adapter, so streaming endpoints can be consumed safely and decoded into models without per-API conventions leaking into core.

SseStream — AutoCloseable Iterable bound to the response (#35)

The SSE surface was a bare Sequence whose reader explicitly disclaimed ownership of the response, so a partial consume could strand the connection. SseStream now wraps the WHATWG parser and owns the response:

  • AutoCloseable, Iterable<ServerSentEvent>. Closing it — explicitly, via use {} / try-with-resources, or implicitly when iteration runs to completion — closes the underlying response/body and releases its pooled connection. This mirrors the close-on-partial-consume invariant PagedIterable enforces.
  • The previously doc-only "do not iterate twice" warning is now an enforced single-pass guard; iteration after close() is rejected; close() is idempotent and safe to call concurrently (e.g. cancellation from another thread).
  • Reader exceptions (mid-stream drops) propagate to the caller and leave the stream closeable so a surrounding use {} still releases the response.
  • Response.sseStream() extension opens a stream bound to the response body lifecycle.

TypedSseStream<T> + SseEventMapper<T> — per-endpoint typed adapter (#62)

A reusable runtime adapter turning SseStream into AutoCloseable Iterable<T> by applying a caller-supplied (eventName, data) -> Result<T> mapper:

  • The mapper returns a decoded value, Skip (keep-alives, bare cursors), or a Done sentinel that ends the stream and closes it. It is the seam where the Serde SPI is invoked and where per-API done-sentinel / error-envelope conventions live — core holds none of them.
  • Lazy per-element decode: the mapper (and any Serde deserialize inside it) runs only when the consumer pulls the next element, so a partial consume decodes only the events taken.
  • Closing the adapter propagates to the underlying stream and response.
  • SseStream.typed(mapper) extension for ergonomic wrapping.

This is a hand-written runtime primitive — no code generation — fully usable today; a generator can target it later without embedding any per-API convention in core.

Tests

SseStreamTest and TypedSseStreamTest cover: full-iteration auto-close, explicit close, use {} close on partial consume, idempotent close, single-pass and is-closed guards, out-of-band close mid-iteration, reader-exception propagation, Response.sseStream() lifecycle binding and the no-body error, typed mapping via a recording Deserializer, lazy per-element decode, Skip/Done handling, mapper-exception propagation, and close propagation.

Gated build (scoped, --no-daemon)

./gradlew :sdk-core:test :sdk-core:ktlintCheck :sdk-core:detekt :sdk-core:apiCheck --no-daemon

Result: BUILD SUCCESSFUL. :sdk-core:apiDump was run and the regenerated sdk-core/api/sdk-core.api is committed.

Closes #35
Closes #62

Introduce SseStream: an AutoCloseable Iterable<ServerSentEvent> that owns
the underlying HTTP response. Closing the stream — explicitly, via use {} /
try-with-resources, or implicitly when iteration runs to completion — closes
the response and releases its connection, so a partial consume never strands
the body. This mirrors the close-on-partial-consume invariant PagedIterable
enforces. The previously doc-only "do not iterate twice" warning is now an
enforced single-pass guard, and iteration after close is rejected.

Add a reusable per-endpoint adapter, TypedSseStream<T>, that maps raw events
to typed models via a caller-supplied SseEventMapper. The mapper receives the
event name and joined data and returns a decoded value, Skip, or a Done
sentinel; it is the seam where the Serde SPI is invoked and where per-API
done-sentinel and error-envelope conventions live. Mapping is applied lazily,
one element at a time, so a partial consume decodes only the events taken.
Closing the typed adapter propagates to the underlying stream.

Both surfaces are hand-written runtime primitives usable today; a code
generator can target them later without embedding any per-API convention in
core.

Closes #35
Closes #62
@OmarAlJarrah

Copy link
Copy Markdown
Member Author

This adds two SSE consumption primitives to http.sse: SseStream, an AutoCloseable single-pass Iterable<ServerSentEvent> that binds the reader's lifecycle to the originating Response (closing on explicit close, use {}, or iteration completion), and TypedSseStream, a per-endpoint adapter that lazily maps raw events to typed models via a SseEventMapper. The design mirrors PagedIterable's close-on-partial-consume invariant nicely, the API snapshot is regenerated correctly, and the test suites are thorough. A couple of things to fix before merging, plus one cleanup.

Issues

close() overrides drop @Throws(IOException)SseStream.kt:83-87 (and TypedSseStream.kt:66-68). Both close() methods forward to resource.close() — a java.io.Closeable, and the documented resource is a Response whose close() is @Throws(IOException::class) — but neither override carries the annotation, so the compiled signature is public void close() with no throws clause. Every sibling resource-owning close in sdk-core (Response.close(), PagedResponse.close(), ParsedResponse.close(), TeeSink.close()) annotates it. The effect for a Java caller in try-with-resources is no compile-time signal to handle IOException, and a real close failure gets sneaky-thrown as an undeclared checked exception. Add @Throws(IOException::class) to both (this is a public-API change, so apiDump will need rerunning).

closeLock is dead synchronizationSseStream.kt:84-86. resource.close() already runs inside if (closed.compareAndSet(false, true)), which guarantees exactly one thread ever executes the body, so wrapping it in closeLock.withLock {} guards against nothing. Harmless but misleading — it implies a concurrency hazard the CAS has already eliminated. Drop the lock and its import.

Worth double-checking

  • The KDoc (SseStream.kt:42-46) and the computeNext comment (SseStream.kt:91-96) say an out-of-band close() "races cleanly" with iteration. Does that actually hold for a close that lands during a reader.next() read rather than between pulls? closeLock guards only resource.close(), not reader.next() (line 100), so if thread B closes after thread A passes the closed.get() check at line 93 but before/while next() reads, the source is torn down under the in-flight read and next() throws IOException rather than terminating cleanly. The one test for this (SseStreamTest.kt:114) only covers the between-pulls case. If the mid-read interleaving can't actually terminate cleanly, the wording is worth softening so the guarantee isn't overstated.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Generate a per-endpoint SSE adapter mapping events to typed models Make the SSE stream an AutoCloseable Iterable bound to response close

1 participant