Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public class LoggableResponseBody
private var closed = false

// True once the whole body fit within the cap and was drained into [captured]; the drain
// path closes the source via `.use {}` in that case, so [source] can hand out repeatable
// path closes the source (best-effort) in that case, so [source] can hand out repeatable
// peek() views and close() must not double-close the delegate.
@Volatile
private var fullyCaptured = false
Expand Down Expand Up @@ -207,11 +207,18 @@ public class LoggableResponseBody
* both through this single guard prevents a double-close on a delegate whose [source]
* returns the same instance and whose [close] closes it (some sockets / streams throw on
* double-close). Callers must hold [lock].
*
* The guard flips in a `finally`, so a delegate whose `close()` throws is still marked
* closed: the failing close propagates to the caller, but the delegate is never closed a
* second time. This matches the drain path, which likewise marks the delegate closed after
* a best-effort source close so a later [close] is a no-op.
*/
@Throws(IOException::class)
private fun closeDelegateOnce() {
if (!delegateClosed) {
if (delegateClosed) return
try {
delegate.close()
} finally {
delegateClosed = true
}
}
Expand All @@ -236,10 +243,11 @@ public class LoggableResponseBody
* the source was never obtained, so the delegate remains open and a subsequent [close]
* will still close it.
*
* If EOF is reached within the cap the body is fully captured: the source is closed and
* [fullyCaptured] is set, preserving the fully-repeatable behavior. If the cap is hit with
* bytes still pending the delegate is **left open** and retained as [liveTail] so the
* consumer still receives the rest of the body via [source].
* If EOF is reached within the cap the body is fully captured: the source is closed
* (best-effort — a close failure does not become a [drainError], since the capture already
* succeeded) and [fullyCaptured] is set, preserving the fully-repeatable behavior. If the
* cap is hit with bytes still pending the delegate is **left open** and retained as
* [liveTail] so the consumer still receives the rest of the body via [source].
*
* If `provider.buffer()` throws (rare; would indicate a misconfigured provider), the error
* is cached in [drainError] and an empty buffer is used as the fallback capture so
Expand Down Expand Up @@ -272,8 +280,19 @@ public class LoggableResponseBody
remaining -= n
}
if (fullyCaptured) {
// Whole body captured: close the source (and via ownership the delegate).
capturedSource.close()
// Whole body captured: close the source (and via ownership the delegate). The
// capture already succeeded, so a close failure here is best-effort cleanup, not
// a drain failure — swallow it (as the read-failure handler below does) so the
// complete captured body stays readable and is never reported as a [drainError].
// Mark the delegate closed whether or not close() throws, so a later close() does
// not close the same source a second time (some sockets / streams throw on
// double-close).
try {
capturedSource.close()
} catch (_: Throwable) {
// Best-effort: the body is already fully captured; a close failure must not
// poison it or leave the single-close guard unset.
}
delegateClosed = true
} else {
// The cap was hit (or maxCaptureBytes was <= 0) with bytes still pending: retain
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,34 @@ class LoggableResponseBodyTest {
}
}

/**
* A body that fits the (unbounded) cap and whose source's [close] throws every time it is
* called, counting invocations. Models a delegate whose handle is not safe to close twice and
* whose close can fail — exercises the full-capture path's best-effort, single-close behavior.
*/
private fun fullyCapturedBodyWithThrowingClose(
text: String,
sourceCloseCount: AtomicInteger,
): ResponseBody {
val backing = Io.provider.buffer().also { it.writeUtf8(text) }
val throwingOnClose =
object : BufferedSource by backing {
override fun close() {
sourceCloseCount.incrementAndGet()
throw IOException("source close failed")
}
}
return object : ResponseBody() {
override fun mediaType(): MediaType? = MediaType.parse("text/plain")

override fun contentLength(): Long = text.toByteArray(Charsets.UTF_8).size.toLong()

override fun source(): BufferedSource = throwingOnClose

override fun close() {}
}
}

// ----- source() that throws before entering .use {} -----

@Test
Expand Down Expand Up @@ -166,6 +194,44 @@ class LoggableResponseBodyTest {
)
}

@Test
fun `fully captured body whose source close throws stays readable and is not poisoned`() {
// Full-capture path: the body fits the cap, so the drain reads it all and then closes the
// source. If that close throws, the capture itself already succeeded — the complete body is
// in the buffer — so source() must still return it and captureException must stay null. A
// close failure after a complete capture is best-effort cleanup, not a drain failure.
val sourceCloseCount = AtomicInteger(0)
val wrapper = LoggableResponseBody(fullyCapturedBodyWithThrowingClose("hello", sourceCloseCount))

assertEquals(
"hello",
wrapper.source().readUtf8(),
"a complete capture must stay readable even when the source close fails",
)
assertNull(
wrapper.captureException,
"a successful capture must not surface a drain error for a best-effort close failure",
)
}

@Test
fun `fully captured body whose source close throws is closed exactly once`() {
// The best-effort close must still happen exactly once: the drain closes the source and,
// because that close threw, the guard must be marked closed so a later wrapper.close() does
// not close the same source a second time (some sockets / streams throw on double-close).
val sourceCloseCount = AtomicInteger(0)
val wrapper = LoggableResponseBody(fullyCapturedBodyWithThrowingClose("hello", sourceCloseCount))

wrapper.source().readUtf8()
wrapper.close()

assertEquals(
1,
sourceCloseCount.get(),
"a fully-captured source whose close() throws must be closed exactly once across drain and wrapper close",
)
}

// ----- additional failure-semantics tests -----

@Test
Expand Down Expand Up @@ -474,6 +540,42 @@ class LoggableResponseBodyTest {
)
}

@Test
fun `delegate whose close throws is invoked only once across two close calls`() {
// Over-cap path: the one-shot source close and the wrapper close both funnel through the
// single-close guard. If the first close() throws, the guard must still flip so the second
// close() is a no-op — a delegate whose handle is not safe to close twice must see exactly
// one close even when that close fails.
val delegateCloseCount = AtomicInteger(0)
val payload = "abcdefghijklmnopqrstuvwxyz" // 26 bytes > cap
val delegate =
object : ResponseBody() {
override fun mediaType(): MediaType? = MediaType.parse("text/plain")

override fun contentLength(): Long = payload.toByteArray(Charsets.UTF_8).size.toLong()

override fun source(): BufferedSource = Io.provider.buffer().also { it.writeUtf8(payload) }

override fun close() {
delegateCloseCount.incrementAndGet()
throw IOException("delegate close failed")
}
}
val wrapper = LoggableResponseBody.bounded(delegate, Io.provider, 5L)

val tail = wrapper.source()
// First close reaches the delegate and throws; the guard must still flip.
assertFailsWith<IOException> { tail.close() }
// Second close must be a no-op — the delegate is not closed again.
wrapper.close()

assertEquals(
1,
delegateCloseCount.get(),
"a delegate whose close() throws must still be closed exactly once across two close() calls",
)
}

@Test
fun `over-cap close then source close closes the underlying source exactly once`() {
// The reverse order: wrapper.close() first, then closing the already-handed-out one-shot
Expand Down
Loading