Skip to content

Commit

Permalink
xds client and util
Browse files Browse the repository at this point in the history
Signed-off-by: daizhenyu <[email protected]>
  • Loading branch information
daizhenyu committed May 29, 2024
1 parent e56c5ac commit a39631f
Show file tree
Hide file tree
Showing 13 changed files with 664 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,12 @@ gateway.nettyPort=6888
# Specify retreat algorithm maximum connection interval (s)
#gateway.maxReconnectInternalTime=180
#=============================xds configuration===============================#
# istio control plane address
# istiod control plane address, security.enable=false with 15010 port, and security.enable=true with 15012 port
xds.config.control.plane.address=istiod.istio-system.svc:15010
# Whether to use secure communication with the control plane
xds.config.security.enable=false
# Certificates used for secure communication with the control plane
xds.config.certificate.path=
# Private key used for secure communication with the control plane
xds.config.private.key.path=
# service account token used for secure communication with the control plane
xds.config.service.account.token.path=/var/run/secrets/kubernetes.io/serviceaccount/token
#=============================Metadata===============================#
# Service name for host service instance
service.meta.service=default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public boolean checkServiceEnable(String serviceName) {
if (ServiceManager.HTTP_SERVER_SERVICE_IMPL.equals(serviceName)) {
return isHttpserverEnable();
}
if (ServiceManager.XDS_SERVICE_IMPL.equals(serviceName)) {
if (ServiceManager.XDS_CORE_SERVICE_IMPL.equals(serviceName)) {
return isXdsServiceEnable();
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ public class ServiceManager {
/**
* xDS Service Discover
*/
public static final String XDS_SERVICE_IMPL =
"io.sermant.implement.service.xds.XdsServiceImpl";
public static final String XDS_CORE_SERVICE_IMPL =
"io.sermant.implement.service.xds.XdsCoreServiceImpl";

/**
* logger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,16 @@ public class XdsConfig implements BaseConfig {
@ConfigFieldKey("security.enable")
private boolean securityEnable = false;

@ConfigFieldKey("certificate.path")
private String certificatePath;
@ConfigFieldKey("service.account.token.path")
private String tokenPath;

@ConfigFieldKey("private.key.path")
private String privateKeyPath;
public String getTokenPath() {
return tokenPath;
}

public void setTokenPath(String tokenPath) {
this.tokenPath = tokenPath;
}

public String getControlPlaneAddress() {
return controlPlaneAddress;
Expand All @@ -55,20 +60,4 @@ public boolean isSecurityEnable() {
public void setSecurityEnable(boolean securityEnable) {
this.securityEnable = securityEnable;
}

public String getCertificatePath() {
return certificatePath;
}

public void setCertificatePath(String certificatePath) {
this.certificatePath = certificatePath;
}

public String getPrivateKeyPath() {
return privateKeyPath;
}

public void setPrivateKeyPath(String privateKeyPath) {
this.privateKeyPath = privateKeyPath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.sermant.core.service.xds.listener.XdsServiceDiscoveryListener;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -54,7 +55,7 @@ public class XdsDataCache {
/**
* key:service name value:cluster map
*/
private static Map<String, List<String>> serviceNameMapping;
private static Map<String, Set<String>> serviceNameMapping = new HashMap<>();

private XdsDataCache() {
}
Expand All @@ -64,17 +65,29 @@ private XdsDataCache() {
*
* @param mapping the mapping between service and cluster
*/
public static void updateServiceNameMapping(Map<String, List<String>> mapping) {
public static void updateServiceNameMapping(Map<String, Set<String>> mapping) {
if (mapping == null) {
serviceNameMapping = new HashMap<>();
}
serviceNameMapping = mapping;
}

/**
* get cluster list for service
* get cluster set for service
*
* @param serviceName
* @return cluster list for service
* @return cluster set for service
*/
public static List<String> getClustersByServiceName(String serviceName) {
return serviceNameMapping.getOrDefault(serviceName, Collections.EMPTY_LIST);
public static Set<String> getClustersByServiceName(String serviceName) {
return serviceNameMapping.getOrDefault(serviceName, Collections.EMPTY_SET);
}

/**
* get serviceNameMapping
*
* @return serviceNameMapping
*/
public static Map<String, Set<String>> getServiceNameMapping() {
return serviceNameMapping;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.sermant.implement.service.xds.client;

import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc;
import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Metadata.Key;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.sermant.core.common.LoggerFactory;
import io.sermant.core.config.ConfigManager;
import io.sermant.core.service.xds.config.XdsConfig;
import io.sermant.core.utils.FileUtils;

import java.io.Closeable;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.net.ssl.SSLException;

/**
* XdsClient
*
* @author daizhenyu
* @since 2024-05-09
**/
public class XdsClient implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger();

private XdsConfig config = ConfigManager.getConfig(XdsConfig.class);

private ManagedChannel channel;

/**
* construction method
*/
public XdsClient() {
createChannel();
}

private void createChannel() {
NettyChannelBuilder nettyChannelBuilder = NettyChannelBuilder.forTarget(config.getControlPlaneAddress());
if (config.isSecurityEnable()) {
try {
SslContext context = GrpcSslContexts.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();
nettyChannelBuilder.sslContext(context);
} catch (SSLException e) {
LOGGER.log(Level.SEVERE, "SSLException occurred when creating gRPC channel", e);
}
} else {
nettyChannelBuilder.usePlaintext();
}
channel = nettyChannelBuilder.build();
}

/**
* update channel
*/
public void updateChannel() {
if (channel == null || channel.isShutdown() || channel.isTerminated()) {
synchronized (this) {
if (channel == null || channel.isShutdown() || channel.isTerminated()) {
createChannel();
}
}
}
}

/**
* get ADS grpc request stream observer
*
* @param observer DiscoveryResponse observer
* @return StreamObserver
*/
public StreamObserver<DiscoveryRequest> getDiscoveryRequestObserver(StreamObserver<DiscoveryResponse> observer) {
AggregatedDiscoveryServiceStub stub = AggregatedDiscoveryServiceGrpc.newStub(channel);
if (config.isSecurityEnable()) {
Metadata header = new Metadata();
Key<String> authorization = Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER);
header.put(authorization, "Bearer " + FileUtils.readFileToString(config.getTokenPath()));
stub = MetadataUtils.attachHeaders(stub, header);
}
return stub.streamAggregatedResources(observer);
}

@Override
public void close() throws IOException {
if (channel != null) {
channel.shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class XdsServiceInstance implements ServiceInstance {

private String service;

private String address;
private String host;

private int port;

Expand All @@ -52,7 +52,7 @@ public String getServiceName() {

@Override
public String getHost() {
return service;
return host;
}

@Override
Expand Down Expand Up @@ -82,13 +82,13 @@ public boolean equals(Object obj) {
return port == instance.port && healthStatus == instance.healthStatus
&& Objects.equals(cluster, instance.cluster)
&& Objects.equals(service, instance.service)
&& Objects.equals(address, instance.address)
&& Objects.equals(host, instance.host)
&& Objects.equals(metadata, instance.metadata);
}

@Override
public int hashCode() {
return Objects.hash(cluster, service, address, port, healthStatus, metadata);
return Objects.hash(cluster, service, host, port, healthStatus, metadata);
}

public void setCluster(String cluster) {
Expand All @@ -99,8 +99,8 @@ public void setService(String service) {
this.service = service;
}

public void setAddress(String address) {
this.address = address;
public void setHost(String host) {
this.host = host;
}

public void setPort(int port) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ public class XdsConstant {
/**
* eds resource type
*/
public static final String EDS_RESOURCE_TYPE = "envoy.config.endpoint.v3.ClusterLoadAssignment";
public static final String EDS_RESOURCE_TYPE = "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment";

/**
* cds resource type
*/
public static final String CDS_RESOURCE_TYPE = "envoy.config.cluster.v3.Cluster";
public static final String CDS_RESOURCE_TYPE = "type.googleapis.com/envoy.config.cluster.v3.Cluster";

/**
* sidecar string
Expand Down
Loading

0 comments on commit a39631f

Please sign in to comment.