Skip to content

Commit

Permalink
Add AbstractEventMeshTCPSubHandler (#624)
Browse files Browse the repository at this point in the history
* Add AbstractEventMeshTCPSubHandler
  • Loading branch information
ruanwenjun authored Dec 5, 2021
1 parent 35c4da2 commit c6b700d
Show file tree
Hide file tree
Showing 34 changed files with 491 additions and 507 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,46 +49,6 @@ public Header(int code, String desc, String seq, Map<String, Object> properties)
}


public Command getCommand() {
return cmd;
}

public void setCommand(Command cmd) {
this.cmd = cmd;
}

public int getCode() {
return code;
}

public void setCode(int code) {
this.code = code;
}

public String getDesc() {
return desc;
}

public void setDesc(String desc) {
this.desc = desc;
}

public String getSeq() {
return seq;
}

public void setSeq(String seq) {
this.seq = seq;
}

public Map<String, Object> getProperties() {
return properties;
}

public void setProperties(Map<String, Object> properties) {
this.properties = properties;
}

public void putProperty(final String name, final Object value) {
if (null == this.properties) {
this.properties = new HashMap<>();
Expand All @@ -99,9 +59,17 @@ public void putProperty(final String name, final Object value) {

public Object getProperty(final String name) {
if (null == this.properties) {
this.properties = new HashMap<>();
return null;
}
return this.properties.get(name);
}

public String getStringProperty(final String name) {
Object property = getProperty(name);
if (null == property) {
return null;
}
return property.toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.eventmesh.common.protocol.tcp.RedirectInfo;
import org.apache.eventmesh.common.protocol.tcp.Subscription;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.common.utils.JsonUtils;

import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -38,6 +39,7 @@
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.common.base.Preconditions;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
Expand Down Expand Up @@ -73,24 +75,20 @@ public class Codec {
public static class Encoder extends MessageToByteEncoder<Package> {
@Override
public void encode(ChannelHandlerContext ctx, Package pkg, ByteBuf out) throws Exception {
final String headerJson = pkg != null ? OBJECT_MAPPER.writeValueAsString(pkg.getHeader()) : null;
final String bodyJson = pkg != null ? OBJECT_MAPPER.writeValueAsString(pkg.getBody()) : null;

final byte[] headerData = serializeBytes(headerJson);
// final byte[] bodyData = serializeBytes(bodyJson);

byte[] bodyData = serializeBytes(bodyJson);

String protocolType = "";
if (pkg.getHeader().getProperty(Constants.PROTOCOL_TYPE) != null) {
protocolType = pkg.getHeader().getProperty(Constants.PROTOCOL_TYPE).toString();
if (StringUtils.equals(CLOUD_EVENTS_PROTOCOL_NAME, protocolType)) {
bodyData = (byte[]) pkg.getBody();
}
Preconditions.checkNotNull(pkg, "TcpPackage cannot be null");
final Header header = pkg.getHeader();
Preconditions.checkNotNull(header, "TcpPackage header cannot be null", header);
if (log.isDebugEnabled()) {
log.debug("Encoder pkg={}", JsonUtils.serialize(pkg));
}

if (log.isDebugEnabled()) {
log.debug("Encoder headerJson={}|bodyJson={}", headerJson, bodyJson);
final byte[] headerData = serializeBytes(OBJECT_MAPPER.writeValueAsString(header));
final byte[] bodyData;

if (StringUtils.equals(CLOUD_EVENTS_PROTOCOL_NAME, header.getStringProperty(Constants.PROTOCOL_TYPE))) {
bodyData = (byte[]) pkg.getBody();
} else {
bodyData = serializeBytes(OBJECT_MAPPER.writeValueAsString(pkg.getBody()));
}

int headerLength = ArrayUtils.getLength(headerData);
Expand Down Expand Up @@ -188,7 +186,7 @@ private void validateFlag(byte[] flagBytes, byte[] versionBytes, ChannelHandlerC
}

private static Object deserializeBody(String bodyJsonString, Header header) throws JsonProcessingException {
Command command = header.getCommand();
Command command = header.getCmd();
switch (command) {
case HELLO_REQUEST:
case RECOMMEND_REQUEST:
Expand All @@ -208,7 +206,8 @@ private static Object deserializeBody(String bodyJsonString, Header header) thro
case RESPONSE_TO_CLIENT_ACK:
case ASYNC_MESSAGE_TO_CLIENT_ACK:
case BROADCAST_MESSAGE_TO_CLIENT_ACK:
// The message json will be deserialized by protocol plugin
// The message string will be deserialized by protocol plugin, if the event is cloudevents, the body is
// just a string.
return bodyJsonString;
case REDIRECT_TO_CLIENT:
return OBJECT_MAPPER.readValue(bodyJsonString, RedirectInfo.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
import org.apache.eventmesh.client.tcp.EventMeshTCPClient;
import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.client.tcp.EventMeshTCPClientFactory;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.tcp.common.EventMeshTestUtils;
import org.apache.eventmesh.util.Utils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.eventmesh.client.tcp.EventMeshTCPClient;
import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory;
import org.apache.eventmesh.client.tcp.EventMeshTCPClientFactory;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.tcp.common.EventMeshTestUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.eventmesh.client.tcp.EventMeshTCPClient;
import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory;
import org.apache.eventmesh.client.tcp.EventMeshTCPClientFactory;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.tcp.common.EventMeshTestUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.eventmesh.client.tcp.EventMeshTCPClient;
import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory;
import org.apache.eventmesh.client.tcp.EventMeshTCPClientFactory;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.eventmesh.client.tcp.EventMeshTCPClient;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory;
import org.apache.eventmesh.client.tcp.EventMeshTCPClientFactory;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@
import org.apache.eventmesh.client.tcp.EventMeshTCPClient;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory;
import org.apache.eventmesh.client.tcp.EventMeshTCPClientFactory;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.tcp.common.EventMeshTestCaseTopicSet;
import org.apache.eventmesh.tcp.common.EventMeshTestUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@
import org.apache.eventmesh.client.tcp.EventMeshTCPClient;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory;
import org.apache.eventmesh.client.tcp.EventMeshTCPClientFactory;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.tcp.common.EventMeshTestUtils;
import org.apache.eventmesh.util.Utils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.eventmesh.client.tcp.EventMeshTCPClient;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory;
import org.apache.eventmesh.client.tcp.EventMeshTCPClientFactory;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
dependencies {
compileOnly project(":eventmesh-protocol-plugin:eventmesh-protocol-api")
implementation "io.cloudevents:cloudevents-core"
implementation "com.google.guava:guava"
implementation "io.cloudevents:cloudevents-json-jackson"

testImplementation project(":eventmesh-protocol-plugin:eventmesh-protocol-api")
testImplementation "io.cloudevents:cloudevents-core"
testImplementation "junit:junit"
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,34 @@

package org.apache.eventmesh.protocol.cloudevents;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.provider.EventFormatProvider;
import io.cloudevents.jackson.JsonFormat;

import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.ProtocolTransportObject;
import org.apache.eventmesh.common.protocol.http.HttpCommand;
import org.apache.eventmesh.common.protocol.http.body.Body;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.common.protocol.tcp.Header;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.protocol.api.ProtocolAdaptor;
import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException;
import org.apache.eventmesh.protocol.cloudevents.resolver.http.SendMessageBatchProtocolResolver;
import org.apache.eventmesh.protocol.cloudevents.resolver.http.SendMessageBatchV2ProtocolResolver;
import org.apache.eventmesh.protocol.cloudevents.resolver.http.SendMessageRequestProtocolResolver;
import org.apache.eventmesh.protocol.cloudevents.resolver.tcp.TcpMessageProtocolResolver;

import org.apache.commons.lang3.StringUtils;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.google.common.base.Preconditions;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.provider.EventFormatProvider;
import io.cloudevents.jackson.JsonFormat;

/**
* CloudEvents protocol adaptor, used to transform CloudEvents message to CloudEvents message.
*
Expand Down Expand Up @@ -75,7 +78,9 @@ private CloudEvent deserializeTcpProtocol(Header header, String cloudEventJson)
return TcpMessageProtocolResolver.buildEvent(header, cloudEventJson);
}

private CloudEvent deserializeHttpProtocol(String requestCode, org.apache.eventmesh.common.protocol.http.header.Header header, Body body) throws ProtocolHandleException {
private CloudEvent deserializeHttpProtocol(String requestCode,
org.apache.eventmesh.common.protocol.http.header.Header header,
Body body) throws ProtocolHandleException {

if (String.valueOf(RequestCode.MSG_BATCH_SEND.getRequestCode()).equals(requestCode)) {
return SendMessageBatchProtocolResolver.buildEvent(header, body);
Expand Down Expand Up @@ -104,9 +109,11 @@ public ProtocolTransportObject fromCloudEvent(CloudEvent cloudEvent) throws Prot
HttpCommand httpCommand = new HttpCommand();
Body body = new Body() {
final Map<String, Object> map = new HashMap<>();

@Override
public Map<String, Object> toMap() {
byte[] eventByte = EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE).serialize(cloudEvent);
byte[] eventByte =
EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE).serialize(cloudEvent);
map.put("content", new String(eventByte, StandardCharsets.UTF_8));
return map;
}
Expand All @@ -116,9 +123,12 @@ public Map<String, Object> toMap() {
return httpCommand;
} else if (StringUtils.equals("tcp", protocolDesc)) {
Package pkg = new Package();
byte[] bodyByte = EventFormatProvider.getInstance().resolveFormat(cloudEvent.getDataContentType())
.serialize(cloudEvent);
pkg.setBody(bodyByte);
String dataContentType = cloudEvent.getDataContentType();
Preconditions.checkNotNull(dataContentType, "DateContentType cannot be null");
EventFormat eventFormat = EventFormatProvider.getInstance().resolveFormat(dataContentType);
Preconditions.checkNotNull(eventFormat,
String.format("DateContentType:%s is not supported", dataContentType));
pkg.setBody(eventFormat.serialize(cloudEvent));
return pkg;
} else {
throw new ProtocolHandleException(String.format("Unsupported protocolDesc: %s", protocolDesc));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,17 @@

import org.apache.commons.lang3.StringUtils;

import java.nio.charset.StandardCharsets;

import com.google.common.base.Preconditions;

import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.provider.EventFormatProvider;
import io.cloudevents.jackson.JsonFormat;

import java.nio.charset.StandardCharsets;

public class TcpMessageProtocolResolver {

public static CloudEvent buildEvent(Header header, String cloudEventJson)
Expand All @@ -43,9 +46,10 @@ public static CloudEvent buildEvent(Header header, String cloudEventJson)
String protocolDesc = header.getProperty(Constants.PROTOCOL_DESC).toString();

if (StringUtils.isBlank(protocolType)
|| StringUtils.isBlank(protocolVersion)
|| StringUtils.isBlank(protocolDesc)) {
throw new ProtocolHandleException(String.format("invalid protocol params protocolType %s|protocolVersion %s|protocolDesc %s",
|| StringUtils.isBlank(protocolVersion)
|| StringUtils.isBlank(protocolDesc)) {
throw new ProtocolHandleException(
String.format("invalid protocol params protocolType %s|protocolVersion %s|protocolDesc %s",
protocolType, protocolVersion, protocolDesc));
}

Expand All @@ -55,8 +59,10 @@ public static CloudEvent buildEvent(Header header, String cloudEventJson)

if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)) {
// todo:resolve different format
CloudEvent event = EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE)
.deserialize(cloudEventJson.getBytes(StandardCharsets.UTF_8));
EventFormat eventFormat = EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE);
Preconditions
.checkNotNull(eventFormat, String.format("EventFormat: %s is not supported", JsonFormat.CONTENT_TYPE));
CloudEvent event = eventFormat.deserialize(cloudEventJson.getBytes(StandardCharsets.UTF_8));
cloudEventBuilder = CloudEventBuilder.v1(event);
for (String propKey : header.getProperties().keySet()) {
cloudEventBuilder.withExtension(propKey, header.getProperty(propKey).toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ protected void channelRead0(ChannelHandlerContext ctx, Package pkg) throws Excep
Command cmd = null;
try {
Runnable task;
cmd = pkg.getHeader().getCommand();
cmd = pkg.getHeader().getCmd();
if (cmd.equals(Command.RECOMMEND_REQUEST)) {
messageLogger.info("pkg|c2eventMesh|cmd={}|pkg={}", cmd, pkg);
task = new RecommendTask(pkg, ctx, startTime, eventMeshTCPServer);
Expand Down Expand Up @@ -134,7 +134,7 @@ private void validateMsg(Package pkg) throws Exception {
logger.error("the incoming message does not have a header|pkg={}", pkg);
throw new Exception("the incoming message does not have a header.");
}
if (pkg.getHeader().getCommand() == null) {
if (pkg.getHeader().getCmd() == null) {
logger.error("the incoming message does not have a command type|pkg={}", pkg);
throw new Exception("the incoming message does not have a command type.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public EventMeshTcpSendResult send(Header header, CloudEvent event, SendCallback
if (upstreamBuff.tryAcquire(TRY_PERMIT_TIME_OUT, TimeUnit.MILLISECONDS)) {
upMsgs.incrementAndGet();
UpStreamMsgContext upStreamMsgContext = null;
Command cmd = header.getCommand();
Command cmd = header.getCmd();
long ttl = EventMeshConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS;
if (Command.REQUEST_TO_SERVER == cmd) {
if (event.getExtension(EventMeshConstants.PROPERTY_MESSAGE_TTL) != null){
Expand Down Expand Up @@ -148,7 +148,7 @@ public void onSuccess(CloudEvent event) {
session.getClientGroupWrapper().get().getEventMeshTcpMonitor().getMq2EventMeshMsgNum().incrementAndGet();

Command cmd;
if (header.getCommand().equals(Command.REQUEST_TO_SERVER)) {
if (header.getCmd().equals(Command.REQUEST_TO_SERVER)) {
cmd = Command.RESPONSE_TO_CLIENT;
} else {
messageLogger.error("invalid message|messageHeader={}|event={}", header, event);
Expand Down
Loading

0 comments on commit c6b700d

Please sign in to comment.