Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,16 @@ void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo

case CONNECTING:
rawConnectivityState = CONNECTING;
// If we get a newly resolved address list via acceptResolvedAddresses,
// as we are in CONNECTING, we will try to .updateAddresses the currently
// connecting subchannel if it exists in the new list.
// As such, We need to make sure that with transitioning to CONNECTING the subchannel for
// the current address of a valid index exists.
if ((!enableHappyEyeballs && !addressIndex.isValid())
|| (addressIndex.isValid() && !subchannels.containsKey(
addressIndex.getCurrentAddress()))) {
addressIndex.seekTo(getAddress(subchannelData.subchannel));
}
updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult()));
break;

Expand Down Expand Up @@ -636,6 +646,11 @@ ConnectivityState getConcludedConnectivityState() {
return this.concludedState;
}

@VisibleForTesting
ConnectivityState getRawConnectivityState() {
return this.rawConnectivityState;
}

/**
* Picker that requests connection during the first pick, and returns noResult.
*/
Expand Down
133 changes: 133 additions & 0 deletions core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,139 @@ public void healthCheckFlow() {
verifyNoMoreInteractions(mockHelper);
}

// reproduces #12796
@Test
public void healthCheckWithTF_AllowsStateInconsistency() {
assumeTrue(!serializeRetries);

when(mockSubchannel1.getAttributes()).thenReturn(
Attributes.newBuilder().set(HAS_HEALTH_PRODUCER_LISTENER_KEY, true).build());

Attributes petioleAttributes =
Attributes.newBuilder().set(IS_PETIOLE_POLICY, true).build();

loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(
Lists.newArrayList(
/* server 1 */servers.get(0),
/* server 3 */servers.get(2)
))
.setAttributes(petioleAttributes)
.build());

// Get the state and health listener for subchannel 1
verify(mockHelper).createSubchannel(createArgsCaptor.capture());
SubchannelStateListener healthListener1 =
createArgsCaptor.getValue().getOption(HEALTH_CONSUMER_LISTENER_ARG_KEY);
verify(mockSubchannel1).start(stateListenerCaptor.capture());
SubchannelStateListener stateListener1 = stateListenerCaptor.getValue();

// As start() was called, we transition subchannel 1 to CONNECTING...
stateListener1.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING));
healthListener1.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING));

// ...which eventually ends up READY.
stateListener1.onSubchannelState(ConnectivityStateInfo.forNonError(READY));
healthListener1.onSubchannelState(ConnectivityStateInfo.forNonError(READY));

// Let the fun begin: subchannel 1's health turns into TRANSIENT_FAILURE
healthListener1.onSubchannelState(
ConnectivityStateInfo.forTransientFailure(
Status.UNAVAILABLE.withDescription("health failure")));
// HealthListener.onSubchannelState gets called. It updates the LBs balancing
// state/concludedState.
assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState());
assertEquals(READY, loadBalancer.getRawConnectivityState());

// Subchannel 1's transport goes idle
stateListener1.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE));

// LB's raw connectivity stays ready as the TRANSIENT_FAILURE health state
assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState());
assertEquals(READY, loadBalancer.getRawConnectivityState());
assertEquals(0, loadBalancer.getIndexLocation());

// LB tries to reconnect subchannel 1.
verify(mockSubchannel1, times(2)).requestConnection();

stateListener1.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING));

// LB is waiting for subchannel 1 to report status.
assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState());
assertEquals(READY, loadBalancer.getRawConnectivityState());
assertEquals(0, loadBalancer.getIndexLocation());

// Subchannel 1's new connection attempt fails and reports TRANSIENT_FAILURE.
stateListener1.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR));

// LB increments the index and tries to connect to server 3.
assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState());
assertEquals(READY, loadBalancer.getRawConnectivityState());
assertEquals(1, loadBalancer.getIndexLocation());
verify(mockSubchannel3).start(stateListenerCaptor.capture());
SubchannelStateListener stateListener3 = stateListenerCaptor.getValue();
verify(mockSubchannel3).requestConnection();
stateListener3.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING));

// Subchannel 3 connection did not change the state as we are
// still in TRANSIENT_FAILURE health state.
assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState());
assertEquals(READY, loadBalancer.getRawConnectivityState());
assertEquals(1, loadBalancer.getIndexLocation());

List<EquivalentAddressGroup> newServers =
Lists.newArrayList(
/* server 2 */
servers.get(1),
/* server 1 */
servers.get(0)
);

// The resolver update removes the (current) subchannel 3, keeps server 1, and
// resets addressIndex to server2, which has no subchannel.
loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(newServers)
.setAttributes(petioleAttributes)
.build());

verify(mockSubchannel3, times(1)).shutdown();

// LB thinks that there are no subchannels that are trying to connect.
assertEquals(IDLE, loadBalancer.getRawConnectivityState());
assertEquals(IDLE, loadBalancer.getConcludedConnectivityState());
// As mentioned, the LB resets the index to 0 by calling addressIndex.updateGroups.
// Given the new list, it is now pointing to server 2 which does not have a subchannel.
assertEquals(0, loadBalancer.getIndexLocation());

// Subchannel 1 is still in TRANSIENT_FAILURE state. Is backoff expires,
// and now it is retrying to connect. This state listener transitions the LB to CONNECTING.
stateListener1.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING));

// As our health state is IDLE now the LB handles the CONNECTING subchannel state change
// by transitioning into CONNECTING itself.
assertEquals(CONNECTING, loadBalancer.getRawConnectivityState());
assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState());

// Before the fix:
// The index is now pointing to server 2 for which the LB did not create a subchannel yet.
// assertEquals(0, loadBalancer.getIndexLocation());

// The index is now pointing to server 1
assertEquals(1, loadBalancer.getIndexLocation());

// The resolver refreshes and provides the same addresses.
// As the LB is in CONNECTING, acceptResolvedAddresses tries
// to get the subchannel represented from the current index (server 2) and
// update its addresses. As the subchannel still does not exist an NPE is thrown.
assertEquals(Status.OK, loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(newServers)
.setAttributes(petioleAttributes)
.build()));
}

@Test
public void pickAfterStateChangeAfterResolution() {
InOrder inOrder =
Expand Down
Loading