Skip to content

Commit

Permalink
Add AbstractEventMeshTCPSubHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Dec 3, 2021
1 parent 963a69e commit 3673ada
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,46 +49,6 @@ public Header(int code, String desc, String seq, Map<String, Object> properties)
}


public Command getCommand() {
return cmd;
}

public void setCommand(Command cmd) {
this.cmd = cmd;
}

public int getCode() {
return code;
}

public void setCode(int code) {
this.code = code;
}

public String getDesc() {
return desc;
}

public void setDesc(String desc) {
this.desc = desc;
}

public String getSeq() {
return seq;
}

public void setSeq(String seq) {
this.seq = seq;
}

public Map<String, Object> getProperties() {
return properties;
}

public void setProperties(Map<String, Object> properties) {
this.properties = properties;
}

public void putProperty(final String name, final Object value) {
if (null == this.properties) {
this.properties = new HashMap<>();
Expand All @@ -99,9 +59,17 @@ public void putProperty(final String name, final Object value) {

public Object getProperty(final String name) {
if (null == this.properties) {
this.properties = new HashMap<>();
return null;
}
return this.properties.get(name);
}

public String getStringProperty(final String name) {
Object property = getProperty(name);
if (null == property) {
return null;
}
return property.toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.eventmesh.common.protocol.tcp.RedirectInfo;
import org.apache.eventmesh.common.protocol.tcp.Subscription;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.common.utils.JsonUtils;

import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -38,6 +39,7 @@
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.common.base.Preconditions;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
Expand Down Expand Up @@ -73,24 +75,20 @@ public class Codec {
public static class Encoder extends MessageToByteEncoder<Package> {
@Override
public void encode(ChannelHandlerContext ctx, Package pkg, ByteBuf out) throws Exception {
final String headerJson = pkg != null ? OBJECT_MAPPER.writeValueAsString(pkg.getHeader()) : null;
final String bodyJson = pkg != null ? OBJECT_MAPPER.writeValueAsString(pkg.getBody()) : null;

final byte[] headerData = serializeBytes(headerJson);
// final byte[] bodyData = serializeBytes(bodyJson);

byte[] bodyData = serializeBytes(bodyJson);

String protocolType = "";
if (pkg.getHeader().getProperty(Constants.PROTOCOL_TYPE) != null) {
protocolType = pkg.getHeader().getProperty(Constants.PROTOCOL_TYPE).toString();
if (StringUtils.equals(CLOUD_EVENTS_PROTOCOL_NAME, protocolType)) {
bodyData = (byte[]) pkg.getBody();
}
Preconditions.checkNotNull(pkg, "TcpPackage cannot be null");
final Header header = pkg.getHeader();
Preconditions.checkNotNull(header, "TcpPackage header cannot be null", header);
if (log.isDebugEnabled()) {
log.debug("Encoder pkg={}", JsonUtils.serialize(pkg));
}

if (log.isDebugEnabled()) {
log.debug("Encoder headerJson={}|bodyJson={}", headerJson, bodyJson);
final byte[] headerData = serializeBytes(OBJECT_MAPPER.writeValueAsString(header));
final byte[] bodyData;

if (StringUtils.equals(CLOUD_EVENTS_PROTOCOL_NAME, header.getStringProperty(Constants.PROTOCOL_TYPE))) {
bodyData = (byte[]) pkg.getBody();
} else {
bodyData = serializeBytes(OBJECT_MAPPER.writeValueAsString(pkg.getBody()));
}

int headerLength = ArrayUtils.getLength(headerData);
Expand Down Expand Up @@ -188,7 +186,7 @@ private void validateFlag(byte[] flagBytes, byte[] versionBytes, ChannelHandlerC
}

private static Object deserializeBody(String bodyJsonString, Header header) throws JsonProcessingException {
Command command = header.getCommand();
Command command = header.getCmd();
switch (command) {
case HELLO_REQUEST:
case RECOMMEND_REQUEST:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@ public AbstractEventMeshTCPSubHandler(ConcurrentHashMap<Object, RequestContext>

@Override
protected void channelRead0(ChannelHandlerContext ctx, Package msg) throws Exception {
Command cmd = msg.getHeader().getCommand();
if (msg == null) {
throw new IllegalArgumentException("TCP package cannot be null");
}
if (msg.getHeader() == null) {
throw new IllegalArgumentException("TCP package header cannot be null");
}
Command cmd = msg.getHeader().getCmd();
log.info("|receive|type={}|msg={}", cmd, msg);
switch (cmd) {
case REQUEST_TO_CLIENT:
Expand Down
2 changes: 0 additions & 2 deletions style/checkStyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,6 @@
</module>
<module name="Indentation">
<property name="basicOffset" value="4"/>
<property name="braceAdjustment" value="2"/>
<property name="caseIndent" value="2"/>
<property name="throwsIndent" value="4"/>
<property name="lineWrappingIndentation" value="4"/>
<property name="arrayInitIndent" value="2"/>
Expand Down

0 comments on commit 3673ada

Please sign in to comment.