Skip to content

Commit

Permalink
xds router performance optimization
Browse files Browse the repository at this point in the history
Signed-off-by: daizhenyu <[email protected]>
  • Loading branch information
daizhenyu committed Nov 29, 2024
1 parent ebcda83 commit 1fb95ec
Show file tree
Hide file tree
Showing 22 changed files with 243 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ public interface XdsLoadBalanceService {
*/
XdsLbPolicy getLbPolicyOfCluster(String clusterName);

/**
* get lb policy of cluster
*
* @param serviceName service name
* @param clusterName cluster name
* @return route rules
*/
XdsLbPolicy getLbPolicyOfCluster(String serviceName, String clusterName);

/**
* get lb policy of service (base cluster)
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,13 @@ public interface XdsRouteService {
* @return route rules
*/
boolean isLocalityRoute(String clusterName);

/**
* get lb policy of cluster
*
* @param serviceName service name
* @param clusterName cluster name
* @return route rules
*/
boolean isLocalityRoute(String serviceName, String clusterName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ public interface XdsServiceDiscovery {
*/
Optional<XdsClusterLoadAssigment> getClusterServiceInstance(String clusterName);

/**
* get service instance of service cluster
*
* @param serviceName service name
* @param clusterName cluster name
* @return XdsClusterInstance
*/
Optional<XdsClusterLoadAssigment> getClusterServiceInstance(String serviceName, String clusterName);

/**
* subscribe service instance without tag by service name, the listener will be triggered when the service instance
* changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,12 @@ public Optional<XdsClusterLoadAssigment> getClusterServiceInstance(String cluste
if (!serviceNameOptional.isPresent()) {
return Optional.empty();
}

String serviceName = serviceNameOptional.get();
return getClusterServiceInstance(serviceName, clusterName);
}

@Override
public Optional<XdsClusterLoadAssigment> getClusterServiceInstance(String serviceName, String clusterName) {
// first check the cache and return if service instance exists
if (XdsDataCache.isContainsRequestObserver(serviceName)) {
return XdsDataCache.getClusterServiceInstance(serviceName, clusterName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ public XdsLbPolicy getLbPolicyOfCluster(String clusterName) {
return XdsLbPolicy.UNRECOGNIZED;
}

@Override
public XdsLbPolicy getLbPolicyOfCluster(String serviceName, String clusterName) {
return XdsDataCache.getLbPolicyOfCluster(serviceName, clusterName);
}

@Override
public XdsLbPolicy getBaseLbPolicyOfService(String serviceName) {
return XdsDataCache.getBaseLbPolicyOfService(serviceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,9 @@ public boolean isLocalityRoute(String clusterName) {
.map(serviceName -> XdsDataCache.isLocalityLb(serviceName, clusterName))
.orElse(false);
}
}

@Override
public boolean isLocalityRoute(String serviceName, String clusterName) {
return XdsDataCache.isLocalityLb(serviceName, clusterName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class XdsRouterUtils {
*/
private static XdsLocality selfServiceLocality;

private static volatile boolean localityObtainedFlag = false;

private XdsRouterUtils() {
}

Expand All @@ -54,13 +56,14 @@ private XdsRouterUtils() {
* @return XdsLocality
*/
public static Optional<XdsLocality> getLocalityInfoOfSelfService() {
if (selfServiceLocality != null) {
return Optional.of(selfServiceLocality);
if (localityObtainedFlag) {
return Optional.ofNullable(selfServiceLocality);
}
synchronized (XdsRouterUtils.class) {
if (selfServiceLocality != null) {
return Optional.of(selfServiceLocality);
if (localityObtainedFlag) {
return Optional.ofNullable(selfServiceLocality);
}
localityObtainedFlag = true;
String podIp = NetworkUtils.getKubernetesPodIp();
if (StringUtils.isEmpty(podIp)) {
return Optional.empty();
Expand Down Expand Up @@ -88,6 +91,15 @@ public static void updateServiceDiscovery(XdsServiceDiscovery xdsServiceDiscover
}
}

/**
* update localityObtainedFlag
*
* @param flag locality obtained flag
*/
public static void updateLocalityObtainedFlag(boolean flag) {
localityObtainedFlag = flag;
}

private static Optional<ServiceInstance> getMatchedServiceInstanceByPodIp(Set<ServiceInstance> serviceInstances,
String podIp) {
return serviceInstances.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@
import io.sermant.router.common.utils.XdsRouterUtils;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
Expand Down Expand Up @@ -151,13 +153,13 @@ private Set<ServiceInstance> handleXdsRoute(XdsRoute route, String serviceName)

// get service instance of cluster
Optional<XdsClusterLoadAssigment> loadAssigmentOptional =
serviceDiscovery.getClusterServiceInstance(cluster);
serviceDiscovery.getClusterServiceInstance(serviceName, cluster);
if (!loadAssigmentOptional.isPresent()) {
return serviceDiscovery.getServiceInstance(serviceName);
}
XdsClusterLoadAssigment clusterLoadAssigment = loadAssigmentOptional.get();

if (!routeService.isLocalityRoute(clusterLoadAssigment.getClusterName())) {
if (!routeService.isLocalityRoute(serviceName, clusterLoadAssigment.getClusterName())) {
Set<ServiceInstance> serviceInstances = getServiceInstanceOfCluster(clusterLoadAssigment);
return serviceInstances.isEmpty() ? serviceDiscovery.getServiceInstance(serviceName) : serviceInstances;
}
Expand All @@ -179,24 +181,47 @@ private Set<ServiceInstance> handleXdsRoute(XdsRoute route, String serviceName)
private Set<ServiceInstance> getServiceInstanceOfLocalityCluster(XdsClusterLoadAssigment clusterLoadAssigment,
XdsLocality locality) {
return clusterLoadAssigment.getLocalityInstances().entrySet().stream()
.filter(xdsLocalitySetEntry -> xdsLocalitySetEntry.getKey().equals(locality))
.filter(xdsLocalitySetEntry -> isSameLocality(locality, xdsLocalitySetEntry.getKey()))
.flatMap(xdsLocalitySetEntry -> xdsLocalitySetEntry.getValue().stream())
.collect(Collectors.toSet());
}

private boolean isSameLocality(XdsLocality selfLocality, XdsLocality serviceLocality) {
if (!selfLocality.getRegion().equals(serviceLocality.getRegion())) {
return false;
}
if (StringUtils.isEmpty(selfLocality.getZone())) {
return true;
}
if (!selfLocality.getZone().equals(serviceLocality.getZone())) {
return false;
}
if (StringUtils.isEmpty(selfLocality.getSubZone())) {
return true;
}
return selfLocality.getSubZone().equals(serviceLocality.getSubZone());
}

private Set<ServiceInstance> getServiceInstanceOfCluster(XdsClusterLoadAssigment clusterLoadAssigment) {
return clusterLoadAssigment.getLocalityInstances().entrySet().stream()
.flatMap(instanceEntry -> instanceEntry.getValue().stream())
.collect(Collectors.toSet());
Set<ServiceInstance> serviceInstances = new HashSet<>();
for (Entry<XdsLocality, Set<ServiceInstance>> xdsLocalitySetEntry : clusterLoadAssigment.getLocalityInstances()
.entrySet()) {
serviceInstances.addAll(xdsLocalitySetEntry.getValue());
}
return serviceInstances;
}

private boolean isPathMatched(XdsPathMatcher matcher, String path) {
return matcher.isMatch(path);
}

private boolean isHeadersMatched(List<XdsHeaderMatcher> matchers, Map<String, String> headers) {
return matchers.stream()
.allMatch(xdsHeaderMatcher -> xdsHeaderMatcher.isMatch(headers));
for (XdsHeaderMatcher matcher : matchers) {
if (!matcher.isMatch(headers)) {
return false;
}
}
return true;
}

private String selectClusterByWeight(XdsWeightedClusters weightedClusters) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,16 @@ private static XdsLoadBalancer getRandomLoadBalancer() {
/**
* getLoadBalancer
*
* @param serviceName service name
* @param clusterName cluster name
* @return XdsLoadBalancer
*/
public static XdsLoadBalancer getLoadBalancer(String clusterName) {
public static XdsLoadBalancer getLoadBalancer(String serviceName, String clusterName) {
if (loadBalanceService == null) {
LOGGER.severe("xDS service not open for xDS routing.");
return getRoundRobinLoadBalancer(clusterName);
}
XdsLbPolicy lbPolicy = loadBalanceService.getLbPolicyOfCluster(clusterName);
XdsLbPolicy lbPolicy = loadBalanceService.getLbPolicyOfCluster(serviceName, clusterName);
switch (lbPolicy) {
case RANDOM:
return getRandomLoadBalancer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public void testGetLocalityInfoOfSelfService() {

// find matched service instance
meta.setService("serviceA");
XdsRouterUtils.updateLocalityObtainedFlag(false);
localityInfo = XdsRouterUtils.getLocalityInfoOfSelfService();
Assert.assertTrue(localityInfo.isPresent());
Assert.assertEquals("127.0.0.1", localityInfo.get().getRegion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
public class XdsRouterHandlerTest {
private static final String CLUSTER_NAME = "outbound|8080||serviceA.default.svc.cluster.local";

private static final String SERVICE_NAME = "serviceA";

private static MockedStatic<ServiceManager> serviceManager;

private static MockedStatic<XdsRouterUtils> xdsRouterUtil;
Expand All @@ -72,7 +74,7 @@ public static void setUp() {
XdsCoreService xdsCoreService = Mockito.mock(XdsCoreService.class);
Mockito.when(xdsCoreService.getXdsRouteService()).thenReturn(routeService);
Mockito.when(xdsCoreService.getXdsServiceDiscovery()).thenReturn(serviceDiscovery);
Mockito.when(routeService.isLocalityRoute(CLUSTER_NAME)).thenReturn(true);
Mockito.when(routeService.isLocalityRoute(SERVICE_NAME, CLUSTER_NAME)).thenReturn(true);
Mockito.when(routeService.getServiceRoute("serviceA")).thenReturn(createXdsRoute());

serviceManager = Mockito.mockStatic(ServiceManager.class);
Expand All @@ -84,7 +86,7 @@ public static void setUp() {
Mockito.when(XdsRouterUtils.getLocalityInfoOfSelfService()).thenReturn(Optional.of(locality));

Mockito.when(serviceDiscovery.getServiceInstance("serviceA")).thenReturn(createServiceInstance4Service());
Mockito.when(serviceDiscovery.getClusterServiceInstance(CLUSTER_NAME))
Mockito.when(serviceDiscovery.getClusterServiceInstance(SERVICE_NAME, CLUSTER_NAME))
.thenReturn(Optional.of(createXdsClusterInstance(CLUSTER_NAME,
Arrays.asList("test-region-1", "test-region-2"))));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@
import io.sermant.core.service.xds.XdsLoadBalanceService;
import io.sermant.core.service.xds.entity.XdsLbPolicy;

import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.MockedStatic;
Expand Down Expand Up @@ -59,21 +57,23 @@ public static void tearDown() {
@Test
public void testGetLoadBalancer() {
// random
Mockito.when(loadBalanceService.getLbPolicyOfCluster("outbound|8080||serviceA.default.svc.cluster.local"))
Mockito.when(loadBalanceService.getLbPolicyOfCluster("serviceA",
"outbound|8080||serviceA.default.svc.cluster.local"))
.thenReturn(XdsLbPolicy.RANDOM);
XdsLoadBalancer loadBalancer = XdsLoadBalancerFactory
.getLoadBalancer("outbound|8080||serviceA.default.svc.cluster.local");
.getLoadBalancer("serviceA", "outbound|8080||serviceA.default.svc.cluster.local");
Assert.assertEquals("io.sermant.router.common.xds.lb.XdsRandomLoadBalancer",
loadBalancer.getClass().getCanonicalName());

// round robin
Mockito.when(loadBalanceService.getLbPolicyOfCluster("outbound|8080||serviceB.default.svc.cluster.local"))
Mockito.when(loadBalanceService.getLbPolicyOfCluster("serviceB",
"outbound|8080||serviceB.default.svc.cluster.local"))
.thenReturn(XdsLbPolicy.ROUND_ROBIN);
loadBalancer = XdsLoadBalancerFactory
.getLoadBalancer("outbound|8080||serviceB.default.svc.cluster.local");
.getLoadBalancer("serviceB", "outbound|8080||serviceB.default.svc.cluster.local");
Assert.assertEquals("io.sermant.router.common.xds.lb.XdsRoundRobinLoadBalancer",
loadBalancer.getClass().getCanonicalName());
Assert.assertEquals(loadBalancer, XdsLoadBalancerFactory
.getLoadBalancer("outbound|8080||serviceB.default.svc.cluster.local"));
.getLoadBalancer("serviceB", "outbound|8080||serviceB.default.svc.cluster.local"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@
public class HttpAsyncClient4xInterceptor extends MarkInterceptor {
private static final Logger LOGGER = LoggerFactory.getLogger();

private static final int HTTPCONTEXT_INDEX = 2;
private static final int HTTP_CONTEXT_INDEX = 2;

private RouterConfig routerConfig = PluginConfigManager.getPluginConfig(RouterConfig.class);
private final RouterConfig routerConfig = PluginConfigManager.getPluginConfig(RouterConfig.class);

/**
* Pre trigger point
Expand All @@ -77,9 +77,11 @@ public ExecuteContext doBefore(ExecuteContext context) throws Exception {
}
HttpAsyncRequestProducer httpAsyncRequestProducer
= (HttpAsyncRequestProducer) httpAsyncRequestProducerArgument;
HttpRequest httpRequest = httpAsyncRequestProducer.generateRequest();
handleXdsRouterAndUpdateHttpRequest(httpRequest, context);
Object argument = context.getArguments()[HTTPCONTEXT_INDEX];
HttpRequestBase httpRequest = (HttpRequestBase) httpAsyncRequestProducer.generateRequest();
if (handleXdsRouterAndUpdateHttpRequest(httpRequest, context)) {
return context;
}
Object argument = context.getArguments()[HTTP_CONTEXT_INDEX];
if (!(argument instanceof HttpContext)) {
return context;
}
Expand All @@ -95,7 +97,7 @@ private void parseTags(HttpContext httpContext, HttpRequest httpRequest) {
Object attribute = httpContext.getAttribute(FlowContextUtils.getTagName());
if (attribute != null) {
Map<String, List<String>> map = FlowContextUtils.decodeTags(String.valueOf(attribute));
if (map != null && map.size() > 0) {
if (map != null && !map.isEmpty()) {
ThreadLocalUtils.setRequestData(new RequestData(
map, httpRequest.getRequestLine().getUri(), httpRequest.getRequestLine().getMethod()));
}
Expand Down Expand Up @@ -133,29 +135,31 @@ private Map<String, String> getHeaders(HttpRequest httpRequest) {
return headerMap;
}

private void handleXdsRouterAndUpdateHttpRequest(HttpRequest httpRequest, ExecuteContext context) {
private boolean handleXdsRouterAndUpdateHttpRequest(HttpRequestBase httpRequest, ExecuteContext context) {
if (!routerConfig.isEnabledXdsRoute()) {
return;
return false;
}
URI uri = URI.create(httpRequest.getRequestLine().getUri());
URI uri = httpRequest.getURI();
String host = uri.getHost();
if (!BaseHttpRouterUtils.isXdsRouteRequired(host)) {
return;
String serviceName = host.split(RouterConstant.ESCAPED_POINT)[0];
if (!BaseHttpRouterUtils.isXdsRouteRequired(serviceName)) {
return false;
}

// use xds route to find a service instance, and modify url by it
Optional<ServiceInstance> serviceInstanceOptional = BaseHttpRouterUtils
.chooseServiceInstanceByXds(host.split(RouterConstant.ESCAPED_POINT)[0], uri.getPath(),
getHeaders(httpRequest));
.chooseServiceInstanceByXds(serviceName, uri.getPath(), getHeaders(httpRequest));
if (!serviceInstanceOptional.isPresent()) {
return;
return false;
}
ServiceInstance instance = serviceInstanceOptional.get();
try {
context.getArguments()[0] = rebuildProducer(context,
new URI(BaseHttpRouterUtils.rebuildUrlByXdsServiceInstance(uri, instance)));
return true;
} catch (URISyntaxException e) {
LOGGER.log(Level.WARNING, "Create uri using xds service instance failed.", e.getMessage());
return false;
}
}

Expand Down
Loading

0 comments on commit 1fb95ec

Please sign in to comment.