Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,11 @@ private <T> ListenableFuture<T> sendRequestRoundRobin(Request request, AsyncHand

// Round-robin resolves up front — before the pool check and before the per-host semaphore — so
// every eligible request resolves first, even one that immediately reuses a pooled connection and
// even a single-IP host. With a caching resolver this is cheap. One side effect on a pooled hit:
// the request timeout is scheduled here (in resolveAddresses) and again in
// sendRequestWithOpenChannel; the second schedule cancels the first (see
// NettyResponseFuture.setTimeoutsHolder), so it's redundant work, not a leak.
resolveAddresses(request, proxyServer, newFuture, asyncHandler).addListener(new SimpleFutureListener<List<InetSocketAddress>>() {
// even a single-IP host. With a caching resolver this is cheap. Pass scheduleTimeout=false: the
// reuse-or-connect path reached from dispatchResolved schedules the request timeout exactly once
// (sendRequestWithOpenChannel for a pooled hit, or the round-robin branch of sendRequestWithNewChannel
// for a new connection), rather than scheduling it here and then cancelling it on a pooled hit.
resolveAddresses(request, proxyServer, newFuture, asyncHandler, false).addListener(new SimpleFutureListener<List<InetSocketAddress>>() {

@Override
protected void onSuccess(List<InetSocketAddress> addresses) {
Expand Down Expand Up @@ -447,14 +447,17 @@ private <T> ListenableFuture<T> sendRequestWithNewChannel(Request request, Proxy
}

// In round-robin mode the addresses were already resolved (and rotated) before polling the pool,
// so reuse them directly instead of resolving a second time.
// so reuse them directly instead of resolving a second time. The up-front resolve deliberately did
// NOT schedule the request timeout (see sendRequestRoundRobin), so schedule it here — once — for this
// new-connection path, before connecting.
List<InetSocketAddress> roundRobinAddresses = future.getRoundRobinAddresses();
if (roundRobinAddresses != null) {
scheduleRequestTimeout(future, roundRobinAddresses.get(0));
connectWithAddresses(request, proxy, future, asyncHandler, roundRobinAddresses);
return future;
}

resolveAddresses(request, proxy, future, asyncHandler).addListener(new SimpleFutureListener<List<InetSocketAddress>>() {
resolveAddresses(request, proxy, future, asyncHandler, true).addListener(new SimpleFutureListener<List<InetSocketAddress>>() {

@Override
protected void onSuccess(List<InetSocketAddress> addresses) {
Expand Down Expand Up @@ -494,20 +497,34 @@ private <T> void connectWithAddresses(Request request, ProxyServer proxy, NettyR
}
}

private <T> Future<List<InetSocketAddress>> resolveAddresses(Request request, ProxyServer proxy, NettyResponseFuture<T> future, AsyncHandler<T> asyncHandler) {
/**
* Resolves the request's remote addresses. When {@code scheduleTimeout} is {@code true} the request
* timeout is scheduled here, before resolution — the behaviour the DEFAULT-mode new-channel path relies
* on so the timeout also bounds DNS. ROUND_ROBIN resolves up front for every request (it needs the IP to
* key the pool) and passes {@code false}: scheduling here and then again on the reuse-or-connect path
* would allocate a {@code TimeoutsHolder} and a timer entry only to cancel them on a pooled hit. Instead
* the chosen path schedules exactly once — {@link #sendRequestWithOpenChannel} for a reuse, or the
* round-robin branch of {@link #sendRequestWithNewChannel} for a new connection.
*/
private <T> Future<List<InetSocketAddress>> resolveAddresses(Request request, ProxyServer proxy, NettyResponseFuture<T> future,
AsyncHandler<T> asyncHandler, boolean scheduleTimeout) {
Uri uri = request.getUri();
final Promise<List<InetSocketAddress>> promise = ImmediateEventExecutor.INSTANCE.newPromise();

if (proxy != null && !proxy.isIgnoredForHost(uri.getHost()) && proxy.getProxyType().isHttp()) {
int port = ProxyType.HTTPS.equals(proxy.getProxyType()) || uri.isSecured() ? proxy.getSecuredPort() : proxy.getPort();
InetSocketAddress unresolvedRemoteAddress = InetSocketAddress.createUnresolved(proxy.getHost(), port);
scheduleRequestTimeout(future, unresolvedRemoteAddress);
if (scheduleTimeout) {
scheduleRequestTimeout(future, unresolvedRemoteAddress);
}
return resolveHostname(request, unresolvedRemoteAddress, asyncHandler);
} else {
int port = uri.getExplicitPort();

InetSocketAddress unresolvedRemoteAddress = InetSocketAddress.createUnresolved(uri.getHost(), port);
scheduleRequestTimeout(future, unresolvedRemoteAddress);
if (scheduleTimeout) {
scheduleRequestTimeout(future, unresolvedRemoteAddress);
}

if (request.getAddress() != null) {
// bypass resolution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.asynchttpclient.Dsl.asyncHttpClient;
Expand All @@ -46,6 +48,8 @@
import static org.asynchttpclient.test.TestUtils.TIMEOUT;
import static org.asynchttpclient.test.TestUtils.addHttpConnector;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;

/**
* End-to-end coverage for {@link LoadBalance#ROUND_ROBIN}: a single host that resolves to several
Expand Down Expand Up @@ -186,6 +190,44 @@ public void roundRobinFailsOverWhenSelectedIpIsDown() throws Exception {
}
}

@Test
public void roundRobinRequestStillHitsRequestTimeout() throws Exception {
// Regression guard: round-robin resolves up front WITHOUT scheduling the request timeout there; the
// reuse-or-connect path must still schedule it exactly once. Point at a server that never responds
// within the request timeout and assert the request times out (i.e. the new-connection round-robin
// path did schedule the timeout).
Server slow = new Server();
ServerConnector connector = addHttpConnector(slow);
slow.setHandler(new AbstractHandler() {
@Override
public void handle(String t, Request base, HttpServletRequest req, HttpServletResponse resp) {
base.setHandled(true);
try {
Thread.sleep(4000); // never responds within the 500ms request timeout below
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
slow.start();
int slowPort = connector.getLocalPort();
try {
NameResolver<InetAddress> resolver = fixedResolver("127.0.0.1");
try (AsyncHttpClient client = asyncHttpClient(config()
.setLoadBalance(LoadBalance.ROUND_ROBIN)
.setRequestTimeout(java.time.Duration.ofMillis(500))
.setMaxRequestRetry(0).build())) {
ExecutionException thrown = assertThrows(ExecutionException.class, () ->
client.executeRequest(get("http://roundrobin.test:" + slowPort + "/").setNameResolver(resolver))
.get(TIMEOUT, SECONDS));
assertInstanceOf(TimeoutException.class, thrown.getCause(),
"round-robin request must still hit the request timeout; got " + thrown.getCause());
}
} finally {
slow.stop();
}
}

@Test
public void roundRobinReResolvesAcrossSameHostPortChangingRedirect() throws Exception {
// Regression test for the same-base redirect leak: round-robin caches the resolved addresses
Expand Down
Loading