From c9c0cf5cf4bb6811fb8f7951b939d90ac2187c8b Mon Sep 17 00:00:00 2001 From: lrhkobe <34571087+lrhkobe@users.noreply.github.com> Date: Mon, 2 Aug 2021 16:30:25 +0800 Subject: [PATCH] [ISSUE #476] Biz Exceptions occured in EventMesh cause connection close of client (#477) * modify:optimize flow control in downstreaming msg * modify:optimize stategy of selecting session in downstream msg * modify:optimize msg downstream,msg store in session * modify:fix bug:not a @Sharable handler * modify:downstream broadcast msg asynchronously * modify:remove unneccessary interface in eventmesh-connector-api * modify:fix conflict * modify:add license in EventMeshAction * modify:fix ack problem * modify:fix exception handle when exception occured in EventMeshTcpMessageDispatcher * modify:fix log print close #476 --- .../client/EventMeshTcpMessageDispatcher.java | 46 +++++++++++++++++-- 1 file changed, 41 insertions(+), 5 deletions(-) diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcpMessageDispatcher.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcpMessageDispatcher.java index cbe74f4a2e..4d68ba8060 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcpMessageDispatcher.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcpMessageDispatcher.java @@ -20,8 +20,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; -import org.apache.eventmesh.common.protocol.tcp.Command; -import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; +import org.apache.eventmesh.common.protocol.tcp.*; import org.apache.eventmesh.common.protocol.tcp.Package; import org.apache.eventmesh.runtime.boot.EventMeshTCPServer; import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.SessionState; @@ -76,9 +75,46 @@ protected void channelRead0(ChannelHandlerContext ctx, Package pkg) throws Excep dispatch(ctx, pkg, startTime, cmd); } catch (Exception e) { - logger.error("exception occurred while pkg|cmd={}|pkg={}|errMsg={}", cmd, pkg, e); - //throw new RuntimeException(e); - throw e; + logger.error("exception occurred while pkg|cmd={}|pkg={}", cmd, pkg, e); + writeToClient(cmd, pkg, ctx, e); + } + } + + private void writeToClient(Command cmd, Package pkg, ChannelHandlerContext ctx, Exception e){ + try{ + Package res = new Package(); + res.setHeader(new Header(getReplyCommand(cmd), OPStatus.FAIL.getCode(), e.toString(), pkg.getHeader() + .getSeq())); + ctx.writeAndFlush(res); + }catch (Exception ex){ + logger.warn("writeToClient failed", ex); + } + } + + private Command getReplyCommand(Command cmd){ + switch (cmd) { + case HELLO_REQUEST: + return Command.HELLO_RESPONSE; + case RECOMMEND_REQUEST: + return Command.RECOMMEND_RESPONSE; + case HEARTBEAT_REQUEST: + return Command.HEARTBEAT_RESPONSE; + case SUBSCRIBE_REQUEST: + return Command.SUBSCRIBE_RESPONSE; + case UNSUBSCRIBE_REQUEST: + return Command.UNSUBSCRIBE_RESPONSE; + case LISTEN_REQUEST: + return Command.LISTEN_RESPONSE; + case CLIENT_GOODBYE_REQUEST: + return Command.CLIENT_GOODBYE_RESPONSE; + case REQUEST_TO_SERVER: + return Command.RESPONSE_TO_CLIENT; + case ASYNC_MESSAGE_TO_SERVER: + return Command.ASYNC_MESSAGE_TO_SERVER_ACK; + case BROADCAST_MESSAGE_TO_SERVER: + return Command.BROADCAST_MESSAGE_TO_SERVER_ACK; + default: + return cmd; } }