Skip to content

Commit

Permalink
xds router performance optimization
Browse files Browse the repository at this point in the history
Signed-off-by: daizhenyu <[email protected]>
  • Loading branch information
daizhenyu committed Nov 28, 2024
1 parent 76b68a4 commit 988ab2d
Show file tree
Hide file tree
Showing 23 changed files with 272 additions and 192 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ public interface XdsLoadBalanceService {
*/
XdsLbPolicy getLbPolicyOfCluster(String clusterName);

/**
* get lb policy of cluster
*
* @param serviceName service name
* @param clusterName cluster name
* @return route rules
*/
XdsLbPolicy getLbPolicyOfCluster(String serviceName, String clusterName);

/**
* get lb policy of service (base cluster)
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,13 @@ public interface XdsRouteService {
* @return route rules
*/
boolean isLocalityRoute(String clusterName);

/**
* get lb policy of cluster
*
* @param serviceName service name
* @param clusterName cluster name
* @return route rules
*/
boolean isLocalityRoute(String serviceName, String clusterName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ public interface XdsServiceDiscovery {
*/
Optional<XdsClusterLoadAssigment> getClusterServiceInstance(String clusterName);

/**
* get service instance of service cluster
*
* @param serviceName service name
* @param clusterName cluster name
* @return XdsClusterInstance
*/
Optional<XdsClusterLoadAssigment> getClusterServiceInstance(String serviceName, String clusterName);

/**
* subscribe service instance without tag by service name, the listener will be triggered when the service instance
* changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,12 @@ public Optional<XdsClusterLoadAssigment> getClusterServiceInstance(String cluste
if (!serviceNameOptional.isPresent()) {
return Optional.empty();
}

String serviceName = serviceNameOptional.get();
return getClusterServiceInstance(serviceName, clusterName);
}

@Override
public Optional<XdsClusterLoadAssigment> getClusterServiceInstance(String serviceName, String clusterName) {
// first check the cache and return if service instance exists
if (XdsDataCache.isContainsRequestObserver(serviceName)) {
return XdsDataCache.getClusterServiceInstance(serviceName, clusterName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ public XdsLbPolicy getLbPolicyOfCluster(String clusterName) {
return XdsLbPolicy.UNRECOGNIZED;
}

@Override
public XdsLbPolicy getLbPolicyOfCluster(String serviceName, String clusterName) {
return XdsDataCache.getLbPolicyOfCluster(serviceName, clusterName);
}

@Override
public XdsLbPolicy getBaseLbPolicyOfService(String serviceName) {
return XdsDataCache.getBaseLbPolicyOfService(serviceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,9 @@ public boolean isLocalityRoute(String clusterName) {
.map(serviceName -> XdsDataCache.isLocalityLb(serviceName, clusterName))
.orElse(false);
}

@Override
public boolean isLocalityRoute(String serviceName, String clusterName) {
return XdsDataCache.isLocalityLb(serviceName, clusterName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ public class MetricsManager {
static {
try {
metricService = ServiceManager.getService(MetricService.class);
TAG_KEY_MAP.put("service","service_meta_service");
TAG_KEY_MAP.put("version","service_meta_version");
TAG_KEY_MAP.put("application","service_meta_application");
TAG_KEY_MAP.put("zone","service_meta_zone");
TAG_KEY_MAP.put("project","service_meta_project");
TAG_KEY_MAP.put("environment","service_meta_environment");
TAG_KEY_MAP.put("parameters","service_meta_parameters");
TAG_KEY_MAP.put("service", "service_meta_service");
TAG_KEY_MAP.put("version", "service_meta_version");
TAG_KEY_MAP.put("application", "service_meta_application");
TAG_KEY_MAP.put("zone", "service_meta_zone");
TAG_KEY_MAP.put("project", "service_meta_project");
TAG_KEY_MAP.put("environment", "service_meta_environment");
TAG_KEY_MAP.put("parameters", "service_meta_parameters");
} catch (IllegalArgumentException e) {
LOGGER.log(Level.SEVERE, "Failed to load metrics service", e);
}
Expand Down Expand Up @@ -92,7 +92,7 @@ public static void addOrUpdateCounterMetricValue(String metricName, Map<String,
} else {
tagsMap = new HashMap<>();
}
tagsMap.put(RouterConstant.SCOPE,"service-router");
tagsMap.put(RouterConstant.SCOPE, "service-router");
Counter counter = COUNT_MAP.computeIfAbsent(new MetricInfo(metricName, tagsMap),
metricInfo -> metricService.counter(metricName, Tags.of(tagsMap)));
counter.increment(value);
Expand Down Expand Up @@ -130,6 +130,27 @@ private static void collectRequestCountMetric(String address) {
addOrUpdateCounterMetricValue(RouterConstant.ROUTER_REQUEST_COUNT, tagsMap, 1);
}

/**
* collect xDS router destination tag count metric
*
* @param cluster cluster name
*/
public static void collectXdsRouterDestinationTagCountMetric(String cluster) {
if (!ROUTER_CONFIG.isEnableMetric()) {
return;
}
Map<String, String> tagsMap = new HashMap<>();
MetricsManager.getAllTagKey().forEach(key -> tagsMap.put(key, StringUtils.EMPTY));
tagsMap.put(RouterConstant.SERVICE_META_PARAMETERS, "cluster: " + cluster);
if (StringUtils.isEmpty(DubboCache.INSTANCE.getAppName())) {
tagsMap.put(RouterConstant.CLIENT_SERVICE_NAME, AppCache.INSTANCE.getAppName());
} else {
tagsMap.put(RouterConstant.CLIENT_SERVICE_NAME, DubboCache.INSTANCE.getAppName());
}
tagsMap.put(RouterConstant.PROTOCOL, RouterConstant.XDS_PROTOCOL);
MetricsManager.addOrUpdateCounterMetricValue(RouterConstant.ROUTER_DESTINATION_TAG_COUNT, tagsMap, 1);
}

/**
* Get the key of the metric tag
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class XdsRouterUtils {
*/
private static XdsLocality selfServiceLocality;

private static volatile boolean localityObtainedFlag = false;

private XdsRouterUtils() {
}

Expand All @@ -54,13 +56,14 @@ private XdsRouterUtils() {
* @return XdsLocality
*/
public static Optional<XdsLocality> getLocalityInfoOfSelfService() {
if (selfServiceLocality != null) {
return Optional.of(selfServiceLocality);
if (localityObtainedFlag) {
return Optional.ofNullable(selfServiceLocality);
}
synchronized (XdsRouterUtils.class) {
if (selfServiceLocality != null) {
return Optional.of(selfServiceLocality);
if (localityObtainedFlag) {
return Optional.ofNullable(selfServiceLocality);
}
localityObtainedFlag = true;
String podIp = NetworkUtils.getKubernetesPodIp();
if (StringUtils.isEmpty(podIp)) {
return Optional.empty();
Expand Down Expand Up @@ -88,6 +91,15 @@ public static void updateServiceDiscovery(XdsServiceDiscovery xdsServiceDiscover
}
}

/**
* update localityObtainedFlag
*
* @param flag locality obtained flag
*/
public static void updateLocalityObtainedFlag(boolean flag) {
localityObtainedFlag = flag;
}

private static Optional<ServiceInstance> getMatchedServiceInstanceByPodIp(Set<ServiceInstance> serviceInstances,
String podIp) {
return serviceInstances.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,14 @@
import io.sermant.core.service.xds.entity.XdsRouteMatch;
import io.sermant.core.utils.CollectionUtils;
import io.sermant.core.utils.StringUtils;
import io.sermant.router.common.cache.AppCache;
import io.sermant.router.common.cache.DubboCache;
import io.sermant.router.common.constants.RouterConstant;
import io.sermant.router.common.metric.MetricsManager;
import io.sermant.router.common.utils.XdsRouterUtils;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
Expand Down Expand Up @@ -153,26 +151,17 @@ private Set<ServiceInstance> handleXdsRoute(XdsRoute route, String serviceName)
if (routeAction.isWeighted()) {
cluster = selectClusterByWeight(routeAction.getWeightedClusters());
}
Map<String, String> tagsMap = new HashMap<>();
MetricsManager.getAllTagKey().forEach(key -> tagsMap.put(key, StringUtils.EMPTY));
tagsMap.put(RouterConstant.SERVICE_META_PARAMETERS, "cluster: " + cluster);
if (StringUtils.isEmpty(DubboCache.INSTANCE.getAppName())) {
tagsMap.put(RouterConstant.CLIENT_SERVICE_NAME, AppCache.INSTANCE.getAppName());
} else {
tagsMap.put(RouterConstant.CLIENT_SERVICE_NAME, DubboCache.INSTANCE.getAppName());
}
tagsMap.put(RouterConstant.PROTOCOL, RouterConstant.XDS_PROTOCOL);
MetricsManager.addOrUpdateCounterMetricValue(RouterConstant.ROUTER_DESTINATION_TAG_COUNT, tagsMap, 1);
MetricsManager.collectXdsRouterDestinationTagCountMetric(cluster);

// get service instance of cluster
Optional<XdsClusterLoadAssigment> loadAssigmentOptional =
serviceDiscovery.getClusterServiceInstance(cluster);
serviceDiscovery.getClusterServiceInstance(serviceName, cluster);
if (!loadAssigmentOptional.isPresent()) {
return serviceDiscovery.getServiceInstance(serviceName);
}
XdsClusterLoadAssigment clusterLoadAssigment = loadAssigmentOptional.get();

if (!routeService.isLocalityRoute(clusterLoadAssigment.getClusterName())) {
if (!routeService.isLocalityRoute(serviceName, clusterLoadAssigment.getClusterName())) {
Set<ServiceInstance> serviceInstances = getServiceInstanceOfCluster(clusterLoadAssigment);
return serviceInstances.isEmpty() ? serviceDiscovery.getServiceInstance(serviceName) : serviceInstances;
}
Expand All @@ -194,24 +183,47 @@ private Set<ServiceInstance> handleXdsRoute(XdsRoute route, String serviceName)
private Set<ServiceInstance> getServiceInstanceOfLocalityCluster(XdsClusterLoadAssigment clusterLoadAssigment,
XdsLocality locality) {
return clusterLoadAssigment.getLocalityInstances().entrySet().stream()
.filter(xdsLocalitySetEntry -> xdsLocalitySetEntry.getKey().equals(locality))
.filter(xdsLocalitySetEntry -> isSameLocality(locality, xdsLocalitySetEntry.getKey()))
.flatMap(xdsLocalitySetEntry -> xdsLocalitySetEntry.getValue().stream())
.collect(Collectors.toSet());
}

private boolean isSameLocality(XdsLocality selfLocality, XdsLocality serviceLocality) {
if (!selfLocality.getRegion().equals(serviceLocality.getRegion())) {
return false;
}
if (StringUtils.isEmpty(selfLocality.getZone())) {
return true;
}
if (!selfLocality.getZone().equals(serviceLocality.getZone())) {
return false;
}
if (StringUtils.isEmpty(selfLocality.getSubZone())) {
return true;
}
return selfLocality.getSubZone().equals(serviceLocality.getSubZone());
}

private Set<ServiceInstance> getServiceInstanceOfCluster(XdsClusterLoadAssigment clusterLoadAssigment) {
return clusterLoadAssigment.getLocalityInstances().entrySet().stream()
.flatMap(instanceEntry -> instanceEntry.getValue().stream())
.collect(Collectors.toSet());
Set<ServiceInstance> serviceInstances = new HashSet<>();
for (Entry<XdsLocality, Set<ServiceInstance>> xdsLocalitySetEntry : clusterLoadAssigment.getLocalityInstances()
.entrySet()) {
serviceInstances.addAll(xdsLocalitySetEntry.getValue());
}
return serviceInstances;
}

private boolean isPathMatched(XdsPathMatcher matcher, String path) {
return matcher.isMatch(path);
}

private boolean isHeadersMatched(List<XdsHeaderMatcher> matchers, Map<String, String> headers) {
return matchers.stream()
.allMatch(xdsHeaderMatcher -> xdsHeaderMatcher.isMatch(headers));
for (XdsHeaderMatcher matcher : matchers) {
if (!matcher.isMatch(headers)) {
return false;
}
}
return true;
}

private String selectClusterByWeight(XdsWeightedClusters weightedClusters) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,16 @@ private static XdsLoadBalancer getRandomLoadBalancer() {
/**
* getLoadBalancer
*
* @param serviceName service name
* @param clusterName cluster name
* @return XdsLoadBalancer
*/
public static XdsLoadBalancer getLoadBalancer(String clusterName) {
public static XdsLoadBalancer getLoadBalancer(String serviceName, String clusterName) {
if (loadBalanceService == null) {
LOGGER.severe("xDS service not open for xDS routing.");
return getRoundRobinLoadBalancer(clusterName);
}
XdsLbPolicy lbPolicy = loadBalanceService.getLbPolicyOfCluster(clusterName);
XdsLbPolicy lbPolicy = loadBalanceService.getLbPolicyOfCluster(serviceName, clusterName);
switch (lbPolicy) {
case RANDOM:
return getRandomLoadBalancer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public void testGetLocalityInfoOfSelfService() {

// find matched service instance
meta.setService("serviceA");
XdsRouterUtils.updateLocalityObtainedFlag(false);
localityInfo = XdsRouterUtils.getLocalityInfoOfSelfService();
Assert.assertTrue(localityInfo.isPresent());
Assert.assertEquals("127.0.0.1", localityInfo.get().getRegion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.sermant.core.service.xds.entity.XdsRouteAction.XdsWeightedClusters;
import io.sermant.core.service.xds.entity.XdsRouteMatch;
import io.sermant.core.service.xds.entity.match.ExactMatchStrategy;
import io.sermant.router.common.metric.MetricsManager;
import io.sermant.router.common.utils.XdsRouterUtils;

import org.junit.AfterClass;
Expand All @@ -57,10 +58,14 @@
public class XdsRouterHandlerTest {
private static final String CLUSTER_NAME = "outbound|8080||serviceA.default.svc.cluster.local";

private static final String SERVICE_NAME = "serviceA";

private static MockedStatic<ServiceManager> serviceManager;

private static MockedStatic<XdsRouterUtils> xdsRouterUtil;

private static MockedStatic<MetricsManager> metricsManager;

private static XdsLocality locality1;

private static XdsLocality locality3;
Expand All @@ -72,7 +77,7 @@ public static void setUp() {
XdsCoreService xdsCoreService = Mockito.mock(XdsCoreService.class);
Mockito.when(xdsCoreService.getXdsRouteService()).thenReturn(routeService);
Mockito.when(xdsCoreService.getXdsServiceDiscovery()).thenReturn(serviceDiscovery);
Mockito.when(routeService.isLocalityRoute(CLUSTER_NAME)).thenReturn(true);
Mockito.when(routeService.isLocalityRoute(SERVICE_NAME, CLUSTER_NAME)).thenReturn(true);
Mockito.when(routeService.getServiceRoute("serviceA")).thenReturn(createXdsRoute());

serviceManager = Mockito.mockStatic(ServiceManager.class);
Expand All @@ -84,7 +89,7 @@ public static void setUp() {
Mockito.when(XdsRouterUtils.getLocalityInfoOfSelfService()).thenReturn(Optional.of(locality));

Mockito.when(serviceDiscovery.getServiceInstance("serviceA")).thenReturn(createServiceInstance4Service());
Mockito.when(serviceDiscovery.getClusterServiceInstance(CLUSTER_NAME))
Mockito.when(serviceDiscovery.getClusterServiceInstance(SERVICE_NAME, CLUSTER_NAME))
.thenReturn(Optional.of(createXdsClusterInstance(CLUSTER_NAME,
Arrays.asList("test-region-1", "test-region-2"))));

Expand All @@ -93,12 +98,15 @@ public static void setUp() {

locality3 = new XdsLocality();
locality3.setRegion("test-region-3");

metricsManager = Mockito.mockStatic(MetricsManager.class);
}

@AfterClass
public static void tearDown() {
serviceManager.close();
xdsRouterUtil.close();
metricsManager.close();
}

@Test
Expand Down
Loading

0 comments on commit 988ab2d

Please sign in to comment.