Skip to content

Commit

Permalink
xds: fix ring-hash-picker behaviour (1.46.x backport) (#9096)
Browse files Browse the repository at this point in the history
previously it appears the picker logic is wrong, e.g. not request connecting on the any subchannel if it is in TRANSIENT_FAILURE
Refactored the logic to mirror the pseudo-code more so easier to understand.
  • Loading branch information
YifeiZhuang authored Apr 20, 2022
1 parent f557fe2 commit afc52a0
Show file tree
Hide file tree
Showing 2 changed files with 273 additions and 30 deletions.
67 changes: 42 additions & 25 deletions xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.Map;
import java.util.Random;
import java.util.Set;
import javax.annotation.Nullable;

/**
* A {@link LoadBalancer} that provides consistent hashing based load balancing to upstream hosts.
Expand Down Expand Up @@ -399,10 +400,12 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
// Try finding a READY subchannel. Starting from the ring entry next to the RPC's hash.
// If the one of the first two subchannels is not in TRANSIENT_FAILURE, return result
// based on that subchannel. Otherwise, fail the pick unless a READY subchannel is found.
// Meanwhile, trigger connection for the first subchannel that is in IDLE if no subchannel
// before it is in CONNECTING or READY.
boolean hasPending = false; // true if having subchannel(s) in CONNECTING or IDLE
boolean canBuffer = true; // true if RPCs can be buffered with a pending subchannel
// Meanwhile, trigger connection for the channel and status:
// For the first subchannel that is in IDLE or TRANSIENT_FAILURE;
// And for the second subchannel that is in IDLE or TRANSIENT_FAILURE;
// And for each of the following subchannels that is in TRANSIENT_FAILURE or IDLE,
// stop until we find the first subchannel that is in CONNECTING or IDLE status.
boolean foundFirstNonFailed = false; // true if having subchannel(s) in CONNECTING or IDLE
Subchannel firstSubchannel = null;
Subchannel secondSubchannel = null;
for (int i = 0; i < ring.size(); i++) {
Expand All @@ -417,36 +420,50 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
// are failed unless there is a READY connection.
if (firstSubchannel == null) {
firstSubchannel = subchannel.subchannel;
} else if (subchannel.subchannel != firstSubchannel) {
if (secondSubchannel == null) {
secondSubchannel = subchannel.subchannel;
} else if (subchannel.subchannel != secondSubchannel) {
canBuffer = false;
PickResult maybeBuffer = pickSubchannelsNonReady(subchannel);
if (maybeBuffer != null) {
return maybeBuffer;
}
}
if (subchannel.stateInfo.getState() == TRANSIENT_FAILURE) {
continue;
}
if (!hasPending) { // first non-failing subchannel
if (subchannel.stateInfo.getState() == IDLE) {
final Subchannel finalSubchannel = subchannel.subchannel;
syncContext.execute(new Runnable() {
@Override
public void run() {
finalSubchannel.requestConnection();
}
});
} else if (subchannel.subchannel != firstSubchannel && secondSubchannel == null) {
secondSubchannel = subchannel.subchannel;
PickResult maybeBuffer = pickSubchannelsNonReady(subchannel);
if (maybeBuffer != null) {
return maybeBuffer;
}
if (canBuffer) { // done if this is the first or second two subchannel
return PickResult.withNoResult(); // queue the pick and re-process later
} else if (subchannel.subchannel != firstSubchannel
&& subchannel.subchannel != secondSubchannel) {
if (!foundFirstNonFailed) {
pickSubchannelsNonReady(subchannel);
if (subchannel.stateInfo.getState() != TRANSIENT_FAILURE) {
foundFirstNonFailed = true;
}
}
hasPending = true;
}
}
// Fail the pick with error status of the original subchannel hit by hash.
SubchannelView originalSubchannel = pickableSubchannels.get(ring.get(mid).addrKey);
return PickResult.withError(originalSubchannel.stateInfo.getStatus());
}

@Nullable
private PickResult pickSubchannelsNonReady(SubchannelView subchannel) {
if (subchannel.stateInfo.getState() == TRANSIENT_FAILURE
|| subchannel.stateInfo.getState() == IDLE ) {
final Subchannel finalSubchannel = subchannel.subchannel;
syncContext.execute(new Runnable() {
@Override
public void run() {
finalSubchannel.requestConnection();
}
});
}
if (subchannel.stateInfo.getState() == CONNECTING
|| subchannel.stateInfo.getState() == IDLE) {
return PickResult.withNoResult();
} else {
return null;
}
}
}

/**
Expand Down
236 changes: 231 additions & 5 deletions xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -618,9 +618,8 @@ public void skipFailingHosts_firstTwoHostsFailed_pickNextFirstReady() {
assertThat(result.getStatus().getDescription()).isEqualTo("unreachable");
verify(subchannels.get(Collections.singletonList(servers.get(1))))
.requestConnection(); // kickoff connection to server3 (next first non-failing)
// TODO: zivy@
//verify(subchannels.get(Collections.singletonList(servers.get(0)))).requestConnection();
//verify(subchannels.get(Collections.singletonList(servers.get(2)))).requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(0)))).requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(2)))).requestConnection();

// Now connecting to server1.
deliverSubchannelState(
Expand Down Expand Up @@ -664,6 +663,7 @@ public void allSubchannelsInTransientFailure() {
}
verify(helper, atLeastOnce())
.updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
verifyConnection(3);

// Picking subchannel triggers connection. RPC hash hits server0.
PickSubchannelArgs args = new PickSubchannelArgsImpl(
Expand All @@ -674,6 +674,233 @@ public void allSubchannelsInTransientFailure() {
assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(result.getStatus().getDescription())
.isEqualTo("[FakeSocketAddress-server0] unreachable");
verify(subchannels.get(Collections.singletonList(servers.get(0))))
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(1))))
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(2))))
.requestConnection();
}

@Test
public void firstSubchannelIdle() {
// Map each server address to exactly one ring entry.
RingHashConfig config = new RingHashConfig(3, 3);
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1);
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));

deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(1))),
ConnectivityStateInfo.forTransientFailure(
Status.UNAVAILABLE.withDescription("unreachable")));
verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
verifyConnection(1);

// Picking subchannel triggers connection. RPC hash hits server0.
PickSubchannelArgs args = new PickSubchannelArgsImpl(
TestMethodDescriptors.voidMethod(), new Metadata(),
CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid()));
PickResult result = pickerCaptor.getValue().pickSubchannel(args);
assertThat(result.getStatus().isOk()).isTrue();
verify(subchannels.get(Collections.singletonList(servers.get(0))))
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(1))), never())
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(2))), never())
.requestConnection();
}

@Test
public void firstSubchannelConnecting() {
// Map each server address to exactly one ring entry.
RingHashConfig config = new RingHashConfig(3, 3);
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1);
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));

deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(0))),
ConnectivityStateInfo.forNonError(CONNECTING));
deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(1))),
ConnectivityStateInfo.forNonError(CONNECTING));
verify(helper, times(2)).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());

// Picking subchannel triggers connection.
PickSubchannelArgs args = new PickSubchannelArgsImpl(
TestMethodDescriptors.voidMethod(), new Metadata(),
CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid()));
PickResult result = pickerCaptor.getValue().pickSubchannel(args);
assertThat(result.getStatus().isOk()).isTrue();
verify(subchannels.get(Collections.singletonList(servers.get(0))), never())
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(1))), never())
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(2))), never())
.requestConnection();
}

@Test
public void firstSubchannelFailure() {
// Map each server address to exactly one ring entry.
RingHashConfig config = new RingHashConfig(3, 3);
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1);
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
// ring:
// "[FakeSocketAddress-server1]_0"
// "[FakeSocketAddress-server0]_0"
// "[FakeSocketAddress-server2]_0"

deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(0))),
ConnectivityStateInfo.forTransientFailure(
Status.UNAVAILABLE.withDescription("unreachable")));
verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
verifyConnection(1);

// Picking subchannel triggers connection.
PickSubchannelArgs args = new PickSubchannelArgsImpl(
TestMethodDescriptors.voidMethod(), new Metadata(),
CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid()));
PickResult result = pickerCaptor.getValue().pickSubchannel(args);
assertThat(result.getStatus().isOk()).isTrue();
verify(subchannels.get(Collections.singletonList(servers.get(0))))
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(2))))
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(1))), never())
.requestConnection();
}

@Test
public void secondSubchannelConnecting() {
// Map each server address to exactly one ring entry.
RingHashConfig config = new RingHashConfig(3, 3);
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1);
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
// ring:
// "[FakeSocketAddress-server1]_0"
// "[FakeSocketAddress-server0]_0"
// "[FakeSocketAddress-server2]_0"

Subchannel firstSubchannel = subchannels.get(Collections.singletonList(servers.get(0)));
deliverSubchannelState(firstSubchannel,
ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE.withDescription(
firstSubchannel.getAddresses().getAddresses() + "unreachable")));
deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(2))),
ConnectivityStateInfo.forNonError(CONNECTING));
verify(helper, times(2)).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
verifyConnection(1);

// Picking subchannel triggers connection.
PickSubchannelArgs args = new PickSubchannelArgsImpl(
TestMethodDescriptors.voidMethod(), new Metadata(),
CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid()));
PickResult result = pickerCaptor.getValue().pickSubchannel(args);
assertThat(result.getStatus().isOk()).isTrue();
verify(subchannels.get(Collections.singletonList(servers.get(0))))
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(2))), never())
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(1))), never())
.requestConnection();
}

@Test
public void secondSubchannelFailure() {
// Map each server address to exactly one ring entry.
RingHashConfig config = new RingHashConfig(3, 3);
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1);
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
// ring:
// "[FakeSocketAddress-server1]_0"
// "[FakeSocketAddress-server0]_0"
// "[FakeSocketAddress-server2]_0"

Subchannel firstSubchannel = subchannels.get(Collections.singletonList(servers.get(0)));
deliverSubchannelState(firstSubchannel,
ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE.withDescription(
firstSubchannel.getAddresses().getAddresses() + " unreachable")));
deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(2))),
ConnectivityStateInfo.forTransientFailure(
Status.UNAVAILABLE.withDescription("unreachable")));
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
verifyConnection(2);

// Picking subchannel triggers connection.
PickSubchannelArgs args = new PickSubchannelArgsImpl(
TestMethodDescriptors.voidMethod(), new Metadata(),
CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid()));
PickResult result = pickerCaptor.getValue().pickSubchannel(args);
assertThat(result.getStatus().isOk()).isFalse();
assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(result.getStatus().getDescription())
.isEqualTo("[FakeSocketAddress-server0] unreachable");
verify(subchannels.get(Collections.singletonList(servers.get(0))))
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(2))))
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(1))))
.requestConnection();
}

@Test
public void thirdSubchannelConnecting() {
// Map each server address to exactly one ring entry.
RingHashConfig config = new RingHashConfig(3, 3);
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1);
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
// ring:
// "[FakeSocketAddress-server1]_0"
// "[FakeSocketAddress-server0]_0"
// "[FakeSocketAddress-server2]_0"

Subchannel firstSubchannel = subchannels.get(Collections.singletonList(servers.get(0)));
deliverSubchannelState(firstSubchannel,
ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE.withDescription(
firstSubchannel.getAddresses().getAddresses() + " unreachable")));
deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(2))),
ConnectivityStateInfo.forTransientFailure(
Status.UNAVAILABLE.withDescription("unreachable")));
deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(1))),
ConnectivityStateInfo.forNonError(CONNECTING));
verify(helper, times(2)).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
verifyConnection(3);

// Picking subchannel triggers connection.
PickSubchannelArgs args = new PickSubchannelArgsImpl(
TestMethodDescriptors.voidMethod(), new Metadata(),
CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid()));
PickResult result = pickerCaptor.getValue().pickSubchannel(args);
assertThat(result.getStatus().isOk()).isFalse();
assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(result.getStatus().getDescription())
.isEqualTo("[FakeSocketAddress-server0] unreachable");
verify(subchannels.get(Collections.singletonList(servers.get(0))))
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(2))))
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(1))), never())
.requestConnection();
}

@Test
Expand Down Expand Up @@ -706,8 +933,7 @@ public void stickyTransientFailure() {
CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid()));
PickResult result = pickerCaptor.getValue().pickSubchannel(args);
assertThat(result.getStatus().isOk()).isTrue();
// enabled me. there is a bug in picker behavior
// verify(subchannels.get(Collections.singletonList(servers.get(0)))).requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(0)))).requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(2)))).requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(1))), never())
.requestConnection();
Expand Down

0 comments on commit afc52a0

Please sign in to comment.