Skip to content

Commit

Permalink
rls: Guarantee backoff will update RLS picker
Browse files Browse the repository at this point in the history
Previously, picker was likely null if entering backoff soon after
start-up. This prevented the picker from being updated and directing
queued RPCs to the fallback. It would work for new RPCs if RLS returned
extremely rapidly; both ManagedChannelImpl and DelayedClientTransport do
a pick before enqueuing so the ManagedChannelImpl pick could request
from RLS and DelayedClientTransport could use the response. So the test
uses a delay to purposefully avoid that unlikely-in-real-life case.

Creating a resolving OOB channel for InProcess doesn't actually change
the destination from the parent, because InProcess uses directaddress.
Thus the fakeRlsServiceImpl is now being added to the fake backend
server, because the same server is used for RLS within the test.

b/333185213
  • Loading branch information
ejona86 committed May 14, 2024
1 parent 80f872e commit 3158f91
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 15 deletions.
23 changes: 9 additions & 14 deletions rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,12 @@ public void accept(BatchRecorder recorder) {
logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient created");
}

void init() {
synchronized (lock) {
refCountedChildPolicyWrapperFactory.init();
}
}

/**
* Convert the status to UNAVAILBLE and enhance the error message.
* @param status status as provided by server
Expand Down Expand Up @@ -385,7 +391,7 @@ private void pendingRpcComplete(PendingCacheEntry entry) {
} catch (Exception e) {
createBackOffEntry(entry.request, Status.fromThrowable(e), entry.backoffPolicy);
// Cache updated. updateBalancingState() to reattempt picks
helper.propagateRlsError();
helper.triggerPendingRpcProcessing();
}
}
}
Expand Down Expand Up @@ -457,19 +463,8 @@ public void updateBalancingState(ConnectivityState newState, SubchannelPicker ne
super.updateBalancingState(newState, newPicker);
}

void propagateRlsError() {
getSynchronizationContext().execute(new Runnable() {
@Override
public void run() {
if (picker != null) {
// Refresh the channel state and let pending RPCs reprocess the picker.
updateBalancingState(state, picker);
}
}
});
}

void triggerPendingRpcProcessing() {
checkState(state != null, "updateBalancingState hasn't yet been called");
helper.getSynchronizationContext().execute(
() -> super.updateBalancingState(state, picker));
}
Expand Down Expand Up @@ -842,7 +837,7 @@ Builder setBackoffProvider(BackoffPolicy.Provider provider) {

CachingRlsLbClient build() {
CachingRlsLbClient client = new CachingRlsLbClient(this);
helper.updateBalancingState(ConnectivityState.CONNECTING, client.rlsPicker);
client.init();
return client;
}
}
Expand Down
4 changes: 4 additions & 0 deletions rls/src/main/java/io/grpc/rls/ChildLoadBalancerHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ static final class ChildLoadBalancerHelperProvider {
this.picker = checkNotNull(picker, "picker");
}

void init() {
helper.updateBalancingState(ConnectivityState.CONNECTING, picker);
}

ChildLoadBalancerHelper forTarget(String target) {
return new ChildLoadBalancerHelper(target, helper, subchannelStateManager, picker);
}
Expand Down
4 changes: 4 additions & 0 deletions rls/src/main/java/io/grpc/rls/LbPolicyConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ public RefCountedChildPolicyWrapperFactory(
this.childLbStatusListener = checkNotNull(childLbStatusListener, "childLbStatusListener");
}

void init() {
childLbHelperProvider.init();
}

ChildPolicyWrapper createOrGet(String target) {
// TODO(creamsoup) check if the target is valid or not
RefCountedChildPolicyWrapper pooledChildPolicyWrapper = childPolicyMap.get(target);
Expand Down
60 changes: 59 additions & 1 deletion rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ public void uncaughtException(Thread t, Throwable e) {
private final FakeHelper helperDelegate = new FakeHelper();
private final Helper helper =
mock(Helper.class, AdditionalAnswers.delegatesTo(helperDelegate));
private final FakeRlsServerImpl fakeRlsServerImpl = new FakeRlsServerImpl();
private final FakeRlsServerImpl fakeRlsServerImpl = new FakeRlsServerImpl(
fakeClock.getScheduledExecutorService());
private final Deque<FakeSubchannel> subchannels = new LinkedList<>();
private final FakeThrottler fakeThrottler = new FakeThrottler();
private final String channelTarget = "channelTarget";
Expand Down Expand Up @@ -296,6 +297,38 @@ public void lb_working_withDefaultTarget_rlsResponding() throws Exception {
verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "fail");
}

@Test
public void fallbackWithDelay_succeeds() throws Exception {
fakeRlsServerImpl.setResponseDelay(100, TimeUnit.MILLISECONDS);
grpcCleanupRule.register(
InProcessServerBuilder.forName("fake-bigtable.googleapis.com")
.addService(ServerServiceDefinition.builder("com.google")
.addMethod(fakeSearchMethod, (call, headers) -> {
call.sendHeaders(new Metadata());
call.sendMessage(null);
call.close(Status.OK, new Metadata());
return new ServerCall.Listener<Void>() {};
})
.build())
.addService(fakeRlsServerImpl)
.directExecutor()
.build()
.start());
ManagedChannel channel = grpcCleanupRule.register(
InProcessChannelBuilder.forName("fake-bigtable.googleapis.com")
.defaultServiceConfig(parseJson(getServiceConfigJsonStr()))
.directExecutor()
.build());

StreamRecorder<Void> recorder = StreamRecorder.create();
StreamObserver<Void> requestObserver = ClientCalls.asyncClientStreamingCall(
channel.newCall(fakeSearchMethod, CallOptions.DEFAULT), recorder);
requestObserver.onCompleted();
fakeClock.forwardTime(100, TimeUnit.MILLISECONDS);
assertThat(recorder.awaitCompletion(10, TimeUnit.SECONDS)).isTrue();
assertThat(recorder.getError()).isNull();
}

@Test
public void metricsWithRealChannel() throws Exception {
grpcCleanupRule.register(
Expand All @@ -308,6 +341,7 @@ public void metricsWithRealChannel() throws Exception {
return new ServerCall.Listener<Void>() {};
})
.build())
.addService(fakeRlsServerImpl)
.directExecutor()
.build()
.start());
Expand Down Expand Up @@ -761,17 +795,41 @@ private static final class FakeRlsServerImpl
private static final Converter<RouteLookupResponse, io.grpc.lookup.v1.RouteLookupResponse>
RESPONSE_CONVERTER = new RouteLookupResponseConverter().reverse();

private final ScheduledExecutorService scheduler;
private long delay;
private TimeUnit delayUnit;

public FakeRlsServerImpl(ScheduledExecutorService scheduler) {
this.scheduler = scheduler;
}

private Map<RouteLookupRequest, RouteLookupResponse> lookupTable = ImmutableMap.of();

private void setLookupTable(Map<RouteLookupRequest, RouteLookupResponse> lookupTable) {
this.lookupTable = checkNotNull(lookupTable, "lookupTable");
}

void setResponseDelay(long delay, TimeUnit unit) {
this.delay = delay;
this.delayUnit = unit;
}

@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void routeLookup(io.grpc.lookup.v1.RouteLookupRequest request,
StreamObserver<io.grpc.lookup.v1.RouteLookupResponse> responseObserver) {
RouteLookupResponse response =
lookupTable.get(REQUEST_CONVERTER.convert(request));
Runnable sendResponse = () -> sendResponse(response, responseObserver);
if (delay != 0) {
scheduler.schedule(sendResponse, delay, delayUnit);
} else {
sendResponse.run();
}
}

private void sendResponse(RouteLookupResponse response,
StreamObserver<io.grpc.lookup.v1.RouteLookupResponse> responseObserver) {
if (response == null) {
responseObserver.onError(new RuntimeException("not found"));
} else {
Expand Down

0 comments on commit 3158f91

Please sign in to comment.