Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,12 @@
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -135,6 +137,13 @@ public class ChannelManager {
private final ChannelPool channelPool;
private final ChannelGroup openChannels;
private final ConcurrentHashMap<Object, Channel> http2Connections = new ConcurrentHashMap<>();
// Requests that could not acquire a connection permit and are waiting — off the event loop, WITHOUT
// blocking their caller thread — for a sibling HTTP/2 connection to the same origin to be registered so
// they can multiplex onto it. Keyed by the same partition key {@link #registerHttp2Connection} uses.
// Each waiter is invoked with the registered channel when one appears, or with {@code null} when the
// client is closing so it can fail its request rather than hang (its request-timeout is not scheduled
// yet at this point). See NettyRequestSender's HTTP/2 deferral.
private final ConcurrentHashMap<Object, Set<Consumer<Channel>>> http2ConnectionWaiters = new ConcurrentHashMap<>();

private AsyncHttpClientHandler wsHandler;
private Http2Handler http2Handler;
Expand Down Expand Up @@ -395,6 +404,52 @@ public void registerHttp2Connection(Object partitionKey, Channel channel) {
"HTTP/2 connection closed before a stream could be opened"));
}
});

// Wake any requests parked waiting for an HTTP/2 connection to this origin (they failed to acquire a
// connection permit and can multiplex onto this one without one). Wake with the currently-registered
// canonical connection — which may be an already-registered one this call lost the race to — so a
// "redundant" duplicate still lets the waiters resume onto the live connection.
Channel registered = http2Connections.get(partitionKey);
if (registered != null && registered.isActive()) {
wakeHttp2ConnectionWaiters(partitionKey, registered);
}
}

/**
* Registers a one-shot waiter to be invoked when an HTTP/2 connection is registered under
* {@code partitionKey} (or with {@code null} on client close). See the {@link #http2ConnectionWaiters}
* field and NettyRequestSender's HTTP/2 deferral. The waiter must be idempotent — it may be invoked by
* a registration, by the client-close sweep, or removed and invoked by its own timeout concurrently.
*/
public void addHttp2ConnectionWaiter(Object partitionKey, Consumer<Channel> onConnection) {
http2ConnectionWaiters.computeIfAbsent(partitionKey, k -> ConcurrentHashMap.newKeySet()).add(onConnection);
}

public void removeHttp2ConnectionWaiter(Object partitionKey, Consumer<Channel> onConnection) {
Set<Consumer<Channel>> waiters = http2ConnectionWaiters.get(partitionKey);
if (waiters != null) {
waiters.remove(onConnection);
}
}

private void wakeHttp2ConnectionWaiters(Object partitionKey, Channel channel) {
Set<Consumer<Channel>> waiters = http2ConnectionWaiters.remove(partitionKey);
if (waiters != null) {
for (Consumer<Channel> waiter : waiters) {
waiter.accept(channel);
}
}
}

private void failHttp2ConnectionWaiters() {
for (Object key : http2ConnectionWaiters.keySet()) {
Set<Consumer<Channel>> waiters = http2ConnectionWaiters.remove(key);
if (waiters != null) {
for (Consumer<Channel> waiter : waiters) {
waiter.accept(null);
}
}
}
}

