diff --git a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java index f8f5c94f5ba..ab60a024e1f 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java @@ -333,6 +333,16 @@ void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo case CONNECTING: rawConnectivityState = CONNECTING; + // If we get a newly resolved address list via acceptResolvedAddresses, + // as we are in CONNECTING, we will try to .updateAddresses the currently + // connecting subchannel if it exists in the new list. + // As such, We need to make sure that with transitioning to CONNECTING the subchannel for + // the current address of a valid index exists. + if ((!enableHappyEyeballs && !addressIndex.isValid()) + || (addressIndex.isValid() && !subchannels.containsKey( + addressIndex.getCurrentAddress()))) { + addressIndex.seekTo(getAddress(subchannelData.subchannel)); + } updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult())); break; @@ -636,6 +646,11 @@ ConnectivityState getConcludedConnectivityState() { return this.concludedState; } + @VisibleForTesting + ConnectivityState getRawConnectivityState() { + return this.rawConnectivityState; + } + /** * Picker that requests connection during the first pick, and returns noResult. */ diff --git a/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java b/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java index eb7b40257c0..0467e57223d 100644 --- a/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java +++ b/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java @@ -642,6 +642,139 @@ public void healthCheckFlow() { verifyNoMoreInteractions(mockHelper); } + // reproduces #12796 + @Test + public void healthCheckWithTF_AllowsStateInconsistency() { + assumeTrue(!serializeRetries); + + when(mockSubchannel1.getAttributes()).thenReturn( + Attributes.newBuilder().set(HAS_HEALTH_PRODUCER_LISTENER_KEY, true).build()); + + Attributes petioleAttributes = + Attributes.newBuilder().set(IS_PETIOLE_POLICY, true).build(); + + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses( + Lists.newArrayList( + /* server 1 */servers.get(0), + /* server 3 */servers.get(2) + )) + .setAttributes(petioleAttributes) + .build()); + + // Get the state and health listener for subchannel 1 + verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + SubchannelStateListener healthListener1 = + createArgsCaptor.getValue().getOption(HEALTH_CONSUMER_LISTENER_ARG_KEY); + verify(mockSubchannel1).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener1 = stateListenerCaptor.getValue(); + + // As start() was called, we transition subchannel 1 to CONNECTING... + stateListener1.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); + healthListener1.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); + + // ...which eventually ends up READY. + stateListener1.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + healthListener1.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + + // Let the fun begin: subchannel 1's health turns into TRANSIENT_FAILURE + healthListener1.onSubchannelState( + ConnectivityStateInfo.forTransientFailure( + Status.UNAVAILABLE.withDescription("health failure"))); + // HealthListener.onSubchannelState gets called. It updates the LBs balancing + // state/concludedState. + assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState()); + assertEquals(READY, loadBalancer.getRawConnectivityState()); + + // Subchannel 1's transport goes idle + stateListener1.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE)); + + // LB's raw connectivity stays ready as the TRANSIENT_FAILURE health state + assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState()); + assertEquals(READY, loadBalancer.getRawConnectivityState()); + assertEquals(0, loadBalancer.getIndexLocation()); + + // LB tries to reconnect subchannel 1. + verify(mockSubchannel1, times(2)).requestConnection(); + + stateListener1.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); + + // LB is waiting for subchannel 1 to report status. + assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState()); + assertEquals(READY, loadBalancer.getRawConnectivityState()); + assertEquals(0, loadBalancer.getIndexLocation()); + + // Subchannel 1's new connection attempt fails and reports TRANSIENT_FAILURE. + stateListener1.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR)); + + // LB increments the index and tries to connect to server 3. + assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState()); + assertEquals(READY, loadBalancer.getRawConnectivityState()); + assertEquals(1, loadBalancer.getIndexLocation()); + verify(mockSubchannel3).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener3 = stateListenerCaptor.getValue(); + verify(mockSubchannel3).requestConnection(); + stateListener3.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); + + // Subchannel 3 connection did not change the state as we are + // still in TRANSIENT_FAILURE health state. + assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState()); + assertEquals(READY, loadBalancer.getRawConnectivityState()); + assertEquals(1, loadBalancer.getIndexLocation()); + + List newServers = + Lists.newArrayList( + /* server 2 */ + servers.get(1), + /* server 1 */ + servers.get(0) + ); + + // The resolver update removes the (current) subchannel 3, keeps server 1, and + // resets addressIndex to server2, which has no subchannel. + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(newServers) + .setAttributes(petioleAttributes) + .build()); + + verify(mockSubchannel3, times(1)).shutdown(); + + // LB thinks that there are no subchannels that are trying to connect. + assertEquals(IDLE, loadBalancer.getRawConnectivityState()); + assertEquals(IDLE, loadBalancer.getConcludedConnectivityState()); + // As mentioned, the LB resets the index to 0 by calling addressIndex.updateGroups. + // Given the new list, it is now pointing to server 2 which does not have a subchannel. + assertEquals(0, loadBalancer.getIndexLocation()); + + // Subchannel 1 is still in TRANSIENT_FAILURE state. Is backoff expires, + // and now it is retrying to connect. This state listener transitions the LB to CONNECTING. + stateListener1.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); + + // As our health state is IDLE now the LB handles the CONNECTING subchannel state change + // by transitioning into CONNECTING itself. + assertEquals(CONNECTING, loadBalancer.getRawConnectivityState()); + assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); + + // Before the fix: + // The index is now pointing to server 2 for which the LB did not create a subchannel yet. + // assertEquals(0, loadBalancer.getIndexLocation()); + + // The index is now pointing to server 1 + assertEquals(1, loadBalancer.getIndexLocation()); + + // The resolver refreshes and provides the same addresses. + // As the LB is in CONNECTING, acceptResolvedAddresses tries + // to get the subchannel represented from the current index (server 2) and + // update its addresses. As the subchannel still does not exist an NPE is thrown. + assertEquals(Status.OK, loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(newServers) + .setAttributes(petioleAttributes) + .build())); + } + @Test public void pickAfterStateChangeAfterResolution() { InOrder inOrder =