Skip to content

Commit

Permalink
XdsClient Fallback working
Browse files Browse the repository at this point in the history
  • Loading branch information
larry-safran committed Aug 21, 2024
1 parent 25aa454 commit d8f7094
Show file tree
Hide file tree
Showing 24 changed files with 1,350 additions and 237 deletions.
9 changes: 8 additions & 1 deletion census/src/test/java/io/grpc/census/CensusModulesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import io.grpc.ClientInterceptors;
import io.grpc.ClientStreamTracer;
import io.grpc.Context;
import io.grpc.KnownLength;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
Expand Down Expand Up @@ -99,6 +100,7 @@
import io.opencensus.trace.Tracer;
import io.opencensus.trace.propagation.BinaryFormat;
import io.opencensus.trace.propagation.SpanContextParseException;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -136,7 +138,7 @@ public class CensusModulesTest {
ClientStreamTracer.StreamInfo.newBuilder()
.setCallOptions(CallOptions.DEFAULT.withOption(NAME_RESOLUTION_DELAYED, 10L)).build();

private static class StringInputStream extends InputStream {
private static class StringInputStream extends InputStream implements KnownLength {
final String string;

StringInputStream(String string) {
Expand All @@ -149,6 +151,11 @@ public int read() {
// passed to the InProcess server and consumed by MARSHALLER.parse().
throw new UnsupportedOperationException("Should not be called");
}

@Override
public int available() throws IOException {
return string == null ? 0 : string.length();
}
}

private static final MethodDescriptor.Marshaller<String> MARSHALLER =
Expand Down
42 changes: 30 additions & 12 deletions xds/src/main/java/io/grpc/xds/CsdsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import io.grpc.xds.client.XdsClient.ResourceMetadata.ResourceMetadataStatus;
import io.grpc.xds.client.XdsClient.ResourceMetadata.UpdateFailureState;
import io.grpc.xds.client.XdsResourceType;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -126,20 +126,32 @@ private boolean handleRequest(
Status.INVALID_ARGUMENT.withDescription("node_matchers not supported"));
} else {
List<String> targets = xdsClientPoolFactory.getTargets();
List<ClientConfig> clientConfigs = new ArrayList<>(targets.size());

for (int i = 0; i < targets.size() && error == null; i++) {
String target = targets.get(i);
try {
responseObserver.onNext(getConfigDumpForRequest(target, request));
ClientConfig clientConfig = getConfigForRequest(targets.get(i));
if (clientConfig != null) {
clientConfigs.add(clientConfig);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.log(Level.FINE, "Server interrupted while building CSDS config dump", e);
error = Status.ABORTED.withDescription("Thread interrupted").withCause(e).asException();
} catch (RuntimeException e) {
logger.log(Level.WARNING, "Unexpected error while building CSDS config dump", e);
error =
Status.INTERNAL.withDescription("Unexpected internal error").withCause(e).asException();
error = Status.INTERNAL.withDescription("Unexpected internal error").withCause(e)
.asException();
}
}

try {
responseObserver.onNext(getStatusResponse(clientConfigs));
} catch (RuntimeException e) {
logger.log(Level.WARNING, "Unexpected error while processing CSDS config dump", e);
error = Status.INTERNAL.withDescription("Unexpected internal error").withCause(e)
.asException();
}
}

if (error == null) {
Expand All @@ -149,29 +161,35 @@ private boolean handleRequest(
return false;
}

private ClientStatusResponse getConfigDumpForRequest(String target, ClientStatusRequest request)
throws InterruptedException {
private ClientConfig getConfigForRequest(String target) throws InterruptedException {
ObjectPool<XdsClient> xdsClientPool = xdsClientPoolFactory.get(target);
if (xdsClientPool == null) {
return ClientStatusResponse.getDefaultInstance();
return null;
}

XdsClient xdsClient = null;
try {
xdsClient = xdsClientPool.getObject();
return ClientStatusResponse.newBuilder()
.addConfig(getClientConfigForXdsClient(xdsClient))
.build();
return getClientConfigForXdsClient(xdsClient, target);
} finally {
if (xdsClient != null) {
xdsClientPool.returnObject(xdsClient);
}
}
}

private ClientStatusResponse getStatusResponse(List<ClientConfig> clientConfigs) {
if (clientConfigs.isEmpty()) {
return ClientStatusResponse.getDefaultInstance();
}
return ClientStatusResponse.newBuilder().addAllConfig(clientConfigs).build();
}

@VisibleForTesting
static ClientConfig getClientConfigForXdsClient(XdsClient xdsClient) throws InterruptedException {
static ClientConfig getClientConfigForXdsClient(XdsClient xdsClient, String target)
throws InterruptedException {
ClientConfig.Builder builder = ClientConfig.newBuilder()
.setClientScope(target)
.setNode(xdsClient.getBootstrapInfo().node().toEnvoyProtoNode());

Map<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByType =
Expand Down
7 changes: 7 additions & 0 deletions xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.grpc.CallOptions;
import io.grpc.ChannelCredentials;
import io.grpc.ClientCall;
import io.grpc.ConnectivityState;
import io.grpc.Context;
import io.grpc.Grpc;
import io.grpc.ManagedChannel;
Expand Down Expand Up @@ -84,6 +85,12 @@ public void shutdown() {
channel.shutdown();
}

@Override
public boolean isConnected() {
ConnectivityState state = channel.getState(false);
return state == ConnectivityState.READY;
}

private class XdsStreamingCall<ReqT, RespT> implements
XdsTransportFactory.StreamingCall<ReqT, RespT> {

Expand Down
41 changes: 26 additions & 15 deletions xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.sun.javafx.UnmodifiableArrayList;
import io.grpc.internal.ExponentialBackoffPolicy;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ObjectPool;
Expand All @@ -33,7 +32,6 @@
import io.grpc.xds.client.XdsClientImpl;
import io.grpc.xds.client.XdsInitializationException;
import io.grpc.xds.internal.security.TlsContextManagerImpl;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -57,8 +55,7 @@ final class SharedXdsClientPoolProvider implements XdsClientPoolFactory {
private final Bootstrapper bootstrapper;
private final Object lock = new Object();
private final AtomicReference<Map<String, ?>> bootstrapOverride = new AtomicReference<>();
// private volatile ObjectPool<XdsClient> xdsClientPool;
private Map<String, ObjectPool<XdsClient>> targetToXdsClientMap = new ConcurrentHashMap<>();
private final Map<String, ObjectPool<XdsClient>> targetToXdsClientMap = new ConcurrentHashMap<>();

SharedXdsClientPoolProvider() {
this(new GrpcBootstrapperImpl());
Expand Down Expand Up @@ -101,7 +98,7 @@ public ObjectPool<XdsClient> getOrCreate(String target) throws XdsInitialization
if (bootstrapInfo.servers().isEmpty()) {
throw new XdsInitializationException("No xDS server provided");
}
ref = new RefCountedXdsClientObjectPool(bootstrapInfo);
ref = new RefCountedXdsClientObjectPool(bootstrapInfo, target);
targetToXdsClientMap.put(target, ref);
}
}
Expand All @@ -122,7 +119,11 @@ private static class SharedXdsClientPoolProviderHolder {
@ThreadSafe
@VisibleForTesting
static class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {

private static final ExponentialBackoffPolicy.Provider BACKOFF_POLICY_PROVIDER =
new ExponentialBackoffPolicy.Provider();
private final BootstrapInfo bootstrapInfo;
private final String target; // The target associated with the xDS client.
private final Object lock = new Object();
@GuardedBy("lock")
private ScheduledExecutorService scheduler;
Expand All @@ -132,8 +133,9 @@ static class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {
private int refCount;

@VisibleForTesting
RefCountedXdsClientObjectPool(BootstrapInfo bootstrapInfo) {
RefCountedXdsClientObjectPool(BootstrapInfo bootstrapInfo, String target) {
this.bootstrapInfo = checkNotNull(bootstrapInfo);
this.target = target;
}

@Override
Expand All @@ -144,15 +146,19 @@ public XdsClient getObject() {
log.log(Level.INFO, "xDS node ID: {0}", bootstrapInfo.node().getId());
}
scheduler = SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE);
xdsClient = new XdsClientImpl(
DEFAULT_XDS_TRANSPORT_FACTORY,
bootstrapInfo,
scheduler,
new ExponentialBackoffPolicy.Provider(),
GrpcUtil.STOPWATCH_SUPPLIER,
TimeProvider.SYSTEM_TIME_PROVIDER,
MessagePrinter.INSTANCE,
new TlsContextManagerImpl(bootstrapInfo));
try {
xdsClient = new XdsClientImpl(
DEFAULT_XDS_TRANSPORT_FACTORY,
bootstrapInfo,
scheduler,
BACKOFF_POLICY_PROVIDER,
GrpcUtil.STOPWATCH_SUPPLIER,
TimeProvider.SYSTEM_TIME_PROVIDER,
MessagePrinter.INSTANCE,
new TlsContextManagerImpl(bootstrapInfo));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
refCount++;
return xdsClient;
Expand All @@ -179,5 +185,10 @@ XdsClient getXdsClientForTest() {
return xdsClient;
}
}

public String getTarget() {
return target;
}
}

}
5 changes: 5 additions & 0 deletions xds/src/main/java/io/grpc/xds/XdsListenerResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ protected LdsUpdate doParse(Args args, Message unpackedMessage)
}
}

@Override
public boolean isSharedName() {
return true;
}

private LdsUpdate processClientSideListener(Listener listener)
throws ResourceInvalidException {
// Unpack HttpConnectionManager from the Listener.
Expand Down
6 changes: 3 additions & 3 deletions xds/src/main/java/io/grpc/xds/XdsNameResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,9 @@ final class XdsNameResolver extends NameResolver {
ServiceConfigParser serviceConfigParser,
SynchronizationContext syncContext, ScheduledExecutorService scheduler,
@Nullable Map<String, ?> bootstrapOverride) {
this(targetUri, targetUri.getAuthority(), name, overrideAuthority, serviceConfigParser, syncContext, scheduler,
SharedXdsClientPoolProvider.getDefaultProvider(), ThreadSafeRandomImpl.instance,
FilterRegistry.getDefaultRegistry(), bootstrapOverride);
this(targetUri, targetUri.getAuthority(), name, overrideAuthority, serviceConfigParser,
syncContext, scheduler, SharedXdsClientPoolProvider.getDefaultProvider(),
ThreadSafeRandomImpl.instance, FilterRegistry.getDefaultRegistry(), bootstrapOverride);
}

@VisibleForTesting
Expand Down
2 changes: 1 addition & 1 deletion xds/src/main/java/io/grpc/xds/XdsServerWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public void run() {

private void internalStart() {
try {
xdsClientPool = xdsClientPoolFactory.getOrCreate();
xdsClientPool = xdsClientPoolFactory.getOrCreate("");
} catch (Exception e) {
StatusException statusException = Status.UNAVAILABLE.withDescription(
"Failed to initialize xDS").withCause(e).asException();
Expand Down
16 changes: 16 additions & 0 deletions xds/src/main/java/io/grpc/xds/client/BootstrapperImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
@Internal
public abstract class BootstrapperImpl extends Bootstrapper {

public static final String GRPC_EXPERIMENTAL_XDS_FALLBACK =
"GRPC_EXPERIMENTAL_XDS_FALLBACK";

// Client features.
@VisibleForTesting
public static final String CLIENT_FEATURE_DISABLE_OVERPROVISIONING =
Expand All @@ -59,11 +62,18 @@ protected BootstrapperImpl() {
logger = XdsLogger.withLogId(InternalLogId.allocate("bootstrapper", null));
}

// Delayed initialization of xdsFallbackEnabled to allow for flag initialization.
public static boolean isEnabledXdsFallback() {
return GrpcUtil.getFlag(GRPC_EXPERIMENTAL_XDS_FALLBACK, false);
}

protected abstract String getJsonContent() throws IOException, XdsInitializationException;

protected abstract Object getImplSpecificConfig(Map<String, ?> serverConfig, String serverUri)
throws XdsInitializationException;



/**
* Reads and parses bootstrap config. The config is expected to be in JSON format.
*/
Expand Down Expand Up @@ -102,6 +112,9 @@ protected BootstrapInfo.Builder bootstrapBuilder(Map<String, ?> rawData)
throw new XdsInitializationException("Invalid bootstrap: 'xds_servers' does not exist.");
}
List<ServerInfo> servers = parseServerInfos(rawServerConfigs, logger);
if (servers.size() > 1 && !isEnabledXdsFallback()) {
servers = ImmutableList.of(servers.get(0));
}
builder.servers(servers);

Node.Builder nodeBuilder = Node.newBuilder();
Expand Down Expand Up @@ -208,6 +221,9 @@ protected BootstrapInfo.Builder bootstrapBuilder(Map<String, ?> rawData)
if (rawAuthorityServers == null || rawAuthorityServers.isEmpty()) {
authorityServers = servers;
} else {
if (rawAuthorityServers.size() > 1 && !isEnabledXdsFallback()) {
rawAuthorityServers = ImmutableList.of(rawAuthorityServers.get(0));
}
authorityServers = parseServerInfos(rawAuthorityServers, logger);
}
authorityInfoMapBuilder.put(
Expand Down
Loading

0 comments on commit d8f7094

Please sign in to comment.