From 3158f91e7538424df0c1c2961fed507e4bf864f6 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Sat, 11 May 2024 10:10:02 -0700 Subject: [PATCH] rls: Guarantee backoff will update RLS picker Previously, picker was likely null if entering backoff soon after start-up. This prevented the picker from being updated and directing queued RPCs to the fallback. It would work for new RPCs if RLS returned extremely rapidly; both ManagedChannelImpl and DelayedClientTransport do a pick before enqueuing so the ManagedChannelImpl pick could request from RLS and DelayedClientTransport could use the response. So the test uses a delay to purposefully avoid that unlikely-in-real-life case. Creating a resolving OOB channel for InProcess doesn't actually change the destination from the parent, because InProcess uses directaddress. Thus the fakeRlsServiceImpl is now being added to the fake backend server, because the same server is used for RLS within the test. b/333185213 --- .../java/io/grpc/rls/CachingRlsLbClient.java | 23 +++---- .../io/grpc/rls/ChildLoadBalancerHelper.java | 4 ++ .../io/grpc/rls/LbPolicyConfiguration.java | 4 ++ .../java/io/grpc/rls/RlsLoadBalancerTest.java | 60 ++++++++++++++++++- 4 files changed, 76 insertions(+), 15 deletions(-) diff --git a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java index fcfd5933c8e..2c94e9e0a66 100644 --- a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java +++ b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java @@ -248,6 +248,12 @@ public void accept(BatchRecorder recorder) { logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient created"); } + void init() { + synchronized (lock) { + refCountedChildPolicyWrapperFactory.init(); + } + } + /** * Convert the status to UNAVAILBLE and enhance the error message. * @param status status as provided by server @@ -385,7 +391,7 @@ private void pendingRpcComplete(PendingCacheEntry entry) { } catch (Exception e) { createBackOffEntry(entry.request, Status.fromThrowable(e), entry.backoffPolicy); // Cache updated. updateBalancingState() to reattempt picks - helper.propagateRlsError(); + helper.triggerPendingRpcProcessing(); } } } @@ -457,19 +463,8 @@ public void updateBalancingState(ConnectivityState newState, SubchannelPicker ne super.updateBalancingState(newState, newPicker); } - void propagateRlsError() { - getSynchronizationContext().execute(new Runnable() { - @Override - public void run() { - if (picker != null) { - // Refresh the channel state and let pending RPCs reprocess the picker. - updateBalancingState(state, picker); - } - } - }); - } - void triggerPendingRpcProcessing() { + checkState(state != null, "updateBalancingState hasn't yet been called"); helper.getSynchronizationContext().execute( () -> super.updateBalancingState(state, picker)); } @@ -842,7 +837,7 @@ Builder setBackoffProvider(BackoffPolicy.Provider provider) { CachingRlsLbClient build() { CachingRlsLbClient client = new CachingRlsLbClient(this); - helper.updateBalancingState(ConnectivityState.CONNECTING, client.rlsPicker); + client.init(); return client; } } diff --git a/rls/src/main/java/io/grpc/rls/ChildLoadBalancerHelper.java b/rls/src/main/java/io/grpc/rls/ChildLoadBalancerHelper.java index 3131aba7551..7a5d5dcc645 100644 --- a/rls/src/main/java/io/grpc/rls/ChildLoadBalancerHelper.java +++ b/rls/src/main/java/io/grpc/rls/ChildLoadBalancerHelper.java @@ -77,6 +77,10 @@ static final class ChildLoadBalancerHelperProvider { this.picker = checkNotNull(picker, "picker"); } + void init() { + helper.updateBalancingState(ConnectivityState.CONNECTING, picker); + } + ChildLoadBalancerHelper forTarget(String target) { return new ChildLoadBalancerHelper(target, helper, subchannelStateManager, picker); } diff --git a/rls/src/main/java/io/grpc/rls/LbPolicyConfiguration.java b/rls/src/main/java/io/grpc/rls/LbPolicyConfiguration.java index be067732bac..4d6ceed9235 100644 --- a/rls/src/main/java/io/grpc/rls/LbPolicyConfiguration.java +++ b/rls/src/main/java/io/grpc/rls/LbPolicyConfiguration.java @@ -225,6 +225,10 @@ public RefCountedChildPolicyWrapperFactory( this.childLbStatusListener = checkNotNull(childLbStatusListener, "childLbStatusListener"); } + void init() { + childLbHelperProvider.init(); + } + ChildPolicyWrapper createOrGet(String target) { // TODO(creamsoup) check if the target is valid or not RefCountedChildPolicyWrapper pooledChildPolicyWrapper = childPolicyMap.get(target); diff --git a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java index ce2926919ba..f3986cb89d5 100644 --- a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java +++ b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java @@ -134,7 +134,8 @@ public void uncaughtException(Thread t, Throwable e) { private final FakeHelper helperDelegate = new FakeHelper(); private final Helper helper = mock(Helper.class, AdditionalAnswers.delegatesTo(helperDelegate)); - private final FakeRlsServerImpl fakeRlsServerImpl = new FakeRlsServerImpl(); + private final FakeRlsServerImpl fakeRlsServerImpl = new FakeRlsServerImpl( + fakeClock.getScheduledExecutorService()); private final Deque subchannels = new LinkedList<>(); private final FakeThrottler fakeThrottler = new FakeThrottler(); private final String channelTarget = "channelTarget"; @@ -296,6 +297,38 @@ public void lb_working_withDefaultTarget_rlsResponding() throws Exception { verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "fail"); } + @Test + public void fallbackWithDelay_succeeds() throws Exception { + fakeRlsServerImpl.setResponseDelay(100, TimeUnit.MILLISECONDS); + grpcCleanupRule.register( + InProcessServerBuilder.forName("fake-bigtable.googleapis.com") + .addService(ServerServiceDefinition.builder("com.google") + .addMethod(fakeSearchMethod, (call, headers) -> { + call.sendHeaders(new Metadata()); + call.sendMessage(null); + call.close(Status.OK, new Metadata()); + return new ServerCall.Listener() {}; + }) + .build()) + .addService(fakeRlsServerImpl) + .directExecutor() + .build() + .start()); + ManagedChannel channel = grpcCleanupRule.register( + InProcessChannelBuilder.forName("fake-bigtable.googleapis.com") + .defaultServiceConfig(parseJson(getServiceConfigJsonStr())) + .directExecutor() + .build()); + + StreamRecorder recorder = StreamRecorder.create(); + StreamObserver requestObserver = ClientCalls.asyncClientStreamingCall( + channel.newCall(fakeSearchMethod, CallOptions.DEFAULT), recorder); + requestObserver.onCompleted(); + fakeClock.forwardTime(100, TimeUnit.MILLISECONDS); + assertThat(recorder.awaitCompletion(10, TimeUnit.SECONDS)).isTrue(); + assertThat(recorder.getError()).isNull(); + } + @Test public void metricsWithRealChannel() throws Exception { grpcCleanupRule.register( @@ -308,6 +341,7 @@ public void metricsWithRealChannel() throws Exception { return new ServerCall.Listener() {}; }) .build()) + .addService(fakeRlsServerImpl) .directExecutor() .build() .start()); @@ -761,17 +795,41 @@ private static final class FakeRlsServerImpl private static final Converter RESPONSE_CONVERTER = new RouteLookupResponseConverter().reverse(); + private final ScheduledExecutorService scheduler; + private long delay; + private TimeUnit delayUnit; + + public FakeRlsServerImpl(ScheduledExecutorService scheduler) { + this.scheduler = scheduler; + } + private Map lookupTable = ImmutableMap.of(); private void setLookupTable(Map lookupTable) { this.lookupTable = checkNotNull(lookupTable, "lookupTable"); } + void setResponseDelay(long delay, TimeUnit unit) { + this.delay = delay; + this.delayUnit = unit; + } + @Override + @SuppressWarnings("FutureReturnValueIgnored") public void routeLookup(io.grpc.lookup.v1.RouteLookupRequest request, StreamObserver responseObserver) { RouteLookupResponse response = lookupTable.get(REQUEST_CONVERTER.convert(request)); + Runnable sendResponse = () -> sendResponse(response, responseObserver); + if (delay != 0) { + scheduler.schedule(sendResponse, delay, delayUnit); + } else { + sendResponse.run(); + } + } + + private void sendResponse(RouteLookupResponse response, + StreamObserver responseObserver) { if (response == null) { responseObserver.onError(new RuntimeException("not found")); } else {