Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,11 @@ abstract class AbstractH2StreamMultiplexer implements Identifiable, HttpConnecti

private static final long CONNECTION_WINDOW_LOW_MARK = 10 * 1024 * 1024;

enum ConnectionHandshake { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN }
enum ConnectionHandshake { READY, ACTIVE, DRAINING, GRACEFUL_SHUTDOWN, SHUTDOWN }
enum SettingsHandshake { READY, TRANSMITTED, ACKED }

private final ProtocolIOSession ioSession;
private final FrameFactory frameFactory;
final FrameFactory frameFactory;
private final HttpProcessor httpProcessor;
private final H2Config localConfig;
private final BasicH2TransportMetrics inputMetrics;
Expand All @@ -121,14 +121,14 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED }
private final Deque<RawFrame> outputQueue;
private final HPackEncoder hPackEncoder;
private final HPackDecoder hPackDecoder;
private final H2Streams streams;
final H2Streams streams;
private final Queue<AsyncPingHandler> pingHandlers;
private final AtomicInteger connInputWindow;
private final AtomicInteger connOutputWindow;
private final AtomicInteger outputRequests;
private final H2StreamListener streamListener;

private ConnectionHandshake connState = ConnectionHandshake.READY;
ConnectionHandshake connState = ConnectionHandshake.READY;
private SettingsHandshake localSettingState = SettingsHandshake.READY;
private SettingsHandshake remoteSettingState = SettingsHandshake.READY;

Expand Down Expand Up @@ -297,7 +297,7 @@ private void commitFrameInternal(final RawFrame frame) throws IOException {
ioSession.setEvent(SelectionKey.OP_WRITE);
}

private void commitFrame(final RawFrame frame) throws IOException {
void commitFrame(final RawFrame frame) throws IOException {
Args.notNull(frame, "Frame");
ioSession.getLock().lock();
try {
Expand Down Expand Up @@ -506,6 +506,10 @@ public final void onOutput() throws HttpException, IOException {
ioSession.getLock().unlock();
}

if (beforeOutputProcessing()) {
return;
}

if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) {

if (connOutputWindow.get() > 0 && remoteSettingState == SettingsHandshake.ACKED) {
Expand Down Expand Up @@ -581,23 +585,7 @@ public final void onOutput() throws HttpException, IOException {
validateStreamTimeouts();
}

if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) == 0) {
int liveStreams = 0;
for (final Iterator<H2Stream> it = streams.iterator(); it.hasNext(); ) {
final H2Stream stream = it.next();
if (stream.isClosedPastLingerDeadline()) {
streams.dropStreamId(stream.getId());
it.remove();
} else {
if (streams.isSameSide(stream.getId()) || stream.getId() <= streams.getLastRemoteId()) {
liveStreams++;
}
}
}
if (liveStreams == 0) {
connState = ConnectionHandshake.SHUTDOWN;
}
}
maybeTransitionFromGracefulShutdown();
if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) >= 0) {
for (;;) {
final Command command = ioSession.poll();
Expand Down Expand Up @@ -628,6 +616,9 @@ public final void onOutput() throws HttpException, IOException {
}

public final void onTimeout(final Timeout timeout) throws HttpException, IOException {
if (onShutdownTimeout(timeout)) {
return;
}
connState = ConnectionHandshake.SHUTDOWN;

final RawFrame goAway;
Expand Down Expand Up @@ -665,14 +656,100 @@ private void executeShutdown(final ShutdownCommand shutdownCommand) throws IOExc
connState = ConnectionHandshake.SHUTDOWN;
} else {
if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
final RawFrame goAway = frameFactory.createGoAway(streams.getLastRemoteId(), H2Error.NO_ERROR, "Graceful shutdown");
commitFrame(goAway);
connState = streams.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN;
initiateGracefulShutdown();
}
}
}

/**
* Initiates the graceful shutdown sequence. The default implementation emits a single
* GOAWAY frame carrying the last processed remote stream id and transitions to
* {@code GRACEFUL_SHUTDOWN} (or straight to {@code SHUTDOWN} when no streams are open).
* Subclasses may override to implement alternate shutdown sequences.
*/
void initiateGracefulShutdown() throws IOException {
final RawFrame goAway = frameFactory.createGoAway(streams.getLastRemoteId(), H2Error.NO_ERROR, "Graceful shutdown");
commitFrame(goAway);
connState = streams.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN;
}

