Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ import java.io.InputStream
import java.io.InterruptedIOException
import java.io.PipedInputStream
import java.io.PipedOutputStream
import java.io.UncheckedIOException
import java.net.http.HttpRequest
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.Future
import java.util.concurrent.atomic.AtomicBoolean
import org.dexpace.sdk.core.http.request.RequestBody as SdkRequestBody

/**
Expand Down Expand Up @@ -47,15 +49,28 @@ import org.dexpace.sdk.core.http.request.RequestBody as SdkRequestBody
* pipe coordination overhead measurably exceeds the buffer cost below this size, and
* holding 64 KiB in heap is uncontroversial.
*
* ## Per-subscription pipes
* ## Per-subscription pipes and one-shot bodies
*
* The JDK invokes the `ofInputStream` supplier **once per subscription**, and it re-subscribes
* the same publisher on internal resends — notably the 407 proxy-auth retry driven by the
* `ProxyAuthenticator` this transport installs, and HTTP/2 `GOAWAY` replays. A supplier that
* returned one shared, already-draining stream would hand the second subscription an exhausted
* pipe, so the authenticated retry would carry a truncated or empty body. The supplier therefore
* constructs a new pipe + writer each time, and the streaming path requires the body to be
* **replayable** (see [streamingPublisher]) so a second `writeTo` produces the same bytes.
* constructs a new pipe + writer each time.
*
* A **replayable** body produces the same bytes on every `writeTo`, so each subscription streams
* the full body straight from the source — no buffering, any number of resends.
*
* A **non-replayable** (one-shot) body can be written only once. The streaming path therefore
* **streams the first subscription directly from the source** — it does NOT buffer the body into
* heap up front, which is the whole point of the streaming path (a body that is large or of
* unknown length is exactly the body you must not materialise). A **second** subscription cannot
* replay a consumed one-shot body, so it fails loudly with a clear "one-shot body cannot be
* re-sent — supply a replayable body" `IOException` rather than silently shipping a truncated or
* empty body. This matches the consume-once discipline of `OneShotInputStreamRequestBody` /
* `BufferedSourceRequestBody` in `sdk-core`: a resend of a one-shot body is a caller error, and
* the transport surfaces it instead of corrupting the request. Callers that need the proxy-auth /
* `GOAWAY` resend to succeed must supply a replayable body. See [streamingPublisher].
*
* ## Writer thread lifecycle
*
Expand Down Expand Up @@ -137,63 +152,62 @@ internal object BodyPublishers {
/**
* Streaming publisher for bodies larger than the eager threshold (or of unknown length).
*
* The JDK calls the `ofInputStream` supplier **once per subscription** and re-subscribes
* on internal resends (407 proxy-auth retry, HTTP/2 `GOAWAY` replay). Each subscription
* therefore re-reads the body, so the body must be replayable for the resent request to
* carry the correct bytes. If [body] is not already replayable it is buffered once into an
* in-memory copy via [SdkRequestBody.toReplayable]. A body that cannot be made replayable
* (its `toReplayable` throws mid-write) has already been partially consumed and cannot be
* recovered — the bytes drained by the failed buffering attempt are gone and `writeTo`
* cannot be driven a second time on a consume-once body. This method therefore fails with a
* checked [IOException] that wraps the original cause rather than masking it (a second
* `writeTo` would trip a consume-once guard and surface an `IllegalStateException`) or
* shipping a truncated body. Surfacing it as a checked [IOException] keeps the transport's
* `@Throws(IOException)` contract intact and matches the eager path, which already propagates
* a mid-write failure as an [IOException]; callers that need resilience here must supply a
* replayable body.
* The JDK calls the `ofInputStream` supplier **once per subscription** and re-subscribes on
* internal resends (407 proxy-auth retry, HTTP/2 `GOAWAY` replay). The supplier therefore
* mints a fresh pipe + writer per subscription so each subscription streams the body from its
* source — no up-front heap buffering. This is the whole point of the streaming path: a body
* that is large or of unknown length is exactly the body that must not be materialised into a
* `ByteArray`/`Buffer`.
*
* **Replayable body.** Every `writeTo` produces the same bytes, so every subscription (the
* first and any resend) streams the full body straight from the source.
*
* For each subscription the supplier:
* **Non-replayable (one-shot) body.** The body can be written only once. The first
* subscription streams it directly from the source. A **second** subscription cannot replay a
* consumed one-shot body, so [oneShotSupplier] returns a stream that fails its first read with
* a clear [IOException] ("one-shot request body cannot be re-sent — supply a replayable
* body"). The JDK surfaces that read failure on the resend; the request fails loudly rather
* than shipping a truncated or empty body. This deliberately does NOT buffer the body up
* front to make resends succeed — that would re-import the OOM hazard the streaming path
* exists to avoid (a 2 GiB upload would need 2 GiB of contiguous heap, and a body above the
* byte-array/segment limits would fail outright). Callers that need proxy-auth / `GOAWAY`
* resend support must supply a replayable body.
*
* For each (admitted) subscription the supplier:
* 1. creates a fresh [PipedOutputStream] / [PipedInputStream] pair;
* 2. submits a writer task that drives `body.writeTo(...)` onto the OutputStream side and
* captures its [Future]; and
* 3. returns the InputStream wrapped so its `close()` cancels the writer [Future] and
* closes the OutputStream — unblocking a writer stranded in `PipedOutputStream.write`
* when the JDK abandons the subscription without draining it.
*
* Errors thrown from `body.writeTo` close the OutputStream prematurely (the `use { }` block
* exits abnormally); the JDK reader then observes an `IOException` on its next read and the
* Errors thrown from `body.writeTo` (including a one-shot body's own consume-once guard, or a
* mid-write `IOException`) close the OutputStream prematurely (the `use { }` block exits
* abnormally); the JDK reader then observes an `IOException` on its next read and the
* surrounding future completes exceptionally. Thread interruption in the writer is honoured
* per the repo convention: the flag is restored and an [InterruptedIOException] is surfaced
* so the reader side fails loudly. The DEBUG log records the writer-side failure so it is
* discoverable in tests / production.
*/
private fun streamingPublisher(body: SdkRequestBody): HttpRequest.BodyPublisher {
val replayable: SdkRequestBody =
if (body.isReplayable()) {
body
if (body.isReplayable()) {
return HttpRequest.BodyPublishers.ofInputStream { newSubscriptionStream(body) }
}
// One-shot body: a single AtomicBoolean — created once here, captured by the supplier —
// admits exactly the first subscription to stream the body. Every later subscription
// (proxy-auth retry, HTTP/2 GOAWAY replay) gets a stream that fails loudly, because a
// consumed one-shot body cannot be replayed.
val firstSubscription = AtomicBoolean(true)
return HttpRequest.BodyPublishers.ofInputStream {
if (firstSubscription.compareAndSet(true, false)) {
newSubscriptionStream(body)
} else {
try {
body.toReplayable()
} catch (e: IOException) {
// toReplayable drained the body once and failed mid-write; a consume-once
// body has already flipped its guard, so a second writeTo would trip that
// guard and surface an IllegalStateException that masks this IOException. The
// partially captured bytes are local to toReplayable and gone. Rethrow as a
// checked IOException wrapping the cause — honouring the transport's
// @Throws(IOException) contract and matching the eager path — rather than
// re-driving the body or shipping a truncated copy.
log.atVerbose()
.event("transport.jdkhttp.body.replayable.failed")
.field("error.message", e.message ?: "")
.log("could not buffer streaming body as replayable; failing the request")
throw IOException(
"streaming request body could not be buffered for the JDK transport and " +
"has been partially consumed; supply a replayable body",
e,
)
}
log.atVerbose()
.event("transport.jdkhttp.body.oneshot.resend")
.log("one-shot streaming body re-subscribed; failing the resend loudly")
ResendRefusedInputStream()
}
return HttpRequest.BodyPublishers.ofInputStream { newSubscriptionStream(replayable) }
}
}

/**
Expand Down Expand Up @@ -238,6 +252,41 @@ internal object BodyPublishers {
return KillSwitchInputStream(pipeIn, pipeOut, writer)
}

/**
* Stand-in stream handed to a **resent** subscription of a one-shot body. A consumed one-shot
* body cannot be replayed, so rather than ship a truncated/empty body the stream fails its
* first read.
*
* The failure is raised as an [UncheckedIOException] wrapping the explanatory [IOException],
* **not** a bare checked [IOException]. The JDK's `ofInputStream` reader (`StreamIterator`)
* catches checked `IOException` from `read` and — on the JDK 11 baseline this module targets —
* swallows it as a silent end-of-stream, which would let the resend complete with a truncated
* body. An [UncheckedIOException] is not caught there, so it propagates: on JDK 11 it surfaces
* synchronously on the subscriber's `request` stack, and on later JDKs it is delivered through
* the subscriber's `onError`. Either way the resend fails loudly with the original cause
* preserved. `available()` returns a non-zero value so the JDK is driven to call `read`
* (where the failure surfaces) rather than treating the stream as already at EOF.
*/
private class ResendRefusedInputStream : InputStream() {
override fun read(): Int = throw resendError()

override fun read(
b: ByteArray,
off: Int,
len: Int,
): Int = throw resendError()

override fun available(): Int = 1

private fun resendError(): UncheckedIOException =
UncheckedIOException(
IOException(
"one-shot request body cannot be re-sent (the JDK transport attempted an internal " +
"resend, e.g. proxy-auth retry or HTTP/2 GOAWAY replay); supply a replayable body",
),
)
}

private fun logWriterFailure(t: Throwable) {
log.atVerbose()
.event("transport.jdkhttp.body.write.failed")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,15 +367,68 @@ class JdkHttpTransportTest {
}

@Test
fun `streamingBodyThatCannotBeBufferedFailsLoudly`() {
// A non-replayable streaming body (> 64 KiB, so it takes the piped path) whose writeTo
// aborts with an IOException after a partial write cannot be turned into a replayable
// copy: toReplayable's internal writeTo throws, and the body — like the SDK's
// consume-once bodies — has already flipped its consumed guard. The adapter must NOT
// attempt a second writeTo (which would trip the guard and surface an
// IllegalStateException, masking the real cause). It must instead fail with a checked
// IOException — honouring execute's @Throws(IOException) contract, and matching the eager
// path — that carries a clear message and preserves the original IOException as its cause.
fun `nonReplayableStreamingBodyIsNotBufferedUpFront`() {
// The core regression for #113: a non-replayable streaming body (> 64 KiB, so it takes the
// piped path) must stream straight from its source. The old behaviour forced the body
// replayable via toReplayable() inside adaptBody — draining the ENTIRE body into an
// in-memory Buffer before a single byte was published. The decisive, non-flaky signal:
// after adaptBody() returns, the body's writeTo must NOT have been invoked at all (the
// first writeTo only happens later, on the per-subscription writer thread). Under the old
// buffering path, writeTo had already run to completion by the time adaptBody returned.
val total = 4L * 1024L * 1024L // 4 MiB — far above the eager threshold
val body = CountingNonReplayableBody(total, CommonMediaTypes.APPLICATION_OCTET_STREAM)

val publisher = BodyPublishers.adaptBody(body)
assertEquals(
0L,
body.bytesWritten(),
"non-replayable streaming body must NOT be drained up front; writeTo ran during adaptBody",
)

// And it still streams the full body when actually subscribed.
val drained = drainPublisher(publisher)
assertEquals(total, drained.size.toLong(), "the full body must stream to the subscriber")
assertEquals(total, body.bytesWritten(), "writeTo must have produced exactly the declared bytes")
}

@Test
fun `oneShotStreamingBodyRefusesResendLoudly`() {
// A non-replayable (one-shot) streaming body streams its FIRST subscription directly from
// the source. The JDK re-acquires the publisher's InputStream once per subscription and
// re-subscribes on internal resends (proxy-auth retry, HTTP/2 GOAWAY). A consumed one-shot
// body cannot replay, so the SECOND subscription must fail loudly with an IOException
// rather than ship a truncated/empty body. We drive the publisher directly (a real 407
// flow is awkward over plaintext MockWebServer): first drain succeeds, second errors.
val total = 256L * 1024L
val body = CountingNonReplayableBody(total, CommonMediaTypes.APPLICATION_OCTET_STREAM)
val publisher = BodyPublishers.adaptBody(body)

val first = drainPublisher(publisher)
assertEquals(total, first.size.toLong(), "first subscription must stream the full one-shot body")

val ex =
assertFails {
drainPublisher(publisher)
}
// drainPublisher rethrows the publisher's onError as an AssertionError wrapping the cause.
val cause = generateSequence<Throwable>(ex) { it.cause }.firstOrNull { it is IOException }
assertTrue(
cause is IOException,
"re-subscription of a one-shot body must fail with an IOException, chain was: $ex",
)
assertTrue(
cause.message?.contains("supply a replayable body") == true,
"the resend failure must explain a replayable body is required, was: ${cause.message}",
)
}

@Test
fun `streamingBodyThatFailsMidWriteFailsLoudly`() {
// A non-replayable streaming body (> 64 KiB, piped path) whose writeTo aborts with an
// IOException after a partial write now streams directly: the writer thread's writeTo
// throws, the pipe closes prematurely, and the JDK reader observes an IOException. The
// request must fail loudly (no silent truncation), surfaced through execute's
// @Throws(IOException) contract.
val body = PartialThenFailingBody(CommonMediaTypes.APPLICATION_OCTET_STREAM)
val request =
Request.builder()
Expand All @@ -387,23 +440,13 @@ class JdkHttpTransportTest {
assertFails {
transport.execute(request).close()
}
// IOException is checked; UncheckedIOException and IllegalStateException are
// RuntimeExceptions, so asserting `is IOException` rules out both a contract-bypassing
// unchecked throw and the IllegalStateException masking failure mode.
assertTrue(
ex is IOException,
"expected a checked IOException, got ${ex::class}: ${ex.message}",
)
assertTrue(
ex.message?.contains("supply a replayable body") == true,
"message must explain the body could not be buffered, was: ${ex.message}",
)
val cause = ex.cause
// IOException is checked; the request must fail with one rather than completing as if the
// (truncated) body had been sent successfully.
val ioFailure = generateSequence<Throwable>(ex) { it.cause }.any { it is IOException }
assertTrue(
cause is IOException,
"the original IOException must be preserved as the cause, was: ${cause?.let { it::class }}",
ioFailure,
"expected the mid-write failure to surface as an IOException, chain was: $ex",
)
assertEquals("simulated mid-write failure", cause.message)
}

@Test
Expand Down Expand Up @@ -792,6 +835,44 @@ class JdkHttpTransportTest {
}
}

/**
* Non-replayable streaming body that emits [total] zero bytes in fixed chunks, tracking the
* cumulative count written. Used to prove the streaming path does NOT drain the whole body up
* front: [bytesWritten] must read 0 immediately after `adaptBody`, and rise only once a
* subscription's writer thread runs. A consume-once guard mirrors the SDK's stream-backed
* bodies so a (buggy) second drive would fail loudly rather than silently replay.
*/
private class CountingNonReplayableBody(
private val total: Long,
private val mediaType: MediaType,
) : RequestBody() {
private val written = java.util.concurrent.atomic.AtomicLong(0)
private val consumed = AtomicBoolean(false)

override fun mediaType(): MediaType = mediaType

override fun contentLength(): Long = total

override fun isReplayable(): Boolean = false

fun bytesWritten(): Long = written.get()

override fun writeTo(sink: BufferedSink) {
check(consumed.compareAndSet(false, true)) {
"CountingNonReplayableBody.writeTo was already called — the body is single-use"
}
val chunk = ByteArray(8 * 1024)
var remaining = total
while (remaining > 0) {
val n = minOf(chunk.size.toLong(), remaining).toInt()
sink.write(chunk, 0, n)
sink.flush()
written.addAndGet(n.toLong())
remaining -= n
}
}
}

/**
* Non-replayable body that writes a partial chunk and then aborts with an [IOException]. Its
* declared length is large enough to route through the streaming (piped) publisher path, and
Expand Down
Loading