From 031d27087988bc69711a60c26ad0c0be48d42b39 Mon Sep 17 00:00:00 2001 From: daizhenyu <1449308021@qq.com> Date: Thu, 5 Sep 2024 10:14:55 +0800 Subject: [PATCH] xds route and lb service Signed-off-by: daizhenyu <1449308021@qq.com> --- .../core/service/xds/XdsCoreService.java | 14 ++ .../service/xds/XdsLoadBalanceService.java | 4 +- .../core/service/xds/XdsServiceDiscovery.java | 15 +- .../xds/entity/XdsClusterLoadAssigment.java | 20 ++ .../core/service/xds/entity/XdsLocality.java | 23 +++ .../XdsServiceClusterLoadAssigment.java | 18 ++ .../service/xds/XdsCoreServiceImpl.java | 40 +++- .../service/xds/cache/XdsDataCache.java | 193 ++++++++++++++++-- .../discovery/XdsServiceDiscoveryImpl.java | 58 +++++- .../service/xds/env/XdsConstant.java | 20 ++ .../service/xds/handler/CdsHandler.java | 54 ++--- .../service/xds/handler/EdsHandler.java | 42 ++-- .../service/xds/handler/LdsHandler.java | 91 +++++++++ .../service/xds/handler/RdsHandler.java | 78 +++++++ .../service/xds/handler/XdsHandler.java | 31 ++- .../service/xds/handler/XdsServiceAction.java | 9 +- .../XdsLoadBalanceServiceImpl.java | 52 +++++ .../xds/route/XdsRouteServiceImpl.java | 50 +++++ ...ormer.java => EdsProtocolTransformer.java} | 108 +++++----- .../service/xds/CommonDataGenerator.java | 113 ++++++++++ .../service/xds/cache/XdsDataCacheTest.java | 87 ++++++-- .../XdsServiceDiscoveryImplTest.java | 49 +++-- .../{CdsXdsTest.java => CdsHandlerTest.java} | 11 +- .../{EdsXdsTest.java => EdsHandlerTest.java} | 8 +- .../service/xds/handler/LdsHandlerTest.java | 113 ++++++++++ .../service/xds/handler/RdsHandlerTest.java | 101 +++++++++ .../XdsLoadBalanceServiceImplTest.java | 78 +++++++ .../xds/route/XdsRouteServiceImplTest.java | 81 ++++++++ ...t.java => EdsProtocolTransformerTest.java} | 64 +++--- 29 files changed, 1407 insertions(+), 218 deletions(-) create mode 100644 sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/handler/LdsHandler.java create mode 100644 sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/handler/RdsHandler.java create mode 100644 sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/loadbalance/XdsLoadBalanceServiceImpl.java create mode 100644 sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/route/XdsRouteServiceImpl.java rename sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/utils/{XdsProtocolTransformer.java => EdsProtocolTransformer.java} (56%) create mode 100644 sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/CommonDataGenerator.java rename sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/handler/{CdsXdsTest.java => CdsHandlerTest.java} (92%) rename sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/handler/{EdsXdsTest.java => EdsHandlerTest.java} (98%) create mode 100644 sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/handler/LdsHandlerTest.java create mode 100644 sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/handler/RdsHandlerTest.java create mode 100644 sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/loadbalance/XdsLoadBalanceServiceImplTest.java create mode 100644 sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/route/XdsRouteServiceImplTest.java rename sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/utils/{XdsProtocolTransformerTest.java => EdsProtocolTransformerTest.java} (50%) diff --git a/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/XdsCoreService.java b/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/XdsCoreService.java index bd6aee232d..39f78b45ac 100644 --- a/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/XdsCoreService.java +++ b/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/XdsCoreService.java @@ -31,4 +31,18 @@ public interface XdsCoreService extends BaseService { * @return XdsServiceDiscovery */ XdsServiceDiscovery getXdsServiceDiscovery(); + + /** + * get xDS route service + * + * @return XdsRoute + */ + XdsRouteService getXdsRouteService(); + + /** + * get xDS lb service + * + * @return XdsRoute + */ + XdsLoadBalanceService getLoadBalanceService(); } diff --git a/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/XdsLoadBalanceService.java b/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/XdsLoadBalanceService.java index a7eb4295be..de1ba0b5c1 100644 --- a/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/XdsLoadBalanceService.java +++ b/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/XdsLoadBalanceService.java @@ -31,7 +31,7 @@ public interface XdsLoadBalanceService { * @param clusterName cluster name * @return route rules */ - XdsLbPolicy getClusterLbPolicy(String clusterName); + XdsLbPolicy getLbPolicyOfCluster(String clusterName); /** * get lb policy of service (base cluster) @@ -39,5 +39,5 @@ public interface XdsLoadBalanceService { * @param serviceName service name * @return route rules */ - XdsLbPolicy getServiceLbPolicy(String serviceName); + XdsLbPolicy getBaseLbPolicyOfService(String serviceName); } diff --git a/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/XdsServiceDiscovery.java b/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/XdsServiceDiscovery.java index e65f204f57..d6fcb0e82e 100644 --- a/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/XdsServiceDiscovery.java +++ b/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/XdsServiceDiscovery.java @@ -17,8 +17,10 @@ package io.sermant.core.service.xds; import io.sermant.core.service.xds.entity.ServiceInstance; +import io.sermant.core.service.xds.entity.XdsClusterLoadAssigment; import io.sermant.core.service.xds.listener.XdsServiceDiscoveryListener; +import java.util.Optional; import java.util.Set; /** @@ -29,7 +31,7 @@ **/ public interface XdsServiceDiscovery { /** - * get service instance by service name + * get all the service instance by service name with base cluster * * @param serviceName service name * @return service instances @@ -37,7 +39,16 @@ public interface XdsServiceDiscovery { Set getServiceInstance(String serviceName); /** - * subscribe service instance by service name, the listener will be triggered when the service instance changes + * get service instance of service cluster + * + * @param clusterName cluster name + * @return XdsClusterInstance + */ + Optional getClusterServiceInstance(String clusterName); + + /** + * subscribe service instance without tag by service name, the listener will be triggered when the service instance + * changes * * @param serviceName service name * @param listener listener diff --git a/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/entity/XdsClusterLoadAssigment.java b/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/entity/XdsClusterLoadAssigment.java index e4b1450e9c..4a4823e19a 100644 --- a/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/entity/XdsClusterLoadAssigment.java +++ b/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/entity/XdsClusterLoadAssigment.java @@ -32,6 +32,26 @@ public class XdsClusterLoadAssigment { private Map> localityInstances; + /** + * constructor + */ + public XdsClusterLoadAssigment() { + } + + /** + * parameterized constructor + * + * @param serviceName service name + * @param clusterName cluster name + * @param localityInstances service instances sorted by locality + */ + public XdsClusterLoadAssigment(String serviceName, String clusterName, + Map> localityInstances) { + this.serviceName = serviceName; + this.clusterName = clusterName; + this.localityInstances = localityInstances; + } + public String getServiceName() { return serviceName; } diff --git a/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/entity/XdsLocality.java b/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/entity/XdsLocality.java index 52cb856def..dc26ff861c 100644 --- a/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/entity/XdsLocality.java +++ b/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/entity/XdsLocality.java @@ -35,6 +35,29 @@ public class XdsLocality { private int localityPriority; + /** + * constructor + */ + public XdsLocality() { + } + + /** + * parameterized constructor + * + * @param region region + * @param zone zone + * @param subZone sub zone + * @param loadBalanceWeight load balance weigh + * @param localityPriority locality priority + */ + public XdsLocality(String region, String zone, String subZone, int loadBalanceWeight, int localityPriority) { + this.region = region; + this.zone = zone; + this.subZone = subZone; + this.loadBalanceWeight = loadBalanceWeight; + this.localityPriority = localityPriority; + } + public String getRegion() { return region; } diff --git a/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/entity/XdsServiceClusterLoadAssigment.java b/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/entity/XdsServiceClusterLoadAssigment.java index 87ffb96983..2cd02b3833 100644 --- a/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/entity/XdsServiceClusterLoadAssigment.java +++ b/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/entity/XdsServiceClusterLoadAssigment.java @@ -35,6 +35,24 @@ public class XdsServiceClusterLoadAssigment { private String baseClusterName; + /** + * constructor + */ + public XdsServiceClusterLoadAssigment() { + } + + /** + * parameterized constructor + * + * @param clusterLoadAssigments cluster load assigments + * @param baseClusterName base cluster name + */ + public XdsServiceClusterLoadAssigment( + Map clusterLoadAssigments, String baseClusterName) { + this.clusterLoadAssigments = clusterLoadAssigments; + this.baseClusterName = baseClusterName; + } + public Map getClusterLoadAssigments() { return clusterLoadAssigments; } diff --git a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/XdsCoreServiceImpl.java b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/XdsCoreServiceImpl.java index 5da9eee44b..ce2476689c 100644 --- a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/XdsCoreServiceImpl.java +++ b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/XdsCoreServiceImpl.java @@ -18,9 +18,18 @@ import io.sermant.core.common.LoggerFactory; import io.sermant.core.service.xds.XdsCoreService; +import io.sermant.core.service.xds.XdsLoadBalanceService; +import io.sermant.core.service.xds.XdsRouteService; import io.sermant.core.service.xds.XdsServiceDiscovery; import io.sermant.implement.service.xds.client.XdsClient; import io.sermant.implement.service.xds.discovery.XdsServiceDiscoveryImpl; +import io.sermant.implement.service.xds.env.XdsConstant; +import io.sermant.implement.service.xds.handler.CdsHandler; +import io.sermant.implement.service.xds.handler.EdsHandler; +import io.sermant.implement.service.xds.handler.LdsHandler; +import io.sermant.implement.service.xds.handler.RdsHandler; +import io.sermant.implement.service.xds.loadbalance.XdsLoadBalanceServiceImpl; +import io.sermant.implement.service.xds.route.XdsRouteServiceImpl; import java.io.IOException; import java.util.logging.Level; @@ -37,12 +46,31 @@ public class XdsCoreServiceImpl implements XdsCoreService { private XdsServiceDiscovery xdsServiceDiscovery; + private XdsRouteService xdsRouteService; + + private XdsLoadBalanceService xdsLoadBalanceService; + private XdsClient client; @Override public void start() { client = new XdsClient(); - xdsServiceDiscovery = new XdsServiceDiscoveryImpl(client); + + // create xdsHandler + RdsHandler rdsHandler = new RdsHandler(client); + LdsHandler ldsHandler = new LdsHandler(client); + CdsHandler cdsHandler = new CdsHandler(client); + + // start to subscribe lds、rds、cds + rdsHandler.subscribe(XdsConstant.RDS_ALL_RESOURCE); + ldsHandler.subscribe(XdsConstant.LDS_ALL_RESOURCE); + cdsHandler.subscribe(XdsConstant.CDS_ALL_RESOURCE); + + // create xds service + EdsHandler edsHandler = new EdsHandler(client); + xdsServiceDiscovery = new XdsServiceDiscoveryImpl(edsHandler); + xdsRouteService = new XdsRouteServiceImpl(); + xdsLoadBalanceService = new XdsLoadBalanceServiceImpl(); } @Override @@ -58,4 +86,14 @@ public void stop() { public XdsServiceDiscovery getXdsServiceDiscovery() { return xdsServiceDiscovery; } + + @Override + public XdsRouteService getXdsRouteService() { + return xdsRouteService; + } + + @Override + public XdsLoadBalanceService getLoadBalanceService() { + return xdsLoadBalanceService; + } } diff --git a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/cache/XdsDataCache.java b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/cache/XdsDataCache.java index 777ae595b3..8b9c6bb587 100644 --- a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/cache/XdsDataCache.java +++ b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/cache/XdsDataCache.java @@ -19,6 +19,14 @@ import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; import io.grpc.stub.StreamObserver; import io.sermant.core.service.xds.entity.ServiceInstance; +import io.sermant.core.service.xds.entity.XdsClusterLoadAssigment; +import io.sermant.core.service.xds.entity.XdsHttpConnectionManager; +import io.sermant.core.service.xds.entity.XdsLbPolicy; +import io.sermant.core.service.xds.entity.XdsRoute; +import io.sermant.core.service.xds.entity.XdsRouteConfiguration; +import io.sermant.core.service.xds.entity.XdsServiceCluster; +import io.sermant.core.service.xds.entity.XdsServiceClusterLoadAssigment; +import io.sermant.core.service.xds.entity.XdsVirtualHost; import io.sermant.core.service.xds.listener.XdsServiceDiscoveryListener; import java.util.ArrayList; @@ -27,8 +35,10 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; /** * xDS data cache @@ -40,7 +50,7 @@ public class XdsDataCache { /** * key:service name value:instances */ - private static final Map> SERVICE_INSTANCES = + private static final Map SERVICE_INSTANCES = new ConcurrentHashMap<>(); /** @@ -55,9 +65,19 @@ public class XdsDataCache { private static final Map> REQUEST_OBSERVERS = new ConcurrentHashMap<>(); /** - * key:service name value:cluster map + * key:service name value:XdsServiceCluster */ - private static Map> serviceNameMapping = new HashMap<>(); + private static Map serviceClusterMap = new HashMap<>(); + + /** + * HttpConnectionManager + */ + private static List httpConnectionManagers = new ArrayList<>(); + + /** + * XdsRouteConfiguration + */ + private static List routeConfigurations = new ArrayList<>(); private XdsDataCache() { } @@ -66,10 +86,11 @@ private XdsDataCache() { * update ServiceInstance * * @param serviceName service name - * @param instances service instance + * @param serviceClusterInstance all cluster instance of service */ - public static void updateServiceInstance(String serviceName, Set instances) { - SERVICE_INSTANCES.put(serviceName, instances); + public static void updateServiceInstance(String serviceName, + XdsServiceClusterLoadAssigment serviceClusterInstance) { + SERVICE_INSTANCES.put(serviceName, serviceClusterInstance); } /** @@ -79,7 +100,27 @@ public static void updateServiceInstance(String serviceName, Set getServiceInstance(String serviceName) { - return SERVICE_INSTANCES.getOrDefault(serviceName, Collections.EMPTY_SET); + XdsServiceClusterLoadAssigment serviceClusterInstance = SERVICE_INSTANCES.get(serviceName); + if (serviceClusterInstance == null) { + return Collections.EMPTY_SET; + } + return serviceClusterInstance.getServiceInstance(); + } + + /** + * get ServiceInstance of cluster + * + * @param serviceName service name + * @param clusterName cluster name + * @return ServiceInstance + */ + public static Optional getClusterServiceInstance(String serviceName, + String clusterName) { + XdsServiceClusterLoadAssigment serviceClusterInstance = SERVICE_INSTANCES.get(serviceName); + if (serviceClusterInstance == null) { + return Optional.empty(); + } + return Optional.ofNullable(serviceClusterInstance.getXdsClusterLoadAssigment(clusterName)); } /** @@ -151,13 +192,13 @@ public static Set>> getRequestObs } /** - * get request observer by service name + * get request observer by request key * - * @param serviceName service name + * @param requestKey request key * @return request observer */ - public static StreamObserver getRequestObserver(String serviceName) { - return REQUEST_OBSERVERS.get(serviceName); + public static StreamObserver getRequestObserver(String requestKey) { + return REQUEST_OBSERVERS.get(requestKey); } /** @@ -172,31 +213,139 @@ public static void removeRequestObserver(String serviceName) { /** * update the mapping between service and cluster * - * @param mapping the mapping between service and cluster + * @param clusterMap the map between service and cluster */ - public static void updateServiceNameMapping(Map> mapping) { - if (mapping == null) { - serviceNameMapping = new HashMap<>(); + public static void updateServiceClusterMap(Map clusterMap) { + if (clusterMap == null) { + serviceClusterMap = new HashMap<>(); + return; } - serviceNameMapping = mapping; + serviceClusterMap = clusterMap; } /** * get cluster set for service * - * @param serviceName + * @param serviceName service name * @return cluster set for service */ public static Set getClustersByServiceName(String serviceName) { - return serviceNameMapping.getOrDefault(serviceName, Collections.EMPTY_SET); + XdsServiceCluster xdsServiceCluster = serviceClusterMap.get(serviceName); + if (xdsServiceCluster == null) { + return Collections.EMPTY_SET; + } + return xdsServiceCluster.getClusterResources(); + } + + /** + * get serviceClusterMap + * + * @return serviceClusterMap + */ + public static Map getServiceClusterMap() { + return serviceClusterMap; + } + + /** + * update HttpConnectionManager + * + * @param hcms httpConnectionManager list + */ + public static void updateHttpConnectionManagers(List hcms) { + if (hcms == null) { + httpConnectionManagers = new ArrayList<>(); + return; + } + httpConnectionManagers = hcms; + } + + /** + * get RouteResources + * + * @return RouteConfig names + */ + public static Set getRouteResources() { + return httpConnectionManagers.stream() + .map(XdsHttpConnectionManager::getRouteConfigName) + .collect(Collectors.toSet()); + } + + /** + * update XdsRouteConfiguration + * + * @param configurations XdsRouteConfiguration list + */ + public static void updateRouteConfigurations(List configurations) { + if (configurations == null) { + routeConfigurations = new ArrayList<>(); + return; + } + routeConfigurations = configurations; + } + + /** + * get service route rule + * + * @param serviceName service name + * @return xds route + */ + public static List getServiceRoute(String serviceName) { + for (XdsRouteConfiguration routeConfiguration : routeConfigurations) { + Map virtualHosts = routeConfiguration.getVirtualHosts(); + if (virtualHosts.containsKey(serviceName)) { + return virtualHosts.get(serviceName).getRoutes(); + } + } + return Collections.EMPTY_LIST; + } + + /** + * cluster locality lb policy + * + * @param serviceName service name + * @param clusterName cluster name + * @return boolean + */ + public static boolean isLocalityLb(String serviceName, String clusterName) { + XdsServiceCluster serviceCluster = serviceClusterMap.get(serviceName); + return serviceCluster != null && serviceCluster.isClusterLocalityLb(clusterName); + } + + /** + * cluster lb policy + * + * @param serviceName service name + * @param clusterName cluster name + * @return boolean + */ + public static XdsLbPolicy getLbPolicyOfCluster(String serviceName, String clusterName) { + XdsServiceCluster serviceCluster = serviceClusterMap.get(serviceName); + if (serviceCluster == null) { + return XdsLbPolicy.UNRECOGNIZED; + } + return serviceCluster.getLbPolicyOfCluster(clusterName); + } + + /** + * get service(base cluster) lb policy + * + * @param serviceName service name + * @return boolean + */ + public static XdsLbPolicy getBaseLbPolicyOfService(String serviceName) { + XdsServiceCluster serviceCluster = serviceClusterMap.get(serviceName); + if (serviceCluster == null) { + return XdsLbPolicy.UNRECOGNIZED; + } + return serviceCluster.getBaseLbPolicyOfService(); } /** - * get serviceNameMapping + * getRouteConfigurations * - * @return serviceNameMapping + * @return XdsRouteConfiguration list */ - public static Map> getServiceNameMapping() { - return serviceNameMapping; + public static List getRouteConfigurations() { + return routeConfigurations; } } \ No newline at end of file diff --git a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/discovery/XdsServiceDiscoveryImpl.java b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/discovery/XdsServiceDiscoveryImpl.java index c8fcb252fb..15da0514e7 100644 --- a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/discovery/XdsServiceDiscoveryImpl.java +++ b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/discovery/XdsServiceDiscoveryImpl.java @@ -19,13 +19,13 @@ import io.sermant.core.common.LoggerFactory; import io.sermant.core.service.xds.XdsServiceDiscovery; import io.sermant.core.service.xds.entity.ServiceInstance; +import io.sermant.core.service.xds.entity.XdsClusterLoadAssigment; import io.sermant.core.service.xds.listener.XdsServiceDiscoveryListener; import io.sermant.implement.service.xds.cache.XdsDataCache; -import io.sermant.implement.service.xds.client.XdsClient; -import io.sermant.implement.service.xds.env.XdsConstant; -import io.sermant.implement.service.xds.handler.CdsHandler; import io.sermant.implement.service.xds.handler.EdsHandler; +import io.sermant.implement.service.xds.utils.XdsCommonUtils; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -51,16 +51,14 @@ public class XdsServiceDiscoveryImpl implements XdsServiceDiscovery { /** * constructor * - * @param client xds client + * @param edsHandler eds handler */ - public XdsServiceDiscoveryImpl(XdsClient client) { - CdsHandler cdsHandler = new CdsHandler(client); - edsHandler = new EdsHandler(client); - cdsHandler.subscribe(XdsConstant.CDS_ALL_RESOURCE, null); + public XdsServiceDiscoveryImpl(EdsHandler edsHandler) { + this.edsHandler = edsHandler; } /** - * subscribe service instance by service name, the listener will be triggered when the service instance changes + * get service all instance * * @param serviceName service name * @return service instances @@ -93,6 +91,46 @@ public Set getServiceInstance(String serviceName) { return XdsDataCache.getServiceInstance(serviceName); } + /** + * get service instance of service cluster + * + * @param clusterName cluster name + * @return XdsClusterInstance + */ + @Override + public Optional getClusterServiceInstance(String clusterName) { + Optional serviceNameOptional = XdsCommonUtils.getServiceNameFromCluster(clusterName); + if (!serviceNameOptional.isPresent()) { + return Optional.empty(); + } + + String serviceName = serviceNameOptional.get(); + + // first check the cache and return if service instance exists + if (XdsDataCache.isContainsRequestObserver(serviceName)) { + return XdsDataCache.getClusterServiceInstance(serviceName, clusterName); + } + + // locking ensures that a service only creates one stream + LOCK.lock(); + CountDownLatch countDownLatch = new CountDownLatch(1); + try { + // check the cache again after locking and return if service instance exists + if (XdsDataCache.isContainsRequestObserver(serviceName)) { + return XdsDataCache.getClusterServiceInstance(serviceName, clusterName); + } + edsHandler.subscribe(serviceName, countDownLatch); + } finally { + LOCK.unlock(); + } + try { + countDownLatch.await(TIMEOUT, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.log(Level.WARNING, "Occur InterruptedException when wait server send message.", e); + } + return XdsDataCache.getClusterServiceInstance(serviceName, clusterName); + } + /** * subscribe service instance by service name, the listener will be triggered when the service instance changes * @@ -120,7 +158,7 @@ public void subscribeServiceInstance(String serviceName, XdsServiceDiscoveryList } // subscribe service instance - edsHandler.subscribe(serviceName, null); + edsHandler.subscribe(serviceName); } finally { LOCK.unlock(); } diff --git a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/env/XdsConstant.java b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/env/XdsConstant.java index 2e92dbec75..7d9f3ab574 100644 --- a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/env/XdsConstant.java +++ b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/env/XdsConstant.java @@ -38,6 +38,16 @@ public class XdsConstant { */ public static final String CDS_RESOURCE_TYPE = "type.googleapis.com/envoy.config.cluster.v3.Cluster"; + /** + * lds resource type + */ + public static final String LDS_RESOURCE_TYPE = "type.googleapis.com/envoy.config.listener.v3.Listener"; + + /** + * rds resource type + */ + public static final String RDS_RESOURCE_TYPE = "type.googleapis.com/envoy.config.route.v3.RouteConfiguration"; + /** * sidecar string */ @@ -63,6 +73,16 @@ public class XdsConstant { */ public static final String CDS_ALL_RESOURCE = "CLUSTER_ALL"; + /** + * lds request cache key for subscribe all resource + */ + public static final String LDS_ALL_RESOURCE = "LISTENER_ALL"; + + /** + * rds request cache key for subscribe all route resource + */ + public static final String RDS_ALL_RESOURCE = "ROUTE_CONFIG_ALL"; + /** * the namespace file path of k8s pod */ diff --git a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/handler/CdsHandler.java b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/handler/CdsHandler.java index e2a93e03ab..2e5b075f90 100644 --- a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/handler/CdsHandler.java +++ b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/handler/CdsHandler.java @@ -22,26 +22,21 @@ package io.sermant.implement.service.xds.handler; -import com.google.protobuf.Any; -import com.google.protobuf.InvalidProtocolBufferException; - import io.envoyproxy.envoy.config.cluster.v3.Cluster; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; import io.grpc.stub.StreamObserver; +import io.sermant.core.service.xds.entity.XdsServiceCluster; import io.sermant.implement.service.xds.cache.XdsDataCache; import io.sermant.implement.service.xds.client.XdsClient; import io.sermant.implement.service.xds.env.XdsConstant; -import io.sermant.implement.service.xds.utils.XdsProtocolTransformer; +import io.sermant.implement.service.xds.utils.CdsProtocolTransformer; -import java.util.ArrayList; import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.logging.Level; /** * CdsHandler @@ -49,7 +44,7 @@ * @author daizhenyu * @since 2024-05-13 **/ -public class CdsHandler extends XdsHandler { +public class CdsHandler extends XdsHandler { /** * construction method * @@ -62,19 +57,23 @@ public CdsHandler(XdsClient client) { @Override protected void handleResponse(String requestKey, DiscoveryResponse response) { - Map> oldMapping = XdsDataCache.getServiceNameMapping(); + Map oldServiceClusterMap = XdsDataCache.getServiceClusterMap(); // The contents of the Cds protocol were resolved, // and the mapping between the service name and the cluster was updated - Map> newMapping = XdsProtocolTransformer - .getService2ClusterMapping(decodeResource2Cluster(response)); - XdsDataCache.updateServiceNameMapping(newMapping); + Map newServiceClusterMap = CdsProtocolTransformer + .getServiceClusters(decodeResources(response, Cluster.class)); + XdsDataCache.updateServiceClusterMap(newServiceClusterMap); // send ack StreamObserver requestObserver = XdsDataCache.getRequestObserver(requestKey); requestObserver.onNext(builtAckDiscoveryRequest(response, Collections.EMPTY_SET)); - // Eds is updated based on the new mapping relationship + updateEdsSubscription(oldServiceClusterMap); + } + + private void updateEdsSubscription(Map oldServiceClusterMap) { + // Eds is updated based on the new service and cluster mapping relationship for (Entry> entry : XdsDataCache.getRequestObserversEntry()) { String key = entry.getKey(); if (XdsConstant.CDS_ALL_RESOURCE.equals(key)) { @@ -82,8 +81,16 @@ protected void handleResponse(String requestKey, DiscoveryResponse response) { } // There is no need to resubscribe when the cluster resources corresponding to the service have not changed. - if (newMapping.getOrDefault(key, Collections.EMPTY_SET) - .equals(oldMapping.getOrDefault(key, Collections.EMPTY_SET))) { + Set newServiceClusterResource = XdsDataCache.getClustersByServiceName(key); + Set oldServiceClusterResource; + XdsServiceCluster oldServiceCluster = oldServiceClusterMap.get(key); + if (oldServiceCluster == null) { + oldServiceClusterResource = Collections.EMPTY_SET; + } else { + oldServiceClusterResource = oldServiceCluster.getClusterResources(); + } + if (newServiceClusterResource + .equals(oldServiceClusterResource)) { continue; } StreamObserver requestStreamObserver = entry.getValue(); @@ -92,18 +99,6 @@ protected void handleResponse(String requestKey, DiscoveryResponse response) { } } - private List decodeResource2Cluster(DiscoveryResponse response) { - List clusters = new ArrayList<>(); - for (Any any : response.getResourcesList()) { - try { - clusters.add(any.unpack(Cluster.class)); - } catch (InvalidProtocolBufferException e) { - LOGGER.log(Level.SEVERE, "Decode resource to cluster failed.", e); - } - } - return clusters; - } - @Override public void subscribe(String requestKey, CountDownLatch countDownLatch) { StreamObserver requestStreamObserver = client @@ -111,4 +106,9 @@ public void subscribe(String requestKey, CountDownLatch countDownLatch) { requestStreamObserver.onNext(buildDiscoveryRequest(resourceType, null, null, Collections.EMPTY_SET)); XdsDataCache.updateRequestObserver(requestKey, requestStreamObserver); } + + @Override + public void subscribe(String requestKey) { + subscribe(requestKey, null); + } } \ No newline at end of file diff --git a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/handler/EdsHandler.java b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/handler/EdsHandler.java index cdc79ce7e3..dbc704bff9 100644 --- a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/handler/EdsHandler.java +++ b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/handler/EdsHandler.java @@ -22,26 +22,22 @@ package io.sermant.implement.service.xds.handler; -import com.google.protobuf.Any; -import com.google.protobuf.InvalidProtocolBufferException; - import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; import io.grpc.stub.StreamObserver; import io.sermant.core.service.xds.entity.ServiceInstance; +import io.sermant.core.service.xds.entity.XdsServiceClusterLoadAssigment; import io.sermant.core.service.xds.listener.XdsServiceDiscoveryListener; import io.sermant.core.utils.CollectionUtils; import io.sermant.implement.service.xds.cache.XdsDataCache; import io.sermant.implement.service.xds.client.XdsClient; import io.sermant.implement.service.xds.env.XdsConstant; -import io.sermant.implement.service.xds.utils.XdsProtocolTransformer; +import io.sermant.implement.service.xds.utils.EdsProtocolTransformer; -import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.logging.Level; /** * EdsHandler @@ -49,7 +45,7 @@ * @author daizhenyu * @since 2024-05-13 **/ -public class EdsHandler extends XdsHandler { +public class EdsHandler extends XdsHandler { /** * construction method * @@ -62,22 +58,23 @@ public EdsHandler(XdsClient client) { @Override protected void handleResponse(String requestKey, DiscoveryResponse response) { - // The contents of the Cds protocol were resolved, - // and the mapping between the service name and the cluster was updated - Set newInstances = XdsProtocolTransformer - .getServiceInstances(decodeResource2ClusterLoadAssignment(response)); + // The contents of the eds protocol were resolved, + // and get the new instance of service + XdsServiceClusterLoadAssigment clusterInstance = EdsProtocolTransformer + .getServiceInstances(decodeResources(response, ClusterLoadAssignment.class)); // send ack StreamObserver requestObserver = XdsDataCache.getRequestObserver(requestKey); requestObserver.onNext(builtAckDiscoveryRequest(response, XdsDataCache.getClustersByServiceName(requestKey))); // check whether the service instance has changed - if (!isInstanceChanged(XdsDataCache.getServiceInstance(requestKey), newInstances)) { + Set newInstances = clusterInstance.getServiceInstance(); + Set oldInstances = XdsDataCache.getServiceInstance(requestKey); + XdsDataCache.updateServiceInstance(requestKey, clusterInstance); + if (!isInstanceChanged(oldInstances, newInstances)) { return; } - XdsDataCache.updateServiceInstance(requestKey, newInstances); - // invoke the listener corresponding to service List listeners = XdsDataCache.getServiceDiscoveryListeners(requestKey); for (XdsServiceDiscoveryListener listener : listeners) { @@ -85,18 +82,6 @@ protected void handleResponse(String requestKey, DiscoveryResponse response) { } } - private List decodeResource2ClusterLoadAssignment(DiscoveryResponse response) { - List assignments = new ArrayList<>(); - for (Any any : response.getResourcesList()) { - try { - assignments.add(any.unpack(ClusterLoadAssignment.class)); - } catch (InvalidProtocolBufferException e) { - LOGGER.log(Level.SEVERE, "Decode resource to ClusterLoadAssignment failed.", e); - } - } - return assignments; - } - @Override public void subscribe(String resourceKey, CountDownLatch countDownLatch) { StreamObserver requestStreamObserver = client @@ -106,6 +91,11 @@ public void subscribe(String resourceKey, CountDownLatch countDownLatch) { XdsDataCache.updateRequestObserver(resourceKey, requestStreamObserver); } + @Override + public void subscribe(String requestKey) { + subscribe(requestKey, null); + } + private boolean isInstanceChanged(Set oldInstances, Set newInstances) { if (CollectionUtils.isEmpty(oldInstances) && CollectionUtils.isEmpty(newInstances)) { return false; diff --git a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/handler/LdsHandler.java b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/handler/LdsHandler.java new file mode 100644 index 0000000000..577ab39c57 --- /dev/null +++ b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/handler/LdsHandler.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Based on dubbo-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/impl/LdsProtocol.java + * from the Apache dubbo project. + */ + +package io.sermant.implement.service.xds.handler; + +import io.envoyproxy.envoy.config.listener.v3.Listener; +import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; +import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; +import io.grpc.stub.StreamObserver; +import io.sermant.implement.service.xds.cache.XdsDataCache; +import io.sermant.implement.service.xds.client.XdsClient; +import io.sermant.implement.service.xds.env.XdsConstant; +import io.sermant.implement.service.xds.utils.LdsProtocolTransformer; + +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.CountDownLatch; + +/** + * LdsHandler + * + * @author daizhenyu + * @since 2024-07-30 + **/ +public class LdsHandler extends XdsHandler { + /** + * construction method + * + * @param client xds client + */ + public LdsHandler(XdsClient client) { + super(client); + this.resourceType = XdsConstant.LDS_RESOURCE_TYPE; + } + + @Override + protected void handleResponse(String requestKey, DiscoveryResponse response) { + Set oldRouteResources = XdsDataCache.getRouteResources(); + XdsDataCache.updateHttpConnectionManagers(LdsProtocolTransformer + .getHttpConnectionManager(decodeResources(response, Listener.class))); + + // send ack + StreamObserver requestObserver = XdsDataCache.getRequestObserver(requestKey); + requestObserver.onNext(builtAckDiscoveryRequest(response, Collections.EMPTY_SET)); + + updateRdsSubscription(oldRouteResources); + } + + private void updateRdsSubscription(Set oldRouteResources) { + // check if the route resources have changed + Set newRouteResources = XdsDataCache.getRouteResources(); + if (oldRouteResources.equals(newRouteResources)) { + return; + } + StreamObserver rdsRequestObserver = XdsDataCache + .getRequestObserver(XdsConstant.RDS_ALL_RESOURCE); + rdsRequestObserver.onNext(buildDiscoveryRequest(XdsConstant.RDS_RESOURCE_TYPE, null, null, newRouteResources)); + } + + @Override + public void subscribe(String requestKey, CountDownLatch countDownLatch) { + StreamObserver requestStreamObserver = client + .getDiscoveryRequestObserver(getResponseStreamObserver(requestKey, countDownLatch)); + requestStreamObserver.onNext(buildDiscoveryRequest(resourceType, null, null, Collections.EMPTY_SET)); + XdsDataCache.updateRequestObserver(requestKey, requestStreamObserver); + } + + @Override + public void subscribe(String requestKey) { + subscribe(requestKey, null); + } +} diff --git a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/handler/RdsHandler.java b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/handler/RdsHandler.java new file mode 100644 index 0000000000..7bcd6645f9 --- /dev/null +++ b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/handler/RdsHandler.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Based on dubbo-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/impl/RdsProtocol.java + * from the Apache dubbo project. + */ + +package io.sermant.implement.service.xds.handler; + +import io.envoyproxy.envoy.config.route.v3.RouteConfiguration; +import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; +import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; +import io.grpc.stub.StreamObserver; +import io.sermant.core.service.xds.entity.XdsRouteConfiguration; +import io.sermant.implement.service.xds.cache.XdsDataCache; +import io.sermant.implement.service.xds.client.XdsClient; +import io.sermant.implement.service.xds.env.XdsConstant; +import io.sermant.implement.service.xds.utils.RdsProtocolTransformer; + +import java.util.List; +import java.util.concurrent.CountDownLatch; + +/** + * RdsHandler + * + * @author daizhenyu + * @since 2024-07-30 + **/ +public class RdsHandler extends XdsHandler { + /** + * construction method + * + * @param client xds client + */ + public RdsHandler(XdsClient client) { + super(client); + this.resourceType = XdsConstant.RDS_RESOURCE_TYPE; + } + + @Override + protected void handleResponse(String requestKey, DiscoveryResponse response) { + List routeConfigurations = RdsProtocolTransformer + .getRouteConfigurations(decodeResources(response, RouteConfiguration.class)); + XdsDataCache.updateRouteConfigurations(routeConfigurations); + + // send ack + StreamObserver requestObserver = XdsDataCache.getRequestObserver(requestKey); + requestObserver.onNext(builtAckDiscoveryRequest(response, XdsDataCache.getRouteResources())); + } + + @Override + public void subscribe(String requestKey, CountDownLatch countDownLatch) { + StreamObserver requestStreamObserver = client + .getDiscoveryRequestObserver(getResponseStreamObserver(requestKey, countDownLatch)); + requestStreamObserver.onNext(buildDiscoveryRequest(resourceType, null, null, XdsDataCache.getRouteResources())); + XdsDataCache.updateRequestObserver(requestKey, requestStreamObserver); + } + + @Override + public void subscribe(String requestKey) { + subscribe(requestKey, null); + } +} diff --git a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/handler/XdsHandler.java b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/handler/XdsHandler.java index 15a6f88dac..70b4a46bc5 100644 --- a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/handler/XdsHandler.java +++ b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/handler/XdsHandler.java @@ -22,6 +22,9 @@ package io.sermant.implement.service.xds.handler; +import com.google.protobuf.Any; +import com.google.protobuf.InvalidProtocolBufferException; + import io.envoyproxy.envoy.config.core.v3.Node; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; @@ -33,6 +36,8 @@ import io.sermant.implement.service.xds.client.XdsClient; import io.sermant.implement.service.xds.env.XdsConstant; +import java.util.ArrayList; +import java.util.List; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CountDownLatch; @@ -46,10 +51,11 @@ /** * abstract xds protocol handler * + * @param xds resource type * @author daizhenyu * @since 2024-05-13 **/ -public abstract class XdsHandler implements XdsServiceAction { +public abstract class XdsHandler implements XdsServiceAction { protected static final Logger LOGGER = LoggerFactory.getLogger(); private static final int DELAY_TIME = 3000; @@ -104,6 +110,29 @@ protected DiscoveryRequest buildDiscoveryRequest(String type, String version, St return builder.build(); } + /** + * decode xds resource + * + * @param response response + * @param clazz resource type + * @return decoded xds resource + */ + protected List decodeResources(DiscoveryResponse response, Class clazz) { + List resources = new ArrayList<>(); + for (Any any : response.getResourcesList()) { + try { + if (any == null) { + continue; + } + resources.add((T) any.unpack(clazz)); + } catch (InvalidProtocolBufferException e) { + LOGGER.log(Level.SEVERE, "Decode {0} resource failed, the error message is {1}", + new Object[]{clazz.getSimpleName(), e}); + } + } + return resources; + } + /** * built DiscoveryRequest for ack * diff --git a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/handler/XdsServiceAction.java b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/handler/XdsServiceAction.java index f090a990f6..da9186b469 100644 --- a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/handler/XdsServiceAction.java +++ b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/handler/XdsServiceAction.java @@ -26,10 +26,17 @@ **/ public interface XdsServiceAction { /** - * subscribe + * when countDownLatch is null, it is an asynchronous subscription; otherwise, it is a synchronous subscription * * @param requestKey request key to get the xds data from cache * @param countDownLatch Used to notify the xds requesting thread to obtain data */ void subscribe(String requestKey, CountDownLatch countDownLatch); + + /** + * async subscribe + * + * @param requestKey request key to get the xds data from cache + */ + void subscribe(String requestKey); } diff --git a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/loadbalance/XdsLoadBalanceServiceImpl.java b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/loadbalance/XdsLoadBalanceServiceImpl.java new file mode 100644 index 0000000000..61fe6817ff --- /dev/null +++ b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/loadbalance/XdsLoadBalanceServiceImpl.java @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.implement.service.xds.loadbalance; + +import io.sermant.core.service.xds.XdsLoadBalanceService; +import io.sermant.core.service.xds.entity.XdsLbPolicy; +import io.sermant.implement.service.xds.cache.XdsDataCache; +import io.sermant.implement.service.xds.utils.XdsCommonUtils; + +import java.util.Optional; + +/** + * XdsLoadBalanceService impl + * + * @author daizhenyu + * @since 2024-08-22 + **/ +public class XdsLoadBalanceServiceImpl implements XdsLoadBalanceService { + /** + * constructor + */ + public XdsLoadBalanceServiceImpl() { + } + + @Override + public XdsLbPolicy getLbPolicyOfCluster(String clusterName) { + Optional serviceName = XdsCommonUtils.getServiceNameFromCluster(clusterName); + if (serviceName.isPresent()) { + return XdsDataCache.getLbPolicyOfCluster(serviceName.get(), clusterName); + } + return XdsLbPolicy.UNRECOGNIZED; + } + + @Override + public XdsLbPolicy getBaseLbPolicyOfService(String serviceName) { + return XdsDataCache.getBaseLbPolicyOfService(serviceName); + } +} diff --git a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/route/XdsRouteServiceImpl.java b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/route/XdsRouteServiceImpl.java new file mode 100644 index 0000000000..42be2fd3b5 --- /dev/null +++ b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/route/XdsRouteServiceImpl.java @@ -0,0 +1,50 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.implement.service.xds.route; + +import io.sermant.core.service.xds.XdsRouteService; +import io.sermant.core.service.xds.entity.XdsRoute; +import io.sermant.implement.service.xds.cache.XdsDataCache; +import io.sermant.implement.service.xds.utils.XdsCommonUtils; + +import java.util.List; + +/** + * XdsRouteService impl + * + * @author daizhenyu + * @since 2024-05-08 + **/ +public class XdsRouteServiceImpl implements XdsRouteService { + /** + * constructor + */ + public XdsRouteServiceImpl() { + } + + @Override + public List getServiceRoute(String serviceName) { + return XdsDataCache.getServiceRoute(serviceName); + } + + @Override + public boolean isLocalityRoute(String clusterName) { + return XdsCommonUtils.getServiceNameFromCluster(clusterName) + .map(serviceName -> XdsDataCache.isLocalityLb(serviceName, clusterName)) + .orElse(false); + } +} \ No newline at end of file diff --git a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/utils/XdsProtocolTransformer.java b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/utils/EdsProtocolTransformer.java similarity index 56% rename from sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/utils/XdsProtocolTransformer.java rename to sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/utils/EdsProtocolTransformer.java index 5a9addc5c1..a8450a6af8 100644 --- a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/utils/XdsProtocolTransformer.java +++ b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/utils/EdsProtocolTransformer.java @@ -16,7 +16,6 @@ package io.sermant.implement.service.xds.utils; -import io.envoyproxy.envoy.config.cluster.v3.Cluster; import io.envoyproxy.envoy.config.core.v3.HealthStatus; import io.envoyproxy.envoy.config.core.v3.Locality; import io.envoyproxy.envoy.config.core.v3.SocketAddress; @@ -24,96 +23,85 @@ import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint; import io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints; import io.sermant.core.service.xds.entity.ServiceInstance; +import io.sermant.core.service.xds.entity.XdsClusterLoadAssigment; +import io.sermant.core.service.xds.entity.XdsLocality; +import io.sermant.core.service.xds.entity.XdsServiceClusterLoadAssigment; import io.sermant.core.utils.CollectionUtils; -import io.sermant.core.utils.StringUtils; import io.sermant.implement.service.xds.entity.XdsServiceInstance; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; /** - * Convert xDS protocol data to Sermant data model + * Convert eds protocol data to Sermant data model * * @author daizhenyu * @since 2024-05-10 **/ -public class XdsProtocolTransformer { - private static final int SERVICE_HOST_INDEX = 3; +public class EdsProtocolTransformer { + private static final int CLUSTER_SPLIT_LENGTH = 4; - private static final int SERVICE_NAME_INDEX = 0; + private static final int CLUSTER_SUBSET_INDEX = 2; - private static final int EXPECT_LENGTH = 4; + private static final String VERTICAL_LINE_SEPARATOR = "\\|"; - private XdsProtocolTransformer() { + private EdsProtocolTransformer() { } /** - * get the mapping between service name of k8s and cluster of istio - * - * @param clusters clusters - * @return mapping - */ - public static Map> getService2ClusterMapping(List clusters) { - Map> nameMapping = new HashMap<>(); - for (Cluster cluster : clusters) { - if (cluster == null) { - continue; - } - Optional serviceNameFromCluster = getServiceNameFromCluster(cluster.getName()); - if (!serviceNameFromCluster.isPresent()) { - continue; - } - String serviceName = serviceNameFromCluster.get(); - nameMapping.computeIfAbsent(serviceName, key -> new HashSet<>()).add(cluster.getName()); - } - return nameMapping; - } - - /** - * get the instance of one service by xds protocol + * get service instances by xds protocol * * @param loadAssignments eds data - * @return instance of service + * @return instances of service */ - public static Set getServiceInstances( + public static XdsServiceClusterLoadAssigment getServiceInstances( List loadAssignments) { - return loadAssignments.stream() + Map clusterLoadAssigmentMap = loadAssignments.stream() .filter(Objects::nonNull) - .flatMap(loadAssignment -> getServiceInstancesFromLoadAssignment(loadAssignment).stream()) - .collect(Collectors.toSet()); + .map(EdsProtocolTransformer::parseClusterLoadAssignment) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toMap(XdsClusterLoadAssigment::getClusterName, + clusterInstance -> clusterInstance)); + return new XdsServiceClusterLoadAssigment(clusterLoadAssigmentMap, + getServiceBaseClusterName(clusterLoadAssigmentMap)); } - private static Set getServiceInstancesFromLoadAssignment(ClusterLoadAssignment loadAssignment) { + private static Optional parseClusterLoadAssignment(ClusterLoadAssignment loadAssignment) { String clusterName = loadAssignment.getClusterName(); - Optional serviceNameOptional = getServiceNameFromCluster(clusterName); + Optional serviceNameOptional = XdsCommonUtils.getServiceNameFromCluster(clusterName); if (!serviceNameOptional.isPresent()) { - return Collections.EMPTY_SET; + return Optional.empty(); } String serviceName = serviceNameOptional.get(); - return processClusterLoadAssignment(loadAssignment, serviceName, clusterName); + return Optional.of(new XdsClusterLoadAssigment(serviceName, clusterName, + parseLocalityLbEndpointsList(loadAssignment, serviceName, clusterName))); } - private static Set processClusterLoadAssignment(ClusterLoadAssignment loadAssignment, + private static Map> parseLocalityLbEndpointsList( + ClusterLoadAssignment loadAssignment, String serviceName, String clusterName) { List localityLbEndpointList = loadAssignment.getEndpointsList(); if (CollectionUtils.isEmpty(localityLbEndpointList)) { - return Collections.EMPTY_SET; + return Collections.EMPTY_MAP; } return localityLbEndpointList.stream() .filter(Objects::nonNull) - .flatMap(localityLbEndpoints -> processLocalityLbEndpoints(localityLbEndpoints, serviceName, - clusterName).stream()) - .collect(Collectors.toSet()); + .collect(Collectors.toMap( + localityLbEndpoints -> + parseLocality(localityLbEndpoints), + localityLbEndpoints -> parseLocalityLbEndpoints(localityLbEndpoints, serviceName, clusterName) + )); } - private static Set processLocalityLbEndpoints(LocalityLbEndpoints localityLbEndpoints, + private static Set parseLocalityLbEndpoints(LocalityLbEndpoints localityLbEndpoints, String serviceName, String clusterName) { List lbEndpointsList = localityLbEndpoints.getLbEndpointsList(); if (CollectionUtils.isEmpty(lbEndpointsList)) { @@ -121,7 +109,7 @@ private static Set processLocalityLbEndpoints(LocalityLbEndpoin } return lbEndpointsList.stream() .filter(Objects::nonNull) - .map(lbEndpoint -> transformEndpoint2Instance(lbEndpoint, serviceName, clusterName, + .map(lbEndpoint -> parseLbEndpoint(lbEndpoint, serviceName, clusterName, getInitializedMetadata(localityLbEndpoints))) .collect(Collectors.toSet()); } @@ -137,7 +125,7 @@ private static Map getInitializedMetadata(LocalityLbEndpoints lo return metadata; } - private static ServiceInstance transformEndpoint2Instance(LbEndpoint endpoint, String serviceName, + private static ServiceInstance parseLbEndpoint(LbEndpoint endpoint, String serviceName, String clusterName, Map metadata) { XdsServiceInstance instance = new XdsServiceInstance(); SocketAddress socketAddress = endpoint.getEndpoint().getAddress().getSocketAddress(); @@ -158,16 +146,20 @@ private static ServiceInstance transformEndpoint2Instance(LbEndpoint endpoint, S return instance; } - private static Optional getServiceNameFromCluster(String clusterName) { - if (StringUtils.isEmpty(clusterName)) { - return Optional.empty(); + private static String getServiceBaseClusterName(Map instanceMap) { + for (Entry instanceEntry : instanceMap.entrySet()) { + String cluster = instanceEntry.getKey(); + String[] splitCluster = cluster.split(VERTICAL_LINE_SEPARATOR); + if (splitCluster.length == CLUSTER_SPLIT_LENGTH && splitCluster[CLUSTER_SUBSET_INDEX].equals("")) { + return cluster; + } } + return ""; + } - // cluster name format: "outbound|8080||xds-service.default.svc.cluster.local", xds-service is service name - String[] clusterSplit = clusterName.split("\\|"); - if (clusterSplit.length != EXPECT_LENGTH) { - return Optional.empty(); - } - return Optional.of(clusterSplit[SERVICE_HOST_INDEX].split("\\.")[SERVICE_NAME_INDEX]); + private static XdsLocality parseLocality(LocalityLbEndpoints localityLbEndpoints) { + Locality locality = localityLbEndpoints.getLocality(); + return new XdsLocality(locality.getRegion(), locality.getZone(), locality.getSubZone(), + localityLbEndpoints.getLoadBalancingWeight().getValue(), localityLbEndpoints.getPriority()); } } diff --git a/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/CommonDataGenerator.java b/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/CommonDataGenerator.java new file mode 100644 index 0000000000..3519804254 --- /dev/null +++ b/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/CommonDataGenerator.java @@ -0,0 +1,113 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.implement.service.xds; + +import io.sermant.core.service.xds.entity.ServiceInstance; +import io.sermant.core.service.xds.entity.XdsCluster; +import io.sermant.core.service.xds.entity.XdsClusterLoadAssigment; +import io.sermant.core.service.xds.entity.XdsHttpConnectionManager; +import io.sermant.core.service.xds.entity.XdsLbPolicy; +import io.sermant.core.service.xds.entity.XdsLocality; +import io.sermant.core.service.xds.entity.XdsRoute; +import io.sermant.core.service.xds.entity.XdsRouteConfiguration; +import io.sermant.core.service.xds.entity.XdsServiceCluster; +import io.sermant.core.service.xds.entity.XdsServiceClusterLoadAssigment; +import io.sermant.core.service.xds.entity.XdsVirtualHost; +import io.sermant.implement.service.xds.entity.XdsServiceInstance; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * @author daizhenyu + * @since 2024-08-26 + **/ +public class CommonDataGenerator { + public static List createRouteConfigurations() { + XdsRouteConfiguration routeConfiguration = new XdsRouteConfiguration(); + XdsVirtualHost virtualHost1 = new XdsVirtualHost(); + virtualHost1.setName("serviceA.name"); + XdsVirtualHost virtualHost2 = new XdsVirtualHost(); + virtualHost2.setName("serviceB.name"); + + XdsRoute route = new XdsRoute(); + route.setName("test-route"); + virtualHost1.setRoutes(Arrays.asList(route)); + virtualHost2.setRoutes(Collections.emptyList()); + Map virtualHosts = new HashMap<>(); + virtualHosts.put("serviceA", virtualHost1); + virtualHosts.put("serviceB", virtualHost2); + routeConfiguration.setVirtualHosts(virtualHosts); + return Arrays.asList(routeConfiguration); + } + + public static Map createServiceClusterMap(String serviceName, String clusterName) { + XdsCluster cluster = new XdsCluster(); + cluster.setClusterName(clusterName); + cluster.setLocalityLb(true); + cluster.setLbPolicy(XdsLbPolicy.RANDOM); + Map clusters = new HashMap<>(); + clusters.put(clusterName, cluster); + + XdsServiceCluster serviceCluster = new XdsServiceCluster(); + serviceCluster.setClusters(clusters); + serviceCluster.setBaseClusterName(clusterName); + + Map serviceClusterMap = new HashMap<>(); + serviceClusterMap.put(serviceName, serviceCluster); + return serviceClusterMap; + } + + public static XdsServiceClusterLoadAssigment createXdsServiceClusterInstance(List clusterNames, + String baseClusterName) { + Map clusterInstances = new HashMap<>(); + for (String clusterName : clusterNames) { + Set instances = new HashSet<>(); + instances.add(new XdsServiceInstance()); + + XdsLocality locality = new XdsLocality(); + Map> localityInstances = new HashMap<>(); + localityInstances.put(locality, instances); + + XdsClusterLoadAssigment clusterInstance = new XdsClusterLoadAssigment(); + clusterInstance.setClusterName(clusterName); + clusterInstance.setLocalityInstances(localityInstances); + clusterInstances.put(clusterName, clusterInstance); + } + + XdsServiceClusterLoadAssigment serviceClusterInstance = new XdsServiceClusterLoadAssigment(); + serviceClusterInstance.setBaseClusterName(baseClusterName); + serviceClusterInstance.setClusterLoadAssigments(clusterInstances); + return serviceClusterInstance; + } + + public static List createHttpConnectionManagers(List routerConfigs) { + List managers = new ArrayList<>(); + for (String routerConfig : routerConfigs) { + XdsHttpConnectionManager manager = new XdsHttpConnectionManager(); + manager.setRouteConfigName("route-test"); + managers.add(manager); + } + return managers; + } +} diff --git a/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/cache/XdsDataCacheTest.java b/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/cache/XdsDataCacheTest.java index 01871f3d56..4048c2caa1 100644 --- a/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/cache/XdsDataCacheTest.java +++ b/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/cache/XdsDataCacheTest.java @@ -19,7 +19,11 @@ import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; import io.grpc.stub.StreamObserver; import io.sermant.core.service.xds.entity.ServiceInstance; +import io.sermant.core.service.xds.entity.XdsClusterLoadAssigment; +import io.sermant.core.service.xds.entity.XdsLbPolicy; +import io.sermant.core.service.xds.entity.XdsLocality; import io.sermant.core.service.xds.listener.XdsServiceDiscoveryListener; +import io.sermant.implement.service.xds.CommonDataGenerator; import io.sermant.implement.service.xds.entity.XdsServiceInstance; import io.sermant.implement.service.xds.handler.StreamObserverRequestImpl; import io.sermant.implement.service.xds.handler.XdsServiceDiscoveryListenerImpl; @@ -27,10 +31,14 @@ import org.junit.Assert; import org.junit.Test; +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; /** @@ -40,16 +48,28 @@ * @since 2024-05-24 **/ public class XdsDataCacheTest { + private static String BASE_CLUSTER_NAME = "outbound|8080||serviceA.default.svc.cluster.local"; + @Test public void testServiceInstance() { XdsServiceInstance instance = new XdsServiceInstance(); Set instanceSet = new HashSet<>(); instanceSet.add(instance); - XdsDataCache.updateServiceInstance("serviceA", instanceSet); + XdsDataCache.updateServiceInstance("serviceA", + CommonDataGenerator + .createXdsServiceClusterInstance(Arrays.asList(BASE_CLUSTER_NAME), BASE_CLUSTER_NAME)); Set serviceA = XdsDataCache.getServiceInstance("serviceA"); Set serviceB = XdsDataCache.getServiceInstance("serviceB"); Assert.assertEquals(1, serviceA.size()); Assert.assertEquals(0, serviceB.size()); + + Optional clusterServiceInstance = XdsDataCache + .getClusterServiceInstance("serviceA", BASE_CLUSTER_NAME); + Assert.assertTrue(clusterServiceInstance.isPresent()); + Map> localityInstances = clusterServiceInstance.get().getLocalityInstances(); + Assert.assertEquals(1, localityInstances.size()); + Assert.assertEquals(1, localityInstances.get(new XdsLocality()).size()); + XdsDataCache.removeServiceInstance("serviceA"); serviceA = XdsDataCache.getServiceInstance("serviceA"); Assert.assertEquals(0, serviceA.size()); @@ -66,6 +86,7 @@ public void testServiceListener() { List listenerB = XdsDataCache.getServiceDiscoveryListeners("serviceB"); Assert.assertEquals(1, listenerA.size()); Assert.assertEquals(0, listenerB.size()); + XdsDataCache.removeServiceDiscoveryListeners("serviceA"); listenerA = XdsDataCache.getServiceDiscoveryListeners("serviceA"); Assert.assertEquals(0, listenerA.size()); @@ -76,33 +97,73 @@ public void testRequestObserver() { StreamObserver requestObserver = new StreamObserverRequestImpl(); XdsDataCache.updateRequestObserver("serviceA", requestObserver); Assert.assertTrue(XdsDataCache.isContainsRequestObserver("serviceA")); + Assert.assertNotNull(XdsDataCache.getRequestObserver("serviceA")); + + XdsDataCache.removeRequestObserver("serviceA"); + Set>> requestObserversEntry = XdsDataCache + .getRequestObserversEntry(); + Assert.assertEquals(0, requestObserversEntry.size()); } @Test - public void testGetClustersByServiceName() { - Map> mapping = new HashMap<>(); - Set clusters = new HashSet<>(); - clusters.add("cluster"); - mapping.put("serviceA", clusters); + public void testServiceClusterMap() { Set result; - // serviceNameMapping is empty - XdsDataCache.updateServiceNameMapping(new HashMap<>()); + // serviceClusterMap is empty + XdsDataCache.updateServiceClusterMap(new HashMap<>()); result = XdsDataCache.getClustersByServiceName("serviceA"); Assert.assertNotNull(result); Assert.assertEquals(0, result.size()); - // serviceNameMapping is not empty, get un cached service - XdsDataCache.updateServiceNameMapping(mapping); + // serviceClusterMap is not empty, get un cached service + XdsDataCache.updateServiceClusterMap(CommonDataGenerator + .createServiceClusterMap("serviceA", BASE_CLUSTER_NAME)); result = XdsDataCache.getClustersByServiceName("serviceB"); Assert.assertNotNull(result); Assert.assertEquals(0, result.size()); - // serviceNameMapping is not null, get cached service - XdsDataCache.updateServiceNameMapping(mapping); + // serviceClusterMap is not null, get cached service result = XdsDataCache.getClustersByServiceName("serviceA"); Assert.assertNotNull(result); Assert.assertEquals(1, result.size()); - Assert.assertTrue(result.contains("cluster")); + Assert.assertTrue(result.contains(BASE_CLUSTER_NAME)); + + // test lb + Assert.assertEquals(XdsLbPolicy.RANDOM, XdsDataCache.getBaseLbPolicyOfService("serviceA")); + Assert.assertEquals(XdsLbPolicy.RANDOM, XdsDataCache.getLbPolicyOfCluster("serviceA", BASE_CLUSTER_NAME)); + Assert.assertEquals(true, XdsDataCache.isLocalityLb("serviceA", BASE_CLUSTER_NAME)); + + // clear data + XdsDataCache.updateServiceClusterMap(new HashMap<>()); + } + + @Test + public void testHttpConnectionManagers() { + XdsDataCache.updateHttpConnectionManagers( + CommonDataGenerator.createHttpConnectionManagers(Arrays.asList("route-test"))); + Set routeResources = XdsDataCache.getRouteResources(); + Assert.assertEquals(1, routeResources.size()); + Assert.assertTrue(routeResources.contains("route-test")); + + // clear data + XdsDataCache.updateHttpConnectionManagers(new ArrayList<>()); + } + + @Test + public void testRouteConfigurations() { + XdsDataCache.updateRouteConfigurations(CommonDataGenerator.createRouteConfigurations()); + + // get service route not in cache + Assert.assertEquals(0, XdsDataCache.getServiceRoute("serviceC").size()); + + // get service route in cache, but route is empty + Assert.assertEquals(0, XdsDataCache.getServiceRoute("serviceB").size()); + + // get service route in cache + Assert.assertEquals(1, XdsDataCache.getServiceRoute("serviceA").size()); + Assert.assertEquals("test-route", XdsDataCache.getServiceRoute("serviceA").get(0).getName()); + + // clear data + XdsDataCache.updateRouteConfigurations(new ArrayList<>()); } } \ No newline at end of file diff --git a/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/discovery/XdsServiceDiscoveryImplTest.java b/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/discovery/XdsServiceDiscoveryImplTest.java index 10b1b807d4..c5f5fd8620 100644 --- a/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/discovery/XdsServiceDiscoveryImplTest.java +++ b/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/discovery/XdsServiceDiscoveryImplTest.java @@ -17,18 +17,20 @@ package io.sermant.implement.service.xds.discovery; import io.sermant.core.service.xds.entity.ServiceInstance; +import io.sermant.core.service.xds.entity.XdsClusterLoadAssigment; import io.sermant.implement.service.xds.BaseXdsTest; +import io.sermant.implement.service.xds.CommonDataGenerator; import io.sermant.implement.service.xds.cache.XdsDataCache; -import io.sermant.implement.service.xds.entity.XdsServiceInstance; +import io.sermant.implement.service.xds.handler.EdsHandler; import io.sermant.implement.service.xds.handler.XdsServiceDiscoveryListenerImpl; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import org.mockito.Mockito; -import java.util.HashSet; +import java.util.Arrays; +import java.util.Optional; import java.util.Set; /** @@ -40,11 +42,14 @@ public class XdsServiceDiscoveryImplTest extends BaseXdsTest { private static String serviceName = "serviceA"; + private static String BASE_CLUSTER_NAME = "outbound|8080||serviceA.default.svc.cluster.local"; + private static XdsServiceDiscoveryImpl xdsServiceDiscovery; @BeforeClass public static void setUp() { - xdsServiceDiscovery = new XdsServiceDiscoveryImpl(client); + EdsHandler edsHandler = new EdsHandler(client); + xdsServiceDiscovery = new XdsServiceDiscoveryImpl(edsHandler); } @AfterClass @@ -55,7 +60,7 @@ public static void tearDown() { } @Test - public void getServiceInstance() { + public void testGetServiceInstance() { // clear data XdsDataCache.removeServiceInstance(serviceName); XdsDataCache.removeRequestObserver(serviceName); @@ -63,17 +68,17 @@ public void getServiceInstance() { // no service instance in cache Set result = xdsServiceDiscovery.getServiceInstance(serviceName); Assert.assertNotNull(XdsDataCache.getRequestObserver(serviceName)); + Assert.assertEquals(0, result.size()); // service instance in cache - Set instances = new HashSet<>(); - instances.add(new XdsServiceInstance()); - XdsDataCache.updateServiceInstance(serviceName, instances); + XdsDataCache.updateServiceInstance(serviceName, CommonDataGenerator + .createXdsServiceClusterInstance(Arrays.asList(BASE_CLUSTER_NAME), BASE_CLUSTER_NAME)); result = xdsServiceDiscovery.getServiceInstance(serviceName); Assert.assertEquals(1, result.size()); } @Test - public void subscribeServiceInstance() { + public void testSubscribeServiceInstance() { // clear data XdsDataCache.removeServiceInstance(serviceName); XdsDataCache.removeServiceDiscoveryListeners(serviceName); @@ -86,10 +91,30 @@ public void subscribeServiceInstance() { Assert.assertEquals(0, xdsServiceDiscoveryListener.getCount()); // service instance in cache - Set instances = new HashSet<>(); - instances.add(new XdsServiceInstance()); - XdsDataCache.updateServiceInstance(serviceName, instances); + XdsDataCache.updateServiceInstance(serviceName, CommonDataGenerator + .createXdsServiceClusterInstance(Arrays.asList(BASE_CLUSTER_NAME), BASE_CLUSTER_NAME)); xdsServiceDiscovery.subscribeServiceInstance(serviceName, xdsServiceDiscoveryListener); Assert.assertEquals(1, xdsServiceDiscoveryListener.getCount()); } + + @Test + public void testGetClusterServiceInstance() { + // clear data + XdsDataCache.removeServiceInstance(serviceName); + XdsDataCache.removeRequestObserver(serviceName); + + // no cluster service instance in cache + XdsServiceDiscoveryListenerImpl xdsServiceDiscoveryListener = new XdsServiceDiscoveryListenerImpl(); + Optional clusterServiceInstance = xdsServiceDiscovery + .getClusterServiceInstance(BASE_CLUSTER_NAME); + Assert.assertFalse(clusterServiceInstance.isPresent()); + + // service instance in cache + XdsDataCache.updateServiceInstance(serviceName, CommonDataGenerator + .createXdsServiceClusterInstance(Arrays.asList(BASE_CLUSTER_NAME), BASE_CLUSTER_NAME)); + clusterServiceInstance = xdsServiceDiscovery + .getClusterServiceInstance(BASE_CLUSTER_NAME); + Assert.assertTrue(clusterServiceInstance.isPresent()); + Assert.assertEquals(BASE_CLUSTER_NAME, clusterServiceInstance.get().getClusterName()); + } } \ No newline at end of file diff --git a/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/handler/CdsXdsTest.java b/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/handler/CdsHandlerTest.java similarity index 92% rename from sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/handler/CdsXdsTest.java rename to sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/handler/CdsHandlerTest.java index c761e6a4f4..ea4a49ebdf 100644 --- a/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/handler/CdsXdsTest.java +++ b/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/handler/CdsHandlerTest.java @@ -32,6 +32,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Set; @@ -39,7 +40,7 @@ * @author daizhenyu * @since 2024-05-24 **/ -public class CdsXdsTest extends BaseXdsTest { +public class CdsHandlerTest extends BaseXdsTest { private static CdsHandler handler; private static String serviceName = "serviceA"; @@ -49,8 +50,6 @@ public static void setUp() { handler = new CdsHandler(client); Mockito.doReturn(requestStreamObserver).when(client).getDiscoveryRequestObserver(handler .getResponseStreamObserver(XdsConstant.CDS_ALL_RESOURCE, null)); - - handler.subscribe(XdsConstant.CDS_ALL_RESOURCE, null); XdsDataCache.updateRequestObserver(serviceName, requestStreamObserver); } @@ -58,14 +57,18 @@ public static void setUp() { public static void tearDown() { Mockito.clearAllCaches(); XdsDataCache.removeRequestObserver(serviceName); + XdsDataCache.removeRequestObserver(XdsConstant.CDS_ALL_RESOURCE); + XdsDataCache.updateServiceClusterMap(new HashMap<>()); } @Test public void testHandleResponse() { + handler.subscribe(XdsConstant.CDS_ALL_RESOURCE, null); + // cluster is empty handler.handleResponse(XdsConstant.CDS_ALL_RESOURCE, buildDiscoveryResponse(new ArrayList<>())); - Assert.assertEquals(0, XdsDataCache.getServiceNameMapping().size()); + Assert.assertEquals(0, XdsDataCache.getServiceClusterMap().size()); // service with one cluster handler.handleResponse(XdsConstant.CDS_ALL_RESOURCE, diff --git a/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/handler/EdsXdsTest.java b/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/handler/EdsHandlerTest.java similarity index 98% rename from sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/handler/EdsXdsTest.java rename to sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/handler/EdsHandlerTest.java index a41c9215a3..318fca0cbc 100644 --- a/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/handler/EdsXdsTest.java +++ b/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/handler/EdsHandlerTest.java @@ -29,7 +29,6 @@ import io.sermant.core.service.xds.listener.XdsServiceDiscoveryListener; import io.sermant.implement.service.xds.BaseXdsTest; import io.sermant.implement.service.xds.cache.XdsDataCache; -import io.sermant.implement.service.xds.env.XdsConstant; import org.junit.AfterClass; import org.junit.Assert; @@ -48,7 +47,7 @@ * @author daizhenyu * @since 2024-05-25 **/ -public class EdsXdsTest extends BaseXdsTest { +public class EdsHandlerTest extends BaseXdsTest { private static String serviceName = "serviceA"; private String clusterName = "outbound|8080||serviceA.default.svc.cluster.local"; @@ -60,8 +59,6 @@ public static void setUp() throws Exception { handler = new EdsHandler(client); Mockito.doReturn(requestStreamObserver).when(client) .getDiscoveryRequestObserver(handler.getResponseStreamObserver(serviceName, null)); - - handler.subscribe(serviceName, null); XdsDataCache.addServiceDiscoveryListener(serviceName, new XdsServiceDiscoveryListenerImpl()); } @@ -70,10 +67,13 @@ public static void tearDown() throws Exception { Mockito.clearAllCaches(); XdsDataCache.removeRequestObserver(serviceName); XdsDataCache.removeServiceDiscoveryListeners(serviceName); + XdsDataCache.removeServiceInstance(serviceName); } @Test public void testHandleResponse() { + handler.subscribe(serviceName, null); + // first update service instance of serviceA handler.handleResponse(serviceName, buildDiscoveryResponse("127.0.0.1", 8080)); assertHandleResponseLogic(1, 1, "127.0.0.1", 8080); diff --git a/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/handler/LdsHandlerTest.java b/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/handler/LdsHandlerTest.java new file mode 100644 index 0000000000..7ae7957b9b --- /dev/null +++ b/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/handler/LdsHandlerTest.java @@ -0,0 +1,113 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.implement.service.xds.handler; + +import com.google.protobuf.Any; + +import io.envoyproxy.envoy.config.listener.v3.Filter; +import io.envoyproxy.envoy.config.listener.v3.FilterChain; +import io.envoyproxy.envoy.config.listener.v3.Listener; +import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager; +import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.Rds; +import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; +import io.sermant.implement.service.xds.BaseXdsTest; +import io.sermant.implement.service.xds.cache.XdsDataCache; +import io.sermant.implement.service.xds.env.XdsConstant; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +/** + * LdsHandlerTest + * + * @author daizhenyu + * @since 2024-08-24 + **/ +public class LdsHandlerTest extends BaseXdsTest { + private static LdsHandler handler; + + @BeforeClass + public static void setUp() { + handler = new LdsHandler(client); + Mockito.doReturn(requestStreamObserver).when(client).getDiscoveryRequestObserver(handler + .getResponseStreamObserver(XdsConstant.LDS_ALL_RESOURCE, null)); + XdsDataCache.updateRequestObserver(XdsConstant.RDS_ALL_RESOURCE, requestStreamObserver); + } + + @AfterClass + public static void tearDown() { + Mockito.clearAllCaches(); + XdsDataCache.removeRequestObserver(XdsConstant.LDS_ALL_RESOURCE); + XdsDataCache.removeRequestObserver(XdsConstant.RDS_ALL_RESOURCE); + XdsDataCache.updateHttpConnectionManagers(new ArrayList<>()); + } + + @Test + public void testHandleResponse() { + handler.subscribe(XdsConstant.LDS_ALL_RESOURCE, null); + + // listener is empty + handler.handleResponse(XdsConstant.LDS_ALL_RESOURCE, + DiscoveryResponse.newBuilder().addAllResources(new ArrayList<>()).build()); + Assert.assertEquals(0, XdsDataCache.getRouteResources().size()); + + // listener is not empty + handler.handleResponse(XdsConstant.LDS_ALL_RESOURCE, + buildDiscoveryResponse("test-listener", "test-routeConfig")); + Set routeResources = XdsDataCache.getRouteResources(); + Assert.assertEquals(1, routeResources.size()); + Assert.assertTrue(routeResources.contains("test-routeConfig")); + } + + private Listener createListener(String listenerName, String routeConfigName) { + HttpConnectionManager httpConnectionManager = createHttpConnectionManager(routeConfigName); + Any httpConnectionManagerAny = Any.pack(httpConnectionManager); + Filter httpFilter = Filter.newBuilder() + .setTypedConfig(httpConnectionManagerAny) + .build(); + FilterChain filterChain = FilterChain.newBuilder() + .addFilters(httpFilter) + .build(); + + return Listener.newBuilder() + .setName(listenerName) + .addFilterChains(filterChain) + .build(); + } + + private HttpConnectionManager createHttpConnectionManager(String routeConfigName) { + HttpConnectionManager.Builder managerBuilder = HttpConnectionManager.newBuilder(); + Rds.Builder rdsBuilder = Rds.newBuilder(); + rdsBuilder.setRouteConfigName(routeConfigName); + managerBuilder.setRds(rdsBuilder.build()); + return managerBuilder.build(); + } + + private DiscoveryResponse buildDiscoveryResponse(String listenerName, String routeConfigName) { + Listener listener = createListener(listenerName, routeConfigName); + List resources = new ArrayList<>(); + resources.add(Any.pack(listener)); + return DiscoveryResponse.newBuilder().addAllResources(resources).build(); + } +} \ No newline at end of file diff --git a/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/handler/RdsHandlerTest.java b/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/handler/RdsHandlerTest.java new file mode 100644 index 0000000000..bd7daf088b --- /dev/null +++ b/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/handler/RdsHandlerTest.java @@ -0,0 +1,101 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.implement.service.xds.handler; + +import com.google.protobuf.Any; + +import io.envoyproxy.envoy.config.route.v3.Route; +import io.envoyproxy.envoy.config.route.v3.RouteConfiguration; +import io.envoyproxy.envoy.config.route.v3.VirtualHost; +import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; +import io.sermant.core.service.xds.entity.XdsRoute; +import io.sermant.implement.service.xds.BaseXdsTest; +import io.sermant.implement.service.xds.cache.XdsDataCache; +import io.sermant.implement.service.xds.env.XdsConstant; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.List; + +/** + * RdsHandlerTest + * + * @author daizhenyu + * @since 2024-08-26 + **/ +public class RdsHandlerTest extends BaseXdsTest { + private static RdsHandler handler; + + @Before + public void setUp() throws Exception { + handler = new RdsHandler(client); + Mockito.doReturn(requestStreamObserver).when(client).getDiscoveryRequestObserver(handler + .getResponseStreamObserver(XdsConstant.RDS_ALL_RESOURCE, null)); + } + + @After + public void tearDown() throws Exception { + Mockito.clearAllCaches(); + XdsDataCache.removeRequestObserver(XdsConstant.RDS_ALL_RESOURCE); + XdsDataCache.updateRouteConfigurations(new ArrayList<>()); + } + + @Test + public void testHandleResponse() { + handler.subscribe(XdsConstant.RDS_ALL_RESOURCE, null); + + // routeConfiguration is empty + handler.handleResponse(XdsConstant.RDS_ALL_RESOURCE, + DiscoveryResponse.newBuilder().addAllResources(new ArrayList<>()).build()); + Assert.assertEquals(0, XdsDataCache.getRouteConfigurations().size()); + + // routeConfiguration is not empty + handler.handleResponse(XdsConstant.RDS_ALL_RESOURCE, + buildDiscoveryResponse("serviceA.example.com", "test-route", + "test-routeConfig")); + List route = XdsDataCache.getServiceRoute("serviceA"); + Assert.assertEquals(1, route.size()); + Assert.assertEquals("test-route", route.get(0).getName()); + } + + private DiscoveryResponse buildDiscoveryResponse(String virtualHostName, String routeName, String routeConfigName) { + RouteConfiguration configuration = createRouteConfiguration(virtualHostName, routeName, routeConfigName); + List resources = new ArrayList<>(); + resources.add(Any.pack(configuration)); + return DiscoveryResponse.newBuilder().addAllResources(resources).build(); + } + + private RouteConfiguration createRouteConfiguration(String virtualHostName, String routeName, + String routeConfigName) { + return RouteConfiguration.newBuilder() + .setName(routeConfigName) + .addVirtualHosts(createVirtualHost(virtualHostName, routeName)) + .build(); + } + + private VirtualHost createVirtualHost(String virtualHostName, String routeName) { + return VirtualHost.newBuilder() + .setName(virtualHostName) + .addRoutes(Route.newBuilder().setName(routeName).build()) + .build(); + } +} \ No newline at end of file diff --git a/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/loadbalance/XdsLoadBalanceServiceImplTest.java b/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/loadbalance/XdsLoadBalanceServiceImplTest.java new file mode 100644 index 0000000000..7fb4ec16eb --- /dev/null +++ b/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/loadbalance/XdsLoadBalanceServiceImplTest.java @@ -0,0 +1,78 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.implement.service.xds.loadbalance; + +import io.sermant.core.service.xds.XdsLoadBalanceService; +import io.sermant.core.service.xds.entity.XdsLbPolicy; +import io.sermant.implement.service.xds.BaseXdsTest; +import io.sermant.implement.service.xds.CommonDataGenerator; +import io.sermant.implement.service.xds.cache.XdsDataCache; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.HashMap; + +/** + * XdsLoadBalanceServiceImplTest + * + * @author daizhenyu + * @since 2024-08-26 + **/ +public class XdsLoadBalanceServiceImplTest extends BaseXdsTest { + private static XdsLoadBalanceService loadBalanceService; + + @BeforeClass + public static void setUp() { + loadBalanceService = new XdsLoadBalanceServiceImpl(); + XdsDataCache.updateServiceClusterMap(CommonDataGenerator + .createServiceClusterMap("serviceA", "outbound|8080||serviceA.default.svc.cluster.local")); + } + + @AfterClass + public static void tearDown() { + XdsDataCache.updateServiceClusterMap(new HashMap<>()); + } + + @Test + public void testGetLbPolicyOfCluster() { + // clusterName is invalid + Assert.assertEquals(XdsLbPolicy.UNRECOGNIZED, + loadBalanceService.getLbPolicyOfCluster("outbound|8080|serviceA.default.svc.cluster.local")); + + // clusterName is valid, but cluster not cached + Assert.assertEquals(XdsLbPolicy.UNRECOGNIZED, + loadBalanceService.getLbPolicyOfCluster("outbound|8080||serviceB.default.svc.cluster.local")); + + // clusterName is valid, and cluster cached + Assert.assertEquals(XdsLbPolicy.RANDOM, + loadBalanceService.getLbPolicyOfCluster("outbound|8080||serviceA.default.svc.cluster.local")); + } + + @Test + public void testGetBaseLbPolicyOfService() { + // service not cached + Assert.assertEquals(XdsLbPolicy.UNRECOGNIZED, + loadBalanceService.getBaseLbPolicyOfService("serviceB")); + + // service cached + Assert.assertEquals(XdsLbPolicy.RANDOM, + loadBalanceService.getBaseLbPolicyOfService("serviceA")); + } +} \ No newline at end of file diff --git a/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/route/XdsRouteServiceImplTest.java b/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/route/XdsRouteServiceImplTest.java new file mode 100644 index 0000000000..eae714705e --- /dev/null +++ b/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/route/XdsRouteServiceImplTest.java @@ -0,0 +1,81 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.implement.service.xds.route; + +import io.sermant.core.service.xds.XdsRouteService; +import io.sermant.core.service.xds.entity.XdsRoute; +import io.sermant.implement.service.xds.BaseXdsTest; +import io.sermant.implement.service.xds.CommonDataGenerator; +import io.sermant.implement.service.xds.cache.XdsDataCache; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +/** + * XdsRouteServiceImplTest + * + * @author daizhenyu + * @since 2024-08-26 + **/ +public class XdsRouteServiceImplTest extends BaseXdsTest { + private static XdsRouteService routeService; + + @BeforeClass + public static void setUp() { + routeService = new XdsRouteServiceImpl(); + XdsDataCache.updateRouteConfigurations(CommonDataGenerator.createRouteConfigurations()); + XdsDataCache.updateServiceClusterMap(CommonDataGenerator + .createServiceClusterMap("serviceA", "outbound|8080||serviceA.default.svc.cluster.local")); + } + + @AfterClass + public static void tearDown() { + XdsDataCache.updateRouteConfigurations(new ArrayList<>()); + XdsDataCache.updateServiceClusterMap(new HashMap<>()); + } + + @Test + public void testGetServiceRoute() { + // service not in cache + List result = routeService.getServiceRoute("serviceC"); + Assert.assertEquals(0, result.size()); + + // service in cache, but route is empty + result = routeService.getServiceRoute("serviceB"); + Assert.assertEquals(0, result.size()); + + // service in cache, and route is not empty + result = routeService.getServiceRoute("serviceA"); + Assert.assertEquals(1, result.size()); + Assert.assertEquals("test-route", result.get(0).getName()); + } + + @Test + public void testIsLocalityRoute() { + // cluster not in cache + Assert.assertFalse(routeService.isLocalityRoute("outbound|8080||serviceB.default.svc.cluster.local")); + + // cluster in cache + Assert.assertTrue(routeService.isLocalityRoute("outbound|8080||serviceA.default.svc.cluster.local")); + } +} \ No newline at end of file diff --git a/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/utils/XdsProtocolTransformerTest.java b/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/utils/EdsProtocolTransformerTest.java similarity index 50% rename from sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/utils/XdsProtocolTransformerTest.java rename to sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/utils/EdsProtocolTransformerTest.java index b0861fd7c9..5e40435955 100644 --- a/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/utils/XdsProtocolTransformerTest.java +++ b/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/utils/EdsProtocolTransformerTest.java @@ -16,11 +16,13 @@ package io.sermant.implement.service.xds.utils; -import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.core.v3.Locality; import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint; import io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints; import io.sermant.core.service.xds.entity.ServiceInstance; +import io.sermant.core.service.xds.entity.XdsLocality; +import io.sermant.core.service.xds.entity.XdsServiceClusterLoadAssigment; import org.junit.Assert; import org.junit.Test; @@ -29,6 +31,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; /** @@ -37,27 +40,7 @@ * @author daizhenyu * @since 2024-05-11 **/ -public class XdsProtocolTransformerTest { - - @Test - public void testGetService2ClusterMapping() { - List clusters = Arrays.asList( - null, - createCluster("outbound|8080||serviceA.default.svc.cluster.local"), - createCluster("outbound|8080|subset1|serviceB.default.svc.cluster.local"), - createCluster("outbound|8080|subset2|serviceB.default.svc.cluster.local"), - createCluster("outbound|8080|serviceC.default.svc.cluster.local"), - createCluster(null) - ); - - Map> result = XdsProtocolTransformer.getService2ClusterMapping(clusters); - Assert.assertEquals(2, result.size()); - Assert.assertTrue(result.containsKey("serviceA")); - Assert.assertTrue(result.containsKey("serviceB")); - Assert.assertEquals(1, result.get("serviceA").size()); - Assert.assertEquals(2, result.get("serviceB").size()); - } - +public class EdsProtocolTransformerTest { @Test public void testGetServiceInstances() { List assignments = Arrays.asList( @@ -67,31 +50,42 @@ public void testGetServiceInstances() { createLoadAssignment("outbound|8080|serviceB.default.svc.cluster.local") ); - Set result = XdsProtocolTransformer.getServiceInstances(assignments); - Assert.assertEquals(2, result.size()); - Iterator iterator = result.iterator(); + XdsServiceClusterLoadAssigment result = EdsProtocolTransformer.getServiceInstances(assignments); + Assert.assertEquals(2, result.getClusterLoadAssigments().size()); + Iterator iterator = result.getServiceInstance().iterator(); while (iterator.hasNext()) { ServiceInstance next = iterator.next(); Assert.assertEquals("serviceB", next.getServiceName()); } - } + Assert.assertEquals(2, result.getClusterLoadAssigments().size()); + Assert.assertEquals("outbound|8080|subset1|serviceB.default.svc.cluster.local", + result.getXdsClusterLoadAssigment("outbound|8080|subset1|serviceB.default.svc.cluster.local") + .getClusterName()); + Assert.assertEquals("serviceB", + result.getXdsClusterLoadAssigment("outbound|8080|subset1|serviceB.default.svc.cluster.local") + .getServiceName()); - private Cluster createCluster(String name) { - Cluster.Builder builder = Cluster.newBuilder(); - if (name != null) { - builder.setName(name); + Map> localityInstances = result + .getXdsClusterLoadAssigment("outbound|8080|subset1|serviceB.default.svc.cluster.local") + .getLocalityInstances(); + Assert.assertEquals(1, + localityInstances.size()); + for (Entry> xdsLocalitySetEntry : localityInstances.entrySet()) { + Assert.assertEquals("test-region", xdsLocalitySetEntry.getKey().getRegion()); + Assert.assertEquals("test-zone", xdsLocalitySetEntry.getKey().getZone()); + Assert.assertEquals("test-subzone", xdsLocalitySetEntry.getKey().getSubZone()); } - return builder.build(); } private ClusterLoadAssignment createLoadAssignment(String clusterName) { ClusterLoadAssignment.Builder assignmentBuilder = ClusterLoadAssignment.newBuilder(); - - LocalityLbEndpoints.Builder localityBuilder = LocalityLbEndpoints.newBuilder(); LbEndpoint.Builder endpointBuilder = LbEndpoint.newBuilder(); - localityBuilder.addLbEndpoints(endpointBuilder.build()); + Locality.Builder localityBuilder = Locality.newBuilder(); + localityBuilder.setRegion("test-region").setZone("test-zone").setSubZone("test-subzone"); + LocalityLbEndpoints.Builder localityEndpointBuilder = LocalityLbEndpoints.newBuilder(); + localityEndpointBuilder.addLbEndpoints(endpointBuilder.build()).setLocality(localityBuilder.build()); assignmentBuilder.setClusterName(clusterName); - assignmentBuilder.addEndpoints(localityBuilder.build()); + assignmentBuilder.addEndpoints(localityEndpointBuilder.build()); return assignmentBuilder.build(); }