Skip to content

Commit

Permalink
xds client and util
Browse files Browse the repository at this point in the history
Signed-off-by: daizhenyu <[email protected]>
  • Loading branch information
daizhenyu committed May 25, 2024
1 parent e56c5ac commit af43889
Show file tree
Hide file tree
Showing 11 changed files with 432 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,12 @@ gateway.nettyPort=6888
# Specify retreat algorithm maximum connection interval (s)
#gateway.maxReconnectInternalTime=180
#=============================xds configuration===============================#
# istio control plane address
# istiod control plane address, security.enable=false with 15010 port, and security.enable=true with 15012 port
xds.config.control.plane.address=istiod.istio-system.svc:15010
# Whether to use secure communication with the control plane
xds.config.security.enable=false
# Certificates used for secure communication with the control plane
xds.config.certificate.path=
# Private key used for secure communication with the control plane
xds.config.private.key.path=
# service account token used for secure communication with the control plane
xds.config.service.account.token.path=/var/run/secrets/kubernetes.io/serviceaccount/token
#=============================Metadata===============================#
# Service name for host service instance
service.meta.service=default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public boolean checkServiceEnable(String serviceName) {
if (ServiceManager.HTTP_SERVER_SERVICE_IMPL.equals(serviceName)) {
return isHttpserverEnable();
}
if (ServiceManager.XDS_SERVICE_IMPL.equals(serviceName)) {
if (ServiceManager.XDS_CORE_SERVICE_IMPL.equals(serviceName)) {
return isXdsServiceEnable();
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ public class ServiceManager {
/**
* xDS Service Discover
*/
public static final String XDS_SERVICE_IMPL =
"io.sermant.implement.service.xds.XdsServiceImpl";
public static final String XDS_CORE_SERVICE_IMPL =
"io.sermant.implement.service.xds.XdsCoreServiceImpl";

/**
* logger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,16 @@ public class XdsConfig implements BaseConfig {
@ConfigFieldKey("security.enable")
private boolean securityEnable = false;

@ConfigFieldKey("certificate.path")
private String certificatePath;
@ConfigFieldKey("service.account.token.path")
private String tokenPath;

@ConfigFieldKey("private.key.path")
private String privateKeyPath;
public String getTokenPath() {
return tokenPath;
}

public void setTokenPath(String tokenPath) {
this.tokenPath = tokenPath;
}

public String getControlPlaneAddress() {
return controlPlaneAddress;
Expand All @@ -55,20 +60,4 @@ public boolean isSecurityEnable() {
public void setSecurityEnable(boolean securityEnable) {
this.securityEnable = securityEnable;
}

public String getCertificatePath() {
return certificatePath;
}

public void setCertificatePath(String certificatePath) {
this.certificatePath = certificatePath;
}

public String getPrivateKeyPath() {
return privateKeyPath;
}

public void setPrivateKeyPath(String privateKeyPath) {
this.privateKeyPath = privateKeyPath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.sermant.core.service.xds.listener.XdsServiceDiscoveryListener;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -54,7 +55,7 @@ public class XdsDataCache {
/**
* key:service name value:cluster map
*/
private static Map<String, List<String>> serviceNameMapping;
private static Map<String, List<String>> serviceNameMapping = new HashMap<>();

private XdsDataCache() {
}
Expand All @@ -75,6 +76,19 @@ public static void updateServiceNameMapping(Map<String, List<String>> mapping) {
* @return cluster list for service
*/
public static List<String> getClustersByServiceName(String serviceName) {
return serviceNameMapping.getOrDefault(serviceName, Collections.EMPTY_LIST);
List emptyList = Collections.EMPTY_LIST;
if (serviceNameMapping == null) {
return emptyList;
}
return serviceNameMapping.getOrDefault(serviceName, emptyList);
}

/**
* get serviceNameMapping
*
* @return serviceNameMapping
*/
public static Map<String, List<String>> getServiceNameMapping() {
return serviceNameMapping;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.sermant.implement.service.xds.client;

import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc;
import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Metadata.Key;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.sermant.core.common.LoggerFactory;
import io.sermant.core.config.ConfigManager;
import io.sermant.core.service.xds.config.XdsConfig;
import io.sermant.core.utils.FileUtils;

import java.io.Closeable;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.net.ssl.SSLException;

/**
* XdsClient
*
* @author daizhenyu
* @since 2024-05-09
**/
public class XdsClient implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger();

private XdsConfig config = ConfigManager.getConfig(XdsConfig.class);

private ManagedChannel channel;

/**
* construction method
*/
public XdsClient() {
createChannel();
}

private void createChannel() {
if (config.isSecurityEnable()) {
try {
SslContext context = GrpcSslContexts.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();
channel = NettyChannelBuilder.forTarget(config.getControlPlaneAddress())
.sslContext(context)
.build();
} catch (SSLException e) {
LOGGER.log(Level.SEVERE, "SSLException occurred when creating gRPC channel", e);
}
} else {
channel = NettyChannelBuilder.forTarget(config.getControlPlaneAddress())
.usePlaintext()
.build();
}
}

/**
* update channel
*/
public void updateChannel() {
if (channel != null && !channel.isShutdown() && !channel.isTerminated()) {
return;
}
synchronized (this) {
if (channel != null && !channel.isShutdown() && !channel.isTerminated()) {
return;
}
createChannel();
}
}

/**
* get ADS grpc request stream observer
*
* @param observer DiscoveryResponse observer
* @return StreamObserver
*/
public StreamObserver<DiscoveryRequest> getDiscoveryRequestObserver(StreamObserver<DiscoveryResponse> observer) {
AggregatedDiscoveryServiceStub stub = AggregatedDiscoveryServiceGrpc.newStub(channel);
if (config.isSecurityEnable()) {
Metadata header = new Metadata();
Key<String> authorization = Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER);
header.put(authorization, "Bearer " + FileUtils.readFileToString(config.getTokenPath()));
stub = MetadataUtils.attachHeaders(stub, header);
}
return stub.streamAggregatedResources(observer);
}

@Override
public void close() throws IOException {
if (channel != null) {
channel.shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class XdsServiceInstance implements ServiceInstance {

private String service;

private String address;
private String host;

private int port;

Expand All @@ -52,7 +52,7 @@ public String getServiceName() {

@Override
public String getHost() {
return service;
return host;
}

@Override
Expand Down Expand Up @@ -82,13 +82,13 @@ public boolean equals(Object obj) {
return port == instance.port && healthStatus == instance.healthStatus
&& Objects.equals(cluster, instance.cluster)
&& Objects.equals(service, instance.service)
&& Objects.equals(address, instance.address)
&& Objects.equals(host, instance.host)
&& Objects.equals(metadata, instance.metadata);
}

@Override
public int hashCode() {
return Objects.hash(cluster, service, address, port, healthStatus, metadata);
return Objects.hash(cluster, service, host, port, healthStatus, metadata);
}

public void setCluster(String cluster) {
Expand All @@ -99,8 +99,8 @@ public void setService(String service) {
this.service = service;
}

public void setAddress(String address) {
this.address = address;
public void setHost(String host) {
this.host = host;
}

public void setPort(int port) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ public class XdsConstant {
/**
* eds resource type
*/
public static final String EDS_RESOURCE_TYPE = "envoy.config.endpoint.v3.ClusterLoadAssignment";
public static final String EDS_RESOURCE_TYPE = "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment";

/**
* cds resource type
*/
public static final String CDS_RESOURCE_TYPE = "envoy.config.cluster.v3.Cluster";
public static final String CDS_RESOURCE_TYPE = "type.googleapis.com/envoy.config.cluster.v3.Cluster";

/**
* sidecar string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ public interface XdsServiceAction {
/**
* subscribe
*
* @param resourceKey resource key to get the xds data from cache
* @param requestKey resource key to get the xds data from cache
* @param countDownLatch Used to notify the xds requesting thread to obtain data
*/
void subscribe(String resourceKey, CountDownLatch countDownLatch);
void subscribe(String requestKey, CountDownLatch countDownLatch);
}
Loading

0 comments on commit af43889

Please sign in to comment.