Skip to content

Commit

Permalink
[ISSUE #476] Biz Exceptions occured in EventMesh cause connection clo…
Browse files Browse the repository at this point in the history
…se of client (#477)

* modify:optimize flow control in downstreaming msg

* modify:optimize stategy of selecting session in downstream msg

* modify:optimize msg downstream,msg store in session

* modify:fix bug:not a @sharable handler

* modify:downstream broadcast msg asynchronously

* modify:remove unneccessary interface in eventmesh-connector-api

* modify:fix conflict

* modify:add license in EventMeshAction

* modify:fix ack problem

* modify:fix exception handle when exception occured in EventMeshTcpMessageDispatcher

* modify:fix log print

close #476
  • Loading branch information
lrhkobe authored Aug 2, 2021
1 parent 9508a5a commit c9c0cf5
Showing 1 changed file with 41 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import org.apache.eventmesh.common.protocol.tcp.Command;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.*;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.SessionState;
Expand Down Expand Up @@ -76,9 +75,46 @@ protected void channelRead0(ChannelHandlerContext ctx, Package pkg) throws Excep

dispatch(ctx, pkg, startTime, cmd);
} catch (Exception e) {
logger.error("exception occurred while pkg|cmd={}|pkg={}|errMsg={}", cmd, pkg, e);
//throw new RuntimeException(e);
throw e;
logger.error("exception occurred while pkg|cmd={}|pkg={}", cmd, pkg, e);
writeToClient(cmd, pkg, ctx, e);
}
}

private void writeToClient(Command cmd, Package pkg, ChannelHandlerContext ctx, Exception e){
try{
Package res = new Package();
res.setHeader(new Header(getReplyCommand(cmd), OPStatus.FAIL.getCode(), e.toString(), pkg.getHeader()
.getSeq()));
ctx.writeAndFlush(res);
}catch (Exception ex){
logger.warn("writeToClient failed", ex);
}
}

private Command getReplyCommand(Command cmd){
switch (cmd) {
case HELLO_REQUEST:
return Command.HELLO_RESPONSE;
case RECOMMEND_REQUEST:
return Command.RECOMMEND_RESPONSE;
case HEARTBEAT_REQUEST:
return Command.HEARTBEAT_RESPONSE;
case SUBSCRIBE_REQUEST:
return Command.SUBSCRIBE_RESPONSE;
case UNSUBSCRIBE_REQUEST:
return Command.UNSUBSCRIBE_RESPONSE;
case LISTEN_REQUEST:
return Command.LISTEN_RESPONSE;
case CLIENT_GOODBYE_REQUEST:
return Command.CLIENT_GOODBYE_RESPONSE;
case REQUEST_TO_SERVER:
return Command.RESPONSE_TO_CLIENT;
case ASYNC_MESSAGE_TO_SERVER:
return Command.ASYNC_MESSAGE_TO_SERVER_ACK;
case BROADCAST_MESSAGE_TO_SERVER:
return Command.BROADCAST_MESSAGE_TO_SERVER_ACK;
default:
return cmd;
}
}

Expand Down

0 comments on commit c9c0cf5

Please sign in to comment.