diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java index 56fe0534a9..2ca1e0845a 100644 --- a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java +++ b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java @@ -26,6 +26,7 @@ import org.apache.eventmesh.common.protocol.http.common.RequestCode; import org.apache.eventmesh.common.protocol.tcp.Header; import org.apache.eventmesh.common.protocol.tcp.Package; +import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.protocol.api.ProtocolAdaptor; import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException; import org.apache.eventmesh.protocol.cloudevents.resolver.http.SendMessageBatchProtocolResolver; @@ -33,7 +34,9 @@ import org.apache.eventmesh.protocol.cloudevents.resolver.http.SendMessageRequestProtocolResolver; import org.apache.eventmesh.protocol.cloudevents.resolver.tcp.TcpMessageProtocolResolver; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * CloudEvents protocol adaptor, used to transform CloudEvents message to CloudEvents message. @@ -93,9 +96,18 @@ public List toBatchCloudEvent(ProtocolTransportObject protocol) public ProtocolTransportObject fromCloudEvent(CloudEvent cloudEvent) throws ProtocolHandleException { String protocolDesc = cloudEvent.getExtension(Constants.PROTOCOL_DESC).toString(); if (StringUtils.equals("http", protocolDesc)) { - // todo: return command, set cloudEvent.getData() to content? - return null; -// return new String(cloudEvent.getData().toBytes(), StandardCharsets.UTF_8); + HttpCommand httpCommand = new HttpCommand(); + Body body = new Body() { + final Map map = new HashMap<>(); + @Override + public Map toMap() { + map.put("content", JsonUtils.serialize(cloudEvent)); + return map; + } + }; + body.toMap(); + httpCommand.setBody(body); + return httpCommand; } else if (StringUtils.equals("tcp", protocolDesc)) { Package pkg = new Package(); pkg.setBody(cloudEvent); diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/MeshMessageProtocolAdaptor.java b/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/MeshMessageProtocolAdaptor.java index c1e636e734..4ef4f90b48 100644 --- a/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/MeshMessageProtocolAdaptor.java +++ b/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/MeshMessageProtocolAdaptor.java @@ -24,6 +24,7 @@ import org.apache.eventmesh.common.protocol.http.common.RequestCode; import org.apache.eventmesh.common.protocol.tcp.Header; import org.apache.eventmesh.common.protocol.tcp.Package; +import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.protocol.api.ProtocolAdaptor; import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException; import org.apache.eventmesh.protocol.meshmessage.resolver.http.SendMessageBatchProtocolResolver; @@ -33,7 +34,10 @@ import org.apache.commons.lang3.StringUtils; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; import java.util.List; +import java.util.Map; import io.cloudevents.CloudEvent; @@ -90,9 +94,18 @@ public ProtocolTransportObject fromCloudEvent(CloudEvent cloudEvent) throws Prot String protocolDesc = cloudEvent.getExtension(Constants.PROTOCOL_DESC).toString(); if (StringUtils.equals("http", protocolDesc)) { - // todo: return command, set cloudEvent.getData() to content? - return null; -// return new String(cloudEvent.getData().toBytes(), StandardCharsets.UTF_8); + HttpCommand httpCommand = new HttpCommand(); + Body body = new Body() { + final Map map = new HashMap<>(); + @Override + public Map toMap() { + map.put("content", new String(cloudEvent.getData().toBytes(), StandardCharsets.UTF_8)); + return map; + } + }; + body.toMap(); + httpCommand.setBody(body); + return httpCommand; } else if (StringUtils.equals("tcp", protocolDesc)) { return TcpMessageProtocolResolver.buildEventMeshMessage(cloudEvent); } else { diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java index 67253d624e..5fa56ef70f 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java @@ -20,6 +20,7 @@ import io.cloudevents.CloudEvent; import io.cloudevents.core.builder.CloudEventBuilder; import org.apache.eventmesh.common.Constants; +import org.apache.eventmesh.common.protocol.http.HttpCommand; import org.apache.eventmesh.common.utils.IPUtils; import org.apache.eventmesh.common.protocol.ProtocolTransportObject; import org.apache.eventmesh.common.utils.RandomStringUtils; @@ -120,12 +121,9 @@ public void tryHTTPRequest() { ProtocolAdaptor protocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType); - // todo ProtocolTransportObject protocolTransportObject = protocolAdaptor.fromCloudEvent(handleMsgContext.getEvent()); - - // content = -// new String(handleMsgContext.getEvent().getData().toBytes(), EventMeshConstants.DEFAULT_CHARSET); + content = ((HttpCommand) protocolTransportObject).getBody().toMap().get("content").toString(); } catch (Exception ex) { return; }