/**
Expand Down Expand Up @@ -469,6 +524,12 @@ private void doClose() {
}

public void close() {
// Fail any requests parked waiting for a sibling HTTP/2 connection to register (see the
// http2ConnectionWaiters field): the client is closing, so no connection will arrive and their
// request-timeout backstop is not scheduled yet. Do this synchronously up front — doClose() only
// runs after the (possibly long) graceful EventLoopGroup shutdown, and the nettyTimer that would
// otherwise fire their deadline is being stopped in parallel.
failHttp2ConnectionWaiters();
// Close the resolver group first while the EventLoopGroup is still active,
// since Netty DNS resolvers may need a live EventLoop for clean shutdown.
if (addressResolverGroup != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.netty.resolver.AddressResolver;
import io.netty.resolver.AddressResolverGroup;
import io.netty.util.AsciiString;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
Expand Down Expand Up @@ -91,6 +92,8 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import static io.netty.handler.codec.http.HttpHeaderNames.EXPECT;
Expand Down Expand Up @@ -430,12 +433,14 @@ private <T> ListenableFuture<T> sendRequestWithNewChannel(Request request, Proxy
try {
future.acquirePartitionLockLazily();
} catch (IOException semaphoreException) {
// If HTTP/2 is enabled, another thread may be establishing an H2 connection.
// Poll the H2 registry with brief retries before giving up.
// The per-host permit is exhausted, but a sibling request may be establishing an HTTP/2
// connection to this origin we can multiplex onto (stream reuse needs no permit). Reuse one
// if it exists; otherwise wait for one WITHOUT blocking the caller thread. Returns the
// pending future when handled; null means we should fail with the permit exception.
if (config.isHttp2Enabled()) {
Channel h2Channel = waitForHttp2Connection(request, proxy, future);
if (h2Channel != null) {
return sendRequestWithOpenChannel(future, asyncHandler, h2Channel);
ListenableFuture<T> handled = reuseOrDeferHttp2Connection(request, proxy, future, asyncHandler, semaphoreException);
if (handled != null) {
return handled;
}
}
throw semaphoreException;
Expand Down Expand Up @@ -1098,60 +1103,132 @@ private static void validateWebSocketRequest(Request request, AsyncHandler<?> as
}

/**
* Waits briefly for an HTTP/2 connection to appear in the registry, so a request that just failed to
* acquire a connection permit can still multiplex onto an HTTP/2 connection another thread is
* establishing to the same origin.
* Tries to reuse — or, failing that, wait for an HTTP/2 connection to this origin that a sibling
* request is establishing, after this request failed to acquire a connection permit. HTTP/2 stream reuse
* multiplexes onto an existing connection and needs no permit, so this lets an over-cap request proceed.
*
* <p><b>{@link org.asynchttpclient.LoadBalance#ROUND_ROBIN} limitation.</b> The HTTP/2 registry is an
* exact-key map, and in round-robin mode each connection is registered under its per-IP key
* ({@code RoundRobinPartitionKey(base, IP)}; see {@link org.asynchttpclient.netty.channel.NettyConnectListener}).
* A request pinned to {@code IP_B} therefore polls only {@code (base, IP_B)} and never discovers a
* sibling HTTP/2 connection already open on {@code IP_A}. The connection permit, however, is per host
* ({@code maxConnectionsPerHost}), so once the host is at its cap such a request can neither open a new
* connection nor reuse the sibling one: off the event loop it spins here for the full
* {@code connectTimeout} and then fails with the original permit exception. This only bites when
* {@code maxConnectionsPerHost} is configured (the default is unlimited). Note that falling back to a
* poll on the per-host base key would be a no-op — nothing is ever registered under it — so reusing a
* sibling-IP connection requires indexing the registry by base key (tracked in issue #2214).
* <p>If a connection is already registered it is used immediately. Otherwise, <b>off the event loop</b>, a
* one-shot {@link Http2ConnectionWaiter} is registered that resumes the send when a matching connection is
* registered ({@link ChannelManager#registerHttp2Connection}) — WITHOUT parking the caller thread — bounded
* by a {@code connectTimeout} deadline that fails the request. The previous implementation instead
* {@code Thread.sleep}-polled the registry here, blocking the caller thread (the synchronous part of
* {@code execute()}) for up to the full {@code connectTimeout} and burning CPU.
*
* <p><b>On the event loop</b> we can neither block nor usefully defer: a redirect / 401 / 407 retry
* re-enters here on the loop, and the connection we would wait for is being established on that SAME loop,
* so waiting could self-deadlock. There we do the single immediate poll and give up.
*
* <p><b>{@link org.asynchttpclient.LoadBalance#ROUND_ROBIN} limitation (#2214).</b> The registry is keyed
* per-IP in round-robin mode, so a request pinned to {@code IP_B} is only woken by a connection registered
* for {@code IP_B}, never a sibling on {@code IP_A}; such a request waits out the deadline and then fails
* with the permit exception — the same outcome as before, but now without occupying the caller thread.
*
* @return the (pending) future when the request was reused or deferred; {@code null} if it should be
* failed with {@code semaphoreException} (a WebSocket request, or on the event loop with no
* connection available)
*/
private Channel waitForHttp2Connection(Request request, ProxyServer proxy, NettyResponseFuture<?> future) {
Uri uri = request.getUri();
private <T> ListenableFuture<T> reuseOrDeferHttp2Connection(Request request, ProxyServer proxy,
NettyResponseFuture<T> future, AsyncHandler<T> asyncHandler, IOException semaphoreException) {
// WebSocket requests must never multiplex onto an HTTP/2 connection (no RFC 8441 support). See #2160.
if (uri.isWebSocket()) {
if (request.getUri().isWebSocket()) {
return null;
}
String virtualHost = request.getVirtualHost();
// In round-robin mode, only multiplex onto the H2 connection for the IP this request is pinned to.
Object override = future != null ? future.getPartitionKeyOverride() : null;

Channel h2Channel = pollHttp2(override, uri, virtualHost, proxy, request);
Object override = future.getPartitionKeyOverride();
Channel h2Channel = pollHttp2(override, request.getUri(), request.getVirtualHost(), proxy, request);
if (h2Channel != null) {
return h2Channel;
return sendRequestWithOpenChannel(future, asyncHandler, h2Channel);
}

// NEVER block an event-loop thread here. A redirect / 401 / 407 retry re-enters sendRequest ON the
// event loop, and the HTTP/2 connection we would wait for is being established on that SAME loop —
// a Thread.sleep would freeze the loop and can self-deadlock (the connection never finishes because
// its loop is parked here). On the loop, do the single non-blocking poll above and give up; the
// caller then proceeds as if no poolable connection was found.
if (isOnEventLoop()) {
return null;
}
new Http2ConnectionWaiter<>(request, proxy, future, asyncHandler, override, semaphoreException).arm();
return future;
}

long deadline = System.nanoTime() + config.getConnectTimeout().toNanos();
while (System.nanoTime() < deadline) {
h2Channel = pollHttp2(override, uri, virtualHost, proxy, request);
if (h2Channel != null) {
return h2Channel;
/**
* A one-shot, off-event-loop waiter for a sibling HTTP/2 connection to the request's origin, used when a
* request could not acquire a connection permit. Instead of blocking the caller thread polling the
* registry, it registers itself with the {@link ChannelManager} and returns; it fires exactly once —
* whichever of these happens first:
* <ul>
* <li>a matching connection is registered → resume the send onto it via {@link #sendRequestWithOpenChannel};</li>
* <li>the {@code connectTimeout} deadline elapses → fail the request with the original permit exception;</li>
* <li>the client closes → {@link ChannelManager} invokes it with {@code null} → fail the request.</li>
* </ul>
* The {@link #claimed} CAS makes those sources mutually exclusive.
*/
private final class Http2ConnectionWaiter<T> implements Consumer<Channel> {

private final Request request;
private final ProxyServer proxy;
private final NettyResponseFuture<T> future;
private final AsyncHandler<T> asyncHandler;
private final Object override;
private final IOException semaphoreException;
// The key a matching connection registers under (registerHttp2Connection uses future.getPartitionKey())
// — the same key pollHttp2 effectively polls, so a registration wakes exactly the right waiters.
private final Object waitKey;
private final AtomicBoolean claimed = new AtomicBoolean();
private volatile Timeout deadline;

Http2ConnectionWaiter(Request request, ProxyServer proxy, NettyResponseFuture<T> future,
AsyncHandler<T> asyncHandler, Object override, IOException semaphoreException) {
this.request = request;
this.proxy = proxy;
this.future = future;
this.asyncHandler = asyncHandler;
this.override = override;
this.semaphoreException = semaphoreException;
this.waitKey = future.getPartitionKey();
}

void arm() {
channelManager.addHttp2ConnectionWaiter(waitKey, this);
// Assign the deadline before the recheck so it exists (and is cancellable) if a wake races in.
deadline = nettyTimer.newTimeout(t -> fireTimeout(),
config.getConnectTimeout().toMillis(), TimeUnit.MILLISECONDS);
if (claimed.get()) {
// A registration woke us between addHttp2ConnectionWaiter and assigning `deadline`; cancel
// the now-orphaned timeout (accept() could not, as `deadline` was still null then).
deadline.cancel();
return;
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
// A connection may have registered between the caller's poll (in reuseOrDeferHttp2Connection) and
// addHttp2ConnectionWaiter above — a lost wakeup. Re-poll now that the waiter is registered.
Channel raced = pollHttp2(override, request.getUri(), request.getVirtualHost(), proxy, request);
if (raced != null) {
accept(raced);
}
}

private void fireTimeout() {
if (claimed.compareAndSet(false, true)) {
channelManager.removeHttp2ConnectionWaiter(waitKey, this);
abort(null, future, semaphoreException);
}
}

// Invoked with the registered connection to resume onto, or with null when the client is closing.
@Override
public void accept(Channel channel) {
if (!claimed.compareAndSet(false, true)) {
return;
}
channelManager.removeHttp2ConnectionWaiter(waitKey, this);
Timeout d = deadline;
if (d != null) {
d.cancel();
}
if (future.isDone()) {
return;
}
if (channel == null) {
abort(null, future, semaphoreException);
} else {
sendRequestWithOpenChannel(future, asyncHandler, channel);
}
}
return null;
}

// Polls the HTTP/2 registry, using the IP-aware key in round-robin mode and the regular key otherwise.
Expand Down
Loading
Loading