diff --git a/sermant-agentcore/sermant-agentcore-config/config/config.properties b/sermant-agentcore/sermant-agentcore-config/config/config.properties index 50ad9c2b08..73ada350b2 100644 --- a/sermant-agentcore/sermant-agentcore-config/config/config.properties +++ b/sermant-agentcore/sermant-agentcore-config/config/config.properties @@ -88,14 +88,12 @@ gateway.nettyPort=6888 # Specify retreat algorithm maximum connection interval (s) #gateway.maxReconnectInternalTime=180 #=============================xds configuration===============================# -# istio control plane address +# istiod control plane address, security.enable=false with 15010 port, and security.enable=true with 15012 port xds.config.control.plane.address=istiod.istio-system.svc:15010 # Whether to use secure communication with the control plane xds.config.security.enable=false -# Certificates used for secure communication with the control plane -xds.config.certificate.path= -# Private key used for secure communication with the control plane -xds.config.private.key.path= +# service account token used for secure communication with the control plane +xds.config.service.account.token.path=/var/run/secrets/kubernetes.io/serviceaccount/token #=============================Metadata===============================# # Service name for host service instance service.meta.service=default diff --git a/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/ServiceConfig.java b/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/ServiceConfig.java index 087c1d31e0..4e83d045a7 100644 --- a/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/ServiceConfig.java +++ b/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/ServiceConfig.java @@ -130,7 +130,7 @@ public boolean checkServiceEnable(String serviceName) { if (ServiceManager.HTTP_SERVER_SERVICE_IMPL.equals(serviceName)) { return isHttpserverEnable(); } - if (ServiceManager.XDS_SERVICE_IMPL.equals(serviceName)) { + if (ServiceManager.XDS_CORE_SERVICE_IMPL.equals(serviceName)) { return isXdsServiceEnable(); } return false; diff --git a/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/ServiceManager.java b/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/ServiceManager.java index bbd565bb50..e5f8d193d4 100644 --- a/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/ServiceManager.java +++ b/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/ServiceManager.java @@ -87,8 +87,8 @@ public class ServiceManager { /** * xDS Service Discover */ - public static final String XDS_SERVICE_IMPL = - "io.sermant.implement.service.xds.XdsServiceImpl"; + public static final String XDS_CORE_SERVICE_IMPL = + "io.sermant.implement.service.xds.XdsCoreServiceImpl"; /** * logger diff --git a/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/config/XdsConfig.java b/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/config/XdsConfig.java index 86f3370b4a..3a9f36dc4b 100644 --- a/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/config/XdsConfig.java +++ b/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/config/XdsConfig.java @@ -34,11 +34,16 @@ public class XdsConfig implements BaseConfig { @ConfigFieldKey("security.enable") private boolean securityEnable = false; - @ConfigFieldKey("certificate.path") - private String certificatePath; + @ConfigFieldKey("service.account.token.path") + private String tokenPath; - @ConfigFieldKey("private.key.path") - private String privateKeyPath; + public String getTokenPath() { + return tokenPath; + } + + public void setTokenPath(String tokenPath) { + this.tokenPath = tokenPath; + } public String getControlPlaneAddress() { return controlPlaneAddress; @@ -55,20 +60,4 @@ public boolean isSecurityEnable() { public void setSecurityEnable(boolean securityEnable) { this.securityEnable = securityEnable; } - - public String getCertificatePath() { - return certificatePath; - } - - public void setCertificatePath(String certificatePath) { - this.certificatePath = certificatePath; - } - - public String getPrivateKeyPath() { - return privateKeyPath; - } - - public void setPrivateKeyPath(String privateKeyPath) { - this.privateKeyPath = privateKeyPath; - } } diff --git a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/cache/XdsDataCache.java b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/cache/XdsDataCache.java index 5747ee0a69..a9fb84dad6 100644 --- a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/cache/XdsDataCache.java +++ b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/cache/XdsDataCache.java @@ -22,6 +22,7 @@ import io.sermant.core.service.xds.listener.XdsServiceDiscoveryListener; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -54,7 +55,7 @@ public class XdsDataCache { /** * key:service name value:cluster map */ - private static Map> serviceNameMapping; + private static Map> serviceNameMapping = new HashMap<>(); private XdsDataCache() { } @@ -64,17 +65,29 @@ private XdsDataCache() { * * @param mapping the mapping between service and cluster */ - public static void updateServiceNameMapping(Map> mapping) { + public static void updateServiceNameMapping(Map> mapping) { serviceNameMapping = mapping; } /** - * get cluster list for service + * get cluster set for service * * @param serviceName - * @return cluster list for service + * @return cluster set for service */ - public static List getClustersByServiceName(String serviceName) { - return serviceNameMapping.getOrDefault(serviceName, Collections.EMPTY_LIST); + public static Set getClustersByServiceName(String serviceName) { + if (serviceNameMapping == null) { + return Collections.EMPTY_SET; + } + return serviceNameMapping.getOrDefault(serviceName, Collections.EMPTY_SET); + } + + /** + * get serviceNameMapping + * + * @return serviceNameMapping + */ + public static Map> getServiceNameMapping() { + return serviceNameMapping; } } \ No newline at end of file diff --git a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/client/XdsClient.java b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/client/XdsClient.java new file mode 100644 index 0000000000..31aeafc3e0 --- /dev/null +++ b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/client/XdsClient.java @@ -0,0 +1,117 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.implement.service.xds.client; + +import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc; +import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub; +import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; +import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; +import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.Metadata.Key; +import io.grpc.netty.GrpcSslContexts; +import io.grpc.netty.NettyChannelBuilder; +import io.grpc.stub.MetadataUtils; +import io.grpc.stub.StreamObserver; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import io.sermant.core.common.LoggerFactory; +import io.sermant.core.config.ConfigManager; +import io.sermant.core.service.xds.config.XdsConfig; +import io.sermant.core.utils.FileUtils; + +import java.io.Closeable; +import java.io.IOException; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.net.ssl.SSLException; + +/** + * XdsClient + * + * @author daizhenyu + * @since 2024-05-09 + **/ +public class XdsClient implements Closeable { + private static final Logger LOGGER = LoggerFactory.getLogger(); + + private XdsConfig config = ConfigManager.getConfig(XdsConfig.class); + + private ManagedChannel channel; + + /** + * construction method + */ + public XdsClient() { + createChannel(); + } + + private void createChannel() { + NettyChannelBuilder nettyChannelBuilder = NettyChannelBuilder.forTarget(config.getControlPlaneAddress()); + if (config.isSecurityEnable()) { + try { + SslContext context = GrpcSslContexts.forClient() + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .build(); + nettyChannelBuilder.sslContext(context); + } catch (SSLException e) { + LOGGER.log(Level.SEVERE, "SSLException occurred when creating gRPC channel", e); + } + } else { + nettyChannelBuilder.usePlaintext(); + } + channel = nettyChannelBuilder.build(); + } + + /** + * update channel + */ + public void updateChannel() { + if (channel == null || channel.isShutdown() || channel.isTerminated()) { + synchronized (this) { + if (channel == null || channel.isShutdown() || channel.isTerminated()) { + createChannel(); + } + } + } + } + + /** + * get ADS grpc request stream observer + * + * @param observer DiscoveryResponse observer + * @return StreamObserver + */ + public StreamObserver getDiscoveryRequestObserver(StreamObserver observer) { + AggregatedDiscoveryServiceStub stub = AggregatedDiscoveryServiceGrpc.newStub(channel); + if (config.isSecurityEnable()) { + Metadata header = new Metadata(); + Key authorization = Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER); + header.put(authorization, "Bearer " + FileUtils.readFileToString(config.getTokenPath())); + stub = MetadataUtils.attachHeaders(stub, header); + } + return stub.streamAggregatedResources(observer); + } + + @Override + public void close() throws IOException { + if (channel != null) { + channel.shutdown(); + } + } +} diff --git a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/entity/XdsServiceInstance.java b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/entity/XdsServiceInstance.java index a5dd96b564..9e18ee1f8e 100644 --- a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/entity/XdsServiceInstance.java +++ b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/entity/XdsServiceInstance.java @@ -32,7 +32,7 @@ public class XdsServiceInstance implements ServiceInstance { private String service; - private String address; + private String host; private int port; @@ -52,7 +52,7 @@ public String getServiceName() { @Override public String getHost() { - return service; + return host; } @Override @@ -82,13 +82,13 @@ public boolean equals(Object obj) { return port == instance.port && healthStatus == instance.healthStatus && Objects.equals(cluster, instance.cluster) && Objects.equals(service, instance.service) - && Objects.equals(address, instance.address) + && Objects.equals(host, instance.host) && Objects.equals(metadata, instance.metadata); } @Override public int hashCode() { - return Objects.hash(cluster, service, address, port, healthStatus, metadata); + return Objects.hash(cluster, service, host, port, healthStatus, metadata); } public void setCluster(String cluster) { @@ -99,8 +99,8 @@ public void setService(String service) { this.service = service; } - public void setAddress(String address) { - this.address = address; + public void setHost(String host) { + this.host = host; } public void setPort(int port) { diff --git a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/env/XdsConstant.java b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/env/XdsConstant.java index 4a76b1d41e..bf95a13330 100644 --- a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/env/XdsConstant.java +++ b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/env/XdsConstant.java @@ -36,12 +36,12 @@ public class XdsConstant { /** * eds resource type */ - public static final String EDS_RESOURCE_TYPE = "envoy.config.endpoint.v3.ClusterLoadAssignment"; + public static final String EDS_RESOURCE_TYPE = "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment"; /** * cds resource type */ - public static final String CDS_RESOURCE_TYPE = "envoy.config.cluster.v3.Cluster"; + public static final String CDS_RESOURCE_TYPE = "type.googleapis.com/envoy.config.cluster.v3.Cluster"; /** * sidecar string diff --git a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/handler/XdsHandler.java b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/handler/XdsHandler.java new file mode 100644 index 0000000000..1ac43210f0 --- /dev/null +++ b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/handler/XdsHandler.java @@ -0,0 +1,165 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.implement.service.xds.handler; + +import io.envoyproxy.envoy.config.core.v3.Node; +import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; +import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; +import io.grpc.stub.StreamObserver; +import io.sermant.core.common.LoggerFactory; +import io.sermant.core.config.ConfigManager; +import io.sermant.core.plugin.config.ServiceMeta; +import io.sermant.core.utils.NetworkUtils; +import io.sermant.implement.service.xds.client.XdsClient; +import io.sermant.implement.service.xds.env.XdsConstant; + +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * abstract xds protocol handler + * + * @author daizhenyu + * @since 2024-05-13 + **/ +public abstract class XdsHandler implements XdsServiceAction { + protected static final Logger LOGGER = LoggerFactory.getLogger(); + + protected final XdsClient client; + + protected final String resourceType; + + protected Node node; + + /** + * construction method + * + * @param client xds client + * @param resourceType xds resource type + */ + public XdsHandler(XdsClient client, String resourceType) { + this.client = client; + this.resourceType = resourceType; + createNode(); + } + + /** + * built DiscoveryRequest + * + * @param type resource type + * @param version resource version + * @param nonce response nonce + * @param resourceName resource name + * @return DiscoveryRequest + */ + protected DiscoveryRequest builtDiscoveryRequest(String type, String version, String nonce, + Set resourceName) { + DiscoveryRequest.Builder builder = DiscoveryRequest.newBuilder() + .setNode(node) + .setTypeUrl(type); + if (version != null) { + builder.setVersionInfo(version); + } + if (nonce != null) { + builder.setResponseNonce(nonce); + } + builder.addAllResourceNames(resourceName); + return builder.build(); + } + + /** + * built DiscoveryRequest for ack + * + * @param response xds response + * @param resourceName resource name + * @return DiscoveryRequest + */ + protected DiscoveryRequest builtAckDiscoveryRequest(DiscoveryResponse response, Set resourceName) { + return builtDiscoveryRequest(response.getTypeUrl(), response.getVersionInfo(), response.getNonce(), + resourceName); + } + + private void createNode() { + String namespace = ConfigManager.getConfig(ServiceMeta.class).getProject(); + StringBuilder nodeIdBuilder = new StringBuilder(); + + // nodeId:sidecar~{pod_ip}~{pod_name}.{namespace}~{namespace}.svc.cluster.local + nodeIdBuilder.append(XdsConstant.SIDECAR) + .append(XdsConstant.WAVY_LINE) + .append(NetworkUtils.getMachineIp()) + .append(XdsConstant.WAVY_LINE) + .append(System.getenv(XdsConstant.POD_NAME_ENV)) + .append(XdsConstant.POINT) + .append(namespace) + .append(XdsConstant.WAVY_LINE) + .append(namespace) + .append(XdsConstant.POINT) + .append(XdsConstant.HOST_SUFFIX); + this.node = Node.newBuilder() + .setId(nodeIdBuilder.toString()) + .build(); + } + + /** + * get response StreamObserver + * + * @param requestKey requestStreamObserver cache key + * @param countDownLatch Used to notify the xds requesting thread to obtain data + * @return StreamObserver + */ + protected StreamObserver getResponseStreamObserver(String requestKey, + CountDownLatch countDownLatch) { + return new StreamObserver() { + @Override + public void onNext(DiscoveryResponse response) { + handleResponse(requestKey, response); + if (countDownLatch != null && countDownLatch.getCount() == 1) { + countDownLatch.countDown(); + } + } + + @Override + public void onError(Throwable throwable) { + if (countDownLatch != null && countDownLatch.getCount() == 1) { + countDownLatch.countDown(); + } + client.updateChannel(); + subscribe(requestKey, null); + LOGGER.log(Level.SEVERE, "An error occurred in Xds communication with istiod.", throwable); + } + + @Override + public void onCompleted() { + if (countDownLatch != null && countDownLatch.getCount() == 1) { + countDownLatch.countDown(); + } + subscribe(requestKey, null); + LOGGER.log(Level.WARNING, "Xds stream is closed, new stream has been created for communication."); + } + }; + } + + /** + * handle response from istiod + * + * @param requestKey resource key to get the request observer from cache + * @param response + */ + protected abstract void handleResponse(String requestKey, DiscoveryResponse response); +} \ No newline at end of file diff --git a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/handler/XdsServiceAction.java b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/handler/XdsServiceAction.java index 602d3083b8..f090a990f6 100644 --- a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/handler/XdsServiceAction.java +++ b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/handler/XdsServiceAction.java @@ -28,8 +28,8 @@ public interface XdsServiceAction { /** * subscribe * - * @param resourceKey resource key to get the xds data from cache + * @param requestKey request key to get the xds data from cache * @param countDownLatch Used to notify the xds requesting thread to obtain data */ - void subscribe(String resourceKey, CountDownLatch countDownLatch); + void subscribe(String requestKey, CountDownLatch countDownLatch); } diff --git a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/utils/XdsProtocolTransformer.java b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/utils/XdsProtocolTransformer.java new file mode 100644 index 0000000000..5a9addc5c1 --- /dev/null +++ b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/utils/XdsProtocolTransformer.java @@ -0,0 +1,173 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.implement.service.xds.utils; + +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.core.v3.HealthStatus; +import io.envoyproxy.envoy.config.core.v3.Locality; +import io.envoyproxy.envoy.config.core.v3.SocketAddress; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; +import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint; +import io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints; +import io.sermant.core.service.xds.entity.ServiceInstance; +import io.sermant.core.utils.CollectionUtils; +import io.sermant.core.utils.StringUtils; +import io.sermant.implement.service.xds.entity.XdsServiceInstance; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Convert xDS protocol data to Sermant data model + * + * @author daizhenyu + * @since 2024-05-10 + **/ +public class XdsProtocolTransformer { + private static final int SERVICE_HOST_INDEX = 3; + + private static final int SERVICE_NAME_INDEX = 0; + + private static final int EXPECT_LENGTH = 4; + + private XdsProtocolTransformer() { + } + + /** + * get the mapping between service name of k8s and cluster of istio + * + * @param clusters clusters + * @return mapping + */ + public static Map> getService2ClusterMapping(List clusters) { + Map> nameMapping = new HashMap<>(); + for (Cluster cluster : clusters) { + if (cluster == null) { + continue; + } + Optional serviceNameFromCluster = getServiceNameFromCluster(cluster.getName()); + if (!serviceNameFromCluster.isPresent()) { + continue; + } + String serviceName = serviceNameFromCluster.get(); + nameMapping.computeIfAbsent(serviceName, key -> new HashSet<>()).add(cluster.getName()); + } + return nameMapping; + } + + /** + * get the instance of one service by xds protocol + * + * @param loadAssignments eds data + * @return instance of service + */ + public static Set getServiceInstances( + List loadAssignments) { + return loadAssignments.stream() + .filter(Objects::nonNull) + .flatMap(loadAssignment -> getServiceInstancesFromLoadAssignment(loadAssignment).stream()) + .collect(Collectors.toSet()); + } + + private static Set getServiceInstancesFromLoadAssignment(ClusterLoadAssignment loadAssignment) { + String clusterName = loadAssignment.getClusterName(); + Optional serviceNameOptional = getServiceNameFromCluster(clusterName); + if (!serviceNameOptional.isPresent()) { + return Collections.EMPTY_SET; + } + String serviceName = serviceNameOptional.get(); + return processClusterLoadAssignment(loadAssignment, serviceName, clusterName); + } + + private static Set processClusterLoadAssignment(ClusterLoadAssignment loadAssignment, + String serviceName, String clusterName) { + List localityLbEndpointList = loadAssignment.getEndpointsList(); + if (CollectionUtils.isEmpty(localityLbEndpointList)) { + return Collections.EMPTY_SET; + } + return localityLbEndpointList.stream() + .filter(Objects::nonNull) + .flatMap(localityLbEndpoints -> processLocalityLbEndpoints(localityLbEndpoints, serviceName, + clusterName).stream()) + .collect(Collectors.toSet()); + } + + private static Set processLocalityLbEndpoints(LocalityLbEndpoints localityLbEndpoints, + String serviceName, String clusterName) { + List lbEndpointsList = localityLbEndpoints.getLbEndpointsList(); + if (CollectionUtils.isEmpty(lbEndpointsList)) { + return Collections.EMPTY_SET; + } + return lbEndpointsList.stream() + .filter(Objects::nonNull) + .map(lbEndpoint -> transformEndpoint2Instance(lbEndpoint, serviceName, clusterName, + getInitializedMetadata(localityLbEndpoints))) + .collect(Collectors.toSet()); + } + + private static Map getInitializedMetadata(LocalityLbEndpoints localityLbEndpoints) { + Map metadata = new HashMap<>(); + Locality locality = localityLbEndpoints.getLocality(); + if (locality != null) { + metadata.put("region", locality.getRegion()); + metadata.put("zone", locality.getZone()); + metadata.put("sub_zone", locality.getSubZone()); + } + return metadata; + } + + private static ServiceInstance transformEndpoint2Instance(LbEndpoint endpoint, String serviceName, + String clusterName, Map metadata) { + XdsServiceInstance instance = new XdsServiceInstance(); + SocketAddress socketAddress = endpoint.getEndpoint().getAddress().getSocketAddress(); + instance.setService(serviceName); + instance.setCluster(clusterName); + instance.setHost(socketAddress.getAddress()); + instance.setPort(socketAddress.getPortValue()); + endpoint.getMetadata().getFilterMetadataMap().values() + .forEach(struct -> struct.getFieldsMap() + .forEach((key, value) -> metadata.put(key, value.getStringValue()))); + instance.setMetadata(metadata); + if (HealthStatus.HEALTHY.equals(endpoint.getHealthStatus()) + || HealthStatus.UNKNOWN.equals(endpoint.getHealthStatus())) { + instance.setHealthStatus(true); + return instance; + } + instance.setHealthStatus(false); + return instance; + } + + private static Optional getServiceNameFromCluster(String clusterName) { + if (StringUtils.isEmpty(clusterName)) { + return Optional.empty(); + } + + // cluster name format: "outbound|8080||xds-service.default.svc.cluster.local", xds-service is service name + String[] clusterSplit = clusterName.split("\\|"); + if (clusterSplit.length != EXPECT_LENGTH) { + return Optional.empty(); + } + return Optional.of(clusterSplit[SERVICE_HOST_INDEX].split("\\.")[SERVICE_NAME_INDEX]); + } +} diff --git a/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/cache/XdsDataCacheTest.java b/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/cache/XdsDataCacheTest.java new file mode 100644 index 0000000000..32bfeb4302 --- /dev/null +++ b/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/cache/XdsDataCacheTest.java @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.implement.service.xds.cache; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * XdsDataCache UT + * + * @author daizhenyu + * @since 2024-05-24 + **/ +public class XdsDataCacheTest { + @Test + public void testGetClustersByServiceName() { + Map> mapping = new HashMap<>(); + Set clusters = new HashSet<>(); + clusters.add("cluster"); + mapping.put("service-A", clusters); + Set result; + + // serviceNameMapping is null + result = XdsDataCache.getClustersByServiceName("service-A"); + Assert.assertNotNull(result); + Assert.assertEquals(0, result.size()); + + // serviceNameMapping is not null, get un cached service + XdsDataCache.updateServiceNameMapping(mapping); + result = XdsDataCache.getClustersByServiceName("service-B"); + Assert.assertNotNull(result); + Assert.assertEquals(0, result.size()); + + // serviceNameMapping is not null, get cached service + XdsDataCache.updateServiceNameMapping(mapping); + result = XdsDataCache.getClustersByServiceName("service-A"); + Assert.assertNotNull(result); + Assert.assertEquals(1, result.size()); + Assert.assertTrue(result.contains("cluster")); + } +} \ No newline at end of file diff --git a/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/utils/XdsProtocolTransformerTest.java b/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/utils/XdsProtocolTransformerTest.java new file mode 100644 index 0000000000..b0861fd7c9 --- /dev/null +++ b/sermant-agentcore/sermant-agentcore-implement/src/test/java/io/sermant/implement/service/xds/utils/XdsProtocolTransformerTest.java @@ -0,0 +1,98 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.implement.service.xds.utils; + +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; +import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint; +import io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints; +import io.sermant.core.service.xds.entity.ServiceInstance; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * XdsProtocolTransformer UT + * + * @author daizhenyu + * @since 2024-05-11 + **/ +public class XdsProtocolTransformerTest { + + @Test + public void testGetService2ClusterMapping() { + List clusters = Arrays.asList( + null, + createCluster("outbound|8080||serviceA.default.svc.cluster.local"), + createCluster("outbound|8080|subset1|serviceB.default.svc.cluster.local"), + createCluster("outbound|8080|subset2|serviceB.default.svc.cluster.local"), + createCluster("outbound|8080|serviceC.default.svc.cluster.local"), + createCluster(null) + ); + + Map> result = XdsProtocolTransformer.getService2ClusterMapping(clusters); + Assert.assertEquals(2, result.size()); + Assert.assertTrue(result.containsKey("serviceA")); + Assert.assertTrue(result.containsKey("serviceB")); + Assert.assertEquals(1, result.get("serviceA").size()); + Assert.assertEquals(2, result.get("serviceB").size()); + } + + @Test + public void testGetServiceInstances() { + List assignments = Arrays.asList( + null, + createLoadAssignment("outbound|8080|subset1|serviceB.default.svc.cluster.local"), + createLoadAssignment("outbound|8080|subset2|serviceB.default.svc.cluster.local"), + createLoadAssignment("outbound|8080|serviceB.default.svc.cluster.local") + ); + + Set result = XdsProtocolTransformer.getServiceInstances(assignments); + Assert.assertEquals(2, result.size()); + Iterator iterator = result.iterator(); + while (iterator.hasNext()) { + ServiceInstance next = iterator.next(); + Assert.assertEquals("serviceB", next.getServiceName()); + } + } + + private Cluster createCluster(String name) { + Cluster.Builder builder = Cluster.newBuilder(); + if (name != null) { + builder.setName(name); + } + return builder.build(); + } + + private ClusterLoadAssignment createLoadAssignment(String clusterName) { + ClusterLoadAssignment.Builder assignmentBuilder = ClusterLoadAssignment.newBuilder(); + + LocalityLbEndpoints.Builder localityBuilder = LocalityLbEndpoints.newBuilder(); + LbEndpoint.Builder endpointBuilder = LbEndpoint.newBuilder(); + localityBuilder.addLbEndpoints(endpointBuilder.build()); + assignmentBuilder.setClusterName(clusterName); + assignmentBuilder.addEndpoints(localityBuilder.build()); + + return assignmentBuilder.build(); + } +}