Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED }

private static final long VALIDATE_AFTER_INACTIVITY_GRANULARITY_NANOS = TimeUnit.SECONDS.toNanos(1);
private final Timeout validateAfterInactivity;
private final Timeout pingAckTimeout;
private volatile long lastActivityNanos;

AbstractH2StreamMultiplexer(
Expand All @@ -174,6 +175,20 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED }
final H2Config h2Config,
final H2StreamListener streamListener,
final Timeout validateAfterInactivity) {
this(ioSession, frameFactory, idGenerator, httpProcessor, charCodingConfig, h2Config, streamListener,
validateAfterInactivity, Timeout.ofSeconds(5));
}

AbstractH2StreamMultiplexer(
final ProtocolIOSession ioSession,
final FrameFactory frameFactory,
final StreamIdGenerator idGenerator,
final HttpProcessor httpProcessor,
final CharCodingConfig charCodingConfig,
final H2Config h2Config,
final H2StreamListener streamListener,
final Timeout validateAfterInactivity,
final Timeout pingAckTimeout) {
this.ioSession = Args.notNull(ioSession, "IO session");
this.frameFactory = Args.notNull(frameFactory, "Frame factory");
this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
Expand Down Expand Up @@ -202,6 +217,7 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED }
this.streamListener = streamListener;
this.lastActivityNanos = System.nanoTime();
this.validateAfterInactivity = validateAfterInactivity;
this.pingAckTimeout = Args.notNull(pingAckTimeout, "PING ACK timeout");
}

