diff --git a/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java b/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java index f6b82775f1e..b5384616925 100644 --- a/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java +++ b/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java @@ -61,6 +61,8 @@ * the xDS RPC stream. */ final class AbstractXdsClient { + + public static final String CLOSED_BY_SERVER = "Closed by server"; private final SynchronizationContext syncContext; private final InternalLogId logId; private final XdsLogger logger; @@ -217,6 +219,11 @@ void readyHandler() { return; } + if (isInBackoff()) { + rpcRetryTimer.cancel(); + rpcRetryTimer = null; + } + timerLaunch.startSubscriberTimersIfNeeded(serverInfo); } @@ -315,21 +322,25 @@ final void handleRpcError(Throwable t) { } final void handleRpcCompleted() { - handleRpcStreamClosed(Status.UNAVAILABLE.withDescription("Closed by server")); + handleRpcStreamClosed(Status.UNAVAILABLE.withDescription(CLOSED_BY_SERVER)); } private void handleRpcStreamClosed(Status error) { - checkArgument(!error.isOk(), "unexpected OK status"); if (closed) { return; } + + checkArgument(!error.isOk(), "unexpected OK status"); + String errorMsg = error.getDescription() != null + && error.getDescription().equals(CLOSED_BY_SERVER) + ? "ADS stream closed with status {0}: {1}. Cause: {2}" + : "ADS stream failed with status {0}: {1}. Cause: {2}"; logger.log( - XdsLogLevel.ERROR, - "ADS stream closed with status {0}: {1}. Cause: {2}", - error.getCode(), error.getDescription(), error.getCause()); + XdsLogLevel.ERROR, errorMsg, error.getCode(), error.getDescription(), error.getCause()); closed = true; xdsResponseHandler.handleStreamClosed(error); cleanUp(); + if (responseReceived || retryBackoffPolicy == null) { // Reset the backoff sequence if had received a response, or backoff sequence // has never been initialized. @@ -423,7 +434,7 @@ public void run() { }); } }; - requestWriter = stub.withWaitForReady().streamAggregatedResources(responseReader); + requestWriter = stub.streamAggregatedResources(responseReader); } @Override diff --git a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java index e7ed64aea87..e67bff12871 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java @@ -514,11 +514,22 @@ private final class ResourceSubscriber { // Initialize metadata in UNKNOWN state to cover the case when resource subscriber, // is created but not yet requested because the client is in backoff. this.metadata = ResourceMetadata.newResourceMetadataUnknown(); - maybeCreateXdsChannelWithLrs(serverInfo); - this.xdsChannel = serverChannelMap.get(serverInfo); - if (xdsChannel.isInBackoff()) { + + AbstractXdsClient xdsChannelTemp = null; + try { + maybeCreateXdsChannelWithLrs(serverInfo); + xdsChannelTemp = serverChannelMap.get(serverInfo); + if (xdsChannelTemp.isInBackoff()) { + return; + } + } catch (IllegalArgumentException e) { + xdsChannelTemp = null; + this.errorDescription = "Bad configuration: " + e.getMessage(); return; + } finally { + this.xdsChannel = xdsChannelTemp; } + restartTimer(); } diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java index 3ee6c916c2e..0f18d3d387f 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java @@ -18,6 +18,7 @@ import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertWithMessage; +import static io.grpc.xds.XdsClientImpl.XdsChannelFactory.DEFAULT_XDS_CHANNEL_FACTORY; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.mock; @@ -70,6 +71,7 @@ import io.grpc.stub.StreamObserver; import io.grpc.testing.GrpcCleanupRule; import io.grpc.xds.Bootstrapper.AuthorityInfo; +import io.grpc.xds.Bootstrapper.BootstrapInfo; import io.grpc.xds.Bootstrapper.CertificateProviderInfo; import io.grpc.xds.Bootstrapper.ServerInfo; import io.grpc.xds.Endpoints.DropOverload; @@ -114,6 +116,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatchers; import org.mockito.Captor; import org.mockito.InOrder; import org.mockito.Mock; @@ -3226,7 +3229,8 @@ public void streamClosedAndRetryWithBackoff() { // Management server closes the RPC stream with an error. call.sendError(Status.UNKNOWN.asException()); - verify(ldsResourceWatcher).onError(errorCaptor.capture()); + verify(ldsResourceWatcher, Mockito.timeout(1000).times(1)) + .onError(errorCaptor.capture()); verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNKNOWN, ""); verify(rdsResourceWatcher).onError(errorCaptor.capture()); verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNKNOWN, ""); @@ -3336,7 +3340,8 @@ public void streamClosedAndRetryRaceWithAddRemoveWatchers() { RDS_RESOURCE, rdsResourceWatcher); DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); call.sendError(Status.UNAVAILABLE.asException()); - verify(ldsResourceWatcher).onError(errorCaptor.capture()); + verify(ldsResourceWatcher, Mockito.timeout(1000).times(1)) + .onError(errorCaptor.capture()); verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, ""); verify(rdsResourceWatcher).onError(errorCaptor.capture()); verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, ""); @@ -3573,13 +3578,18 @@ public void sendingToStoppedServer() throws Exception { .build() .start()); fakeClock.forwardTime(5, TimeUnit.SECONDS); + verify(ldsResourceWatcher, never()).onResourceDoesNotExist(LDS_RESOURCE); + fakeClock.forwardTime(20, TimeUnit.SECONDS); // Trigger rpcRetryTimer DiscoveryRpcCall call = resourceDiscoveryCalls.poll(3, TimeUnit.SECONDS); + if (call == null) { // The first rpcRetry may have happened before the channel was ready + fakeClock.forwardTime(50, TimeUnit.SECONDS); + call = resourceDiscoveryCalls.poll(3, TimeUnit.SECONDS); + } // NOTE: There is a ScheduledExecutorService that may get involved due to the reconnect // so you cannot rely on the logic being single threaded. The timeout() in verifyRequest // is therefore necessary to avoid flakiness. // Send a response and do verifications - verify(ldsResourceWatcher, never()).onResourceDoesNotExist(LDS_RESOURCE); call.sendResponse(LDS, mf.buildWrappedResource(testListenerVhosts), VERSION_1, "0001"); call.verifyRequest(LDS, LDS_RESOURCE, VERSION_1, "0001", NODE); verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture()); @@ -3592,6 +3602,66 @@ public void sendingToStoppedServer() throws Exception { } } + @Test + public void sendToBadUrl() throws Exception { + // Setup xdsClient to fail on stream creation + XdsClientImpl client = createXdsClient("some. garbage"); + + client.watchXdsResource(XdsListenerResource.getInstance(), LDS_RESOURCE, ldsResourceWatcher); + fakeClock.forwardTime(20, TimeUnit.SECONDS); + verify(ldsResourceWatcher, Mockito.timeout(5000).times(1)).onError(ArgumentMatchers.any()); + client.shutdown(); + } + + @Test + public void sendToNonexistentHost() throws Exception { + // Setup xdsClient to fail on stream creation + XdsClientImpl client = createXdsClient("some.garbage"); + client.watchXdsResource(XdsListenerResource.getInstance(), LDS_RESOURCE, ldsResourceWatcher); + fakeClock.forwardTime(20, TimeUnit.SECONDS); + + verify(ldsResourceWatcher, Mockito.timeout(5000).times(1)).onError(ArgumentMatchers.any()); + fakeClock.forwardTime(50, TimeUnit.SECONDS); // Trigger rpcRetry if appropriate + assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); + client.shutdown(); + } + + private XdsClientImpl createXdsClient(String serverUri) { + BootstrapInfo bootstrapInfo = buildBootStrap(serverUri); + return new XdsClientImpl( + DEFAULT_XDS_CHANNEL_FACTORY, + bootstrapInfo, + Context.ROOT, + fakeClock.getScheduledExecutorService(), + backoffPolicyProvider, + fakeClock.getStopwatchSupplier(), + timeProvider, + tlsContextManager); + } + + private BootstrapInfo buildBootStrap(String serverUri) { + + ServerInfo xdsServerInfo = ServerInfo.create(serverUri, CHANNEL_CREDENTIALS, + ignoreResourceDeletion()); + + return Bootstrapper.BootstrapInfo.builder() + .servers(Collections.singletonList(xdsServerInfo)) + .node(NODE) + .authorities(ImmutableMap.of( + "authority.xds.com", + AuthorityInfo.create( + "xdstp://authority.xds.com/envoy.config.listener.v3.Listener/%s", + ImmutableList.of(Bootstrapper.ServerInfo.create( + SERVER_URI_CUSTOME_AUTHORITY, CHANNEL_CREDENTIALS))), + "", + AuthorityInfo.create( + "xdstp:///envoy.config.listener.v3.Listener/%s", + ImmutableList.of(Bootstrapper.ServerInfo.create( + SERVER_URI_EMPTY_AUTHORITY, CHANNEL_CREDENTIALS))))) + .certProviders(ImmutableMap.of("cert-instance-name", + CertificateProviderInfo.create("file-watcher", ImmutableMap.of()))) + .build(); + } private DiscoveryRpcCall startResourceWatcher( XdsResourceType type, String name, ResourceWatcher watcher) {