/**
* Whether the multiplexer is currently accepting new remote-initiated streams.
* The default allows new streams while the connection is in {@code READY} or {@code ACTIVE}.
*/
boolean canAcceptNewRemoteStream() {
return connState.compareTo(ConnectionHandshake.ACTIVE) <= 0;
}

/**
* Notification hook fired right after a remote-initiated stream has been created.
* The default is a no-op.
*/
void onRemoteStreamAccepted(final int streamId) {
}

/**
* Timeout hook that lets subclasses intercept inactivity-driven shutdowns.
* Returning {@code true} signals that the subclass has handled the timeout and the
* default full-shutdown path should be skipped.
*/
boolean onShutdownTimeout(final Timeout timeout) throws HttpException, IOException {
return false;
}

/**
* Early {@code onOutput} hook fired after the output buffer has been flushed and before
* the main state-machine processing. Returning {@code true} aborts the rest of the
* {@code onOutput} cycle, allowing subclasses to interleave additional frames between
* flush and further work.
*/
boolean beforeOutputProcessing() throws HttpException, IOException {
return false;
}

void applyRemoteGracefulGoAway(final int processedLocalStreamId) {
if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
for (final Iterator<H2Stream> it = streams.iterator(); it.hasNext(); ) {
final H2Stream stream = it.next();
final int activeStreamId = stream.getId();
if (!streams.isSameSide(activeStreamId) && activeStreamId > processedLocalStreamId) {
stream.fail(new RequestNotExecutedException());
it.remove();
}
}
}
connState = streams.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN;
}

/**
* Evaluates whether the {@code GRACEFUL_SHUTDOWN} state can be promoted to
* {@code SHUTDOWN}. The default transitions once no live streams remain.
*/
void maybeTransitionFromGracefulShutdown() {
if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) == 0) {
int liveStreams = 0;
for (final Iterator<H2Stream> it = streams.iterator(); it.hasNext(); ) {
final H2Stream stream = it.next();
if (stream.isClosedPastLingerDeadline()) {
streams.dropStreamId(stream.getId());
it.remove();
} else {
if (streams.isSameSide(stream.getId()) || stream.getId() <= streams.getLastRemoteId()) {
liveStreams++;
}
}
}
if (liveStreams == 0) {
connState = ConnectionHandshake.SHUTDOWN;
}
}
}

private void executePing(final PingCommand pingCommand) throws IOException {
boolean hasPendingOutput() {
return !outputBuffer.isEmpty() || !outputQueue.isEmpty();
}

void executePing(final PingCommand pingCommand) throws IOException {
final AsyncPingHandler handler = pingCommand.getHandler();
pingHandlers.add(handler);
final RawFrame ping = frameFactory.createPing(handler.getData());
Expand Down Expand Up @@ -817,8 +894,9 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio
}

final H2StreamChannel channel = createChannel(streamId);
if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
if (canAcceptNewRemoteStream()) {
stream = streams.createActive(channel, incomingRequest(channel));
onRemoteStreamAccepted(streamId);
streams.resetIfExceedsMaxConcurrentLimit(stream, localConfig.getMaxConcurrentStreams());
} else {
channel.localReset(H2Error.REFUSED_STREAM);
Expand Down Expand Up @@ -1026,8 +1104,9 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio

final H2StreamChannel channel = createChannel(promisedStreamId);
final H2Stream promisedStream;
if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
if (canAcceptNewRemoteStream()) {
promisedStream = streams.createReserved(channel, incomingPushPromise(channel, stream.getPushHandlerFactory()));
onRemoteStreamAccepted(promisedStreamId);
} else {
channel.localReset(H2Error.REFUSED_STREAM);
promisedStream = streams.createActive(channel, NoopH2StreamHandler.INSTANCE);
Expand All @@ -1053,17 +1132,7 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio
final int errorCode = payload.getInt();
goAwayReceived = true;
if (errorCode == H2Error.NO_ERROR.getCode()) {
if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
for (final Iterator<H2Stream> it = streams.iterator(); it.hasNext(); ) {
final H2Stream stream = it.next();
final int activeStreamId = stream.getId();
if (!streams.isSameSide(activeStreamId) && activeStreamId > processedLocalStreamId) {
stream.fail(new RequestNotExecutedException());
it.remove();
}
}
}
connState = streams.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN;
applyRemoteGracefulGoAway(processedLocalStreamId);
} else {
for (final Iterator<H2Stream> it = streams.iterator(); it.hasNext(); ) {
final H2Stream stream = it.next();
Expand Down
Loading
Loading