From d5ff2ac63452fecf639240b6ed6219179c490039 Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Sun, 17 Apr 2022 20:45:34 -0700 Subject: [PATCH 1/2] xds: change ring_hash LB aggregation rule to handles transient_failures (#9084) --- .../io/grpc/xds/RingHashLoadBalancer.java | 104 +++++++++---- .../io/grpc/xds/RingHashLoadBalancerTest.java | 138 +++++++++++++++++- 2 files changed, 202 insertions(+), 40 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java index a8f517a8967..f4f8ee2e6ee 100644 --- a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java @@ -40,8 +40,10 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; /** @@ -67,6 +69,8 @@ final class RingHashLoadBalancer extends LoadBalancer { private List ring; private ConnectivityState currentState; + private Iterator connectionAttemptIterator = subchannels.values().iterator(); + private final Random random = new Random(); RingHashLoadBalancer(Helper helper) { this.helper = checkNotNull(helper, "helper"); @@ -142,6 +146,14 @@ public void onSubchannelState(ConnectivityStateInfo newState) { for (EquivalentAddressGroup addr : removedAddrs) { removedSubchannels.add(subchannels.remove(addr)); } + // If we need to proactively start connecting, iterate through all the subchannels, starting + // at a random position. + // Alternatively, we should better start at the same position. + connectionAttemptIterator = subchannels.values().iterator(); + int randomAdvance = random.nextInt(subchannels.size()); + while (randomAdvance-- > 0) { + connectionAttemptIterator.next(); + } // Update the picker before shutting down the subchannels, to reduce the chance of race // between picking a subchannel and shutting it down. @@ -203,53 +215,77 @@ public void shutdown() { * TRANSIENT_FAILURE *
  • If there is at least one subchannel in CONNECTING state, overall state is * CONNECTING
  • + *
  • If there is one subchannel in TRANSIENT_FAILURE state and there is + * more than one subchannel, report CONNECTING
  • *
  • If there is at least one subchannel in IDLE state, overall state is IDLE
  • *
  • Otherwise, overall state is TRANSIENT_FAILURE
  • * */ private void updateBalancingState() { checkState(!subchannels.isEmpty(), "no subchannel has been created"); - int failureCount = 0; - boolean hasConnecting = false; - Subchannel idleSubchannel = null; - ConnectivityState overallState = null; + boolean start_connection_attempt = false; + int num_idle_ = 0; + int num_ready_ = 0; + int num_connecting_ = 0; + int num_transient_failure_ = 0; for (Subchannel subchannel : subchannels.values()) { ConnectivityState state = getSubchannelStateInfoRef(subchannel).value.getState(); if (state == READY) { - overallState = READY; + num_ready_++; break; - } - if (state == TRANSIENT_FAILURE) { - failureCount++; - } else if (state == CONNECTING) { - hasConnecting = true; + } else if (state == TRANSIENT_FAILURE) { + num_transient_failure_++; + } else if (state == CONNECTING ) { + num_connecting_++; } else if (state == IDLE) { - if (idleSubchannel == null) { - idleSubchannel = subchannel; - } + num_idle_++; } } - if (overallState == null) { - if (failureCount >= 2) { - // This load balancer may not get any pick requests from the upstream if it's reporting - // TRANSIENT_FAILURE. It needs to recover by itself by attempting to connect to at least - // one subchannel that has not failed at any given time. - if (!hasConnecting && idleSubchannel != null) { - idleSubchannel.requestConnection(); - } - overallState = TRANSIENT_FAILURE; - } else if (hasConnecting) { - overallState = CONNECTING; - } else if (idleSubchannel != null) { - overallState = IDLE; - } else { - overallState = TRANSIENT_FAILURE; - } + ConnectivityState overallState; + if (num_ready_ > 0) { + overallState = READY; + } else if (num_transient_failure_ >= 2) { + overallState = TRANSIENT_FAILURE; + start_connection_attempt = true; + } else if (num_connecting_ > 0) { + overallState = CONNECTING; + } else if (num_transient_failure_ == 1 && subchannels.size() > 1) { + overallState = CONNECTING; + start_connection_attempt = true; + } else if (num_idle_ > 0) { + overallState = IDLE; + } else { + overallState = TRANSIENT_FAILURE; + start_connection_attempt = true; } RingHashPicker picker = new RingHashPicker(syncContext, ring, subchannels); // TODO(chengyuanzhang): avoid unnecessary reprocess caused by duplicated server addr updates helper.updateBalancingState(overallState, picker); currentState = overallState; + // While the ring_hash policy is reporting TRANSIENT_FAILURE, it will + // not be getting any pick requests from the priority policy. + // However, because the ring_hash policy does not attempt to + // reconnect to subchannels unless it is getting pick requests, + // it will need special handling to ensure that it will eventually + // recover from TRANSIENT_FAILURE state once the problem is resolved. + // Specifically, it will make sure that it is attempting to connect to + // at least one subchannel at any given time. After a given subchannel + // fails a connection attempt, it will move on to the next subchannel + // in the ring. It will keep doing this until one of the subchannels + // successfully connects, at which point it will report READY and stop + // proactively trying to connect. The policy will remain in + // TRANSIENT_FAILURE until at least one subchannel becomes connected, + // even if subchannels are in state CONNECTING during that time. + // + // Note that we do the same thing when the policy is in state + // CONNECTING, just to ensure that we don't remain in CONNECTING state + // indefinitely if there are no new picks coming in. + if (start_connection_attempt) { + if (!connectionAttemptIterator.hasNext()) { + connectionAttemptIterator = subchannels.values().iterator(); + } + connectionAttemptIterator.next().requestConnection(); + } } private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) { @@ -259,18 +295,22 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo if (stateInfo.getState() == TRANSIENT_FAILURE || stateInfo.getState() == IDLE) { helper.refreshNameResolution(); } - Ref subchannelStateRef = getSubchannelStateInfoRef(subchannel); + updateConnectivityState(subchannel, stateInfo); + updateBalancingState(); + } + private void updateConnectivityState(Subchannel subchannel, ConnectivityStateInfo stateInfo) { + Ref subchannelStateRef = getSubchannelStateInfoRef(subchannel); + ConnectivityState previousConnectivityState = subchannelStateRef.value.getState(); // Don't proactively reconnect if the subchannel enters IDLE, even if previously was connected. // If the subchannel was previously in TRANSIENT_FAILURE, it is considered to stay in // TRANSIENT_FAILURE until it becomes READY. - if (subchannelStateRef.value.getState() == TRANSIENT_FAILURE) { + if (previousConnectivityState == TRANSIENT_FAILURE) { if (stateInfo.getState() == CONNECTING || stateInfo.getState() == IDLE) { return; } } subchannelStateRef.value = stateInfo; - updateBalancingState(); } private static void shutdownSubchannel(Subchannel subchannel) { diff --git a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java index 5a9bb7ff4a8..9edbe02f098 100644 --- a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java @@ -25,6 +25,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -56,8 +57,10 @@ import io.grpc.xds.RingHashLoadBalancer.RingHashConfig; import java.lang.Thread.UncaughtExceptionHandler; import java.net.SocketAddress; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; +import java.util.Deque; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -95,6 +98,7 @@ public void uncaughtException(Thread t, Throwable e) { private final Map, Subchannel> subchannels = new HashMap<>(); private final Map subchannelStateListeners = new HashMap<>(); + private final Deque connectionRequestedQueue = new ArrayDeque<>(); private final XxHash64 hashFunc = XxHash64.INSTANCE; @Mock private Helper helper; @@ -123,6 +127,13 @@ public Void answer(InvocationOnMock invocation) throws Throwable { return null; } }).when(subchannel).start(any(SubchannelStateListener.class)); + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + connectionRequestedQueue.offer(subchannel); + return null; + } + }).when(subchannel).requestConnection(); return subchannel; } }); @@ -138,6 +149,7 @@ public void tearDown() { for (Subchannel subchannel : subchannels.values()) { verify(subchannel).shutdown(); } + connectionRequestedQueue.clear(); } @Test @@ -216,18 +228,21 @@ public void aggregateSubchannelStates_connectingReadyIdleFailure() { subchannels.get(Collections.singletonList(servers.get(0))), ConnectivityStateInfo.forNonError(CONNECTING)); inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); + verifyConnection(0); // two in CONNECTING deliverSubchannelState( subchannels.get(Collections.singletonList(servers.get(1))), ConnectivityStateInfo.forNonError(CONNECTING)); inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); + verifyConnection(0); // one in CONNECTING, one in READY deliverSubchannelState( subchannels.get(Collections.singletonList(servers.get(1))), ConnectivityStateInfo.forNonError(READY)); inOrder.verify(helper).updateBalancingState(eq(READY), any(SubchannelPicker.class)); + verifyConnection(0); // one in TRANSIENT_FAILURE, one in READY deliverSubchannelState( @@ -236,17 +251,28 @@ public void aggregateSubchannelStates_connectingReadyIdleFailure() { Status.UNKNOWN.withDescription("unknown failure"))); inOrder.verify(helper).refreshNameResolution(); inOrder.verify(helper).updateBalancingState(eq(READY), any(SubchannelPicker.class)); + verifyConnection(0); // one in TRANSIENT_FAILURE, one in IDLE deliverSubchannelState( subchannels.get(Collections.singletonList(servers.get(1))), ConnectivityStateInfo.forNonError(IDLE)); inOrder.verify(helper).refreshNameResolution(); - inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); + verifyConnection(1); verifyNoMoreInteractions(helper); } + private void verifyConnection(int times) { + for (int i = 0; i < times; i++) { + Subchannel connectOnce = connectionRequestedQueue.poll(); + assertThat(connectOnce).isNotNull(); + clearInvocations(connectOnce); + } + assertThat(connectionRequestedQueue.poll()).isNull(); + } + @Test public void aggregateSubchannelStates_twoOrMoreSubchannelsInTransientFailure() { RingHashConfig config = new RingHashConfig(10, 100); @@ -264,7 +290,8 @@ public void aggregateSubchannelStates_twoOrMoreSubchannelsInTransientFailure() { ConnectivityStateInfo.forTransientFailure( Status.UNAVAILABLE.withDescription("not found"))); inOrder.verify(helper).refreshNameResolution(); - inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); + verifyConnection(1); // two in TRANSIENT_FAILURE, two in IDLE deliverSubchannelState( @@ -274,6 +301,7 @@ public void aggregateSubchannelStates_twoOrMoreSubchannelsInTransientFailure() { inOrder.verify(helper).refreshNameResolution(); inOrder.verify(helper) .updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); + verifyConnection(1); // two in TRANSIENT_FAILURE, one in CONNECTING, one in IDLE // The overall state is dominated by the two in TRANSIENT_FAILURE. @@ -282,6 +310,7 @@ public void aggregateSubchannelStates_twoOrMoreSubchannelsInTransientFailure() { ConnectivityStateInfo.forNonError(CONNECTING)); inOrder.verify(helper) .updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); + verifyConnection(1); // three in TRANSIENT_FAILURE, one in CONNECTING deliverSubchannelState( @@ -291,12 +320,14 @@ public void aggregateSubchannelStates_twoOrMoreSubchannelsInTransientFailure() { inOrder.verify(helper).refreshNameResolution(); inOrder.verify(helper) .updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); + verifyConnection(1); // three in TRANSIENT_FAILURE, one in READY deliverSubchannelState( subchannels.get(Collections.singletonList(servers.get(2))), ConnectivityStateInfo.forNonError(READY)); inOrder.verify(helper).updateBalancingState(eq(READY), any(SubchannelPicker.class)); + verifyConnection(0); verifyNoMoreInteractions(helper); } @@ -320,15 +351,20 @@ public void subchannelStayInTransientFailureUntilBecomeReady() { verify(helper, times(3)).refreshNameResolution(); // Stays in IDLE when until there are two or more subchannels in TRANSIENT_FAILURE. - verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); verify(helper, times(2)) .updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); + verifyConnection(3); verifyNoMoreInteractions(helper); + reset(helper); // Simulate underlying subchannel auto reconnect after backoff. for (Subchannel subchannel : subchannels.values()) { deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING)); } + verify(helper, times(3)) + .updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); + verifyConnection(3); verifyNoMoreInteractions(helper); // Simulate one subchannel enters READY. @@ -337,6 +373,51 @@ public void subchannelStayInTransientFailureUntilBecomeReady() { verify(helper).updateBalancingState(eq(READY), any(SubchannelPicker.class)); } + @Test + public void updateConnectionIterator() { + RingHashConfig config = new RingHashConfig(10, 100); + List servers = createWeightedServerAddrs(1, 1, 1); + InOrder inOrder = Mockito.inOrder(helper); + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); + verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + + deliverSubchannelState( + subchannels.get(Collections.singletonList(servers.get(0))), + ConnectivityStateInfo.forTransientFailure( + Status.UNAVAILABLE.withDescription("connection lost"))); + inOrder.verify(helper).refreshNameResolution(); + inOrder.verify(helper) + .updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); + verifyConnection(1); + + servers = createWeightedServerAddrs(1,1); + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + inOrder.verify(helper) + .updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); + verifyConnection(1); + + deliverSubchannelState( + subchannels.get(Collections.singletonList(servers.get(1))), + ConnectivityStateInfo.forTransientFailure( + Status.UNAVAILABLE.withDescription("connection lost"))); + inOrder.verify(helper).refreshNameResolution(); + inOrder.verify(helper) + .updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); + verifyConnection(1); + + deliverSubchannelState( + subchannels.get(Collections.singletonList(servers.get(0))), + ConnectivityStateInfo.forNonError(CONNECTING)); + inOrder.verify(helper) + .updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); + verifyConnection(1); + } + @Test public void ignoreShutdownSubchannelStateChange() { RingHashConfig config = new RingHashConfig(10, 100); @@ -466,7 +547,8 @@ public void skipFailingHosts_pickNextNonFailingHostInFirstTwoHosts() { subchannels.get(Collections.singletonList(servers.get(0))), ConnectivityStateInfo.forTransientFailure( Status.UNAVAILABLE.withDescription("unreachable"))); - verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); + verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + verifyConnection(1); PickResult result = pickerCaptor.getValue().pickSubchannel(args); assertThat(result.getStatus().isOk()).isTrue(); @@ -476,6 +558,7 @@ public void skipFailingHosts_pickNextNonFailingHostInFirstTwoHosts() { verify(subchannels.get(Collections.singletonList(servers.get(1))), never()) .requestConnection(); // no excessive connection + reset(helper); deliverSubchannelState( subchannels.get(Collections.singletonList(servers.get(2))), ConnectivityStateInfo.forNonError(CONNECTING)); @@ -526,16 +609,18 @@ public void skipFailingHosts_firstTwoHostsFailed_pickNextFirstReady() { ConnectivityStateInfo.forTransientFailure( Status.PERMISSION_DENIED.withDescription("permission denied"))); verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); - verify(subchannels.get(Collections.singletonList(servers.get(1)))) - .requestConnection(); // LB attempts to recover by itself + verifyConnection(2); // LB attempts to recover by itself PickResult result = pickerCaptor.getValue().pickSubchannel(args); assertThat(result.getStatus().isOk()).isFalse(); // fail the RPC assertThat(result.getStatus().getCode()) .isEqualTo(Code.UNAVAILABLE); // with error status for the original server hit by hash assertThat(result.getStatus().getDescription()).isEqualTo("unreachable"); - verify(subchannels.get(Collections.singletonList(servers.get(1))), times(2)) - .requestConnection(); // kickoff connection to server3 (next first non-failing) + verify(subchannels.get(Collections.singletonList(servers.get(1)))) + .requestConnection(); // kickoff connection to server3 (next first non-failing) + // TODO: zivy@ + //verify(subchannels.get(Collections.singletonList(servers.get(0)))).requestConnection(); + //verify(subchannels.get(Collections.singletonList(servers.get(2)))).requestConnection(); // Now connecting to server1. deliverSubchannelState( @@ -591,6 +676,43 @@ public void allSubchannelsInTransientFailure() { .isEqualTo("[FakeSocketAddress-server0] unreachable"); } + @Test + public void stickyTransientFailure() { + // Map each server address to exactly one ring entry. + RingHashConfig config = new RingHashConfig(3, 3); + List servers = createWeightedServerAddrs(1, 1, 1); + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); + verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + + // Bring one subchannel to TRANSIENT_FAILURE. + Subchannel firstSubchannel = subchannels.get(Collections.singletonList(servers.get(0))); + deliverSubchannelState(firstSubchannel, + ConnectivityStateInfo.forTransientFailure( + Status.UNAVAILABLE.withDescription( + firstSubchannel.getAddresses().getAddresses() + " unreachable"))); + + verify(helper).updateBalancingState(eq(CONNECTING), any()); + verifyConnection(1); + deliverSubchannelState(firstSubchannel, ConnectivityStateInfo.forNonError(IDLE)); + verify(helper, times(2)).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + verifyConnection(1); + + // Picking subchannel triggers connection. RPC hash hits server0. + PickSubchannelArgs args = new PickSubchannelArgsImpl( + TestMethodDescriptors.voidMethod(), new Metadata(), + CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid())); + PickResult result = pickerCaptor.getValue().pickSubchannel(args); + assertThat(result.getStatus().isOk()).isTrue(); + // enabled me. there is a bug in picker behavior + // verify(subchannels.get(Collections.singletonList(servers.get(0)))).requestConnection(); + verify(subchannels.get(Collections.singletonList(servers.get(2)))).requestConnection(); + verify(subchannels.get(Collections.singletonList(servers.get(1))), never()) + .requestConnection(); + } + @Test public void hostSelectionProportionalToWeights() { RingHashConfig config = new RingHashConfig(10000, 100000); // large ring From 795a4311b06b2f16d2a8e2e735f30b9ddacbfc11 Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Mon, 18 Apr 2022 12:16:08 -0700 Subject: [PATCH 2/2] xds: fix ring-hash-picker behaviour (#9085) --- .../io/grpc/xds/RingHashLoadBalancer.java | 67 +++-- .../io/grpc/xds/RingHashLoadBalancerTest.java | 236 +++++++++++++++++- 2 files changed, 273 insertions(+), 30 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java index f4f8ee2e6ee..34d5e2efcbc 100644 --- a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java @@ -45,6 +45,7 @@ import java.util.Map; import java.util.Random; import java.util.Set; +import javax.annotation.Nullable; /** * A {@link LoadBalancer} that provides consistent hashing based load balancing to upstream hosts. @@ -399,10 +400,12 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { // Try finding a READY subchannel. Starting from the ring entry next to the RPC's hash. // If the one of the first two subchannels is not in TRANSIENT_FAILURE, return result // based on that subchannel. Otherwise, fail the pick unless a READY subchannel is found. - // Meanwhile, trigger connection for the first subchannel that is in IDLE if no subchannel - // before it is in CONNECTING or READY. - boolean hasPending = false; // true if having subchannel(s) in CONNECTING or IDLE - boolean canBuffer = true; // true if RPCs can be buffered with a pending subchannel + // Meanwhile, trigger connection for the channel and status: + // For the first subchannel that is in IDLE or TRANSIENT_FAILURE; + // And for the second subchannel that is in IDLE or TRANSIENT_FAILURE; + // And for each of the following subchannels that is in TRANSIENT_FAILURE or IDLE, + // stop until we find the first subchannel that is in CONNECTING or IDLE status. + boolean foundFirstNonFailed = false; // true if having subchannel(s) in CONNECTING or IDLE Subchannel firstSubchannel = null; Subchannel secondSubchannel = null; for (int i = 0; i < ring.size(); i++) { @@ -417,36 +420,50 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { // are failed unless there is a READY connection. if (firstSubchannel == null) { firstSubchannel = subchannel.subchannel; - } else if (subchannel.subchannel != firstSubchannel) { - if (secondSubchannel == null) { - secondSubchannel = subchannel.subchannel; - } else if (subchannel.subchannel != secondSubchannel) { - canBuffer = false; + PickResult maybeBuffer = pickSubchannelsNonReady(subchannel); + if (maybeBuffer != null) { + return maybeBuffer; } - } - if (subchannel.stateInfo.getState() == TRANSIENT_FAILURE) { - continue; - } - if (!hasPending) { // first non-failing subchannel - if (subchannel.stateInfo.getState() == IDLE) { - final Subchannel finalSubchannel = subchannel.subchannel; - syncContext.execute(new Runnable() { - @Override - public void run() { - finalSubchannel.requestConnection(); - } - }); + } else if (subchannel.subchannel != firstSubchannel && secondSubchannel == null) { + secondSubchannel = subchannel.subchannel; + PickResult maybeBuffer = pickSubchannelsNonReady(subchannel); + if (maybeBuffer != null) { + return maybeBuffer; } - if (canBuffer) { // done if this is the first or second two subchannel - return PickResult.withNoResult(); // queue the pick and re-process later + } else if (subchannel.subchannel != firstSubchannel + && subchannel.subchannel != secondSubchannel) { + if (!foundFirstNonFailed) { + pickSubchannelsNonReady(subchannel); + if (subchannel.stateInfo.getState() != TRANSIENT_FAILURE) { + foundFirstNonFailed = true; + } } - hasPending = true; } } // Fail the pick with error status of the original subchannel hit by hash. SubchannelView originalSubchannel = pickableSubchannels.get(ring.get(mid).addrKey); return PickResult.withError(originalSubchannel.stateInfo.getStatus()); } + + @Nullable + private PickResult pickSubchannelsNonReady(SubchannelView subchannel) { + if (subchannel.stateInfo.getState() == TRANSIENT_FAILURE + || subchannel.stateInfo.getState() == IDLE ) { + final Subchannel finalSubchannel = subchannel.subchannel; + syncContext.execute(new Runnable() { + @Override + public void run() { + finalSubchannel.requestConnection(); + } + }); + } + if (subchannel.stateInfo.getState() == CONNECTING + || subchannel.stateInfo.getState() == IDLE) { + return PickResult.withNoResult(); + } else { + return null; + } + } } /** diff --git a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java index 9edbe02f098..8b205e8ad85 100644 --- a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java @@ -618,9 +618,8 @@ public void skipFailingHosts_firstTwoHostsFailed_pickNextFirstReady() { assertThat(result.getStatus().getDescription()).isEqualTo("unreachable"); verify(subchannels.get(Collections.singletonList(servers.get(1)))) .requestConnection(); // kickoff connection to server3 (next first non-failing) - // TODO: zivy@ - //verify(subchannels.get(Collections.singletonList(servers.get(0)))).requestConnection(); - //verify(subchannels.get(Collections.singletonList(servers.get(2)))).requestConnection(); + verify(subchannels.get(Collections.singletonList(servers.get(0)))).requestConnection(); + verify(subchannels.get(Collections.singletonList(servers.get(2)))).requestConnection(); // Now connecting to server1. deliverSubchannelState( @@ -664,6 +663,7 @@ public void allSubchannelsInTransientFailure() { } verify(helper, atLeastOnce()) .updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + verifyConnection(3); // Picking subchannel triggers connection. RPC hash hits server0. PickSubchannelArgs args = new PickSubchannelArgsImpl( @@ -674,6 +674,233 @@ public void allSubchannelsInTransientFailure() { assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); assertThat(result.getStatus().getDescription()) .isEqualTo("[FakeSocketAddress-server0] unreachable"); + verify(subchannels.get(Collections.singletonList(servers.get(0)))) + .requestConnection(); + verify(subchannels.get(Collections.singletonList(servers.get(1)))) + .requestConnection(); + verify(subchannels.get(Collections.singletonList(servers.get(2)))) + .requestConnection(); + } + + @Test + public void firstSubchannelIdle() { + // Map each server address to exactly one ring entry. + RingHashConfig config = new RingHashConfig(3, 3); + List servers = createWeightedServerAddrs(1, 1, 1); + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); + verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + + deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(1))), + ConnectivityStateInfo.forTransientFailure( + Status.UNAVAILABLE.withDescription("unreachable"))); + verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + verifyConnection(1); + + // Picking subchannel triggers connection. RPC hash hits server0. + PickSubchannelArgs args = new PickSubchannelArgsImpl( + TestMethodDescriptors.voidMethod(), new Metadata(), + CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid())); + PickResult result = pickerCaptor.getValue().pickSubchannel(args); + assertThat(result.getStatus().isOk()).isTrue(); + verify(subchannels.get(Collections.singletonList(servers.get(0)))) + .requestConnection(); + verify(subchannels.get(Collections.singletonList(servers.get(1))), never()) + .requestConnection(); + verify(subchannels.get(Collections.singletonList(servers.get(2))), never()) + .requestConnection(); + } + + @Test + public void firstSubchannelConnecting() { + // Map each server address to exactly one ring entry. + RingHashConfig config = new RingHashConfig(3, 3); + List servers = createWeightedServerAddrs(1, 1, 1); + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); + verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + + deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(0))), + ConnectivityStateInfo.forNonError(CONNECTING)); + deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(1))), + ConnectivityStateInfo.forNonError(CONNECTING)); + verify(helper, times(2)).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + + // Picking subchannel triggers connection. + PickSubchannelArgs args = new PickSubchannelArgsImpl( + TestMethodDescriptors.voidMethod(), new Metadata(), + CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid())); + PickResult result = pickerCaptor.getValue().pickSubchannel(args); + assertThat(result.getStatus().isOk()).isTrue(); + verify(subchannels.get(Collections.singletonList(servers.get(0))), never()) + .requestConnection(); + verify(subchannels.get(Collections.singletonList(servers.get(1))), never()) + .requestConnection(); + verify(subchannels.get(Collections.singletonList(servers.get(2))), never()) + .requestConnection(); + } + + @Test + public void firstSubchannelFailure() { + // Map each server address to exactly one ring entry. + RingHashConfig config = new RingHashConfig(3, 3); + List servers = createWeightedServerAddrs(1, 1, 1); + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); + verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + // ring: + // "[FakeSocketAddress-server1]_0" + // "[FakeSocketAddress-server0]_0" + // "[FakeSocketAddress-server2]_0" + + deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(0))), + ConnectivityStateInfo.forTransientFailure( + Status.UNAVAILABLE.withDescription("unreachable"))); + verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + verifyConnection(1); + + // Picking subchannel triggers connection. + PickSubchannelArgs args = new PickSubchannelArgsImpl( + TestMethodDescriptors.voidMethod(), new Metadata(), + CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid())); + PickResult result = pickerCaptor.getValue().pickSubchannel(args); + assertThat(result.getStatus().isOk()).isTrue(); + verify(subchannels.get(Collections.singletonList(servers.get(0)))) + .requestConnection(); + verify(subchannels.get(Collections.singletonList(servers.get(2)))) + .requestConnection(); + verify(subchannels.get(Collections.singletonList(servers.get(1))), never()) + .requestConnection(); + } + + @Test + public void secondSubchannelConnecting() { + // Map each server address to exactly one ring entry. + RingHashConfig config = new RingHashConfig(3, 3); + List servers = createWeightedServerAddrs(1, 1, 1); + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); + verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + // ring: + // "[FakeSocketAddress-server1]_0" + // "[FakeSocketAddress-server0]_0" + // "[FakeSocketAddress-server2]_0" + + Subchannel firstSubchannel = subchannels.get(Collections.singletonList(servers.get(0))); + deliverSubchannelState(firstSubchannel, + ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE.withDescription( + firstSubchannel.getAddresses().getAddresses() + "unreachable"))); + deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(2))), + ConnectivityStateInfo.forNonError(CONNECTING)); + verify(helper, times(2)).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + verifyConnection(1); + + // Picking subchannel triggers connection. + PickSubchannelArgs args = new PickSubchannelArgsImpl( + TestMethodDescriptors.voidMethod(), new Metadata(), + CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid())); + PickResult result = pickerCaptor.getValue().pickSubchannel(args); + assertThat(result.getStatus().isOk()).isTrue(); + verify(subchannels.get(Collections.singletonList(servers.get(0)))) + .requestConnection(); + verify(subchannels.get(Collections.singletonList(servers.get(2))), never()) + .requestConnection(); + verify(subchannels.get(Collections.singletonList(servers.get(1))), never()) + .requestConnection(); + } + + @Test + public void secondSubchannelFailure() { + // Map each server address to exactly one ring entry. + RingHashConfig config = new RingHashConfig(3, 3); + List servers = createWeightedServerAddrs(1, 1, 1); + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); + verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + // ring: + // "[FakeSocketAddress-server1]_0" + // "[FakeSocketAddress-server0]_0" + // "[FakeSocketAddress-server2]_0" + + Subchannel firstSubchannel = subchannels.get(Collections.singletonList(servers.get(0))); + deliverSubchannelState(firstSubchannel, + ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE.withDescription( + firstSubchannel.getAddresses().getAddresses() + " unreachable"))); + deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(2))), + ConnectivityStateInfo.forTransientFailure( + Status.UNAVAILABLE.withDescription("unreachable"))); + verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + verifyConnection(2); + + // Picking subchannel triggers connection. + PickSubchannelArgs args = new PickSubchannelArgsImpl( + TestMethodDescriptors.voidMethod(), new Metadata(), + CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid())); + PickResult result = pickerCaptor.getValue().pickSubchannel(args); + assertThat(result.getStatus().isOk()).isFalse(); + assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(result.getStatus().getDescription()) + .isEqualTo("[FakeSocketAddress-server0] unreachable"); + verify(subchannels.get(Collections.singletonList(servers.get(0)))) + .requestConnection(); + verify(subchannels.get(Collections.singletonList(servers.get(2)))) + .requestConnection(); + verify(subchannels.get(Collections.singletonList(servers.get(1)))) + .requestConnection(); + } + + @Test + public void thirdSubchannelConnecting() { + // Map each server address to exactly one ring entry. + RingHashConfig config = new RingHashConfig(3, 3); + List servers = createWeightedServerAddrs(1, 1, 1); + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); + verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + // ring: + // "[FakeSocketAddress-server1]_0" + // "[FakeSocketAddress-server0]_0" + // "[FakeSocketAddress-server2]_0" + + Subchannel firstSubchannel = subchannels.get(Collections.singletonList(servers.get(0))); + deliverSubchannelState(firstSubchannel, + ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE.withDescription( + firstSubchannel.getAddresses().getAddresses() + " unreachable"))); + deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(2))), + ConnectivityStateInfo.forTransientFailure( + Status.UNAVAILABLE.withDescription("unreachable"))); + deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(1))), + ConnectivityStateInfo.forNonError(CONNECTING)); + verify(helper, times(2)).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + verifyConnection(3); + + // Picking subchannel triggers connection. + PickSubchannelArgs args = new PickSubchannelArgsImpl( + TestMethodDescriptors.voidMethod(), new Metadata(), + CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid())); + PickResult result = pickerCaptor.getValue().pickSubchannel(args); + assertThat(result.getStatus().isOk()).isFalse(); + assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(result.getStatus().getDescription()) + .isEqualTo("[FakeSocketAddress-server0] unreachable"); + verify(subchannels.get(Collections.singletonList(servers.get(0)))) + .requestConnection(); + verify(subchannels.get(Collections.singletonList(servers.get(2)))) + .requestConnection(); + verify(subchannels.get(Collections.singletonList(servers.get(1))), never()) + .requestConnection(); } @Test @@ -706,8 +933,7 @@ public void stickyTransientFailure() { CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid())); PickResult result = pickerCaptor.getValue().pickSubchannel(args); assertThat(result.getStatus().isOk()).isTrue(); - // enabled me. there is a bug in picker behavior - // verify(subchannels.get(Collections.singletonList(servers.get(0)))).requestConnection(); + verify(subchannels.get(Collections.singletonList(servers.get(0)))).requestConnection(); verify(subchannels.get(Collections.singletonList(servers.get(2)))).requestConnection(); verify(subchannels.get(Collections.singletonList(servers.get(1))), never()) .requestConnection();