Skip to content

Commit

Permalink
xds router performance optimization
Browse files Browse the repository at this point in the history
Signed-off-by: daizhenyu <[email protected]>
  • Loading branch information
daizhenyu committed Nov 26, 2024
1 parent 76b68a4 commit b62a479
Show file tree
Hide file tree
Showing 27 changed files with 289 additions and 183 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ public interface XdsLoadBalanceService {
*/
XdsLbPolicy getLbPolicyOfCluster(String clusterName);

/**
* get lb policy of cluster
*
* @param serviceName service name
* @param clusterName cluster name
* @return route rules
*/
XdsLbPolicy getLbPolicyOfCluster(String serviceName, String clusterName);

/**
* get lb policy of service (base cluster)
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,13 @@ public interface XdsRouteService {
* @return route rules
*/
boolean isLocalityRoute(String clusterName);

/**
* get lb policy of cluster
*
* @param serviceName service name
* @param clusterName cluster name
* @return route rules
*/
boolean isLocalityRoute(String serviceName, String clusterName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ public interface XdsServiceDiscovery {
*/
Optional<XdsClusterLoadAssigment> getClusterServiceInstance(String clusterName);

/**
* get service instance of service cluster
*
* @param serviceName service name
* @param clusterName cluster name
* @return XdsClusterInstance
*/
Optional<XdsClusterLoadAssigment> getClusterServiceInstance(String serviceName, String clusterName);

/**
* subscribe service instance without tag by service name, the listener will be triggered when the service instance
* changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,12 @@ public Optional<XdsClusterLoadAssigment> getClusterServiceInstance(String cluste
if (!serviceNameOptional.isPresent()) {
return Optional.empty();
}

String serviceName = serviceNameOptional.get();
return getClusterServiceInstance(serviceName, clusterName);
}

@Override
public Optional<XdsClusterLoadAssigment> getClusterServiceInstance(String serviceName, String clusterName) {
// first check the cache and return if service instance exists
if (XdsDataCache.isContainsRequestObserver(serviceName)) {
return XdsDataCache.getClusterServiceInstance(serviceName, clusterName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ public XdsLbPolicy getLbPolicyOfCluster(String clusterName) {
return XdsLbPolicy.UNRECOGNIZED;
}

@Override
public XdsLbPolicy getLbPolicyOfCluster(String serviceName, String clusterName) {
return XdsDataCache.getLbPolicyOfCluster(serviceName, clusterName);
}

@Override
public XdsLbPolicy getBaseLbPolicyOfService(String serviceName) {
return XdsDataCache.getBaseLbPolicyOfService(serviceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,9 @@ public boolean isLocalityRoute(String clusterName) {
.map(serviceName -> XdsDataCache.isLocalityLb(serviceName, clusterName))
.orElse(false);
}

@Override
public boolean isLocalityRoute(String serviceName, String clusterName) {
return XdsDataCache.isLocalityLb(serviceName, clusterName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ spec:
value: "true"
- name: router_plugin_enabled_xds_route
value: "true"
- name: router_plugin_enabled_simple_http_route
value: "true"
- name: xds_service_discovery_enabled
value: "false"
- name: JAVA_TOOL_OPTIONS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ spec:
value: "true"
- name: router_plugin_enabled_xds_route
value: "true"
- name: router_plugin_enabled_simple_http_route
value: "true"
- name: ZOOKEEPER_IP
value: "zookeeper.default.svc.cluster.local"
- name: JAVA_TOOL_OPTIONS
Expand Down
2 changes: 2 additions & 0 deletions sermant-plugins/sermant-router/config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ router.plugin:
enabled-registry-plugin-adaptation: false
# whether to use xds route
enabled-xds-route: false
# whether to use xds route for httpclient, httpasyncclient, okhttp, httpurlconnection
enabled-simple-http-xds-route: false
# whether to use secure protocol to invoke spring cloud downstream service with xds route, example: http or https
enabled-springcloud-xds-route-secure: false
# Whether to use request information for routing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ public class RouterConfig implements PluginConfig {
@ConfigFieldKey("enabled-xds-route")
private boolean enabledXdsRoute;

/**
* whether to use xds route for httpclient, httpasyncclient, okhttp, httpurlconnection
*/
@ConfigFieldKey("enabled-simple-http-xds-route")
private boolean enabledSimpleHttpXdsRoute;

/**
* whether to use secure protocol to invoke downstream service with xds route, example: http or https
*/
Expand Down Expand Up @@ -224,6 +230,14 @@ public void setEnabledXdsRoute(boolean enabledXdsRoute) {
this.enabledXdsRoute = enabledXdsRoute;
}

public boolean isEnabledSimpleHttpXdsRoute() {
return enabledSimpleHttpXdsRoute;
}

public void setEnabledSimpleHttpXdsRoute(boolean enabledSimpleHttpXdsRoute) {
this.enabledSimpleHttpXdsRoute = enabledSimpleHttpXdsRoute;
}

public boolean isEnabledSpringCloudXdsRouteSecure() {
return enabledSpringCloudXdsRouteSecure;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ public class MetricsManager {
static {
try {
metricService = ServiceManager.getService(MetricService.class);
TAG_KEY_MAP.put("service","service_meta_service");
TAG_KEY_MAP.put("version","service_meta_version");
TAG_KEY_MAP.put("application","service_meta_application");
TAG_KEY_MAP.put("zone","service_meta_zone");
TAG_KEY_MAP.put("project","service_meta_project");
TAG_KEY_MAP.put("environment","service_meta_environment");
TAG_KEY_MAP.put("parameters","service_meta_parameters");
TAG_KEY_MAP.put("service", "service_meta_service");
TAG_KEY_MAP.put("version", "service_meta_version");
TAG_KEY_MAP.put("application", "service_meta_application");
TAG_KEY_MAP.put("zone", "service_meta_zone");
TAG_KEY_MAP.put("project", "service_meta_project");
TAG_KEY_MAP.put("environment", "service_meta_environment");
TAG_KEY_MAP.put("parameters", "service_meta_parameters");
} catch (IllegalArgumentException e) {
LOGGER.log(Level.SEVERE, "Failed to load metrics service", e);
}
Expand Down Expand Up @@ -92,7 +92,7 @@ public static void addOrUpdateCounterMetricValue(String metricName, Map<String,
} else {
tagsMap = new HashMap<>();
}
tagsMap.put(RouterConstant.SCOPE,"service-router");
tagsMap.put(RouterConstant.SCOPE, "service-router");
Counter counter = COUNT_MAP.computeIfAbsent(new MetricInfo(metricName, tagsMap),
metricInfo -> metricService.counter(metricName, Tags.of(tagsMap)));
counter.increment(value);
Expand Down Expand Up @@ -130,6 +130,27 @@ private static void collectRequestCountMetric(String address) {
addOrUpdateCounterMetricValue(RouterConstant.ROUTER_REQUEST_COUNT, tagsMap, 1);
}

/**
* collect xDS router destination tag count metric
*
* @param cluster cluster name
*/
public static void collectXdsRouterDestinationTagCountMetric(String cluster) {
if (!ROUTER_CONFIG.isEnableMetric()) {
return;
}
Map<String, String> tagsMap = new HashMap<>();
MetricsManager.getAllTagKey().forEach(key -> tagsMap.put(key, StringUtils.EMPTY));
tagsMap.put(RouterConstant.SERVICE_META_PARAMETERS, "cluster: " + cluster);
if (StringUtils.isEmpty(DubboCache.INSTANCE.getAppName())) {
tagsMap.put(RouterConstant.CLIENT_SERVICE_NAME, AppCache.INSTANCE.getAppName());
} else {
tagsMap.put(RouterConstant.CLIENT_SERVICE_NAME, DubboCache.INSTANCE.getAppName());
}
tagsMap.put(RouterConstant.PROTOCOL, RouterConstant.XDS_PROTOCOL);
MetricsManager.addOrUpdateCounterMetricValue(RouterConstant.ROUTER_DESTINATION_TAG_COUNT, tagsMap, 1);
}

/**
* Get the key of the metric tag
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class XdsRouterUtils {
*/
private static XdsLocality selfServiceLocality;

private static volatile boolean localityObtainedFlag = false;

private XdsRouterUtils() {
}

Expand All @@ -54,13 +56,14 @@ private XdsRouterUtils() {
* @return XdsLocality
*/
public static Optional<XdsLocality> getLocalityInfoOfSelfService() {
if (selfServiceLocality != null) {
return Optional.of(selfServiceLocality);
if (localityObtainedFlag) {
return Optional.ofNullable(selfServiceLocality);
}
synchronized (XdsRouterUtils.class) {
if (selfServiceLocality != null) {
return Optional.of(selfServiceLocality);
if (localityObtainedFlag) {
return Optional.ofNullable(selfServiceLocality);
}
localityObtainedFlag = true;
String podIp = NetworkUtils.getKubernetesPodIp();
if (StringUtils.isEmpty(podIp)) {
return Optional.empty();
Expand Down Expand Up @@ -88,6 +91,15 @@ public static void updateServiceDiscovery(XdsServiceDiscovery xdsServiceDiscover
}
}

/**
* update localityObtainedFlag
*
* @param flag locality obtained flag
*/
public static void updateLocalityObtainedFlag(boolean flag) {
localityObtainedFlag = flag;
}

private static Optional<ServiceInstance> getMatchedServiceInstanceByPodIp(Set<ServiceInstance> serviceInstances,
String podIp) {
return serviceInstances.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,14 @@
import io.sermant.core.service.xds.entity.XdsRouteMatch;
import io.sermant.core.utils.CollectionUtils;
import io.sermant.core.utils.StringUtils;
import io.sermant.router.common.cache.AppCache;
import io.sermant.router.common.cache.DubboCache;
import io.sermant.router.common.constants.RouterConstant;
import io.sermant.router.common.metric.MetricsManager;
import io.sermant.router.common.utils.XdsRouterUtils;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
Expand Down Expand Up @@ -153,26 +151,17 @@ private Set<ServiceInstance> handleXdsRoute(XdsRoute route, String serviceName)
if (routeAction.isWeighted()) {
cluster = selectClusterByWeight(routeAction.getWeightedClusters());
}
Map<String, String> tagsMap = new HashMap<>();
MetricsManager.getAllTagKey().forEach(key -> tagsMap.put(key, StringUtils.EMPTY));
tagsMap.put(RouterConstant.SERVICE_META_PARAMETERS, "cluster: " + cluster);
if (StringUtils.isEmpty(DubboCache.INSTANCE.getAppName())) {
tagsMap.put(RouterConstant.CLIENT_SERVICE_NAME, AppCache.INSTANCE.getAppName());
} else {
tagsMap.put(RouterConstant.CLIENT_SERVICE_NAME, DubboCache.INSTANCE.getAppName());
}
tagsMap.put(RouterConstant.PROTOCOL, RouterConstant.XDS_PROTOCOL);
MetricsManager.addOrUpdateCounterMetricValue(RouterConstant.ROUTER_DESTINATION_TAG_COUNT, tagsMap, 1);
MetricsManager.collectXdsRouterDestinationTagCountMetric(cluster);

// get service instance of cluster
Optional<XdsClusterLoadAssigment> loadAssigmentOptional =
serviceDiscovery.getClusterServiceInstance(cluster);
serviceDiscovery.getClusterServiceInstance(serviceName, cluster);
if (!loadAssigmentOptional.isPresent()) {
return serviceDiscovery.getServiceInstance(serviceName);
}
XdsClusterLoadAssigment clusterLoadAssigment = loadAssigmentOptional.get();

if (!routeService.isLocalityRoute(clusterLoadAssigment.getClusterName())) {
if (!routeService.isLocalityRoute(serviceName, clusterLoadAssigment.getClusterName())) {
Set<ServiceInstance> serviceInstances = getServiceInstanceOfCluster(clusterLoadAssigment);
return serviceInstances.isEmpty() ? serviceDiscovery.getServiceInstance(serviceName) : serviceInstances;
}
Expand All @@ -194,24 +183,47 @@ private Set<ServiceInstance> handleXdsRoute(XdsRoute route, String serviceName)
private Set<ServiceInstance> getServiceInstanceOfLocalityCluster(XdsClusterLoadAssigment clusterLoadAssigment,
XdsLocality locality) {
return clusterLoadAssigment.getLocalityInstances().entrySet().stream()
.filter(xdsLocalitySetEntry -> xdsLocalitySetEntry.getKey().equals(locality))
.filter(xdsLocalitySetEntry -> isSameLocality(locality, xdsLocalitySetEntry.getKey()))
.flatMap(xdsLocalitySetEntry -> xdsLocalitySetEntry.getValue().stream())
.collect(Collectors.toSet());
}

private boolean isSameLocality(XdsLocality selfLocality, XdsLocality serviceLocality) {
if (!selfLocality.getRegion().equals(serviceLocality.getRegion())) {
return false;
}
if (StringUtils.isEmpty(selfLocality.getZone())) {
return true;
}
if (!selfLocality.getZone().equals(serviceLocality.getZone())) {
return false;
}
if (StringUtils.isEmpty(selfLocality.getSubZone())) {
return true;
}
return selfLocality.getSubZone().equals(serviceLocality.getSubZone());
}

private Set<ServiceInstance> getServiceInstanceOfCluster(XdsClusterLoadAssigment clusterLoadAssigment) {
return clusterLoadAssigment.getLocalityInstances().entrySet().stream()
.flatMap(instanceEntry -> instanceEntry.getValue().stream())
.collect(Collectors.toSet());
Set<ServiceInstance> serviceInstances = new HashSet<>();
for (Entry<XdsLocality, Set<ServiceInstance>> xdsLocalitySetEntry : clusterLoadAssigment.getLocalityInstances()
.entrySet()) {
serviceInstances.addAll(xdsLocalitySetEntry.getValue());
}
return serviceInstances;
}

private boolean isPathMatched(XdsPathMatcher matcher, String path) {
return matcher.isMatch(path);
}

private boolean isHeadersMatched(List<XdsHeaderMatcher> matchers, Map<String, String> headers) {
return matchers.stream()
.allMatch(xdsHeaderMatcher -> xdsHeaderMatcher.isMatch(headers));
for (XdsHeaderMatcher matcher : matchers) {
if (!matcher.isMatch(headers)) {
return false;
}
}
return true;
}

private String selectClusterByWeight(XdsWeightedClusters weightedClusters) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,16 @@ private static XdsLoadBalancer getRandomLoadBalancer() {
/**
* getLoadBalancer
*
* @param serviceName service name
* @param clusterName cluster name
* @return XdsLoadBalancer
*/
public static XdsLoadBalancer getLoadBalancer(String clusterName) {
public static XdsLoadBalancer getLoadBalancer(String serviceName, String clusterName) {
if (loadBalanceService == null) {
LOGGER.severe("xDS service not open for xDS routing.");
return getRoundRobinLoadBalancer(clusterName);
}
XdsLbPolicy lbPolicy = loadBalanceService.getLbPolicyOfCluster(clusterName);
XdsLbPolicy lbPolicy = loadBalanceService.getLbPolicyOfCluster(serviceName, clusterName);
switch (lbPolicy) {
case RANDOM:
return getRandomLoadBalancer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public void testGetLocalityInfoOfSelfService() {

// find matched service instance
meta.setService("serviceA");
XdsRouterUtils.updateLocalityObtainedFlag(false);
localityInfo = XdsRouterUtils.getLocalityInfoOfSelfService();
Assert.assertTrue(localityInfo.isPresent());
Assert.assertEquals("127.0.0.1", localityInfo.get().getRegion());
Expand Down
Loading

0 comments on commit b62a479

Please sign in to comment.