diff --git a/README.md b/README.md index 6aecce8..241c20d 100644 --- a/README.md +++ b/README.md @@ -76,14 +76,14 @@ - [x] **集成 Spring 通过注解进行服务消费** 。参考: [PR#10](https://github.com/Snailclimb/guide-rpc-framework/pull/10) - [x] **增加服务版本号** :建议使用两位数字版本,如:1.0,通常在接口不兼容时版本号才需要升级。为什么要增加服务版本号?为后续不兼容升级提供可能,比如服务接口增加方法,或服务模型增加字段,可向后兼容,删除方法或删除字段,将不兼容,枚举类型新增字段也不兼容,需通过变更版本号升级。 - [x] **对 SPI 机制的运用** -- [x] 扩充 rpc 协议,增加序列化协议字段,服务端可以根据消息体中的序列化协议动态选择序列化器。参考: [PR#14](https://github.com/Snailclimb/guide-rpc-framework/pull/14) - [ ] **增加可配置比如序列化方式、注册中心的实现方式,避免硬编码** :通过 API 配置,后续集成 Spring 的话建议使用配置文件的方式进行配置 -- [ ] **客户端与服务端通信协议(数据包结构)重新设计** ,可以将原有的 `RpcRequest`和 `RpcReuqest` 对象作为消息体,然后增加如下字段(可以参考:《Netty 入门实战小册》和 Dubbo 框架对这块的设计): +- [x] **客户端与服务端通信协议(数据包结构)重新设计** ,可以将原有的 `RpcRequest`和 `RpcReuqest` 对象作为消息体,然后增加如下字段(可以参考:《Netty 入门实战小册》和 Dubbo 框架对这块的设计): - **魔数** : 通常是 4 个字节。这个魔数主要是为了筛选来到服务端的数据包,有了这个魔数之后,服务端首先取出前面四个字节进行比对,能够在第一时间识别出这个数据包并非是遵循自定义协议的,也就是无效数据包,为了安全考虑可以直接关闭连接以节省资源。 - **序列化器编号** :标识序列化的方式,比如是使用 Java 自带的序列化,还是 json,kyro 等序列化方式。 - **消息体长度** : 运行时计算出来。 - ...... - [ ] **编写测试为重构代码提供信心** +- [ ] **服务监控中心(类似dubbo admin)** ### 项目模块概览 diff --git a/example-client/src/main/java/github/javaguide/NettyClientMain.java b/example-client/src/main/java/github/javaguide/NettyClientMain.java index 7ab4c4f..6575706 100644 --- a/example-client/src/main/java/github/javaguide/NettyClientMain.java +++ b/example-client/src/main/java/github/javaguide/NettyClientMain.java @@ -1,13 +1,7 @@ package github.javaguide; import github.javaguide.annotation.RpcScan; -import github.javaguide.entity.RpcServiceProperties; -import github.javaguide.proxy.RpcClientProxy; -import github.javaguide.remoting.transport.ClientTransport; -import github.javaguide.remoting.transport.netty.client.NettyClientTransport; import org.springframework.context.annotation.AnnotationConfigApplicationContext; -import org.springframework.context.annotation.ComponentScan; -import org.springframework.context.annotation.Configuration; /** * @author shuang.kou diff --git a/example-server/src/main/java/RpcFrameworkSimpleServerMain.java b/example-server/src/main/java/RpcFrameworkSimpleServerMain.java index 73f3c7d..7bb6942 100644 --- a/example-server/src/main/java/RpcFrameworkSimpleServerMain.java +++ b/example-server/src/main/java/RpcFrameworkSimpleServerMain.java @@ -2,7 +2,6 @@ import github.javaguide.entity.RpcServiceProperties; import github.javaguide.remoting.transport.socket.SocketRpcServer; import github.javaguide.serviceimpl.HelloServiceImpl; -import github.javaguide.serviceimpl.HelloServiceImpl2; /** * @author shuang.kou diff --git a/rpc-framework-common/src/main/java/github/javaguide/enumeration/RpcMessageType.java b/rpc-framework-common/src/main/java/github/javaguide/enumeration/RpcMessageType.java deleted file mode 100644 index bea97b9..0000000 --- a/rpc-framework-common/src/main/java/github/javaguide/enumeration/RpcMessageType.java +++ /dev/null @@ -1,9 +0,0 @@ -package github.javaguide.enumeration; - -/** - * @author shuang.kou - * @createTime 2020年06月16日 20:34:00 - */ -public enum RpcMessageType { - HEART_BEAT -} diff --git a/rpc-framework-common/src/main/java/github/javaguide/enumeration/RpcConfigProperties.java b/rpc-framework-common/src/main/java/github/javaguide/enums/RpcConfigPropertiesEnum.java similarity index 67% rename from rpc-framework-common/src/main/java/github/javaguide/enumeration/RpcConfigProperties.java rename to rpc-framework-common/src/main/java/github/javaguide/enums/RpcConfigPropertiesEnum.java index c8b04ac..22dc39e 100644 --- a/rpc-framework-common/src/main/java/github/javaguide/enumeration/RpcConfigProperties.java +++ b/rpc-framework-common/src/main/java/github/javaguide/enums/RpcConfigPropertiesEnum.java @@ -1,6 +1,6 @@ -package github.javaguide.enumeration; +package github.javaguide.enums; -public enum RpcConfigProperties { +public enum RpcConfigPropertiesEnum { RPC_CONFIG_PATH("rpc.properties"), ZK_ADDRESS("rpc.zookeeper.address"); @@ -8,7 +8,7 @@ public enum RpcConfigProperties { private final String propertyValue; - RpcConfigProperties(String propertyValue) { + RpcConfigPropertiesEnum(String propertyValue) { this.propertyValue = propertyValue; } diff --git a/rpc-framework-common/src/main/java/github/javaguide/enumeration/RpcErrorMessage.java b/rpc-framework-common/src/main/java/github/javaguide/enums/RpcErrorMessageEnum.java similarity index 89% rename from rpc-framework-common/src/main/java/github/javaguide/enumeration/RpcErrorMessage.java rename to rpc-framework-common/src/main/java/github/javaguide/enums/RpcErrorMessageEnum.java index 31d118b..68435ec 100644 --- a/rpc-framework-common/src/main/java/github/javaguide/enumeration/RpcErrorMessage.java +++ b/rpc-framework-common/src/main/java/github/javaguide/enums/RpcErrorMessageEnum.java @@ -1,4 +1,4 @@ -package github.javaguide.enumeration; +package github.javaguide.enums; import lombok.AllArgsConstructor; import lombok.Getter; @@ -11,7 +11,7 @@ @AllArgsConstructor @Getter @ToString -public enum RpcErrorMessage { +public enum RpcErrorMessageEnum { CLIENT_CONNECT_SERVER_FAILURE("客户端连接服务端失败"), SERVICE_INVOCATION_FAILURE("服务调用失败"), SERVICE_CAN_NOT_BE_FOUND("没有找到指定的服务"), diff --git a/rpc-framework-common/src/main/java/github/javaguide/enumeration/RpcResponseCode.java b/rpc-framework-common/src/main/java/github/javaguide/enums/RpcResponseCodeEnum.java similarity index 83% rename from rpc-framework-common/src/main/java/github/javaguide/enumeration/RpcResponseCode.java rename to rpc-framework-common/src/main/java/github/javaguide/enums/RpcResponseCodeEnum.java index a4dc6c9..a200761 100644 --- a/rpc-framework-common/src/main/java/github/javaguide/enumeration/RpcResponseCode.java +++ b/rpc-framework-common/src/main/java/github/javaguide/enums/RpcResponseCodeEnum.java @@ -1,4 +1,4 @@ -package github.javaguide.enumeration; +package github.javaguide.enums; import lombok.AllArgsConstructor; import lombok.Getter; @@ -11,7 +11,7 @@ @AllArgsConstructor @Getter @ToString -public enum RpcResponseCode { +public enum RpcResponseCodeEnum { SUCCESS(200, "The remote call is successful"), FAIL(500, "The remote call is fail"); diff --git a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/codec/enums/MySerializableEnum.java b/rpc-framework-common/src/main/java/github/javaguide/enums/SerializationTypeEnum.java similarity index 71% rename from rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/codec/enums/MySerializableEnum.java rename to rpc-framework-common/src/main/java/github/javaguide/enums/SerializationTypeEnum.java index 26df913..a4a0884 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/codec/enums/MySerializableEnum.java +++ b/rpc-framework-common/src/main/java/github/javaguide/enums/SerializationTypeEnum.java @@ -1,4 +1,4 @@ -package github.javaguide.remoting.transport.netty.codec.enums; +package github.javaguide.enums; import lombok.AllArgsConstructor; import lombok.Getter; @@ -9,16 +9,15 @@ */ @AllArgsConstructor @Getter -public enum MySerializableEnum { +public enum SerializationTypeEnum { KYRO((byte) 0x01, "kyro"); - private final byte code; private final String name; public static String getName(byte code) { - for (MySerializableEnum c : MySerializableEnum.values()) { + for (SerializationTypeEnum c : SerializationTypeEnum.values()) { if (c.getCode() == code) { return c.name; } diff --git a/rpc-framework-common/src/main/java/github/javaguide/exception/RpcException.java b/rpc-framework-common/src/main/java/github/javaguide/exception/RpcException.java index cd2bd67..0e7521f 100644 --- a/rpc-framework-common/src/main/java/github/javaguide/exception/RpcException.java +++ b/rpc-framework-common/src/main/java/github/javaguide/exception/RpcException.java @@ -1,21 +1,21 @@ package github.javaguide.exception; -import github.javaguide.enumeration.RpcErrorMessage; +import github.javaguide.enums.RpcErrorMessageEnum; /** * @author shuang.kou * @createTime 2020年05月12日 16:48:00 */ public class RpcException extends RuntimeException { - public RpcException(RpcErrorMessage rpcErrorMessage, String detail) { - super(rpcErrorMessage.getMessage() + ":" + detail); + public RpcException(RpcErrorMessageEnum rpcErrorMessageEnum, String detail) { + super(rpcErrorMessageEnum.getMessage() + ":" + detail); } public RpcException(String message, Throwable cause) { super(message, cause); } - public RpcException(RpcErrorMessage rpcErrorMessage) { - super(rpcErrorMessage.getMessage()); + public RpcException(RpcErrorMessageEnum rpcErrorMessageEnum) { + super(rpcErrorMessageEnum.getMessage()); } } diff --git a/rpc-framework-common/src/main/java/github/javaguide/extension/SPI.java b/rpc-framework-common/src/main/java/github/javaguide/extension/SPI.java index 9f10d06..6f35719 100644 --- a/rpc-framework-common/src/main/java/github/javaguide/extension/SPI.java +++ b/rpc-framework-common/src/main/java/github/javaguide/extension/SPI.java @@ -1,6 +1,10 @@ package github.javaguide.extension; -import java.lang.annotation.*; +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; @Documented @Retention(RetentionPolicy.RUNTIME) diff --git a/rpc-framework-common/src/main/java/github/javaguide/factory/SingletonFactory.java b/rpc-framework-common/src/main/java/github/javaguide/factory/SingletonFactory.java index bfc6716..2a6e7f9 100644 --- a/rpc-framework-common/src/main/java/github/javaguide/factory/SingletonFactory.java +++ b/rpc-framework-common/src/main/java/github/javaguide/factory/SingletonFactory.java @@ -21,7 +21,7 @@ public static T getInstance(Class c) { Object instance = null; if (instance == null) { synchronized (SingletonFactory.class) { - instance = OBJECT_MAP.get(key); + instance = OBJECT_MAP.get(key); if (instance == null) { try { instance = c.getDeclaredConstructor().newInstance(); diff --git a/rpc-framework-common/src/main/java/github/javaguide/utils/RuntimeUtil.java b/rpc-framework-common/src/main/java/github/javaguide/utils/RuntimeUtil.java new file mode 100644 index 0000000..4e97176 --- /dev/null +++ b/rpc-framework-common/src/main/java/github/javaguide/utils/RuntimeUtil.java @@ -0,0 +1,15 @@ +package github.javaguide.utils; + +/** + * @author hty + * @createTime 2020年09月18日 15:43:00 + */ +public class RuntimeUtil { + /** + * 获取CPU的核心数 + * @return cpu的核心数 + */ + public static int cpus() { + return Runtime.getRuntime().availableProcessors(); + } +} diff --git a/rpc-framework-common/src/main/java/github/javaguide/utils/concurrent/threadpool/ThreadPoolFactoryUtils.java b/rpc-framework-common/src/main/java/github/javaguide/utils/concurrent/threadpool/ThreadPoolFactoryUtils.java index 810a138..2708f39 100644 --- a/rpc-framework-common/src/main/java/github/javaguide/utils/concurrent/threadpool/ThreadPoolFactoryUtils.java +++ b/rpc-framework-common/src/main/java/github/javaguide/utils/concurrent/threadpool/ThreadPoolFactoryUtils.java @@ -85,7 +85,7 @@ private static ExecutorService createThreadPool(CustomThreadPoolConfig customThr * @param daemon 指定是否为 Daemon Thread(守护线程) * @return ThreadFactory */ - private static ThreadFactory createThreadFactory(String threadNamePrefix, Boolean daemon) { + public static ThreadFactory createThreadFactory(String threadNamePrefix, Boolean daemon) { if (threadNamePrefix != null) { if (daemon != null) { return new ThreadFactoryBuilder() diff --git a/rpc-framework-simple/src/main/java/github/javaguide/annotation/RpcReference.java b/rpc-framework-simple/src/main/java/github/javaguide/annotation/RpcReference.java index 3352126..9fbdfd2 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/annotation/RpcReference.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/annotation/RpcReference.java @@ -12,6 +12,7 @@ * RPC reference annotation, autowire the service implementation class * * @author smile2coder + * @createTime 2020年09月16日 21:42:00 */ @Documented @Retention(RetentionPolicy.RUNTIME) diff --git a/rpc-framework-simple/src/main/java/github/javaguide/annotation/RpcService.java b/rpc-framework-simple/src/main/java/github/javaguide/annotation/RpcService.java index 3c292bf..40685d7 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/annotation/RpcService.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/annotation/RpcService.java @@ -1,8 +1,6 @@ package github.javaguide.annotation; -import org.springframework.stereotype.Component; - import java.lang.annotation.Documented; import java.lang.annotation.ElementType; import java.lang.annotation.Inherited; diff --git a/rpc-framework-simple/src/main/java/github/javaguide/provider/ServiceProviderImpl.java b/rpc-framework-simple/src/main/java/github/javaguide/provider/ServiceProviderImpl.java index 22d8a83..77da287 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/provider/ServiceProviderImpl.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/provider/ServiceProviderImpl.java @@ -1,7 +1,7 @@ package github.javaguide.provider; import github.javaguide.entity.RpcServiceProperties; -import github.javaguide.enumeration.RpcErrorMessage; +import github.javaguide.enums.RpcErrorMessageEnum; import github.javaguide.exception.RpcException; import github.javaguide.extension.ExtensionLoader; import github.javaguide.registry.ServiceRegistry; @@ -52,7 +52,7 @@ public void addService(Object service, Class serviceClass, RpcServiceProperti public Object getService(RpcServiceProperties rpcServiceProperties) { Object service = serviceMap.get(rpcServiceProperties.toRpcServiceName()); if (null == service) { - throw new RpcException(RpcErrorMessage.SERVICE_CAN_NOT_BE_FOUND); + throw new RpcException(RpcErrorMessageEnum.SERVICE_CAN_NOT_BE_FOUND); } return service; } diff --git a/rpc-framework-simple/src/main/java/github/javaguide/proxy/RpcClientProxy.java b/rpc-framework-simple/src/main/java/github/javaguide/proxy/RpcClientProxy.java index 919c466..dc4ee9d 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/proxy/RpcClientProxy.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/proxy/RpcClientProxy.java @@ -1,7 +1,9 @@ package github.javaguide.proxy; import github.javaguide.entity.RpcServiceProperties; -import github.javaguide.remoting.dto.RpcMessageChecker; +import github.javaguide.enums.RpcErrorMessageEnum; +import github.javaguide.enums.RpcResponseCodeEnum; +import github.javaguide.exception.RpcException; import github.javaguide.remoting.dto.RpcRequest; import github.javaguide.remoting.dto.RpcResponse; import github.javaguide.remoting.transport.ClientTransport; @@ -26,6 +28,9 @@ */ @Slf4j public class RpcClientProxy implements InvocationHandler { + + private static final String INTERFACE_NAME = "interfaceName"; + /** * Used to send requests to the server.And there are two implementations: socket and netty */ @@ -82,7 +87,21 @@ public Object invoke(Object proxy, Method method, Object[] args) { if (clientTransport instanceof SocketRpcClient) { rpcResponse = (RpcResponse) clientTransport.sendRpcRequest(rpcRequest); } - RpcMessageChecker.check(rpcResponse, rpcRequest); + this.check(rpcResponse, rpcRequest); return rpcResponse.getData(); } + + private void check(RpcResponse rpcResponse, RpcRequest rpcRequest) { + if (rpcResponse == null) { + throw new RpcException(RpcErrorMessageEnum.SERVICE_INVOCATION_FAILURE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName()); + } + + if (!rpcRequest.getRequestId().equals(rpcResponse.getRequestId())) { + throw new RpcException(RpcErrorMessageEnum.REQUEST_NOT_MATCH_RESPONSE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName()); + } + + if (rpcResponse.getCode() == null || !rpcResponse.getCode().equals(RpcResponseCodeEnum.SUCCESS.getCode())) { + throw new RpcException(RpcErrorMessageEnum.SERVICE_INVOCATION_FAILURE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName()); + } + } } diff --git a/rpc-framework-simple/src/main/java/github/javaguide/registry/zk/ZkServiceDiscovery.java b/rpc-framework-simple/src/main/java/github/javaguide/registry/zk/ZkServiceDiscovery.java index a0d5aa7..8601767 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/registry/zk/ZkServiceDiscovery.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/registry/zk/ZkServiceDiscovery.java @@ -1,6 +1,6 @@ package github.javaguide.registry.zk; -import github.javaguide.enumeration.RpcErrorMessage; +import github.javaguide.enums.RpcErrorMessageEnum; import github.javaguide.exception.RpcException; import github.javaguide.loadbalance.LoadBalance; import github.javaguide.loadbalance.RandomLoadBalance; @@ -31,7 +31,7 @@ public InetSocketAddress lookupService(String rpcServiceName) { CuratorFramework zkClient = CuratorUtils.getZkClient(); List serviceUrlList = CuratorUtils.getChildrenNodes(zkClient, rpcServiceName); if (serviceUrlList.size() == 0) { - throw new RpcException(RpcErrorMessage.SERVICE_CAN_NOT_BE_FOUND, rpcServiceName); + throw new RpcException(RpcErrorMessageEnum.SERVICE_CAN_NOT_BE_FOUND, rpcServiceName); } // load balancing String targetServiceUrl = loadBalance.selectServiceAddress(serviceUrlList); diff --git a/rpc-framework-simple/src/main/java/github/javaguide/registry/zk/util/CuratorUtils.java b/rpc-framework-simple/src/main/java/github/javaguide/registry/zk/util/CuratorUtils.java index 063a9d7..e8dfa9b 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/registry/zk/util/CuratorUtils.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/registry/zk/util/CuratorUtils.java @@ -1,6 +1,6 @@ package github.javaguide.registry.zk.util; -import github.javaguide.enumeration.RpcConfigProperties; +import github.javaguide.enums.RpcConfigPropertiesEnum; import github.javaguide.exception.RpcException; import github.javaguide.utils.file.PropertiesFileUtils; import lombok.extern.slf4j.Slf4j; @@ -97,9 +97,9 @@ public static void clearRegistry(CuratorFramework zkClient) { public static CuratorFramework getZkClient() { // check if user has set zk address - Properties properties = PropertiesFileUtils.readPropertiesFile(RpcConfigProperties.RPC_CONFIG_PATH.getPropertyValue()); + Properties properties = PropertiesFileUtils.readPropertiesFile(RpcConfigPropertiesEnum.RPC_CONFIG_PATH.getPropertyValue()); if (properties != null) { - defaultZookeeperAddress = properties.getProperty(RpcConfigProperties.ZK_ADDRESS.getPropertyValue()); + defaultZookeeperAddress = properties.getProperty(RpcConfigPropertiesEnum.ZK_ADDRESS.getPropertyValue()); } // if zkClient has been started, return directly if (zkClient != null && zkClient.getState() == CuratorFrameworkState.STARTED) { diff --git a/rpc-framework-simple/src/main/java/github/javaguide/remoting/constants/RpcConstants.java b/rpc-framework-simple/src/main/java/github/javaguide/remoting/constants/RpcConstants.java index dea0591..283338f 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/remoting/constants/RpcConstants.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/remoting/constants/RpcConstants.java @@ -7,44 +7,26 @@ * @author wangtao . * @createTime on 2020/10/2 */ - public class RpcConstants { /** - * 魔法数 检验 RpcMessage - * guide rpc + * Magic number. Verify RpcMessage */ public static final byte[] MAGIC_NUMBER = {(byte) 'g', (byte) 'r', (byte) 'p', (byte) 'c'}; - - public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8; - - //版本信息 + //version information public static final byte VERSION = 1; public static final byte TOTAL_LENGTH = 15; - - //请求 - public static final byte MSGTYPE_RESQUEST = 1; - - //相应 - public static final byte MSGTYPE_RESPONSE = 2; - + public static final byte REQUEST_TYPE = 1; + public static final byte RESPONSE_TYPE = 2; //ping - public static final byte MSGTYPE_HEARTBEAT_REQUEST = 3; - + public static final byte HEARTBEAT_REQUEST_TYPE = 3; //pong - public static final byte MSGTYPE_HEARTBEAT_RESPONSE = 4; - - + public static final byte HEARTBEAT_RESPONSE_TYPE = 4; public static final int HEAD_LENGTH = 15; - - public static final String PING = "ping"; - public static final String PONG = "pong"; - - public static final int MAX_FRAME_LENGTH = 8 * 1024 * 1024; } diff --git a/rpc-framework-simple/src/main/java/github/javaguide/remoting/dto/RpcMessage.java b/rpc-framework-simple/src/main/java/github/javaguide/remoting/dto/RpcMessage.java index ecdf784..eac9c4c 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/remoting/dto/RpcMessage.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/remoting/dto/RpcMessage.java @@ -1,7 +1,12 @@ package github.javaguide.remoting.dto; -import lombok.*; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.ToString; /** * @author wangtao @@ -15,17 +20,13 @@ @ToString public class RpcMessage { - //消息类型 + //rpc message type private byte messageType; - - //序列化类型 + //serialization type private byte codec; - - //请求id + //request id private int requestId; - - //数据内容 + //request data private Object data; - } diff --git a/rpc-framework-simple/src/main/java/github/javaguide/remoting/dto/RpcMessageChecker.java b/rpc-framework-simple/src/main/java/github/javaguide/remoting/dto/RpcMessageChecker.java deleted file mode 100644 index b163d2a..0000000 --- a/rpc-framework-simple/src/main/java/github/javaguide/remoting/dto/RpcMessageChecker.java +++ /dev/null @@ -1,35 +0,0 @@ -package github.javaguide.remoting.dto; - - -import github.javaguide.enumeration.RpcErrorMessage; -import github.javaguide.enumeration.RpcResponseCode; -import github.javaguide.exception.RpcException; -import lombok.extern.slf4j.Slf4j; - -/** - * Verify RpcRequest and RpcRequest - * - * @author shuang.kou - * @createTime 2020年05月26日 18:03:00 - */ -@Slf4j -public final class RpcMessageChecker { - private static final String INTERFACE_NAME = "interfaceName"; - - private RpcMessageChecker() { - } - - public static void check(RpcResponse rpcResponse, RpcRequest rpcRequest) { - if (rpcResponse == null) { - throw new RpcException(RpcErrorMessage.SERVICE_INVOCATION_FAILURE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName()); - } - - if (!rpcRequest.getRequestId().equals(rpcResponse.getRequestId())) { - throw new RpcException(RpcErrorMessage.REQUEST_NOT_MATCH_RESPONSE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName()); - } - - if (rpcResponse.getCode() == null || !rpcResponse.getCode().equals(RpcResponseCode.SUCCESS.getCode())) { - throw new RpcException(RpcErrorMessage.SERVICE_INVOCATION_FAILURE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName()); - } - } -} diff --git a/rpc-framework-simple/src/main/java/github/javaguide/remoting/dto/RpcRequest.java b/rpc-framework-simple/src/main/java/github/javaguide/remoting/dto/RpcRequest.java index 1042022..daf0f37 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/remoting/dto/RpcRequest.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/remoting/dto/RpcRequest.java @@ -1,7 +1,6 @@ package github.javaguide.remoting.dto; import github.javaguide.entity.RpcServiceProperties; -import github.javaguide.enumeration.RpcMessageType; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Getter; @@ -26,7 +25,6 @@ public class RpcRequest implements Serializable { private String methodName; private Object[] parameters; private Class[] paramTypes; - private RpcMessageType rpcMessageType; private String version; private String group; diff --git a/rpc-framework-simple/src/main/java/github/javaguide/remoting/dto/RpcResponse.java b/rpc-framework-simple/src/main/java/github/javaguide/remoting/dto/RpcResponse.java index 4ef1cab..6f13d8b 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/remoting/dto/RpcResponse.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/remoting/dto/RpcResponse.java @@ -1,6 +1,6 @@ package github.javaguide.remoting.dto; -import github.javaguide.enumeration.RpcResponseCode; +import github.javaguide.enums.RpcResponseCodeEnum; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Getter; @@ -39,8 +39,8 @@ public class RpcResponse implements Serializable { public static RpcResponse success(T data, String requestId) { RpcResponse response = new RpcResponse<>(); - response.setCode(RpcResponseCode.SUCCESS.getCode()); - response.setMessage(RpcResponseCode.SUCCESS.getMessage()); + response.setCode(RpcResponseCodeEnum.SUCCESS.getCode()); + response.setMessage(RpcResponseCodeEnum.SUCCESS.getMessage()); response.setRequestId(requestId); if (null != data) { response.setData(data); @@ -48,10 +48,10 @@ public static RpcResponse success(T data, String requestId) { return response; } - public static RpcResponse fail(RpcResponseCode rpcResponseCode) { + public static RpcResponse fail(RpcResponseCodeEnum rpcResponseCodeEnum) { RpcResponse response = new RpcResponse<>(); - response.setCode(rpcResponseCode.getCode()); - response.setMessage(rpcResponseCode.getMessage()); + response.setCode(rpcResponseCodeEnum.getCode()); + response.setMessage(rpcResponseCodeEnum.getMessage()); return response; } diff --git a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/NettyClient.java b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/NettyClient.java index 9510c15..f948c9c 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/NettyClient.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/NettyClient.java @@ -4,7 +4,12 @@ import github.javaguide.remoting.transport.netty.codec.RpcMessageDecoder; import github.javaguide.remoting.transport.netty.codec.RpcMessageEncoder; import io.netty.bootstrap.Bootstrap; -import io.netty.channel.*; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; diff --git a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/NettyClientHandler.java b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/NettyClientHandler.java index a797137..0ac9746 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/NettyClientHandler.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/NettyClientHandler.java @@ -4,8 +4,12 @@ import github.javaguide.remoting.constants.RpcConstants; import github.javaguide.remoting.dto.RpcMessage; import github.javaguide.remoting.dto.RpcResponse; -import github.javaguide.remoting.transport.netty.codec.enums.MySerializableEnum; -import io.netty.channel.*; +import github.javaguide.enums.SerializationTypeEnum; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.ReferenceCountUtil; @@ -43,9 +47,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { if (msg instanceof RpcMessage) { RpcMessage tmp = (RpcMessage) msg; byte messageType = tmp.getMessageType(); - if (messageType == RpcConstants.MSGTYPE_HEARTBEAT_RESPONSE) { + if (messageType == RpcConstants.HEARTBEAT_RESPONSE_TYPE) { log.info("heart [{}]", tmp.getData()); - } else if (messageType == RpcConstants.MSGTYPE_RESPONSE) { + } else if (messageType == RpcConstants.RESPONSE_TYPE) { RpcResponse rpcResponse = (RpcResponse) tmp.getData(); unprocessedRequests.complete(rpcResponse); } @@ -63,8 +67,8 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc log.info("write idle happen [{}]", ctx.channel().remoteAddress()); Channel channel = channelProvider.get((InetSocketAddress) ctx.channel().remoteAddress()); RpcMessage rpcMessage = new RpcMessage(); - rpcMessage.setCodec(MySerializableEnum.KYRO.getCode()); - rpcMessage.setMessageType(RpcConstants.MSGTYPE_HEARTBEAT_REQUEST); + rpcMessage.setCodec(SerializationTypeEnum.KYRO.getCode()); + rpcMessage.setMessageType(RpcConstants.HEARTBEAT_REQUEST_TYPE); rpcMessage.setData(RpcConstants.PING); channel.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } diff --git a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/NettyClientTransport.java b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/NettyClientTransport.java index 3b4561d..81701f7 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/NettyClientTransport.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/NettyClientTransport.java @@ -8,7 +8,7 @@ import github.javaguide.remoting.dto.RpcRequest; import github.javaguide.remoting.dto.RpcResponse; import github.javaguide.remoting.transport.ClientTransport; -import github.javaguide.remoting.transport.netty.codec.enums.MySerializableEnum; +import github.javaguide.enums.SerializationTypeEnum; import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; import lombok.extern.slf4j.Slf4j; @@ -49,8 +49,8 @@ public CompletableFuture> sendRpcRequest(RpcRequest rpcReque unprocessedRequests.put(rpcRequest.getRequestId(), resultFuture); RpcMessage rpcMessage = new RpcMessage(); rpcMessage.setData(rpcRequest); - rpcMessage.setCodec(MySerializableEnum.KYRO.getCode()); - rpcMessage.setMessageType(RpcConstants.MSGTYPE_RESQUEST); + rpcMessage.setCodec(SerializationTypeEnum.KYRO.getCode()); + rpcMessage.setMessageType(RpcConstants.REQUEST_TYPE); channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { log.info("client send message: [{}]", rpcMessage); diff --git a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/codec/RpcMessageDecoder.java b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/codec/RpcMessageDecoder.java index d6cf7a4..3ed4554 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/codec/RpcMessageDecoder.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/codec/RpcMessageDecoder.java @@ -1,11 +1,11 @@ package github.javaguide.remoting.transport.netty.codec; +import github.javaguide.enums.SerializationTypeEnum; import github.javaguide.extension.ExtensionLoader; import github.javaguide.remoting.constants.RpcConstants; import github.javaguide.remoting.dto.RpcMessage; import github.javaguide.remoting.dto.RpcRequest; import github.javaguide.remoting.dto.RpcResponse; -import github.javaguide.remoting.transport.netty.codec.enums.MySerializableEnum; import github.javaguide.serialize.Serializer; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; @@ -15,25 +15,52 @@ import java.util.Arrays; /** - * @author wangtao . + * custom protocol decoder + *
+ *   0     1     2     3     4        5     6     7     8     9          10       11     12    13    14   15
+ *   +-----+-----+-----+-----+--------+----+----+----+------+-----------+-------+-----------+-----+-----+-----+
+ *   |   magic   code        |version | full length         | messageType| codec| RequestId                   |
+ *   +-----------------------+--------+---------------------+-----------+-----------+-----------+------------+
+ *   |                                                                                                       |
+ *   |                                         body                                                          |
+ *   |                                                                                                       |
+ *   |                                        ... ...                                                        |
+ *   +-------------------------------------------------------------------------------------------------------+
+ * 4B  magic code(魔法数)   1B version(版本)   4B full length(消息长度)    1B messageType(消息类型)
+ * 1B codec(序列化类型)    4B  requestId(请求的Id)
+ * body(object类型数据)
+ * 
+ *

+ * {@link LengthFieldBasedFrameDecoder} is a length-based decoder , used to solve TCP unpacking and sticking problems. + *

+ * + * @author wangtao * @createTime on 2020/10/2 + * @see LengthFieldBasedFrameDecoder解码器 */ @Slf4j public class RpcMessageDecoder extends LengthFieldBasedFrameDecoder { public RpcMessageDecoder() { - // default is 8M - this(RpcConstants.MAX_FRAME_LENGTH); + // lengthFieldOffset: magic code is 4B, and version is 1B, and then full length. so value is 5 + // lengthFieldLength: full length is 4B. so value is 4 + // lengthAdjustment: full length include all data and read 9 bytes before, so the left length is (fullLength-9). so values is -9 + // initialBytesToStrip: we will check magic code and version manually, so do not strip any bytes. so values is 0 + this(RpcConstants.MAX_FRAME_LENGTH, 5, 4, -9, 0); } - public RpcMessageDecoder(int maxFrameLength) { - /* - int maxFrameLength, - int lengthFieldOffset, magic code is 4B, and version is 1B, and then FullLength. so value is 5 - int lengthFieldLength, FullLength is int(4B). so values is 4 - int lengthAdjustment, FullLength include all data and read 9 bytes before, so the left length is (FullLength-9). so values is -9 - int initialBytesToStrip we will check magic code and version self, so do not strip any bytes. so values is 0 - */ - super(maxFrameLength, 5, 4, -9, 0); + /** + * @param maxFrameLength Maximum frame length. It decide the maximum length of data that can be received. + * If it exceeds, the data will be discarded. + * @param lengthFieldOffset Length field offset. The length field is the one that skips the specified length of byte. + * @param lengthFieldLength The number of bytes in the length field. + * @param lengthAdjustment The compensation value to add to the value of the length field + * @param initialBytesToStrip Number of bytes skipped. + * If you need to receive all of the header+body data, this value is 0 + * if you only want to receive the body data, then you need to skip the number of bytes consumed by the header. + */ + public RpcMessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, + int lengthAdjustment, int initialBytesToStrip) { + super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip); } @Override @@ -57,10 +84,9 @@ protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception } - - private Object decodeFrame(ByteBuf in) - throws Exception { -// 读取前4个magic比对一下 + private Object decodeFrame(ByteBuf in) { + // note: must read ByteBuf in order + // read the first 4 bit, which is the magic number, and compare int len = RpcConstants.MAGIC_NUMBER.length; byte[] tmp = new byte[len]; in.readBytes(tmp); @@ -69,33 +95,33 @@ private Object decodeFrame(ByteBuf in) throw new IllegalArgumentException("Unknown magic code: " + Arrays.toString(tmp)); } } + // read the version and compare byte version = in.readByte(); if (version != RpcConstants.VERSION) { throw new RuntimeException("version isn't compatible" + version); } int fullLength = in.readInt(); - //消息类型 + // build RpcMessage object byte messageType = in.readByte(); - //读取序列化类型 byte codecType = in.readByte(); int requestId = in.readInt(); - RpcMessage rpcMessage = new RpcMessage(); - rpcMessage.setMessageType(messageType); - rpcMessage.setRequestId(requestId); - rpcMessage.setCodec(codecType); - if (messageType == RpcConstants.MSGTYPE_HEARTBEAT_REQUEST) { + RpcMessage rpcMessage = RpcMessage.builder() + .codec(codecType) + .requestId(requestId) + .messageType(messageType).build(); + if (messageType == RpcConstants.HEARTBEAT_REQUEST_TYPE) { rpcMessage.setData(RpcConstants.PING); - } else if (messageType == RpcConstants.MSGTYPE_HEARTBEAT_RESPONSE) { + } else if (messageType == RpcConstants.HEARTBEAT_RESPONSE_TYPE) { rpcMessage.setData(RpcConstants.PONG); } else { int bodyLength = fullLength - RpcConstants.HEAD_LENGTH; if (bodyLength > 0) { byte[] bs = new byte[bodyLength]; in.readBytes(bs); - String codecName = MySerializableEnum.getName(rpcMessage.getCodec()); + String codecName = SerializationTypeEnum.getName(rpcMessage.getCodec()); Serializer serializer = ExtensionLoader.getExtensionLoader(Serializer.class) .getExtension(codecName); - if (messageType == RpcConstants.MSGTYPE_RESQUEST) { + if (messageType == RpcConstants.REQUEST_TYPE) { RpcRequest tmpValue = serializer.deserialize(bs, RpcRequest.class); rpcMessage.setData(tmpValue); } else { diff --git a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/codec/RpcMessageEncoder.java b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/codec/RpcMessageEncoder.java index bfe9b95..63c2b17 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/codec/RpcMessageEncoder.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/codec/RpcMessageEncoder.java @@ -1,10 +1,10 @@ package github.javaguide.remoting.transport.netty.codec; +import github.javaguide.enums.SerializationTypeEnum; import github.javaguide.extension.ExtensionLoader; import github.javaguide.remoting.constants.RpcConstants; import github.javaguide.remoting.dto.RpcMessage; -import github.javaguide.remoting.transport.netty.codec.enums.MySerializableEnum; import github.javaguide.serialize.Serializer; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; @@ -15,58 +15,51 @@ /** + *

+ * custom protocol decoder + *

+ *

+ *   0     1     2     3     4        5     6     7     8     9          10       11     12    13    14   15
+ *   +-----+-----+-----+-----+--------+----+----+----+------+-----------+-------+-----------+-----+-----+-----+
+ *   |   magic   code        |version | full length         | messageType| codec| RequestId                   |
+ *   +-----------------------+--------+---------------------+-----------+-----------+-----------+------------+
+ *   |                                                                                                       |
+ *   |                                         body                                                          |
+ *   |                                                                                                       |
+ *   |                                        ... ...                                                        |
+ *   +-------------------------------------------------------------------------------------------------------+
+ * 4B  magic code(魔法数)   1B version(版本)   4B full length(消息长度)    1B messageType(消息类型)
+ * 1B codec(序列化类型)    4B  requestId(请求的Id)
+ * body(object类型数据)
+ * 
+ * * @author WangTao * @createTime on 2020/10/2 - * - * 自定义协议解码器 - * - * *
- *  * 0     1     2     3     4        5     6     7     8     9          10       11     12    13    14   15
- *  * +-----+-----+-----+-----+--------+----+----+----+------+-----------+-------+-----------+-----+-----+-----+
- *  * |   magic   code        |version | Full length         | messageType| codec| RequestId                   |
- *  * +-----------------------+--------+---------------------+-----------+-----------+-----------+------------+
- *  * |                                                                                                       |
- *  * |                                         body                                                          |
- *  * |                                                                                                       |
- *  * |                                        ... ...                                                        |
- *  * +-------------------------------------------------------------------------------------------------------+
- *
- *  自定义编码器
- *  4B  magic   code 魔法数  1B version 版本  4B full length  消息长度  1B messageType 消息类型
- *   1B codec 序列化   4B  requestId 请求的Id
- *   body object类型数据
- *
+ * @see LengthFieldBasedFrameDecoder解码器
  */
 
 @Slf4j
 public class RpcMessageEncoder extends MessageToByteEncoder {
     private static final AtomicInteger ATOMIC_INTEGER = new AtomicInteger(0);
 
-
-
     @Override
-    protected void encode(ChannelHandlerContext ctx, RpcMessage rpcMessage, ByteBuf out)
-            throws Exception {
+    protected void encode(ChannelHandlerContext ctx, RpcMessage rpcMessage, ByteBuf out) {
         try {
-            int fullLength = RpcConstants.HEAD_LENGTH;
-            byte messageType = rpcMessage.getMessageType();
-            //写入magic数字
             out.writeBytes(RpcConstants.MAGIC_NUMBER);
             out.writeByte(RpcConstants.VERSION);
-            // 留出位置写入数据包的长度
+            // leave a place to write the value of full length
             out.writerIndex(out.writerIndex() + 4);
-            //设置消息类型
-            out.writeByte(rpcMessage.getMessageType());
-            //设置序列化
+            byte messageType = rpcMessage.getMessageType();
+            out.writeByte(messageType);
             out.writeByte(rpcMessage.getCodec());
             out.writeInt(ATOMIC_INTEGER.getAndDecrement());
+            // build full length
             byte[] bodyBytes = null;
-
-            //不是心跳
-            if (messageType != RpcConstants.MSGTYPE_HEARTBEAT_REQUEST
-                    && messageType != RpcConstants.MSGTYPE_HEARTBEAT_RESPONSE) {
-                //对象序列化
-                String codecName = MySerializableEnum.getName(rpcMessage.getCodec());
+            int fullLength = RpcConstants.HEAD_LENGTH;
+            // if messageType is not heartbeat message,fullLength = head length + body length
+            if (messageType != RpcConstants.HEARTBEAT_REQUEST_TYPE
+                    && messageType != RpcConstants.HEARTBEAT_RESPONSE_TYPE) {
+                String codecName = SerializationTypeEnum.getName(rpcMessage.getCodec());
                 Serializer serializer = ExtensionLoader.getExtensionLoader(Serializer.class)
                         .getExtension(codecName);
                 bodyBytes = serializer.serialize(rpcMessage.getData());
@@ -76,21 +69,16 @@ protected void encode(ChannelHandlerContext ctx, RpcMessage rpcMessage, ByteBuf
             if (bodyBytes != null) {
                 out.writeBytes(bodyBytes);
             }
-
             int writeIndex = out.writerIndex();
             out.writerIndex(writeIndex - fullLength + RpcConstants.MAGIC_NUMBER.length + 1);
-            //写入长度
             out.writeInt(fullLength);
-            //重置
             out.writerIndex(writeIndex);
         } catch (Exception e) {
             log.error("Encode request error!", e);
         }
 
-
     }
 
 
-
 }
 
diff --git a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/server/NettyServer.java b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/server/NettyServer.java
index 8aa92fa..f3e4ef0 100644
--- a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/server/NettyServer.java
+++ b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/server/NettyServer.java
@@ -7,14 +7,21 @@
 import github.javaguide.provider.ServiceProviderImpl;
 import github.javaguide.remoting.transport.netty.codec.RpcMessageDecoder;
 import github.javaguide.remoting.transport.netty.codec.RpcMessageEncoder;
+import github.javaguide.utils.RuntimeUtil;
+import github.javaguide.utils.concurrent.threadpool.ThreadPoolFactoryUtils;
 import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.*;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.logging.LogLevel;
 import io.netty.handler.logging.LoggingHandler;
 import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.concurrent.DefaultEventExecutorGroup;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
@@ -37,10 +44,6 @@ public class NettyServer {
 
     private final ServiceProvider serviceProvider = SingletonFactory.getInstance(ServiceProviderImpl.class);
 
-    public void registerService(Object service) {
-        serviceProvider.publishService(service);
-    }
-
     public void registerService(Object service, RpcServiceProperties rpcServiceProperties) {
         serviceProvider.publishService(service, rpcServiceProperties);
     }
@@ -51,6 +54,10 @@ public void start() {
         String host = InetAddress.getLocalHost().getHostAddress();
         EventLoopGroup bossGroup = new NioEventLoopGroup(1);
         EventLoopGroup workerGroup = new NioEventLoopGroup();
+        DefaultEventExecutorGroup serviceHandlerGroup = new DefaultEventExecutorGroup(
+                RuntimeUtil.cpus() * 2,
+                ThreadPoolFactoryUtils.createThreadFactory("service-handler-group", false)
+        );
         try {
             ServerBootstrap b = new ServerBootstrap();
             b.group(bossGroup, workerGroup)
@@ -71,7 +78,7 @@ protected void initChannel(SocketChannel ch) {
                             p.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
                             p.addLast(new RpcMessageEncoder());
                             p.addLast(new RpcMessageDecoder());
-                            p.addLast(new NettyServerHandler());
+                            p.addLast(serviceHandlerGroup, new NettyServerHandler());
                         }
                     });
 
@@ -85,6 +92,7 @@ protected void initChannel(SocketChannel ch) {
             log.error("shutdown bossGroup and workerGroup");
             bossGroup.shutdownGracefully();
             workerGroup.shutdownGracefully();
+            serviceHandlerGroup.shutdownGracefully();
         }
     }
 
diff --git a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/server/NettyServerHandler.java b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/server/NettyServerHandler.java
index 01c4f1b..587a781 100644
--- a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/server/NettyServerHandler.java
+++ b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/server/NettyServerHandler.java
@@ -1,13 +1,13 @@
 package github.javaguide.remoting.transport.netty.server;
 
-import github.javaguide.enumeration.RpcResponseCode;
+import github.javaguide.enums.RpcResponseCodeEnum;
 import github.javaguide.factory.SingletonFactory;
 import github.javaguide.remoting.constants.RpcConstants;
 import github.javaguide.remoting.dto.RpcMessage;
 import github.javaguide.remoting.dto.RpcRequest;
 import github.javaguide.remoting.dto.RpcResponse;
 import github.javaguide.remoting.handler.RpcRequestHandler;
-import github.javaguide.remoting.transport.netty.codec.enums.MySerializableEnum;
+import github.javaguide.enums.SerializationTypeEnum;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
@@ -41,10 +41,10 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
             if (msg instanceof RpcMessage) {
                 log.info("server receive msg: [{}] ", msg);
                 byte messageType = ((RpcMessage) msg).getMessageType();
-                if (messageType == RpcConstants.MSGTYPE_HEARTBEAT_REQUEST) {
+                if (messageType == RpcConstants.HEARTBEAT_REQUEST_TYPE) {
                     RpcMessage rpcMessage = new RpcMessage();
-                    rpcMessage.setCodec(MySerializableEnum.KYRO.getCode());
-                    rpcMessage.setMessageType(RpcConstants.MSGTYPE_HEARTBEAT_RESPONSE);
+                    rpcMessage.setCodec(SerializationTypeEnum.KYRO.getCode());
+                    rpcMessage.setMessageType(RpcConstants.HEARTBEAT_RESPONSE_TYPE);
                     rpcMessage.setData(RpcConstants.PONG);
                     ctx.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                 } else {
@@ -55,15 +55,15 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
                     if (ctx.channel().isActive() && ctx.channel().isWritable()) {
                         RpcResponse rpcResponse = RpcResponse.success(result, rpcRequest.getRequestId());
                         RpcMessage rpcMessage = new RpcMessage();
-                        rpcMessage.setCodec(MySerializableEnum.KYRO.getCode());
-                        rpcMessage.setMessageType(RpcConstants.MSGTYPE_RESPONSE);
+                        rpcMessage.setCodec(SerializationTypeEnum.KYRO.getCode());
+                        rpcMessage.setMessageType(RpcConstants.RESPONSE_TYPE);
                         rpcMessage.setData(rpcResponse);
                         ctx.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                     } else {
-                        RpcResponse rpcResponse = RpcResponse.fail(RpcResponseCode.FAIL);
+                        RpcResponse rpcResponse = RpcResponse.fail(RpcResponseCodeEnum.FAIL);
                         RpcMessage rpcMessage = new RpcMessage();
-                        rpcMessage.setCodec(MySerializableEnum.KYRO.getCode());
-                        rpcMessage.setMessageType(RpcConstants.MSGTYPE_RESPONSE);
+                        rpcMessage.setCodec(SerializationTypeEnum.KYRO.getCode());
+                        rpcMessage.setMessageType(RpcConstants.RESPONSE_TYPE);
                         rpcMessage.setData(rpcResponse);
                         ctx.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                         log.error("not writable now, message dropped");