Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.PushbackInputStream;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
Expand Down Expand Up @@ -551,45 +550,232 @@ public void connectFailed(URI uri, SocketAddress sa, IOException ioe) {
}
}

static class HttpClientPipedOutputStream extends PipedOutputStream {
/**
* Circular byte-buffer pipe whose write side enforces a configurable timeout.
* Replaces PipedInputStream/PipedOutputStream to fix CXF-8926: when the network
* stalls, the HttpClient reader stops consuming from the pipe and
* PipedOutputStream.write() would block indefinitely in awaitSpace(). Here, writes
* instead time out after writeTimeoutMs and throw IOException.
*
* When writeTimeoutMs is 0 (receiveTimeout not configured), the pipe waits
* indefinitely, preserving the pre-fix behaviour.
*/
static final class TimedBlockingPipe {
private final byte[] buf;
private final int capacity;
private final long writeTimeoutMs;
private int readPos;
private int writePos;
private int count;
private boolean writerClosed;
private boolean readerClosed;

TimedBlockingPipe(int capacity, long writeTimeoutMs) {
this.buf = new byte[capacity];
this.capacity = capacity;
this.writeTimeoutMs = writeTimeoutMs;
}

OutputStream newOutputStream() {
return new PipeOut();
}

InputStream newInputStream() {
return new PipeIn();
}

private synchronized void doWrite(byte[] b, int off, int len) throws IOException {
long deadline = writeTimeoutMs > 0
? System.nanoTime() + writeTimeoutMs * 1_000_000L
: Long.MAX_VALUE;
int srcOff = off;
int remaining = len;
while (remaining > 0) {
if (readerClosed) {
throw new IOException("Pipe closed");
}
int space = capacity - count;
if (space == 0) {
long waitMs = (deadline - System.nanoTime()) / 1_000_000L;
if (waitMs <= 0) {
throw new IOException(
"Request body write timed out after " + writeTimeoutMs
+ "ms: the receiving end stopped consuming data."
+ " Configure a longer receiveTimeout if the server"
+ " is expected to be slow.");
}
try {
wait(waitMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new InterruptedIOException("Pipe write interrupted");
}
continue;
}
int batch = Math.min(remaining, space);
int toEnd = capacity - writePos;
int first = Math.min(batch, toEnd);
System.arraycopy(b, srcOff, buf, writePos, first);
writePos = (writePos + first) % capacity;
count += first;
srcOff += first;
remaining -= first;
if (first < batch) {
int second = batch - first;
System.arraycopy(b, srcOff, buf, 0, second);
writePos = second;
count += second;
srcOff += second;
remaining -= second;
}
notifyAll();
}
}

private synchronized int doRead(byte[] b, int off, int len) throws IOException {
while (count == 0 && !writerClosed) {
if (readerClosed) {
return -1;
}
try {
wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new InterruptedIOException("Pipe read interrupted");
}
}
if (count == 0) {
return -1;
}
int batch = Math.min(len, count);
int toEnd = capacity - readPos;
int first = Math.min(batch, toEnd);
System.arraycopy(buf, readPos, b, off, first);
readPos = (readPos + first) % capacity;
count -= first;
if (first < batch) {
int second = batch - first;
System.arraycopy(buf, 0, b, off + first, second);
readPos = second;
count -= second;
}
notifyAll();
return batch;
}

private synchronized void closeWriter() {
writerClosed = true;
notifyAll();
}

private synchronized void closeReader() {
readerClosed = true;
notifyAll();
}

private final class PipeOut extends OutputStream {
private boolean closed;

@Override
public void write(int b) throws IOException {
write(new byte[] {(byte) b}, 0, 1);
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
if (closed) {
throw new IOException("Stream already closed");
}
doWrite(b, off, len);
}

@Override
public void close() throws IOException {
if (!closed) {
closed = true;
closeWriter();
}
}
}

private final class PipeIn extends InputStream {
private boolean closed;

@Override
public int read() throws IOException {
byte[] b = new byte[1];
int n = read(b, 0, 1);
return n == -1 ? -1 : (b[0] & 0xFF);
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
if (closed) {
throw new IOException("Stream already closed");
}
return doRead(b, off, len);
}

@Override
public void close() throws IOException {
if (!closed) {
closed = true;
closeReader();
}
}
}
}

/**
* Output stream for the request body that feeds the HttpClient's body publisher
* via a {@link TimedBlockingPipe}. Writes are held until the TCP connection is
* established (connectionComplete), after which they go directly into the pipe.
*/
static final class HttpClientTimedOutputStream extends OutputStream {
HttpClientWrappedOutputStream stream;
HTTPClientPolicy csPolicy;
CloseableBodyPublisher publisher;
HttpClientPipedOutputStream(HttpClientWrappedOutputStream s,
PipedInputStream pin,
private final OutputStream delegate;

HttpClientTimedOutputStream(HttpClientWrappedOutputStream s,
TimedBlockingPipe pipe,
HTTPClientPolicy cp,
CloseableBodyPublisher bp) throws IOException {
super(pin);
stream = s;
csPolicy = cp;
publisher = bp;
CloseableBodyPublisher bp) {
this.delegate = pipe.newOutputStream();
this.stream = s;
this.csPolicy = cp;
this.publisher = bp;
}

@Override
public void close() throws IOException {
super.close();
delegate.close();
csPolicy = null;
stream = null;
if (publisher != null) {
publisher.close();
publisher = null;
}
}

synchronized boolean canWrite() throws IOException {
return stream.isConnectionAttemptCompleted(csPolicy, this);
}

@Override
public void write(int b) throws IOException {
if (stream != null && (stream.connectionComplete || canWrite())) {
super.write(b);
delegate.write(b);
}
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
if (stream != null && (stream.connectionComplete || canWrite())) {
super.write(b, off, len);
delegate.write(b, off, len);
}
}

};
}
private static final class HttpClientFilteredInputStream extends FilterInputStream {
boolean closed;

Expand Down Expand Up @@ -950,7 +1136,7 @@ public void setProtocolHeadersInBuilder(HttpRequest.Builder rb) throws IOExcepti
}
}

private boolean isConnectionAttemptCompleted(HTTPClientPolicy csPolicy, PipedOutputStream out)
private boolean isConnectionAttemptCompleted(HTTPClientPolicy csPolicy, Object monitor)
throws IOException {
if (!connectionComplete) {
// if we haven't connected yet, we'll see if an exception is the reason
Expand All @@ -967,7 +1153,7 @@ private boolean isConnectionAttemptCompleted(HTTPClientPolicy csPolicy, PipedOut
return false;
}
try {
out.wait(csPolicy.getConnectionTimeout());
monitor.wait(csPolicy.getConnectionTimeout());
} catch (InterruptedException e) {
//ignore
}
Expand Down Expand Up @@ -1001,11 +1187,13 @@ protected void setProtocolHeaders() throws IOException {
}

if (csPolicy.isAllowChunking() || contentLen >= 0) {
final PipedInputStream pin = new PipedInputStream(csPolicy.getChunkLength() <= 0
? 4096 : csPolicy.getChunkLength());
int pipeCapacity = csPolicy.getChunkLength() <= 0 ? 4096 : csPolicy.getChunkLength();
long writeTimeout = csPolicy.getReceiveTimeout();
final TimedBlockingPipe pipe = new TimedBlockingPipe(pipeCapacity, writeTimeout);
final InputStream pin = pipe.newInputStream();
this.publisher = new HttpClientBodyPublisher(this, () -> pin);
if (contentLen != 0) {
pout = new HttpClientPipedOutputStream(this, pin, csPolicy, publisher);
pout = new HttpClientTimedOutputStream(this, pipe, csPolicy, publisher);
}
} else if (contentLen != 0) {
// If chunking is not allowed but the contentLen is unknown (-1), we need to
Expand Down
Loading