Skip to content

xds: Use acceptResolvedAddresses() for WeightedTarget children #12053

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions xds/src/main/java/io/grpc/xds/WeightedTargetLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,19 @@ public Status acceptResolvedAddressesInternal(ResolvedAddresses resolvedAddresse
}
}
targets = newTargets;
Status status = Status.OK;
for (String targetName : targets.keySet()) {
childBalancers.get(targetName).handleResolvedAddresses(
Status newStatus = childBalancers.get(targetName).acceptResolvedAddresses(
resolvedAddresses.toBuilder()
.setAddresses(AddressFilter.filter(resolvedAddresses.getAddresses(), targetName))
.setLoadBalancingPolicyConfig(targets.get(targetName).childConfig)
.setAttributes(resolvedAddresses.getAttributes().toBuilder()
.set(CHILD_NAME, targetName)
.build())
.build());
if (!newStatus.isOk()) {
status = newStatus;
}
}

// Cleanup removed targets.
Expand All @@ -108,7 +112,7 @@ public Status acceptResolvedAddressesInternal(ResolvedAddresses resolvedAddresse
childBalancers.keySet().retainAll(targets.keySet());
childHelpers.keySet().retainAll(targets.keySet());
updateOverallBalancingState();
return Status.OK;
return status;
}

@Override
Expand Down
30 changes: 20 additions & 10 deletions xds/src/test/java/io/grpc/xds/WeightedTargetLoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public String getPolicyName() {
public LoadBalancer newLoadBalancer(Helper helper) {
childHelpers.add(helper);
LoadBalancer childBalancer = mock(LoadBalancer.class);
when(childBalancer.acceptResolvedAddresses(any())).thenReturn(Status.OK);
childBalancers.add(childBalancer);
fooLbCreated++;
return childBalancer;
Expand All @@ -139,6 +140,7 @@ public String getPolicyName() {
public LoadBalancer newLoadBalancer(Helper helper) {
childHelpers.add(helper);
LoadBalancer childBalancer = mock(LoadBalancer.class);
when(childBalancer.acceptResolvedAddresses(any())).thenReturn(Status.OK);
childBalancers.add(childBalancer);
barLbCreated++;
return childBalancer;
Expand Down Expand Up @@ -180,7 +182,7 @@ public void tearDown() {
}

@Test
public void handleResolvedAddresses() {
public void acceptResolvedAddresses() {
ArgumentCaptor<ResolvedAddresses> resolvedAddressesCaptor =
ArgumentCaptor.forClass(ResolvedAddresses.class);
Attributes.Key<Object> fakeKey = Attributes.Key.create("fake_key");
Expand All @@ -203,20 +205,21 @@ public void handleResolvedAddresses() {
eag2 = AddressFilter.setPathFilter(eag2, ImmutableList.of("target2"));
EquivalentAddressGroup eag3 = new EquivalentAddressGroup(socketAddresses[3]);
eag3 = AddressFilter.setPathFilter(eag3, ImmutableList.of("target3"));
weightedTargetLb.handleResolvedAddresses(
Status status = weightedTargetLb.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.of(eag0, eag1, eag2, eag3))
.setAttributes(Attributes.newBuilder().set(fakeKey, fakeValue).build())
.setLoadBalancingPolicyConfig(new WeightedTargetConfig(targets))
.build());
assertThat(status.isOk()).isTrue();
verify(helper).updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult()));
assertThat(childBalancers).hasSize(4);
assertThat(childHelpers).hasSize(4);
assertThat(fooLbCreated).isEqualTo(2);
assertThat(barLbCreated).isEqualTo(2);

for (int i = 0; i < childBalancers.size(); i++) {
verify(childBalancers.get(i)).handleResolvedAddresses(resolvedAddressesCaptor.capture());
verify(childBalancers.get(i)).acceptResolvedAddresses(resolvedAddressesCaptor.capture());
ResolvedAddresses resolvedAddresses = resolvedAddressesCaptor.getValue();
assertThat(resolvedAddresses.getLoadBalancingPolicyConfig()).isEqualTo(configs[i]);
assertThat(resolvedAddresses.getAttributes().get(fakeKey)).isEqualTo(fakeValue);
Expand All @@ -226,6 +229,11 @@ public void handleResolvedAddresses() {
.containsExactly(socketAddresses[i]);
}

// Even when a child return an error from the update, the other children should still receive
// their updates.
Status acceptReturnStatus = Status.UNAVAILABLE.withDescription("Didn't like something");
when(childBalancers.get(2).acceptResolvedAddresses(any())).thenReturn(acceptReturnStatus);

// Update new weighted target config for a typical workflow.
// target0 removed. target1, target2, target3 changed weight and config. target4 added.
int[] newWeights = new int[]{11, 22, 33, 44};
Expand All @@ -243,11 +251,12 @@ public void handleResolvedAddresses() {
"target4",
new WeightedPolicySelection(
newWeights[3], newChildConfig(fooLbProvider, newConfigs[3])));
weightedTargetLb.handleResolvedAddresses(
status = weightedTargetLb.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(new WeightedTargetConfig(newTargets))
.build());
assertThat(status.getCode()).isEqualTo(acceptReturnStatus.getCode());
verify(helper, atLeast(2))
.updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult()));
assertThat(childBalancers).hasSize(5);
Expand All @@ -258,7 +267,7 @@ public void handleResolvedAddresses() {
verify(childBalancers.get(0)).shutdown();
for (int i = 1; i < childBalancers.size(); i++) {
verify(childBalancers.get(i), atLeastOnce())
.handleResolvedAddresses(resolvedAddressesCaptor.capture());
.acceptResolvedAddresses(resolvedAddressesCaptor.capture());
assertThat(resolvedAddressesCaptor.getValue().getLoadBalancingPolicyConfig())
.isEqualTo(newConfigs[i - 1]);
}
Expand Down Expand Up @@ -286,7 +295,7 @@ public void handleNameResolutionError() {
"target2", weightedLbConfig2,
// {foo, 40, config3}
"target3", weightedLbConfig3);
weightedTargetLb.handleResolvedAddresses(
weightedTargetLb.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(new WeightedTargetConfig(targets))
Expand All @@ -313,7 +322,7 @@ public void balancingStateUpdatedFromChildBalancers() {
"target2", weightedLbConfig2,
// {foo, 40, config3}
"target3", weightedLbConfig3);
weightedTargetLb.handleResolvedAddresses(
weightedTargetLb.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(new WeightedTargetConfig(targets))
Expand Down Expand Up @@ -395,7 +404,7 @@ public void raceBetweenShutdownAndChildLbBalancingStateUpdate() {
Map<String, WeightedPolicySelection> targets = ImmutableMap.of(
"target0", weightedLbConfig0,
"target1", weightedLbConfig1);
weightedTargetLb.handleResolvedAddresses(
weightedTargetLb.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(new WeightedTargetConfig(targets))
Expand All @@ -421,7 +430,7 @@ public void noDuplicateOverallBalancingStateUpdate() {
weights[0], newChildConfig(fakeLbProvider, configs[0])),
"target3", new WeightedPolicySelection(
weights[3], newChildConfig(fakeLbProvider, configs[3])));
weightedTargetLb.handleResolvedAddresses(
weightedTargetLb.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(new WeightedTargetConfig(targets))
Expand Down Expand Up @@ -470,9 +479,10 @@ static class FakeLoadBalancer extends LoadBalancer {
}

@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
helper.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(Status.INTERNAL)));
return Status.OK;
}

@Override
Expand Down