Skip to content

Commit

Permalink
xds: change ring_hash LB aggregation rule to handles transient_failur…
Browse files Browse the repository at this point in the history
…es (#9084) (#9094)

per gRFC change grpc/grpc#29332:
Apply new aggregation rule: If there is at least one subchannel in state TRANSIENT_FAILURE and there are more than one subchannel, report state CONNECTING. If we hit this rule, proactively start a subchannel connection attempt.
  • Loading branch information
YifeiZhuang authored Apr 19, 2022
1 parent 78ccc81 commit f557fe2
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 40 deletions.
104 changes: 72 additions & 32 deletions xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;

/**
Expand All @@ -67,6 +69,8 @@ final class RingHashLoadBalancer extends LoadBalancer {

private List<RingEntry> ring;
private ConnectivityState currentState;
private Iterator<Subchannel> connectionAttemptIterator = subchannels.values().iterator();
private final Random random = new Random();

RingHashLoadBalancer(Helper helper) {
this.helper = checkNotNull(helper, "helper");
Expand Down Expand Up @@ -142,6 +146,14 @@ public void onSubchannelState(ConnectivityStateInfo newState) {
for (EquivalentAddressGroup addr : removedAddrs) {
removedSubchannels.add(subchannels.remove(addr));
}
// If we need to proactively start connecting, iterate through all the subchannels, starting
// at a random position.
// Alternatively, we should better start at the same position.
connectionAttemptIterator = subchannels.values().iterator();
int randomAdvance = random.nextInt(subchannels.size());
while (randomAdvance-- > 0) {
connectionAttemptIterator.next();
}

// Update the picker before shutting down the subchannels, to reduce the chance of race
// between picking a subchannel and shutting it down.
Expand Down Expand Up @@ -203,53 +215,77 @@ public void shutdown() {
* TRANSIENT_FAILURE</li>
* <li>If there is at least one subchannel in CONNECTING state, overall state is
* CONNECTING</li>
* <li> If there is one subchannel in TRANSIENT_FAILURE state and there is
* more than one subchannel, report CONNECTING </li>
* <li>If there is at least one subchannel in IDLE state, overall state is IDLE</li>
* <li>Otherwise, overall state is TRANSIENT_FAILURE</li>
* </ol>
*/
private void updateBalancingState() {
checkState(!subchannels.isEmpty(), "no subchannel has been created");
int failureCount = 0;
boolean hasConnecting = false;
Subchannel idleSubchannel = null;
ConnectivityState overallState = null;
boolean start_connection_attempt = false;
int num_idle_ = 0;
int num_ready_ = 0;
int num_connecting_ = 0;
int num_transient_failure_ = 0;
for (Subchannel subchannel : subchannels.values()) {
ConnectivityState state = getSubchannelStateInfoRef(subchannel).value.getState();
if (state == READY) {
overallState = READY;
num_ready_++;
break;
}
if (state == TRANSIENT_FAILURE) {
failureCount++;
} else if (state == CONNECTING) {
hasConnecting = true;
} else if (state == TRANSIENT_FAILURE) {
num_transient_failure_++;
} else if (state == CONNECTING ) {
num_connecting_++;
} else if (state == IDLE) {
if (idleSubchannel == null) {
idleSubchannel = subchannel;
}
num_idle_++;
}
}
if (overallState == null) {
if (failureCount >= 2) {
// This load balancer may not get any pick requests from the upstream if it's reporting
// TRANSIENT_FAILURE. It needs to recover by itself by attempting to connect to at least
// one subchannel that has not failed at any given time.
if (!hasConnecting && idleSubchannel != null) {
idleSubchannel.requestConnection();
}
overallState = TRANSIENT_FAILURE;
} else if (hasConnecting) {
overallState = CONNECTING;
} else if (idleSubchannel != null) {
overallState = IDLE;
} else {
overallState = TRANSIENT_FAILURE;
}
ConnectivityState overallState;
if (num_ready_ > 0) {
overallState = READY;
} else if (num_transient_failure_ >= 2) {
overallState = TRANSIENT_FAILURE;
start_connection_attempt = true;
} else if (num_connecting_ > 0) {
overallState = CONNECTING;
} else if (num_transient_failure_ == 1 && subchannels.size() > 1) {
overallState = CONNECTING;
start_connection_attempt = true;
} else if (num_idle_ > 0) {
overallState = IDLE;
} else {
overallState = TRANSIENT_FAILURE;
start_connection_attempt = true;
}
RingHashPicker picker = new RingHashPicker(syncContext, ring, subchannels);
// TODO(chengyuanzhang): avoid unnecessary reprocess caused by duplicated server addr updates
helper.updateBalancingState(overallState, picker);
currentState = overallState;
// While the ring_hash policy is reporting TRANSIENT_FAILURE, it will
// not be getting any pick requests from the priority policy.
// However, because the ring_hash policy does not attempt to
// reconnect to subchannels unless it is getting pick requests,
// it will need special handling to ensure that it will eventually
// recover from TRANSIENT_FAILURE state once the problem is resolved.
// Specifically, it will make sure that it is attempting to connect to
// at least one subchannel at any given time. After a given subchannel
// fails a connection attempt, it will move on to the next subchannel
// in the ring. It will keep doing this until one of the subchannels
// successfully connects, at which point it will report READY and stop
// proactively trying to connect. The policy will remain in
// TRANSIENT_FAILURE until at least one subchannel becomes connected,
// even if subchannels are in state CONNECTING during that time.
//
// Note that we do the same thing when the policy is in state
// CONNECTING, just to ensure that we don't remain in CONNECTING state
// indefinitely if there are no new picks coming in.
if (start_connection_attempt) {
if (!connectionAttemptIterator.hasNext()) {
connectionAttemptIterator = subchannels.values().iterator();
}
connectionAttemptIterator.next().requestConnection();
}
}

private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
Expand All @@ -259,18 +295,22 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo
if (stateInfo.getState() == TRANSIENT_FAILURE || stateInfo.getState() == IDLE) {
helper.refreshNameResolution();
}
Ref<ConnectivityStateInfo> subchannelStateRef = getSubchannelStateInfoRef(subchannel);
updateConnectivityState(subchannel, stateInfo);
updateBalancingState();
}

private void updateConnectivityState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
Ref<ConnectivityStateInfo> subchannelStateRef = getSubchannelStateInfoRef(subchannel);
ConnectivityState previousConnectivityState = subchannelStateRef.value.getState();
// Don't proactively reconnect if the subchannel enters IDLE, even if previously was connected.
// If the subchannel was previously in TRANSIENT_FAILURE, it is considered to stay in
// TRANSIENT_FAILURE until it becomes READY.
if (subchannelStateRef.value.getState() == TRANSIENT_FAILURE) {
if (previousConnectivityState == TRANSIENT_FAILURE) {
if (stateInfo.getState() == CONNECTING || stateInfo.getState() == IDLE) {
return;
}
}
subchannelStateRef.value = stateInfo;
updateBalancingState();
}

private static void shutdownSubchannel(Subchannel subchannel) {
Expand Down
Loading

0 comments on commit f557fe2

Please sign in to comment.