-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Read InputStream bodies straight into the buffer, no staging array #2233
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -58,7 +58,6 @@ private static class InputStreamBody implements Body { | |
|
|
||
| private final InputStream inputStream; | ||
| private final long contentLength; | ||
| private byte[] chunk; | ||
|
|
||
| private InputStreamBody(InputStream inputStream, long contentLength) { | ||
| this.inputStream = inputStream; | ||
|
|
@@ -72,23 +71,17 @@ public long getContentLength() { | |
|
|
||
| @Override | ||
| public BodyState transferTo(ByteBuf target) { | ||
|
|
||
| // To be safe. | ||
| chunk = new byte[target.writableBytes() - 10]; | ||
|
|
||
| int read = -1; | ||
| boolean write = false; | ||
| // Read straight from the stream into the target buffer: no per-call staging byte[] and no extra | ||
| // copy (ByteBuf.writeBytes(InputStream, int) fills the buffer directly, like FileLikeMultipartPart). | ||
| // The "- 10" margin preserves the previous behaviour of never fully filling the writable region. | ||
| int read; | ||
| try { | ||
| read = inputStream.read(chunk); | ||
| read = target.writeBytes(inputStream, target.writableBytes() - 10); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we drop the Keeping it also preserves two edge-case bugs. If This is reachable via something like |
||
| } catch (IOException ex) { | ||
| LOGGER.warn("Unable to read", ex); | ||
| return BodyState.STOP; | ||
| } | ||
|
|
||
| if (read > 0) { | ||
| target.writeBytes(chunk, 0, read); | ||
| write = true; | ||
| } | ||
| return write ? BodyState.CONTINUE : BodyState.STOP; | ||
| return read > 0 ? BodyState.CONTINUE : BodyState.STOP; | ||
| } | ||
|
|
||
| @Override | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,95 @@ | ||
| /* | ||
| * Copyright (c) 2026 AsyncHttpClient Project. All rights reserved. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.asynchttpclient.request.body.generator; | ||
|
|
||
| import io.netty.buffer.ByteBuf; | ||
| import io.netty.buffer.Unpooled; | ||
| import org.asynchttpclient.request.body.Body; | ||
| import org.asynchttpclient.request.body.Body.BodyState; | ||
| import org.junit.jupiter.api.Test; | ||
|
|
||
| import java.io.ByteArrayInputStream; | ||
| import java.io.ByteArrayOutputStream; | ||
| import java.io.IOException; | ||
| import java.util.Random; | ||
|
|
||
| import static org.junit.jupiter.api.Assertions.assertArrayEquals; | ||
| import static org.junit.jupiter.api.Assertions.assertEquals; | ||
|
|
||
| /** | ||
| * Covers {@link InputStreamBodyGenerator}'s {@link Body#transferTo(ByteBuf)}, which now reads straight from | ||
| * the stream into the target buffer (no per-call staging {@code byte[]} and no extra copy). The whole stream | ||
| * must still be transferred byte-for-byte, CONTINUE while data remains and STOP at EOF. | ||
| */ | ||
| public class InputStreamBodyGeneratorTest { | ||
|
|
||
| private static final int CHUNK_SIZE = 1024 * 8; | ||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor: the existing tests in this package (
|
||
| @Test | ||
| public void streamsAllBytesAcrossMultipleReads() throws IOException { | ||
| final byte[] src = new byte[3 * CHUNK_SIZE + 42]; | ||
| new Random().nextBytes(src); | ||
|
|
||
| Body body = new InputStreamBodyGenerator(new ByteArrayInputStream(src)).createBody(); | ||
| ByteBuf chunkBuffer = Unpooled.buffer(CHUNK_SIZE); | ||
| ByteArrayOutputStream collected = new ByteArrayOutputStream(); | ||
| try { | ||
| BodyState state; | ||
| while ((state = body.transferTo(chunkBuffer)) != BodyState.STOP) { | ||
| assertEquals(BodyState.CONTINUE, state, "a stream with data left must report CONTINUE"); | ||
| byte[] b = new byte[chunkBuffer.readableBytes()]; | ||
| chunkBuffer.readBytes(b); | ||
| collected.write(b); | ||
| chunkBuffer.clear(); | ||
| } | ||
| assertArrayEquals(src, collected.toByteArray(), "the whole stream must be transferred unchanged"); | ||
| } finally { | ||
| chunkBuffer.release(); | ||
| body.close(); | ||
| } | ||
| } | ||
|
|
||
| @Test | ||
| public void singleReadDrainsASmallStream() throws IOException { | ||
| final byte[] src = new byte[CHUNK_SIZE - 100]; // fits under writableBytes - 10, so one read drains it | ||
| new Random().nextBytes(src); | ||
|
|
||
| Body body = new InputStreamBodyGenerator(new ByteArrayInputStream(src)).createBody(); | ||
| ByteBuf chunkBuffer = Unpooled.buffer(CHUNK_SIZE); | ||
| try { | ||
| assertEquals(BodyState.CONTINUE, body.transferTo(chunkBuffer)); | ||
| assertEquals(src.length, chunkBuffer.readableBytes(), "one read should drain a small stream"); | ||
| chunkBuffer.clear(); | ||
| assertEquals(BodyState.STOP, body.transferTo(chunkBuffer), "body at EOF"); | ||
| } finally { | ||
| chunkBuffer.release(); | ||
| body.close(); | ||
| } | ||
| } | ||
|
|
||
| @Test | ||
| public void emptyStreamStopsImmediately() throws IOException { | ||
| Body body = new InputStreamBodyGenerator(new ByteArrayInputStream(new byte[0])).createBody(); | ||
| ByteBuf chunkBuffer = Unpooled.buffer(CHUNK_SIZE); | ||
| try { | ||
| assertEquals(BodyState.STOP, body.transferTo(chunkBuffer), "an empty stream must STOP immediately"); | ||
| assertEquals(0, chunkBuffer.readableBytes(), "nothing should be written for an empty stream"); | ||
| } finally { | ||
| chunkBuffer.release(); | ||
| body.close(); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two corrections are needed in this comment.
First, "no per-call staging
byte[]and no extra copy" is only true for heap buffers.InputStream.read()can only read into abyte[], so for direct buffers Netty still has to stage through heap memory internally. For an 8 KB chunk,threadLocalTempArrayallocates a new array on every call because the size exceeds the 1024-byte thread-local cache, and the unsafe path acquires a pooled heap buffer and performs acopyMemory()on every call.BodyChunkedInputallocates from the channel allocator, which prefers direct buffers by default, so on that path this change is roughly allocation-for-allocation equivalent to the previous implementation. The improvement is real for heap buffers, which is what the new test exercises, so I'd reword the comment to make that distinction.Second,
FileLikeMultipartPartis the wrong reference. It's abstract and never callswriteBytes(InputStream, int). The class that actually uses this pattern isInputStreamMultipartPart, and it passes the fullwritableBytes()without reserving any margin.