Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds client and util #1522

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,12 @@ gateway.nettyPort=6888
# Specify retreat algorithm maximum connection interval (s)
#gateway.maxReconnectInternalTime=180
#=============================xds configuration===============================#
# istio control plane address
# istiod control plane address, security.enable=false with 15010 port, and security.enable=true with 15012 port
xds.config.control.plane.address=istiod.istio-system.svc:15010
# Whether to use secure communication with the control plane
xds.config.security.enable=false
# Certificates used for secure communication with the control plane
xds.config.certificate.path=
# Private key used for secure communication with the control plane
xds.config.private.key.path=
# service account token used for secure communication with the control plane
xds.config.service.account.token.path=/var/run/secrets/kubernetes.io/serviceaccount/token
#=============================Metadata===============================#
# Service name for host service instance
service.meta.service=default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public boolean checkServiceEnable(String serviceName) {
if (ServiceManager.HTTP_SERVER_SERVICE_IMPL.equals(serviceName)) {
return isHttpserverEnable();
}
if (ServiceManager.XDS_SERVICE_IMPL.equals(serviceName)) {
if (ServiceManager.XDS_CORE_SERVICE_IMPL.equals(serviceName)) {
return isXdsServiceEnable();
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ public class ServiceManager {
/**
* xDS Service Discover
*/
public static final String XDS_SERVICE_IMPL =
"io.sermant.implement.service.xds.XdsServiceImpl";
public static final String XDS_CORE_SERVICE_IMPL =
"io.sermant.implement.service.xds.XdsCoreServiceImpl";

/**
* logger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,16 @@ public class XdsConfig implements BaseConfig {
@ConfigFieldKey("security.enable")
private boolean securityEnable = false;

@ConfigFieldKey("certificate.path")
private String certificatePath;
@ConfigFieldKey("service.account.token.path")
private String tokenPath;

@ConfigFieldKey("private.key.path")
private String privateKeyPath;
public String getTokenPath() {
return tokenPath;
}

public void setTokenPath(String tokenPath) {
this.tokenPath = tokenPath;
}

public String getControlPlaneAddress() {
return controlPlaneAddress;
Expand All @@ -55,20 +60,4 @@ public boolean isSecurityEnable() {
public void setSecurityEnable(boolean securityEnable) {
this.securityEnable = securityEnable;
}

public String getCertificatePath() {
return certificatePath;
}

public void setCertificatePath(String certificatePath) {
this.certificatePath = certificatePath;
}

public String getPrivateKeyPath() {
return privateKeyPath;
}

public void setPrivateKeyPath(String privateKeyPath) {
this.privateKeyPath = privateKeyPath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.sermant.core.service.xds.listener.XdsServiceDiscoveryListener;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -54,7 +55,7 @@ public class XdsDataCache {
/**
* key:service name value:cluster map
*/
private static Map<String, List<String>> serviceNameMapping;
private static Map<String, Set<String>> serviceNameMapping = new HashMap<>();

private XdsDataCache() {
}
Expand All @@ -64,17 +65,29 @@ private XdsDataCache() {
*
* @param mapping the mapping between service and cluster
*/
public static void updateServiceNameMapping(Map<String, List<String>> mapping) {
public static void updateServiceNameMapping(Map<String, Set<String>> mapping) {
if (mapping == null) {
serviceNameMapping = new HashMap<>();
}
serviceNameMapping = mapping;
}

/**
* get cluster list for service
* get cluster set for service
*
* @param serviceName
* @return cluster list for service
* @return cluster set for service
*/
public static List<String> getClustersByServiceName(String serviceName) {
return serviceNameMapping.getOrDefault(serviceName, Collections.EMPTY_LIST);
provenceee marked this conversation as resolved.
Show resolved Hide resolved
public static Set<String> getClustersByServiceName(String serviceName) {
return serviceNameMapping.getOrDefault(serviceName, Collections.EMPTY_SET);
}

/**
* get serviceNameMapping
*
* @return serviceNameMapping
*/
public static Map<String, Set<String>> getServiceNameMapping() {
return serviceNameMapping;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.sermant.implement.service.xds.client;

import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc;
import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Metadata.Key;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.sermant.core.common.LoggerFactory;
import io.sermant.core.config.ConfigManager;
import io.sermant.core.service.xds.config.XdsConfig;
import io.sermant.core.utils.FileUtils;

import java.io.Closeable;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.net.ssl.SSLException;

/**
* XdsClient
*
* @author daizhenyu
* @since 2024-05-09
**/
public class XdsClient implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger();

private XdsConfig config = ConfigManager.getConfig(XdsConfig.class);

private ManagedChannel channel;

/**
* construction method
*/
public XdsClient() {
createChannel();
provenceee marked this conversation as resolved.
Show resolved Hide resolved
}

private void createChannel() {
NettyChannelBuilder nettyChannelBuilder = NettyChannelBuilder.forTarget(config.getControlPlaneAddress());
if (config.isSecurityEnable()) {
try {
SslContext context = GrpcSslContexts.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();
nettyChannelBuilder.sslContext(context);
} catch (SSLException e) {
LOGGER.log(Level.SEVERE, "SSLException occurred when creating gRPC channel", e);
}
} else {
lilai23 marked this conversation as resolved.
Show resolved Hide resolved
nettyChannelBuilder.usePlaintext();
}
channel = nettyChannelBuilder.build();
}

/**
* update channel
*/
public void updateChannel() {
if (channel == null || channel.isShutdown() || channel.isTerminated()) {
synchronized (this) {
if (channel == null || channel.isShutdown() || channel.isTerminated()) {
createChannel();
}
}
}
}

/**
* get ADS grpc request stream observer
*
* @param observer DiscoveryResponse observer
* @return StreamObserver
*/
public StreamObserver<DiscoveryRequest> getDiscoveryRequestObserver(StreamObserver<DiscoveryResponse> observer) {
AggregatedDiscoveryServiceStub stub = AggregatedDiscoveryServiceGrpc.newStub(channel);
if (config.isSecurityEnable()) {
Metadata header = new Metadata();
Key<String> authorization = Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER);
header.put(authorization, "Bearer " + FileUtils.readFileToString(config.getTokenPath()));
stub = MetadataUtils.attachHeaders(stub, header);
}
return stub.streamAggregatedResources(observer);
}

@Override
public void close() throws IOException {
if (channel != null) {
channel.shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class XdsServiceInstance implements ServiceInstance {

private String service;

private String address;
private String host;

private int port;

Expand All @@ -52,7 +52,7 @@ public String getServiceName() {

@Override
public String getHost() {
return service;
return host;
}

@Override
Expand Down Expand Up @@ -82,13 +82,13 @@ public boolean equals(Object obj) {
return port == instance.port && healthStatus == instance.healthStatus
&& Objects.equals(cluster, instance.cluster)
&& Objects.equals(service, instance.service)
&& Objects.equals(address, instance.address)
&& Objects.equals(host, instance.host)
&& Objects.equals(metadata, instance.metadata);
}

@Override
public int hashCode() {
return Objects.hash(cluster, service, address, port, healthStatus, metadata);
return Objects.hash(cluster, service, host, port, healthStatus, metadata);
}

public void setCluster(String cluster) {
Expand All @@ -99,8 +99,8 @@ public void setService(String service) {
this.service = service;
}

public void setAddress(String address) {
this.address = address;
public void setHost(String host) {
this.host = host;
}

public void setPort(int port) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ public class XdsConstant {
/**
* eds resource type
*/
public static final String EDS_RESOURCE_TYPE = "envoy.config.endpoint.v3.ClusterLoadAssignment";
public static final String EDS_RESOURCE_TYPE = "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment";

/**
* cds resource type
*/
public static final String CDS_RESOURCE_TYPE = "envoy.config.cluster.v3.Cluster";
public static final String CDS_RESOURCE_TYPE = "type.googleapis.com/envoy.config.cluster.v3.Cluster";

/**
* sidecar string
Expand Down
Loading
Loading