Skip to content

Commit

Permalink
[ISSUE alibaba#11456]Add cluster server tls.
Browse files Browse the repository at this point in the history
  • Loading branch information
stone-98 committed Dec 23, 2023
1 parent 7ded5ba commit 57f9d40
Show file tree
Hide file tree
Showing 18 changed files with 613 additions and 211 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,12 @@ public void start() throws Exception {

startServer();

if (RpcServerSslContextRefresherHolder.getInstance() != null) {
RpcServerSslContextRefresherHolder.getInstance().refresh(this);
if (RpcServerSslContextRefresherHolder.getSdkInstance() != null) {
RpcServerSslContextRefresherHolder.getSdkInstance().refresh(this);
}

if (RpcServerSslContextRefresherHolder.getClusterInstance() != null) {
RpcServerSslContextRefresherHolder.getClusterInstance().refresh(this);
}

Loggers.REMOTE.info("Nacos {} Rpc server started at port {}", serverName, getServicePort());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.alibaba.nacos.core.remote;

/**
* Enum representing different types of communication.
*
* <p>CommunicationType includes:</p>
* <ul>
* <li>SDK: Communication between SDK and servers.</li>
* <li>CLUSTER: Communication between servers in a cluster.</li>
* </ul>
*
* @author stone-98
* @date 2023/12/23
*/
public enum CommunicationType {
/**
* Communication between SDK and servers
*/
SDK("sdk"),
/**
* Communication between servers in a cluster
*/
CLUSTER("cluster");

private final String type;

CommunicationType(String type) {
this.type = type;
}

public String getType() {
return type;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.core.remote.BaseRpcServer;
import com.alibaba.nacos.core.remote.ConnectionManager;
import com.alibaba.nacos.core.remote.grpc.negotiator.NacosGrpcProtocolNegotiator;
import com.alibaba.nacos.core.utils.Loggers;
import com.alibaba.nacos.sys.env.EnvUtil;
import io.grpc.CompressorRegistry;
Expand Down Expand Up @@ -53,6 +54,11 @@
*/
public abstract class BaseGrpcServer extends BaseRpcServer {

/**
* The ProtocolNegotiator instance used for communication.
*/
protected NacosGrpcProtocolNegotiator protocolNegotiator;

private Server server;

@Autowired
Expand Down Expand Up @@ -115,6 +121,15 @@ protected Optional<InternalProtocolNegotiator.ProtocolNegotiator> newProtocolNeg
* reload protocol negotiator If necessary.
*/
public void reloadProtocolNegotiator() {
if (protocolNegotiator != null) {
try {
protocolNegotiator.reloadNegotiator();
} catch (Throwable throwable) {
Loggers.REMOTE.info("Nacos {} Rpc server reload negotiator fail at port {}.",
this.getClass().getSimpleName(), getServicePort());
throw throwable;
}
}
}

protected long getPermitKeepAliveTime() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,23 @@
package com.alibaba.nacos.core.remote.grpc;

import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.core.remote.CommunicationType;
import com.alibaba.nacos.core.remote.grpc.filter.NacosGrpcServerTransportFilter;
import com.alibaba.nacos.core.remote.grpc.filter.NacosGrpcServerTransportFilterServiceLoader;
import com.alibaba.nacos.core.remote.grpc.interceptor.NacosGrpcServerInterceptor;
import com.alibaba.nacos.core.remote.grpc.interceptor.NacosGrpcServerInterceptorServiceLoader;
import com.alibaba.nacos.core.remote.grpc.negotiator.ProtocolNegotiatorBuilderManager;
import com.alibaba.nacos.core.utils.GlobalExecutor;
import com.alibaba.nacos.core.utils.Loggers;
import com.alibaba.nacos.sys.env.EnvUtil;
import io.grpc.ServerInterceptor;
import io.grpc.ServerTransportFilter;
import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiator;
import org.springframework.stereotype.Service;

import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ThreadPoolExecutor;

/**
Expand All @@ -56,8 +60,8 @@ public ThreadPoolExecutor getRpcExecutor() {

@Override
protected long getKeepAliveTime() {
Long property = EnvUtil
.getProperty(GrpcServerConstants.GrpcConfig.CLUSTER_KEEP_ALIVE_TIME_PROPERTY, Long.class);
Long property = EnvUtil.getProperty(GrpcServerConstants.GrpcConfig.CLUSTER_KEEP_ALIVE_TIME_PROPERTY,
Long.class);
if (property != null) {
return property;
}
Expand All @@ -66,14 +70,20 @@ protected long getKeepAliveTime() {

@Override
protected long getKeepAliveTimeout() {
Long property = EnvUtil
.getProperty(GrpcServerConstants.GrpcConfig.CLUSTER_KEEP_ALIVE_TIMEOUT_PROPERTY, Long.class);
Long property = EnvUtil.getProperty(GrpcServerConstants.GrpcConfig.CLUSTER_KEEP_ALIVE_TIMEOUT_PROPERTY,
Long.class);
if (property != null) {
return property;
}
return super.getKeepAliveTimeout();
}

@Override
protected Optional<InternalProtocolNegotiator.ProtocolNegotiator> newProtocolNegotiator() {
protocolNegotiator = ProtocolNegotiatorBuilderManager.getInstance().get(CommunicationType.CLUSTER);
return Optional.ofNullable(protocolNegotiator);
}

@Override
protected long getPermitKeepAliveTime() {
Long property = EnvUtil.getProperty(GrpcServerConstants.GrpcConfig.CLUSTER_PERMIT_KEEP_ALIVE_TIME, Long.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
package com.alibaba.nacos.core.remote.grpc;

import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.core.remote.CommunicationType;
import com.alibaba.nacos.core.remote.grpc.filter.NacosGrpcServerTransportFilter;
import com.alibaba.nacos.core.remote.grpc.filter.NacosGrpcServerTransportFilterServiceLoader;
import com.alibaba.nacos.core.remote.grpc.interceptor.NacosGrpcServerInterceptor;
import com.alibaba.nacos.core.remote.grpc.interceptor.NacosGrpcServerInterceptorServiceLoader;
import com.alibaba.nacos.core.remote.grpc.negotiator.NacosGrpcProtocolNegotiator;
import com.alibaba.nacos.core.remote.grpc.negotiator.ProtocolNegotiatorBuilderSingleton;
import com.alibaba.nacos.core.remote.grpc.negotiator.ProtocolNegotiatorBuilderManager;
import com.alibaba.nacos.core.utils.GlobalExecutor;
import com.alibaba.nacos.core.utils.Loggers;
import com.alibaba.nacos.sys.env.EnvUtil;
Expand All @@ -45,8 +45,6 @@
@Service
public class GrpcSdkServer extends BaseGrpcServer {

private NacosGrpcProtocolNegotiator protocolNegotiator;

@Override
public int rpcPortOffset() {
return Constants.SDK_GRPC_PORT_DEFAULT_OFFSET;
Expand Down Expand Up @@ -106,7 +104,8 @@ protected long getPermitKeepAliveTime() {

@Override
protected Optional<InternalProtocolNegotiator.ProtocolNegotiator> newProtocolNegotiator() {
protocolNegotiator = ProtocolNegotiatorBuilderSingleton.getSingleton().build();
protocolNegotiator = ProtocolNegotiatorBuilderManager.getInstance()
.get(CommunicationType.SDK);
return Optional.ofNullable(protocolNegotiator);
}

Expand All @@ -128,19 +127,4 @@ protected List<ServerTransportFilter> getServerTransportFilters() {
return result;
}

/**
* reload ssl context.
*/
public void reloadProtocolNegotiator() {
if (protocolNegotiator != null) {
try {
protocolNegotiator.reloadNegotiator();
} catch (Throwable throwable) {
Loggers.REMOTE
.info("Nacos {} Rpc server reload negotiator fail at port {}.", this.getClass().getSimpleName(),
getServicePort());
throw throwable;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

package com.alibaba.nacos.core.remote.grpc.negotiator;

import com.alibaba.nacos.core.remote.CommunicationType;

import java.util.List;

/**
* Protocol negotiator builder.
*
Expand All @@ -36,4 +40,14 @@ public interface ProtocolNegotiatorBuilder {
* @return type
*/
String type();

/**
* Get a list of supported communication types by the ProtocolNegotiator.
*
* <p>The communication types represent the different types of communication
* scenarios that the ProtocolNegotiator can handle.</p>
*
* @return List of supported CommunicationType values.
*/
List<CommunicationType> supportCommunicationTypes();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package com.alibaba.nacos.core.remote.grpc.negotiator;

import com.alibaba.nacos.common.spi.NacosServiceLoader;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.core.remote.CommunicationType;
import com.alibaba.nacos.core.remote.grpc.negotiator.tls.ClusterDefaultTlsProtocolNegotiatorBuilder;
import com.alibaba.nacos.core.remote.grpc.negotiator.tls.SdkDefaultTlsProtocolNegotiatorBuilder;
import com.alibaba.nacos.core.utils.Loggers;
import com.alibaba.nacos.sys.env.EnvUtil;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

import static com.alibaba.nacos.core.remote.grpc.negotiator.tls.ClusterDefaultTlsProtocolNegotiatorBuilder.CLUSTER_TYPE_DEFAULT_TLS;
import static com.alibaba.nacos.core.remote.grpc.negotiator.tls.SdkDefaultTlsProtocolNegotiatorBuilder.SDK_TYPE_DEFAULT_TLS;

/**
* Manager for ProtocolNegotiatorBuilder instances, responsible for loading, managing, and providing
* ProtocolNegotiatorBuilders.
*
* <p>{@code ProtocolNegotiatorBuilderManager} is a singleton class, and it initializes ProtocolNegotiatorBuilders
* using the SPI mechanism. It also provides default ProtocolNegotiatorBuilders in case loading from SPI fails.
* </p>
*
* <p>Usage:
* <pre>{@code
* ProtocolNegotiatorBuilderManager manager = ProtocolNegotiatorBuilderManager.getInstance();
* NacosGrpcProtocolNegotiator negotiator = manager.get(CommunicationType.SDK);
* }</pre>
* </p>
*
* @author stone-98
* @date 2023/12/23
*/
public class ProtocolNegotiatorBuilderManager {

/**
* Property key for configuring the ProtocolNegotiator type for cluster communication.
*/
private static final String CLUSTER_TYPE_PROPERTY_KEY = "nacos.remote.cluster.server.rpc.protocol.negotiator.type";

/**
* Property key for configuring the ProtocolNegotiator type for SDK communication.
*/
private static final String SDK_TYPE_PROPERTY_KEY = "nacos.remote.sdk.server.rpc.protocol.negotiator.type";

/**
* Singleton instance of ProtocolNegotiatorBuilderManager.
*/
private static final ProtocolNegotiatorBuilderManager INSTANCE = new ProtocolNegotiatorBuilderManager();

/**
* Map to store ProtocolNegotiatorBuilders by their types.
*/
private static final Map<String, ProtocolNegotiatorBuilder> BUILDER_MAP = new HashMap<>();

/**
* Map to store the actual ProtocolNegotiator types used for different CommunicationTypes.
*/
private static final Map<CommunicationType, String> ACTUAL_TYPE_MAP = new HashMap<>();

/**
* Private constructor to enforce singleton pattern.
*/
private ProtocolNegotiatorBuilderManager() {
initActualTypeMap();
try {
initAllBuilders();
} catch (Exception e) {
Loggers.REMOTE.warn("Load ProtocolNegotiatorBuilder failed, use default ProtocolNegotiatorBuilder", e);
initDefaultBuilder();
}
}

/**
* Initialize all ProtocolNegotiatorBuilders using the SPI mechanism.
*/
private void initAllBuilders() {
for (ProtocolNegotiatorBuilder each : NacosServiceLoader.load(ProtocolNegotiatorBuilder.class)) {
BUILDER_MAP.put(each.type(), each);
Loggers.REMOTE.info("Load ProtocolNegotiatorBuilder {} for type {}", each.getClass().getCanonicalName(),
each.type());
}
}

/**
* Initialize the mapping of CommunicationType to actual ProtocolNegotiator type from configuration properties.
*/
private void initActualTypeMap() {
ACTUAL_TYPE_MAP.put(CommunicationType.SDK, EnvUtil.getProperty(SDK_TYPE_PROPERTY_KEY, SDK_TYPE_DEFAULT_TLS));
ACTUAL_TYPE_MAP.put(CommunicationType.CLUSTER,
EnvUtil.getProperty(CLUSTER_TYPE_PROPERTY_KEY, CLUSTER_TYPE_DEFAULT_TLS));
}

/**
* Initialize default ProtocolNegotiatorBuilders in case loading from SPI fails.
*/
private void initDefaultBuilder() {
BUILDER_MAP.put(SDK_TYPE_DEFAULT_TLS, new SdkDefaultTlsProtocolNegotiatorBuilder());
BUILDER_MAP.put(CLUSTER_TYPE_PROPERTY_KEY, new ClusterDefaultTlsProtocolNegotiatorBuilder());
}

/**
* Get the singleton instance of ProtocolNegotiatorBuilderManager.
*
* @return The singleton instance.
*/
public static ProtocolNegotiatorBuilderManager getInstance() {
return INSTANCE;
}

/**
* Get the ProtocolNegotiator for the specified CommunicationType.
*
* @param communicationType The CommunicationType for which the ProtocolNegotiator is requested.
* @return The ProtocolNegotiator instance.
*/
public NacosGrpcProtocolNegotiator get(CommunicationType communicationType) {
String actualType = ACTUAL_TYPE_MAP.get(communicationType);
if (StringUtils.isBlank(actualType)) {
Loggers.REMOTE.warn("Not found actualType for communicationType {}.", communicationType);
return null;
}
ProtocolNegotiatorBuilder builder = BUILDER_MAP.get(actualType);
if (Objects.isNull(builder)) {
Loggers.REMOTE.warn("Not found ProtocolNegotiatorBuilder for actualType {}.", actualType);
return null;
}
return BUILDER_MAP.get(actualType).build();
}
}
Loading

0 comments on commit 57f9d40

Please sign in to comment.