Skip to content

Commit

Permalink
xds: support xdstp scheme in resource URIs for federation (#8716)
Browse files Browse the repository at this point in the history
Implement applying `server_listener_resource_name_template` and `client_listener_resource_name_template` with xdstp scheme, extracting authorities from xdstp resource URI and lookup authorities map in bootstrap.
  • Loading branch information
dapengzhang0 authored Nov 22, 2021
1 parent a5f1fb5 commit 5f3a5f8
Show file tree
Hide file tree
Showing 9 changed files with 295 additions and 34 deletions.
9 changes: 7 additions & 2 deletions xds/src/main/java/io/grpc/xds/Bootstrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.grpc.xds;

import static com.google.common.base.Preconditions.checkArgument;

import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
Expand All @@ -33,6 +35,8 @@
@Internal
public abstract class Bootstrapper {

static final String XDSTP_SCHEME = "xdstp:";

/**
* Returns system-loaded bootstrap configuration.
*/
Expand Down Expand Up @@ -104,12 +108,13 @@ abstract static class AuthorityInfo {
* <p>If the same server is listed in multiple authorities, the entries will be de-duped (i.e.,
* resources for both authorities will be fetched on the same ADS stream).
*
* <p>If empty, the top-level server list {@link BootstrapInfo#servers()} will be used.
* <p>Defaults to the top-level server list {@link BootstrapInfo#servers()}. Must not be empty.
*/
abstract ImmutableList<ServerInfo> xdsServers();

static AuthorityInfo create(
String clientListenerResourceNameTemplate, List<ServerInfo> xdsServers) {
checkArgument(!xdsServers.isEmpty(), "xdsServers must not be empty");
return new AutoValue_Bootstrapper_AuthorityInfo(
clientListenerResourceNameTemplate, ImmutableList.copyOf(xdsServers));
}
Expand All @@ -121,7 +126,7 @@ static AuthorityInfo create(
@AutoValue
@Internal
public abstract static class BootstrapInfo {
/** Returns the list of xDS servers to be connected to. */
/** Returns the list of xDS servers to be connected to. Must not be empty. */
abstract ImmutableList<ServerInfo> servers();

/** Returns the node identifier to be included in xDS requests. */
Expand Down
2 changes: 1 addition & 1 deletion xds/src/main/java/io/grpc/xds/BootstrapperImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ BootstrapInfo bootstrap(Map<String, ?> rawData) throws XdsInitializationExceptio
JsonUtil.getString(rawAuthority, "client_listener_resource_name_template");
logger.log(
XdsLogLevel.INFO, "client_listener_resource_name_template: {0}", clientListnerTemplate);
String prefix = "xdstp://" + authorityName + "/";
String prefix = XDSTP_SCHEME + "//" + authorityName + "/";
if (clientListnerTemplate == null) {
clientListnerTemplate = prefix + "envoy.config.listener.v3.Listener/%s";
} else if (!clientListnerTemplate.startsWith(prefix)) {
Expand Down
21 changes: 18 additions & 3 deletions xds/src/main/java/io/grpc/xds/ClientXdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.xds.Bootstrapper.XDSTP_SCHEME;

import com.github.udpa.udpa.type.v1.TypedStruct;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -70,6 +71,7 @@
import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.TimeProvider;
import io.grpc.xds.AbstractXdsClient.ResourceType;
import io.grpc.xds.Bootstrapper.AuthorityInfo;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.Endpoints.DropOverload;
import io.grpc.xds.Endpoints.LbEndpoint;
Expand Down Expand Up @@ -98,6 +100,7 @@
import io.grpc.xds.internal.Matchers.FractionMatcher;
import io.grpc.xds.internal.Matchers.HeaderMatcher;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -2267,8 +2270,12 @@ private final class ResourceSubscriber {
ResourceSubscriber(ResourceType type, String resource) {
syncContext.throwIfNotInThisSynchronizationContext();
this.type = type;
// TODO(zdapeng): Validate authority in resource URI for new-style resource name
// when parsing XDS response.
// TODO(zdapeng): Canonicalize the resource name by sorting the context params in normal
// lexicographic order.
this.resource = resource;
this.serverInfo = getServerInfo();
this.serverInfo = getServerInfo(resource);
// Initialize metadata in UNKNOWN state to cover the case when resource subscriber,
// is created but not yet requested because the client is in backoff.
this.metadata = ResourceMetadata.newResourceMetadataUnknown();
Expand All @@ -2280,8 +2287,16 @@ private final class ResourceSubscriber {
restartTimer();
}

// TODO(zdapeng): add resourceName arg and support xdstp:// resources
private ServerInfo getServerInfo() {
private ServerInfo getServerInfo(String resource) {
if (resource.startsWith(XDSTP_SCHEME)) {
URI uri = URI.create(resource);
String authority = uri.getAuthority();
if (authority == null) {
authority = "";
}
AuthorityInfo authorityInfo = bootstrapInfo.authorities().get(authority);
return authorityInfo.xdsServers().get(0);
}
return bootstrapInfo.servers().get(0); // use first server
}

Expand Down
62 changes: 49 additions & 13 deletions xds/src/main/java/io/grpc/xds/XdsNameResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.xds.Bootstrapper.XDSTP_SCHEME;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.google.common.net.UrlEscapers;
import com.google.gson.Gson;
import com.google.protobuf.util.Durations;
import io.grpc.Attributes;
Expand All @@ -45,6 +47,8 @@
import io.grpc.SynchronizationContext;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ObjectPool;
import io.grpc.xds.Bootstrapper.AuthorityInfo;
import io.grpc.xds.Bootstrapper.BootstrapInfo;
import io.grpc.xds.Filter.ClientInterceptorBuilder;
import io.grpc.xds.Filter.FilterConfig;
import io.grpc.xds.Filter.NamedFilterConfig;
Expand Down Expand Up @@ -101,7 +105,9 @@ final class XdsNameResolver extends NameResolver {

private final InternalLogId logId;
private final XdsLogger logger;
private final String authority;
@Nullable
private final String targetAuthority;
private final String serviceAuthority;
private final ServiceConfigParser serviceConfigParser;
private final SynchronizationContext syncContext;
private final ScheduledExecutorService scheduler;
Expand All @@ -120,20 +126,23 @@ final class XdsNameResolver extends NameResolver {
private CallCounterProvider callCounterProvider;
private ResolveState resolveState;

XdsNameResolver(String name, ServiceConfigParser serviceConfigParser,
XdsNameResolver(
@Nullable String targetAuthority, String name, ServiceConfigParser serviceConfigParser,
SynchronizationContext syncContext, ScheduledExecutorService scheduler,
@Nullable Map<String, ?> bootstrapOverride) {
this(name, serviceConfigParser, syncContext, scheduler,
this(targetAuthority, name, serviceConfigParser, syncContext, scheduler,
SharedXdsClientPoolProvider.getDefaultProvider(), ThreadSafeRandomImpl.instance,
FilterRegistry.getDefaultRegistry(), bootstrapOverride);
}

@VisibleForTesting
XdsNameResolver(String name, ServiceConfigParser serviceConfigParser,
XdsNameResolver(
@Nullable String targetAuthority, String name, ServiceConfigParser serviceConfigParser,
SynchronizationContext syncContext, ScheduledExecutorService scheduler,
XdsClientPoolFactory xdsClientPoolFactory, ThreadSafeRandom random,
FilterRegistry filterRegistry, @Nullable Map<String, ?> bootstrapOverride) {
authority = GrpcUtil.checkAuthority(checkNotNull(name, "name"));
this.targetAuthority = targetAuthority;
serviceAuthority = GrpcUtil.checkAuthority(checkNotNull(name, "name"));
this.serviceConfigParser = checkNotNull(serviceConfigParser, "serviceConfigParser");
this.syncContext = checkNotNull(syncContext, "syncContext");
this.scheduler = checkNotNull(scheduler, "scheduler");
Expand All @@ -149,7 +158,7 @@ final class XdsNameResolver extends NameResolver {

@Override
public String getServiceAuthority() {
return authority;
return serviceAuthority;
}

@Override
Expand All @@ -163,11 +172,33 @@ public void start(Listener2 listener) {
return;
}
xdsClient = xdsClientPool.getObject();
BootstrapInfo bootstrapInfo = xdsClient.getBootstrapInfo();
String listenerNameTemplate;
if (targetAuthority == null) {
listenerNameTemplate = bootstrapInfo.clientDefaultListenerResourceNameTemplate();
} else {
AuthorityInfo authorityInfo = bootstrapInfo.authorities().get(targetAuthority);
if (authorityInfo == null) {
listener.onError(Status.INVALID_ARGUMENT.withDescription(
"invalid target URI: target authority not found in the bootstrap"));
return;
}
listenerNameTemplate = authorityInfo.clientListenerResourceNameTemplate();
}
String replacement = serviceAuthority;
if (listenerNameTemplate.startsWith(XDSTP_SCHEME)) {
replacement = UrlEscapers.urlFragmentEscaper().escape(replacement);
}
String ldsResourceName = expandPercentS(listenerNameTemplate, replacement);
callCounterProvider = SharedCallCounterMap.getInstance();
resolveState = new ResolveState();
resolveState = new ResolveState(ldsResourceName);
resolveState.start();
}

private static String expandPercentS(String template, String replacement) {
return template.replace("%s", replacement);
}

@Override
public void shutdown() {
logger.log(XdsLogLevel.INFO, "Shutdown");
Expand Down Expand Up @@ -624,12 +655,17 @@ private class ResolveState implements LdsResourceWatcher {
.setServiceConfig(emptyServiceConfig)
// let channel take action for no config selector
.build();
private final String ldsResourceName;
private boolean stopped;
@Nullable
private Set<String> existingClusters; // clusters to which new requests can be routed
@Nullable
private RouteDiscoveryState routeDiscoveryState;

ResolveState(String ldsResourceName) {
this.ldsResourceName = ldsResourceName;
}

@Override
public void onChanged(final LdsUpdate update) {
syncContext.execute(new Runnable() {
Expand Down Expand Up @@ -686,23 +722,23 @@ public void run() {
}

private void start() {
logger.log(XdsLogLevel.INFO, "Start watching LDS resource {0}", authority);
xdsClient.watchLdsResource(authority, this);
logger.log(XdsLogLevel.INFO, "Start watching LDS resource {0}", ldsResourceName);
xdsClient.watchLdsResource(ldsResourceName, this);
}

private void stop() {
logger.log(XdsLogLevel.INFO, "Stop watching LDS resource {0}", authority);
logger.log(XdsLogLevel.INFO, "Stop watching LDS resource {0}", ldsResourceName);
stopped = true;
cleanUpRouteDiscoveryState();
xdsClient.cancelLdsResourceWatch(authority, this);
xdsClient.cancelLdsResourceWatch(ldsResourceName, this);
}

private void updateRoutes(List<VirtualHost> virtualHosts, long httpMaxStreamDurationNano,
@Nullable List<NamedFilterConfig> filterConfigs) {
VirtualHost virtualHost = findVirtualHostForHostName(virtualHosts, authority);
VirtualHost virtualHost = findVirtualHostForHostName(virtualHosts, ldsResourceName);
if (virtualHost == null) {
logger.log(XdsLogLevel.WARNING,
"Failed to find virtual host matching hostname {0}", authority);
"Failed to find virtual host matching hostname {0}", ldsResourceName);
cleanUpRoutes();
return;
}
Expand Down
7 changes: 4 additions & 3 deletions xds/src/main/java/io/grpc/xds/XdsNameResolverProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ public XdsNameResolver newNameResolver(URI targetUri, Args args) {
targetPath,
targetUri);
String name = targetPath.substring(1);
return new XdsNameResolver(name, args.getServiceConfigParser(),
args.getSynchronizationContext(), args.getScheduledExecutorService(),
bootstrapOverride);
return new XdsNameResolver(
targetUri.getAuthority(), name, args.getServiceConfigParser(),
args.getSynchronizationContext(), args.getScheduledExecutorService(),
bootstrapOverride);
}
return null;
}
Expand Down
8 changes: 7 additions & 1 deletion xds/src/main/java/io/grpc/xds/XdsServerWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static io.grpc.xds.Bootstrapper.XDSTP_SCHEME;

import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.net.UrlEscapers;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes;
import io.grpc.InternalServerInterceptors;
Expand Down Expand Up @@ -192,7 +194,11 @@ private void internalStart() {
xdsClient = xdsClientPool.returnObject(xdsClient);
return;
}
discoveryState = new DiscoveryState(listenerTemplate.replaceAll("%s", listenerAddress));
String replacement = listenerAddress;
if (listenerTemplate.startsWith(XDSTP_SCHEME)) {
replacement = UrlEscapers.urlFragmentEscaper().escape(replacement);
}
discoveryState = new DiscoveryState(listenerTemplate.replaceAll("%s", replacement));
}

@Override
Expand Down
Loading

0 comments on commit 5f3a5f8

Please sign in to comment.