Skip to content

Commit

Permalink
Add AbstractEventMeshTCPPubHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Dec 3, 2021
1 parent 3673ada commit 23c9592
Show file tree
Hide file tree
Showing 32 changed files with 331 additions and 249 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ private static Object deserializeBody(String bodyJsonString, Header header) thro
case RESPONSE_TO_CLIENT_ACK:
case ASYNC_MESSAGE_TO_CLIENT_ACK:
case BROADCAST_MESSAGE_TO_CLIENT_ACK:
// The message json will be deserialized by protocol plugin
// The message string will be deserialized by protocol plugin, if the event is cloudevents, the body is
// just a string.
return bodyJsonString;
case REDIRECT_TO_CLIENT:
return OBJECT_MAPPER.readValue(bodyJsonString, RedirectInfo.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
import org.apache.eventmesh.client.tcp.EventMeshTCPClient;
import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.client.tcp.EventMeshTCPClientFactory;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.tcp.common.EventMeshTestUtils;
import org.apache.eventmesh.util.Utils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.eventmesh.client.tcp.EventMeshTCPClient;
import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory;
import org.apache.eventmesh.client.tcp.EventMeshTCPClientFactory;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.tcp.common.EventMeshTestUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.eventmesh.client.tcp.EventMeshTCPClient;
import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory;
import org.apache.eventmesh.client.tcp.EventMeshTCPClientFactory;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.tcp.common.EventMeshTestUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.eventmesh.client.tcp.EventMeshTCPClient;
import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory;
import org.apache.eventmesh.client.tcp.EventMeshTCPClientFactory;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.eventmesh.client.tcp.EventMeshTCPClient;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory;
import org.apache.eventmesh.client.tcp.EventMeshTCPClientFactory;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@
import org.apache.eventmesh.client.tcp.EventMeshTCPClient;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory;
import org.apache.eventmesh.client.tcp.EventMeshTCPClientFactory;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.tcp.common.EventMeshTestCaseTopicSet;
import org.apache.eventmesh.tcp.common.EventMeshTestUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@
import org.apache.eventmesh.client.tcp.EventMeshTCPClient;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory;
import org.apache.eventmesh.client.tcp.EventMeshTCPClientFactory;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.tcp.common.EventMeshTestUtils;
import org.apache.eventmesh.util.Utils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.eventmesh.client.tcp.EventMeshTCPClient;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory;
import org.apache.eventmesh.client.tcp.EventMeshTCPClientFactory;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
dependencies {
compileOnly project(":eventmesh-protocol-plugin:eventmesh-protocol-api")
implementation "io.cloudevents:cloudevents-core"
implementation "com.google.guava:guava"
implementation "io.cloudevents:cloudevents-json-jackson"

testImplementation project(":eventmesh-protocol-plugin:eventmesh-protocol-api")
testImplementation "io.cloudevents:cloudevents-core"
testImplementation "junit:junit"
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@

package org.apache.eventmesh.protocol.cloudevents;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.provider.EventFormatProvider;

import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.ProtocolTransportObject;
import org.apache.eventmesh.common.protocol.http.HttpCommand;
Expand All @@ -36,10 +32,18 @@
import org.apache.eventmesh.protocol.cloudevents.resolver.http.SendMessageRequestProtocolResolver;
import org.apache.eventmesh.protocol.cloudevents.resolver.tcp.TcpMessageProtocolResolver;

import org.apache.commons.lang3.StringUtils;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.google.common.base.Preconditions;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.provider.EventFormatProvider;

/**
* CloudEvents protocol adaptor, used to transform CloudEvents message to CloudEvents message.
*
Expand Down Expand Up @@ -73,7 +77,9 @@ private CloudEvent deserializeTcpProtocol(Header header, String cloudEventJson)
return TcpMessageProtocolResolver.buildEvent(header, cloudEventJson);
}

private CloudEvent deserializeHttpProtocol(String requestCode, org.apache.eventmesh.common.protocol.http.header.Header header, Body body) throws ProtocolHandleException {
private CloudEvent deserializeHttpProtocol(String requestCode,
org.apache.eventmesh.common.protocol.http.header.Header header,
Body body) throws ProtocolHandleException {

if (String.valueOf(RequestCode.MSG_BATCH_SEND.getRequestCode()).equals(requestCode)) {
return SendMessageBatchProtocolResolver.buildEvent(header, body);
Expand Down Expand Up @@ -102,6 +108,7 @@ public ProtocolTransportObject fromCloudEvent(CloudEvent cloudEvent) throws Prot
HttpCommand httpCommand = new HttpCommand();
Body body = new Body() {
final Map<String, Object> map = new HashMap<>();

@Override
public Map<String, Object> toMap() {
map.put("content", JsonUtils.serialize(cloudEvent));
Expand All @@ -113,9 +120,12 @@ public Map<String, Object> toMap() {
return httpCommand;
} else if (StringUtils.equals("tcp", protocolDesc)) {
Package pkg = new Package();
byte[] bodyByte = EventFormatProvider.getInstance().resolveFormat(cloudEvent.getDataContentType())
.serialize(cloudEvent);
pkg.setBody(bodyByte);
String dataContentType = cloudEvent.getDataContentType();
Preconditions.checkNotNull(dataContentType, "DateContentType cannot be null");
EventFormat eventFormat = EventFormatProvider.getInstance().resolveFormat(dataContentType);
Preconditions.checkNotNull(eventFormat,
String.format("DateContentType:%s is not supported", dataContentType));
pkg.setBody(eventFormat.serialize(cloudEvent));
return pkg;
} else {
throw new ProtocolHandleException(String.format("Unsupported protocolDesc: %s", protocolDesc));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,17 @@

import org.apache.commons.lang3.StringUtils;

import java.nio.charset.StandardCharsets;

import com.google.common.base.Preconditions;

import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.provider.EventFormatProvider;
import io.cloudevents.jackson.JsonFormat;

import java.nio.charset.StandardCharsets;

public class TcpMessageProtocolResolver {

public static CloudEvent buildEvent(Header header, String cloudEventJson)
Expand All @@ -43,9 +46,10 @@ public static CloudEvent buildEvent(Header header, String cloudEventJson)
String protocolDesc = header.getProperty(Constants.PROTOCOL_DESC).toString();

if (StringUtils.isBlank(protocolType)
|| StringUtils.isBlank(protocolVersion)
|| StringUtils.isBlank(protocolDesc)) {
throw new ProtocolHandleException(String.format("invalid protocol params protocolType %s|protocolVersion %s|protocolDesc %s",
|| StringUtils.isBlank(protocolVersion)
|| StringUtils.isBlank(protocolDesc)) {
throw new ProtocolHandleException(
String.format("invalid protocol params protocolType %s|protocolVersion %s|protocolDesc %s",
protocolType, protocolVersion, protocolDesc));
}

Expand All @@ -55,8 +59,10 @@ public static CloudEvent buildEvent(Header header, String cloudEventJson)

if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)) {
// todo:resolve different format
CloudEvent event = EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE)
.deserialize(cloudEventJson.getBytes(StandardCharsets.UTF_8));
EventFormat eventFormat = EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE);
Preconditions
.checkNotNull(eventFormat, String.format("EventFormat: %s is not supported", JsonFormat.CONTENT_TYPE));
CloudEvent event = eventFormat.deserialize(cloudEventJson.getBytes(StandardCharsets.UTF_8));
cloudEventBuilder = CloudEventBuilder.v1(event);
for (String propKey : header.getProperties().keySet()) {
cloudEventBuilder.withExtension(propKey, header.getProperty(propKey).toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ protected void channelRead0(ChannelHandlerContext ctx, Package pkg) throws Excep
Command cmd = null;
try {
Runnable task;
cmd = pkg.getHeader().getCommand();
cmd = pkg.getHeader().getCmd();
if (cmd.equals(Command.RECOMMEND_REQUEST)) {
messageLogger.info("pkg|c2eventMesh|cmd={}|pkg={}", cmd, pkg);
task = new RecommendTask(pkg, ctx, startTime, eventMeshTCPServer);
Expand Down Expand Up @@ -134,7 +134,7 @@ private void validateMsg(Package pkg) throws Exception {
logger.error("the incoming message does not have a header|pkg={}", pkg);
throw new Exception("the incoming message does not have a header.");
}
if (pkg.getHeader().getCommand() == null) {
if (pkg.getHeader().getCmd() == null) {
logger.error("the incoming message does not have a command type|pkg={}", pkg);
throw new Exception("the incoming message does not have a command type.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public EventMeshTcpSendResult send(Header header, CloudEvent event, SendCallback
if (upstreamBuff.tryAcquire(TRY_PERMIT_TIME_OUT, TimeUnit.MILLISECONDS)) {
upMsgs.incrementAndGet();
UpStreamMsgContext upStreamMsgContext = null;
Command cmd = header.getCommand();
Command cmd = header.getCmd();
long ttl = EventMeshConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS;
if (Command.REQUEST_TO_SERVER == cmd) {
if (event.getExtension(EventMeshConstants.PROPERTY_MESSAGE_TTL) != null){
Expand Down Expand Up @@ -148,7 +148,7 @@ public void onSuccess(CloudEvent event) {
session.getClientGroupWrapper().get().getEventMeshTcpMonitor().getMq2EventMeshMsgNum().incrementAndGet();

Command cmd;
if (header.getCommand().equals(Command.REQUEST_TO_SERVER)) {
if (header.getCmd().equals(Command.REQUEST_TO_SERVER)) {
cmd = Command.RESPONSE_TO_CLIENT;
} else {
messageLogger.error("invalid message|messageHeader={}|event={}", header, event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,27 @@

package org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send;

import io.cloudevents.CloudEvent;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.exception.OnExceptionContext;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.tcp.*;
import org.apache.eventmesh.common.protocol.tcp.Command;
import org.apache.eventmesh.common.protocol.tcp.Header;
import org.apache.eventmesh.common.protocol.tcp.OPStatus;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry.RetryContext;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.Utils;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.cloudevents.CloudEvent;

public class UpStreamMsgContext extends RetryContext {

private final Logger logger = LoggerFactory.getLogger(this.getClass());
Expand Down Expand Up @@ -72,26 +76,28 @@ public long getCreateTime() {
@Override
public String toString() {
return "UpStreamMsgContext{seq=" + seq
+ ",topic=" + event.getSubject()
+ ",client=" + session.getClient()
+ ",retryTimes=" + retryTimes
+ ",createTime=" + DateFormatUtils.format(createTime, EventMeshConstants.DATE_FORMAT) + "}"
+ ",executeTime=" + DateFormatUtils.format(executeTime, EventMeshConstants.DATE_FORMAT);
+ ",topic=" + event.getSubject()
+ ",client=" + session.getClient()
+ ",retryTimes=" + retryTimes
+ ",createTime=" + DateFormatUtils.format(createTime, EventMeshConstants.DATE_FORMAT) + "}"
+ ",executeTime=" + DateFormatUtils.format(executeTime, EventMeshConstants.DATE_FORMAT);
}

@Override
public void retry() {
logger.info("retry upStream msg start,seq:{},retryTimes:{},bizSeq:{}", this.seq, this.retryTimes, EventMeshUtil.getMessageBizSeq(this.event));
logger.info("retry upStream msg start,seq:{},retryTimes:{},bizSeq:{}", this.seq, this.retryTimes,
EventMeshUtil.getMessageBizSeq(this.event));

try {
Command replyCmd = getReplyCmd(header.getCommand());
Command replyCmd = getReplyCmd(header.getCmd());
long sendTime = System.currentTimeMillis();

EventMeshTcpSendResult sendStatus = session.upstreamMsg(header, event,
createSendCallback(replyCmd, taskExecuteTime, event), startTime, taskExecuteTime);
EventMeshTcpSendResult sendStatus = session.upstreamMsg(header, event,
createSendCallback(replyCmd, taskExecuteTime, event), startTime, taskExecuteTime);

if (StringUtils.equals(EventMeshTcpSendStatus.SUCCESS.name(), sendStatus.getSendStatus().name())) {
logger.info("pkg|eventMesh2mq|cmd={}|event={}|user={}|wait={}ms|cost={}ms", header.getCommand(), event, session.getClient(), taskExecuteTime - startTime, sendTime - startTime);
logger.info("pkg|eventMesh2mq|cmd={}|event={}|user={}|wait={}ms|cost={}ms", header.getCmd(), event,
session.getClient(), taskExecuteTime - startTime, sendTime - startTime);
} else {
throw new Exception(sendStatus.getDetail());
}
Expand All @@ -109,9 +115,9 @@ protected SendCallback createSendCallback(Command replyCmd, long taskExecuteTime
public void onSuccess(SendResult sendResult) {
session.getSender().getUpstreamBuff().release();
logger.info("upstreamMsg message success|user={}|callback cost={}", session.getClient(),
String.valueOf(System.currentTimeMillis() - createTime));
String.valueOf(System.currentTimeMillis() - createTime));
if (replyCmd.equals(Command.BROADCAST_MESSAGE_TO_SERVER_ACK) || replyCmd.equals(Command
.ASYNC_MESSAGE_TO_SERVER_ACK)) {
.ASYNC_MESSAGE_TO_SERVER_ACK)) {
msg.setHeader(new Header(replyCmd, OPStatus.SUCCESS.getCode(), OPStatus.SUCCESS.getDesc(), seq));
msg.setBody(event);
Utils.writeAndFlush(msg, startTime, taskExecuteTime, session.getContext(), session);
Expand All @@ -124,12 +130,13 @@ public void onException(OnExceptionContext context) {

// retry
UpStreamMsgContext upStreamMsgContext = new UpStreamMsgContext(
session, event, header, startTime, taskExecuteTime);
session, event, header, startTime, taskExecuteTime);
upStreamMsgContext.delay(10000);
session.getClientGroupWrapper().get().getEventMeshTcpRetryer().pushRetry(upStreamMsgContext);

session.getSender().failMsgCount.incrementAndGet();
logger.error("upstreamMsg mq message error|user={}|callback cost={}, errMsg={}", session.getClient(), String.valueOf
logger.error("upstreamMsg mq message error|user={}|callback cost={}, errMsg={}", session.getClient(),
String.valueOf
(System.currentTimeMillis() - createTime), new Exception(context.getException()));
msg.setHeader(new Header(replyCmd, OPStatus.FAIL.getCode(), context.getException().toString(), seq));
msg.setBody(event);
Expand Down
Loading

0 comments on commit 23c9592

Please sign in to comment.