@Override
Expand Down Expand Up @@ -544,7 +560,7 @@ public final void onOutput() throws HttpException, IOException {
final boolean hasBeenIdleTooLong = t > 0 && System.nanoTime() - lastActivityNanos > t;
if (hasBeenIdleTooLong && ioSession.hasCommands() && pingHandlers.isEmpty()) {
final Timeout socketTimeout = ioSession.getSocketTimeout();
ioSession.setSocketTimeout(Timeout.ofSeconds(5));
ioSession.setSocketTimeout(pingAckTimeout);
executePing(new PingCommand(new BasicPingHandler(result -> {
// restore timeout
ioSession.setSocketTimeout(socketTimeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,24 @@ public class ClientH2StreamMultiplexer extends AbstractH2StreamMultiplexer {

private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;

/**
* @since 5.5
*/
public ClientH2StreamMultiplexer(
final ProtocolIOSession ioSession,
final FrameFactory frameFactory,
final HttpProcessor httpProcessor,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final H2Config h2Config,
final CharCodingConfig charCodingConfig,
final H2StreamListener streamListener,
final Timeout validateAfterInactivity,
final Timeout pingAckTimeout) {
super(ioSession, frameFactory, StreamIdGenerator.ODD, httpProcessor, charCodingConfig, h2Config,
streamListener, validateAfterInactivity, pingAckTimeout);
this.pushHandlerFactory = pushHandlerFactory;
}

public ClientH2StreamMultiplexer(
final ProtocolIOSession ioSession,
final FrameFactory frameFactory,
Expand All @@ -69,8 +87,8 @@ public ClientH2StreamMultiplexer(
final CharCodingConfig charCodingConfig,
final H2StreamListener streamListener,
final Timeout validateAfterInactivity) {
super(ioSession, frameFactory, StreamIdGenerator.ODD, httpProcessor, charCodingConfig, h2Config, streamListener,
validateAfterInactivity);
super(ioSession, frameFactory, StreamIdGenerator.ODD, httpProcessor, charCodingConfig, h2Config,
streamListener, validateAfterInactivity);
this.pushHandlerFactory = pushHandlerFactory;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,40 @@ public final class ClientH2StreamMultiplexerFactory {
private final H2StreamListener streamListener;
private final FrameFactory frameFactory;
private final Supplier<TimeValue> validateAfterInactivitySupplier;
private final Timeout pingAckTimeout;

/**
* @since 5.5
*/
public ClientH2StreamMultiplexerFactory(
final HttpProcessor httpProcessor,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final H2Config h2Config,
final CharCodingConfig charCodingConfig,
final H2StreamListener streamListener,
final FrameFactory frameFactory,
final Supplier<TimeValue> validateAfterInactivitySupplier) {
final Supplier<TimeValue> validateAfterInactivitySupplier,
final Timeout pingAckTimeout) {
this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
this.pushHandlerFactory = pushHandlerFactory;
this.h2Config = h2Config != null ? h2Config : H2Config.DEFAULT;
this.charCodingConfig = charCodingConfig != null ? charCodingConfig : CharCodingConfig.DEFAULT;
this.streamListener = streamListener;
this.frameFactory = frameFactory != null ? frameFactory : DefaultFrameFactory.INSTANCE;
this.validateAfterInactivitySupplier = validateAfterInactivitySupplier;
this.pingAckTimeout = pingAckTimeout;
}

public ClientH2StreamMultiplexerFactory(
final HttpProcessor httpProcessor,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final H2Config h2Config,
final CharCodingConfig charCodingConfig,
final H2StreamListener streamListener,
final FrameFactory frameFactory,
final Supplier<TimeValue> validateAfterInactivitySupplier) {
this(httpProcessor, pushHandlerFactory, h2Config, charCodingConfig, streamListener, frameFactory,
validateAfterInactivitySupplier, null);
}

public ClientH2StreamMultiplexerFactory(
Expand All @@ -84,7 +102,7 @@ public ClientH2StreamMultiplexerFactory(
final CharCodingConfig charCodingConfig,
final H2StreamListener streamListener,
final FrameFactory frameFactory) {
this(httpProcessor, pushHandlerFactory, h2Config, charCodingConfig, streamListener, frameFactory, null);
this(httpProcessor, pushHandlerFactory, h2Config, charCodingConfig, streamListener, frameFactory, null, null);
}

public ClientH2StreamMultiplexerFactory(
Expand All @@ -111,8 +129,14 @@ public ClientH2StreamMultiplexerFactory(
}

public ClientH2StreamMultiplexer create(final ProtocolIOSession ioSession) {
final Timeout validateAfterInactivity = resolveValidateAfterInactivity();
if (pingAckTimeout != null) {
return new ClientH2StreamMultiplexer(ioSession, frameFactory, httpProcessor,
pushHandlerFactory, h2Config, charCodingConfig, streamListener,
validateAfterInactivity, pingAckTimeout);
}
return new ClientH2StreamMultiplexer(ioSession, frameFactory, httpProcessor,
pushHandlerFactory, h2Config, charCodingConfig, streamListener, resolveValidateAfterInactivity());
pushHandlerFactory, h2Config, charCodingConfig, streamListener, validateAfterInactivity);
}

private Timeout resolveValidateAfterInactivity() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.hc.core5.reactor.IOSessionListener;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;

/**
* {@link H2MultiplexingRequester} bootstrap.
Expand All @@ -81,6 +82,8 @@ public class H2MultiplexingRequesterBootstrap {

private int maxCommandsPerConnection;

private Timeout pingAckTimeout;

private H2MultiplexingRequesterBootstrap() {
this.routeEntries = new ArrayList<>();
}
Expand Down Expand Up @@ -202,6 +205,18 @@ public final H2MultiplexingRequesterBootstrap setMaxCommandsPerConnection(final
return this;
}

/**
* Sets the timeout applied while waiting for the HTTP/2 PING ACK emitted during
* pre-flight connection validation. When unset, the default of 5 seconds is used.
*
* @return this instance.
* @since 5.5
*/
public final H2MultiplexingRequesterBootstrap setPingAckTimeout(final Timeout pingAckTimeout) {
this.pingAckTimeout = pingAckTimeout;
return this;
}

/**
* Sets {@link H2StreamListener} instance.
*
Expand Down Expand Up @@ -287,7 +302,8 @@ public H2MultiplexingRequester create() {
charCodingConfig != null ? charCodingConfig : CharCodingConfig.DEFAULT,
streamListener,
frameFactory,
validateAfterInactivityRef::get);
validateAfterInactivityRef::get,
pingAckTimeout);
return new H2MultiplexingRequester(
ioReactorConfig,
(ioSession, attachment) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public class H2RequesterBootstrap {
private IOReactorMetricsListener threadPoolListener;
private FrameFactory frameFactory;
private int maxPendingCommandsPerConnection;
private Timeout pingAckTimeout;


private H2RequesterBootstrap() {
Expand Down Expand Up @@ -216,6 +217,18 @@ public final H2RequesterBootstrap setMaxPendingCommandsPerConnection(final int m
return this;
}

/**
* Sets the timeout applied while waiting for the HTTP/2 PING ACK emitted during
* pre-flight connection validation. When unset, the default of 5 seconds is used.
*
* @return this instance.
* @since 5.5
*/
public final H2RequesterBootstrap setPingAckTimeout(final Timeout pingAckTimeout) {
this.pingAckTimeout = pingAckTimeout;
return this;
}

/**
* Sets {@link TlsStrategy} instance.
*
Expand Down Expand Up @@ -405,7 +418,9 @@ public H2AsyncRequester create() {
h2Config != null ? h2Config : H2Config.DEFAULT,
charCodingConfig != null ? charCodingConfig : CharCodingConfig.DEFAULT,
streamListener,
frameFactory);
frameFactory,
null,
pingAckTimeout);

final TlsStrategy actualTlsStrategy = tlsStrategy != null ? tlsStrategy : new H2ClientTlsStrategy();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public H2StreamMultiplexerImpl(
final H2StreamListener streamListener,
final Supplier<H2StreamHandler> streamHandlerSupplier) {
this(ioSession, frameFactory, idGenerator, httpProcessor, charCodingConfig, h2Config, streamListener,
streamHandlerSupplier, null);
streamHandlerSupplier, null, Timeout.ofSeconds(5));
}

public H2StreamMultiplexerImpl(
Expand All @@ -133,8 +133,23 @@ public H2StreamMultiplexerImpl(
final H2StreamListener streamListener,
final Supplier<H2StreamHandler> streamHandlerSupplier,
final Timeout validateAfterInactivity) {
this(ioSession, frameFactory, idGenerator, httpProcessor, charCodingConfig, h2Config, streamListener,
streamHandlerSupplier, validateAfterInactivity, Timeout.ofSeconds(5));
}

public H2StreamMultiplexerImpl(
final ProtocolIOSession ioSession,
final FrameFactory frameFactory,
final StreamIdGenerator idGenerator,
final HttpProcessor httpProcessor,
final CharCodingConfig charCodingConfig,
final H2Config h2Config,
final H2StreamListener streamListener,
final Supplier<H2StreamHandler> streamHandlerSupplier,
final Timeout validateAfterInactivity,
final Timeout pingAckTimeout) {
super(ioSession, frameFactory, idGenerator, httpProcessor, charCodingConfig, h2Config, streamListener,
validateAfterInactivity);
validateAfterInactivity, pingAckTimeout);
this.streamHandlerSupplier = streamHandlerSupplier;
}

Expand Down Expand Up @@ -1632,6 +1647,49 @@ void testValidateAfterInactivityPingAckRestoresPreviousTimeout() throws Exceptio
Mockito.verify(protocolIOSession, Mockito.atLeastOnce()).setSocketTimeout(ArgumentMatchers.eq(previousTimeout));
}

@Test
void testValidateAfterInactivityUsesConfiguredPingAckTimeout() throws Exception {
final List<byte[]> writes = new ArrayList<>();
Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class)))
.thenAnswer(inv -> {
final ByteBuffer b = inv.getArgument(0, ByteBuffer.class);
final byte[] copy = new byte[b.remaining()];
b.get(copy);
writes.add(copy);
return copy.length;
});
Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt());
Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt());

Mockito.when(protocolIOSession.hasCommands()).thenReturn(true);
Mockito.when(protocolIOSession.getSocketTimeout()).thenReturn(Timeout.ofSeconds(30));

final Timeout customPingAckTimeout = Timeout.ofSeconds(15);
final H2Config h2Config = H2Config.custom()
.build();
final Timeout validateAfterInactivity = Timeout.ofMilliseconds(1);

final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl(
protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD,
httpProcessor, CharCodingConfig.DEFAULT, h2Config, h2StreamListener, () -> streamHandler,
validateAfterInactivity, customPingAckTimeout);

mux.onConnect();
completeSettingsHandshake(mux);

writes.clear();
makeMuxIdle(mux, validateAfterInactivity);

mux.onOutput();

Mockito.verify(protocolIOSession, Mockito.atLeastOnce())
.setSocketTimeout(ArgumentMatchers.eq(customPingAckTimeout));

final List<FrameStub> frames = parseFrames(concat(writes));
Assertions.assertTrue(frames.stream().anyMatch(f -> f.isPing() && !f.isAck()),
"Must emit pre-flight PING");
}

@Test
void testKeepAliveAckTimeoutShutsDownAndFailsStreams() throws Exception {
final List<byte[]> writes = new ArrayList<>();
Expand Down
Loading