diff --git a/sdk-transport-jdkhttp/src/main/kotlin/org/dexpace/sdk/transport/jdkhttp/internal/BodyPublishers.kt b/sdk-transport-jdkhttp/src/main/kotlin/org/dexpace/sdk/transport/jdkhttp/internal/BodyPublishers.kt index 38b8f151..77a7985e 100644 --- a/sdk-transport-jdkhttp/src/main/kotlin/org/dexpace/sdk/transport/jdkhttp/internal/BodyPublishers.kt +++ b/sdk-transport-jdkhttp/src/main/kotlin/org/dexpace/sdk/transport/jdkhttp/internal/BodyPublishers.kt @@ -15,10 +15,12 @@ import java.io.InputStream import java.io.InterruptedIOException import java.io.PipedInputStream import java.io.PipedOutputStream +import java.io.UncheckedIOException import java.net.http.HttpRequest import java.util.concurrent.ExecutorService import java.util.concurrent.Executors import java.util.concurrent.Future +import java.util.concurrent.atomic.AtomicBoolean import org.dexpace.sdk.core.http.request.RequestBody as SdkRequestBody /** @@ -47,15 +49,28 @@ import org.dexpace.sdk.core.http.request.RequestBody as SdkRequestBody * pipe coordination overhead measurably exceeds the buffer cost below this size, and * holding 64 KiB in heap is uncontroversial. * - * ## Per-subscription pipes + * ## Per-subscription pipes and one-shot bodies * * The JDK invokes the `ofInputStream` supplier **once per subscription**, and it re-subscribes * the same publisher on internal resends — notably the 407 proxy-auth retry driven by the * `ProxyAuthenticator` this transport installs, and HTTP/2 `GOAWAY` replays. A supplier that * returned one shared, already-draining stream would hand the second subscription an exhausted * pipe, so the authenticated retry would carry a truncated or empty body. The supplier therefore - * constructs a new pipe + writer each time, and the streaming path requires the body to be - * **replayable** (see [streamingPublisher]) so a second `writeTo` produces the same bytes. + * constructs a new pipe + writer each time. + * + * A **replayable** body produces the same bytes on every `writeTo`, so each subscription streams + * the full body straight from the source — no buffering, any number of resends. + * + * A **non-replayable** (one-shot) body can be written only once. The streaming path therefore + * **streams the first subscription directly from the source** — it does NOT buffer the body into + * heap up front, which is the whole point of the streaming path (a body that is large or of + * unknown length is exactly the body you must not materialise). A **second** subscription cannot + * replay a consumed one-shot body, so it fails loudly with a clear "one-shot body cannot be + * re-sent — supply a replayable body" `IOException` rather than silently shipping a truncated or + * empty body. This matches the consume-once discipline of `OneShotInputStreamRequestBody` / + * `BufferedSourceRequestBody` in `sdk-core`: a resend of a one-shot body is a caller error, and + * the transport surfaces it instead of corrupting the request. Callers that need the proxy-auth / + * `GOAWAY` resend to succeed must supply a replayable body. See [streamingPublisher]. * * ## Writer thread lifecycle * @@ -137,22 +152,28 @@ internal object BodyPublishers { /** * Streaming publisher for bodies larger than the eager threshold (or of unknown length). * - * The JDK calls the `ofInputStream` supplier **once per subscription** and re-subscribes - * on internal resends (407 proxy-auth retry, HTTP/2 `GOAWAY` replay). Each subscription - * therefore re-reads the body, so the body must be replayable for the resent request to - * carry the correct bytes. If [body] is not already replayable it is buffered once into an - * in-memory copy via [SdkRequestBody.toReplayable]. A body that cannot be made replayable - * (its `toReplayable` throws mid-write) has already been partially consumed and cannot be - * recovered — the bytes drained by the failed buffering attempt are gone and `writeTo` - * cannot be driven a second time on a consume-once body. This method therefore fails with a - * checked [IOException] that wraps the original cause rather than masking it (a second - * `writeTo` would trip a consume-once guard and surface an `IllegalStateException`) or - * shipping a truncated body. Surfacing it as a checked [IOException] keeps the transport's - * `@Throws(IOException)` contract intact and matches the eager path, which already propagates - * a mid-write failure as an [IOException]; callers that need resilience here must supply a - * replayable body. + * The JDK calls the `ofInputStream` supplier **once per subscription** and re-subscribes on + * internal resends (407 proxy-auth retry, HTTP/2 `GOAWAY` replay). The supplier therefore + * mints a fresh pipe + writer per subscription so each subscription streams the body from its + * source — no up-front heap buffering. This is the whole point of the streaming path: a body + * that is large or of unknown length is exactly the body that must not be materialised into a + * `ByteArray`/`Buffer`. + * + * **Replayable body.** Every `writeTo` produces the same bytes, so every subscription (the + * first and any resend) streams the full body straight from the source. * - * For each subscription the supplier: + * **Non-replayable (one-shot) body.** The body can be written only once. The first + * subscription streams it directly from the source. A **second** subscription cannot replay a + * consumed one-shot body, so [oneShotSupplier] returns a stream that fails its first read with + * a clear [IOException] ("one-shot request body cannot be re-sent — supply a replayable + * body"). The JDK surfaces that read failure on the resend; the request fails loudly rather + * than shipping a truncated or empty body. This deliberately does NOT buffer the body up + * front to make resends succeed — that would re-import the OOM hazard the streaming path + * exists to avoid (a 2 GiB upload would need 2 GiB of contiguous heap, and a body above the + * byte-array/segment limits would fail outright). Callers that need proxy-auth / `GOAWAY` + * resend support must supply a replayable body. + * + * For each (admitted) subscription the supplier: * 1. creates a fresh [PipedOutputStream] / [PipedInputStream] pair; * 2. submits a writer task that drives `body.writeTo(...)` onto the OutputStream side and * captures its [Future]; and @@ -160,40 +181,33 @@ internal object BodyPublishers { * closes the OutputStream — unblocking a writer stranded in `PipedOutputStream.write` * when the JDK abandons the subscription without draining it. * - * Errors thrown from `body.writeTo` close the OutputStream prematurely (the `use { }` block - * exits abnormally); the JDK reader then observes an `IOException` on its next read and the + * Errors thrown from `body.writeTo` (including a one-shot body's own consume-once guard, or a + * mid-write `IOException`) close the OutputStream prematurely (the `use { }` block exits + * abnormally); the JDK reader then observes an `IOException` on its next read and the * surrounding future completes exceptionally. Thread interruption in the writer is honoured * per the repo convention: the flag is restored and an [InterruptedIOException] is surfaced * so the reader side fails loudly. The DEBUG log records the writer-side failure so it is * discoverable in tests / production. */ private fun streamingPublisher(body: SdkRequestBody): HttpRequest.BodyPublisher { - val replayable: SdkRequestBody = - if (body.isReplayable()) { - body + if (body.isReplayable()) { + return HttpRequest.BodyPublishers.ofInputStream { newSubscriptionStream(body) } + } + // One-shot body: a single AtomicBoolean — created once here, captured by the supplier — + // admits exactly the first subscription to stream the body. Every later subscription + // (proxy-auth retry, HTTP/2 GOAWAY replay) gets a stream that fails loudly, because a + // consumed one-shot body cannot be replayed. + val firstSubscription = AtomicBoolean(true) + return HttpRequest.BodyPublishers.ofInputStream { + if (firstSubscription.compareAndSet(true, false)) { + newSubscriptionStream(body) } else { - try { - body.toReplayable() - } catch (e: IOException) { - // toReplayable drained the body once and failed mid-write; a consume-once - // body has already flipped its guard, so a second writeTo would trip that - // guard and surface an IllegalStateException that masks this IOException. The - // partially captured bytes are local to toReplayable and gone. Rethrow as a - // checked IOException wrapping the cause — honouring the transport's - // @Throws(IOException) contract and matching the eager path — rather than - // re-driving the body or shipping a truncated copy. - log.atVerbose() - .event("transport.jdkhttp.body.replayable.failed") - .field("error.message", e.message ?: "") - .log("could not buffer streaming body as replayable; failing the request") - throw IOException( - "streaming request body could not be buffered for the JDK transport and " + - "has been partially consumed; supply a replayable body", - e, - ) - } + log.atVerbose() + .event("transport.jdkhttp.body.oneshot.resend") + .log("one-shot streaming body re-subscribed; failing the resend loudly") + ResendRefusedInputStream() } - return HttpRequest.BodyPublishers.ofInputStream { newSubscriptionStream(replayable) } + } } /** @@ -238,6 +252,41 @@ internal object BodyPublishers { return KillSwitchInputStream(pipeIn, pipeOut, writer) } + /** + * Stand-in stream handed to a **resent** subscription of a one-shot body. A consumed one-shot + * body cannot be replayed, so rather than ship a truncated/empty body the stream fails its + * first read. + * + * The failure is raised as an [UncheckedIOException] wrapping the explanatory [IOException], + * **not** a bare checked [IOException]. The JDK's `ofInputStream` reader (`StreamIterator`) + * catches checked `IOException` from `read` and — on the JDK 11 baseline this module targets — + * swallows it as a silent end-of-stream, which would let the resend complete with a truncated + * body. An [UncheckedIOException] is not caught there, so it propagates: on JDK 11 it surfaces + * synchronously on the subscriber's `request` stack, and on later JDKs it is delivered through + * the subscriber's `onError`. Either way the resend fails loudly with the original cause + * preserved. `available()` returns a non-zero value so the JDK is driven to call `read` + * (where the failure surfaces) rather than treating the stream as already at EOF. + */ + private class ResendRefusedInputStream : InputStream() { + override fun read(): Int = throw resendError() + + override fun read( + b: ByteArray, + off: Int, + len: Int, + ): Int = throw resendError() + + override fun available(): Int = 1 + + private fun resendError(): UncheckedIOException = + UncheckedIOException( + IOException( + "one-shot request body cannot be re-sent (the JDK transport attempted an internal " + + "resend, e.g. proxy-auth retry or HTTP/2 GOAWAY replay); supply a replayable body", + ), + ) + } + private fun logWriterFailure(t: Throwable) { log.atVerbose() .event("transport.jdkhttp.body.write.failed") diff --git a/sdk-transport-jdkhttp/src/test/kotlin/org/dexpace/sdk/transport/jdkhttp/JdkHttpTransportTest.kt b/sdk-transport-jdkhttp/src/test/kotlin/org/dexpace/sdk/transport/jdkhttp/JdkHttpTransportTest.kt index 3e5ec734..a5554f5e 100644 --- a/sdk-transport-jdkhttp/src/test/kotlin/org/dexpace/sdk/transport/jdkhttp/JdkHttpTransportTest.kt +++ b/sdk-transport-jdkhttp/src/test/kotlin/org/dexpace/sdk/transport/jdkhttp/JdkHttpTransportTest.kt @@ -367,15 +367,68 @@ class JdkHttpTransportTest { } @Test - fun `streamingBodyThatCannotBeBufferedFailsLoudly`() { - // A non-replayable streaming body (> 64 KiB, so it takes the piped path) whose writeTo - // aborts with an IOException after a partial write cannot be turned into a replayable - // copy: toReplayable's internal writeTo throws, and the body — like the SDK's - // consume-once bodies — has already flipped its consumed guard. The adapter must NOT - // attempt a second writeTo (which would trip the guard and surface an - // IllegalStateException, masking the real cause). It must instead fail with a checked - // IOException — honouring execute's @Throws(IOException) contract, and matching the eager - // path — that carries a clear message and preserves the original IOException as its cause. + fun `nonReplayableStreamingBodyIsNotBufferedUpFront`() { + // The core regression for #113: a non-replayable streaming body (> 64 KiB, so it takes the + // piped path) must stream straight from its source. The old behaviour forced the body + // replayable via toReplayable() inside adaptBody — draining the ENTIRE body into an + // in-memory Buffer before a single byte was published. The decisive, non-flaky signal: + // after adaptBody() returns, the body's writeTo must NOT have been invoked at all (the + // first writeTo only happens later, on the per-subscription writer thread). Under the old + // buffering path, writeTo had already run to completion by the time adaptBody returned. + val total = 4L * 1024L * 1024L // 4 MiB — far above the eager threshold + val body = CountingNonReplayableBody(total, CommonMediaTypes.APPLICATION_OCTET_STREAM) + + val publisher = BodyPublishers.adaptBody(body) + assertEquals( + 0L, + body.bytesWritten(), + "non-replayable streaming body must NOT be drained up front; writeTo ran during adaptBody", + ) + + // And it still streams the full body when actually subscribed. + val drained = drainPublisher(publisher) + assertEquals(total, drained.size.toLong(), "the full body must stream to the subscriber") + assertEquals(total, body.bytesWritten(), "writeTo must have produced exactly the declared bytes") + } + + @Test + fun `oneShotStreamingBodyRefusesResendLoudly`() { + // A non-replayable (one-shot) streaming body streams its FIRST subscription directly from + // the source. The JDK re-acquires the publisher's InputStream once per subscription and + // re-subscribes on internal resends (proxy-auth retry, HTTP/2 GOAWAY). A consumed one-shot + // body cannot replay, so the SECOND subscription must fail loudly with an IOException + // rather than ship a truncated/empty body. We drive the publisher directly (a real 407 + // flow is awkward over plaintext MockWebServer): first drain succeeds, second errors. + val total = 256L * 1024L + val body = CountingNonReplayableBody(total, CommonMediaTypes.APPLICATION_OCTET_STREAM) + val publisher = BodyPublishers.adaptBody(body) + + val first = drainPublisher(publisher) + assertEquals(total, first.size.toLong(), "first subscription must stream the full one-shot body") + + val ex = + assertFails { + drainPublisher(publisher) + } + // drainPublisher rethrows the publisher's onError as an AssertionError wrapping the cause. + val cause = generateSequence(ex) { it.cause }.firstOrNull { it is IOException } + assertTrue( + cause is IOException, + "re-subscription of a one-shot body must fail with an IOException, chain was: $ex", + ) + assertTrue( + cause.message?.contains("supply a replayable body") == true, + "the resend failure must explain a replayable body is required, was: ${cause.message}", + ) + } + + @Test + fun `streamingBodyThatFailsMidWriteFailsLoudly`() { + // A non-replayable streaming body (> 64 KiB, piped path) whose writeTo aborts with an + // IOException after a partial write now streams directly: the writer thread's writeTo + // throws, the pipe closes prematurely, and the JDK reader observes an IOException. The + // request must fail loudly (no silent truncation), surfaced through execute's + // @Throws(IOException) contract. val body = PartialThenFailingBody(CommonMediaTypes.APPLICATION_OCTET_STREAM) val request = Request.builder() @@ -387,23 +440,13 @@ class JdkHttpTransportTest { assertFails { transport.execute(request).close() } - // IOException is checked; UncheckedIOException and IllegalStateException are - // RuntimeExceptions, so asserting `is IOException` rules out both a contract-bypassing - // unchecked throw and the IllegalStateException masking failure mode. - assertTrue( - ex is IOException, - "expected a checked IOException, got ${ex::class}: ${ex.message}", - ) - assertTrue( - ex.message?.contains("supply a replayable body") == true, - "message must explain the body could not be buffered, was: ${ex.message}", - ) - val cause = ex.cause + // IOException is checked; the request must fail with one rather than completing as if the + // (truncated) body had been sent successfully. + val ioFailure = generateSequence(ex) { it.cause }.any { it is IOException } assertTrue( - cause is IOException, - "the original IOException must be preserved as the cause, was: ${cause?.let { it::class }}", + ioFailure, + "expected the mid-write failure to surface as an IOException, chain was: $ex", ) - assertEquals("simulated mid-write failure", cause.message) } @Test @@ -792,6 +835,44 @@ class JdkHttpTransportTest { } } + /** + * Non-replayable streaming body that emits [total] zero bytes in fixed chunks, tracking the + * cumulative count written. Used to prove the streaming path does NOT drain the whole body up + * front: [bytesWritten] must read 0 immediately after `adaptBody`, and rise only once a + * subscription's writer thread runs. A consume-once guard mirrors the SDK's stream-backed + * bodies so a (buggy) second drive would fail loudly rather than silently replay. + */ + private class CountingNonReplayableBody( + private val total: Long, + private val mediaType: MediaType, + ) : RequestBody() { + private val written = java.util.concurrent.atomic.AtomicLong(0) + private val consumed = AtomicBoolean(false) + + override fun mediaType(): MediaType = mediaType + + override fun contentLength(): Long = total + + override fun isReplayable(): Boolean = false + + fun bytesWritten(): Long = written.get() + + override fun writeTo(sink: BufferedSink) { + check(consumed.compareAndSet(false, true)) { + "CountingNonReplayableBody.writeTo was already called — the body is single-use" + } + val chunk = ByteArray(8 * 1024) + var remaining = total + while (remaining > 0) { + val n = minOf(chunk.size.toLong(), remaining).toInt() + sink.write(chunk, 0, n) + sink.flush() + written.addAndGet(n.toLong()) + remaining -= n + } + } + } + /** * Non-replayable body that writes a partial chunk and then aborts with an [IOException]. Its * declared length is large enough to route through the streaming (piped) publisher path, and