diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java index 88f5016e9d..854eb5b58c 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java @@ -24,7 +24,6 @@ import java.util.concurrent.ThreadLocalRandom; -import org.apache.eventmesh.client.tcp.common.EventMeshCommon; import org.apache.eventmesh.common.protocol.tcp.Command; import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; import org.apache.eventmesh.common.protocol.tcp.Header; @@ -35,37 +34,37 @@ public class EventMeshTestUtils { private static final int seqLength = 10; public static UserAgent generateClient1() { - UserAgent user = new UserAgent(); - user.setEnv("test"); - user.setHost("127.0.0.1"); - user.setPassword(generateRandomString(8)); - user.setUsername("PU4283"); - user.setProducerGroup("EventmeshTest-ProducerGroup"); - user.setConsumerGroup("EventmeshTest-ConsumerGroup"); - user.setPath("/data/app/umg_proxy"); - user.setPort(8362); - user.setSubsystem("5023"); - user.setPid(32893); - user.setVersion("2.0.11"); - user.setIdc("FT"); - return user; + return UserAgent.builder() + .env("test") + .host("127.0.0.1") + .password(generateRandomString(8)) + .username("PU4283") + .producerGroup("EventmeshTest-ProducerGroup") + .consumerGroup("EventmeshTest-ConsumerGroup") + .path("/data/app/umg_proxy") + .port(8362) + .subsystem("5023") + .pid(32893) + .version("2.0.11") + .idc("FT") + .build(); } public static UserAgent generateClient2() { - UserAgent user = new UserAgent(); - user.setEnv("test"); - user.setHost("127.0.0.1"); - user.setPassword(generateRandomString(8)); - user.setUsername("PU4283"); - user.setConsumerGroup("EventmeshTest-ConsumerGroup"); - user.setProducerGroup("EventmeshTest-ProducerGroup"); - user.setPath("/data/app/umg_proxy"); - user.setPort(9362); - user.setSubsystem("5017"); - user.setPid(42893); - user.setVersion("2.0.11"); - user.setIdc("FT"); - return user; + return UserAgent.builder() + .env("test") + .host("127.0.0.1") + .password(generateRandomString(8)) + .username("PU4283") + .producerGroup("EventmeshTest-ProducerGroup") + .consumerGroup("EventmeshTest-ConsumerGroup") + .path("/data/app/umg_proxy") + .port(9362) + .subsystem("5017") + .pid(42893) + .version("2.0.11") + .idc("FT") + .build(); } public static Package syncRR() { diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/CloudEventProducer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/CloudEventProducer.java index 6d6ff2f63d..49905ad1ac 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/CloudEventProducer.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/CloudEventProducer.java @@ -29,6 +29,8 @@ class CloudEventProducer extends AbstractHttpClient implements EventMeshProtocol private static final String PROTOCOL_TYPE = "cloudevents"; + private static final String PROTOCOL_DESC = "http"; + public CloudEventProducer(EventMeshHttpClientConfig eventMeshHttpClientConfig) throws EventMeshException { super(eventMeshHttpClientConfig); } @@ -105,6 +107,8 @@ private RequestParam buildCommonPostParam(CloudEvent cloudEvent) { .addHeader(ProtocolKey.ClientInstanceKey.PASSWD, eventMeshHttpClientConfig.getPassword()) .addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA) .addHeader(ProtocolKey.PROTOCOL_TYPE, PROTOCOL_TYPE) + .addHeader(ProtocolKey.PROTOCOL_DESC, PROTOCOL_DESC) + .addHeader(ProtocolKey.PROTOCOL_VERSION, cloudEvent.getSpecVersion().toString()) // todo: move producerGroup tp header .addBody(SendMessageRequestBody.PRODUCERGROUP, eventMeshHttpClientConfig.getProducerGroup()) .addBody(SendMessageRequestBody.CONTENT, JsonUtils.serialize(cloudEvent)); diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/EventMeshMessageProducer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/EventMeshMessageProducer.java index 85f6153953..e9c79f92c6 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/EventMeshMessageProducer.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/EventMeshMessageProducer.java @@ -1,5 +1,6 @@ package org.apache.eventmesh.client.http.producer; +import io.cloudevents.SpecVersion; import org.apache.eventmesh.client.http.AbstractHttpClient; import org.apache.eventmesh.client.http.EventMeshRetObj; import org.apache.eventmesh.client.http.conf.EventMeshHttpClientConfig; @@ -26,6 +27,10 @@ @Slf4j class EventMeshMessageProducer extends AbstractHttpClient implements EventMeshProtocolProducer { + private static final String PROTOCOL_TYPE = "eventmeshmessage"; + + private static final String PROTOCOL_DESC = "http"; + public EventMeshMessageProducer(EventMeshHttpClientConfig eventMeshHttpClientConfig) throws EventMeshException { super(eventMeshHttpClientConfig); } @@ -106,6 +111,10 @@ private RequestParam buildCommonPostParam(EventMeshMessage message) { .addHeader(ProtocolKey.ClientInstanceKey.USERNAME, eventMeshHttpClientConfig.getUserName()) .addHeader(ProtocolKey.ClientInstanceKey.PASSWD, eventMeshHttpClientConfig.getPassword()) .addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion()) + .addHeader(ProtocolKey.PROTOCOL_TYPE, PROTOCOL_TYPE) + .addHeader(ProtocolKey.PROTOCOL_DESC, PROTOCOL_DESC) + //default ce version is 1.0 + .addHeader(ProtocolKey.PROTOCOL_VERSION, SpecVersion.V1.toString()) .addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA) .addBody(SendMessageRequestBody.PRODUCERGROUP, eventMeshHttpClientConfig.getProducerGroup()) // todo: set message to content is better diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/EventMeshCommon.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/EventMeshCommon.java index 8238f52ceb..ca56422ff1 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/EventMeshCommon.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/EventMeshCommon.java @@ -122,4 +122,6 @@ public class EventMeshCommon { public static String PREFIX_SESSION_TPS_STAT_EVENTREV = "event_rev_tps_"; public static String CLOUD_EVENTS_PROTOCOL_NAME = "cloudevents"; + + public static String EM_MESSAGE_PROTOCOL_NAME = "eventmeshmessage"; } diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java index a0c1ba85bc..e799fb726d 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java @@ -22,6 +22,7 @@ import java.util.concurrent.ThreadLocalRandom; import io.cloudevents.CloudEvent; +import io.cloudevents.SpecVersion; import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.tcp.Subscription; @@ -78,16 +79,26 @@ public static Package asyncMessageAck(Package in) { return msg; } - public static Package asyncCloudEvent(CloudEvent cloudEvent) { + public static Package buildPackage(Object message, Command command) { Package msg = new Package(); - msg.setHeader(new Header(Command.ASYNC_MESSAGE_TO_SERVER, 0, - null, generateRandomString(seqLength))); - msg.getHeader().putProperty(Constants.PROTOCOL_TYPE, - EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME); - msg.getHeader().putProperty(Constants.PROTOCOL_VERSION, - cloudEvent.getSpecVersion().toString()); + msg.setHeader(new Header(command, 0, + null, generateRandomString(seqLength))); + if (message instanceof CloudEvent) { + msg.getHeader().putProperty(Constants.PROTOCOL_TYPE, + EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME); + msg.getHeader().putProperty(Constants.PROTOCOL_VERSION, + ((CloudEvent) message).getSpecVersion().toString()); + } else if (message instanceof EventMeshMessage) { + msg.getHeader().putProperty(Constants.PROTOCOL_TYPE, + EventMeshCommon.EM_MESSAGE_PROTOCOL_NAME); + msg.getHeader().putProperty(Constants.PROTOCOL_VERSION, + SpecVersion.V1.toString()); + } else { + // unsupported protocol for server + return msg; + } msg.getHeader().putProperty(Constants.PROTOCOL_DESC, "tcp"); - msg.setBody(cloudEvent); + msg.setBody(message); return msg; } diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java index 2b9bfaf38b..46cf370e23 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java @@ -106,14 +106,13 @@ public void initChannel(SocketChannel ch) { } @Override - public void close() throws EventMeshException { + public void close() { try { channel.disconnect().sync(); workers.shutdownGracefully(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.warn("close tcp client failed.|remote address={}", channel.remoteAddress(), e); - throw new EventMeshException(e); } } diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPPubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPPubClient.java index 51341e16e0..7a1d03b1bc 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPPubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPPubClient.java @@ -1,54 +1,132 @@ package org.apache.eventmesh.client.tcp.impl.cloudevent; + + import org.apache.eventmesh.client.tcp.EventMeshTCPPubClient; -import org.apache.eventmesh.client.tcp.common.AsyncRRCallback; -import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; +import org.apache.eventmesh.client.tcp.common.*; +import org.apache.eventmesh.client.tcp.conf.EventMeshTcpClientConfig; +import org.apache.eventmesh.client.tcp.impl.eventmeshmessage.EventMeshMessageTCPPubClient; +import org.apache.eventmesh.common.EventMeshMessage; import org.apache.eventmesh.common.exception.EventMeshException; +import org.apache.eventmesh.common.protocol.tcp.Command; import org.apache.eventmesh.common.protocol.tcp.Package; import org.apache.eventmesh.common.protocol.tcp.UserAgent; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; import io.cloudevents.CloudEvent; import lombok.extern.slf4j.Slf4j; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + /** * A CloudEvent TCP publish client implementation. */ @Slf4j -public class CloudEventTCPPubClient implements EventMeshTCPPubClient { +public class CloudEventTCPPubClient extends TcpClient implements EventMeshTCPPubClient { + + private final UserAgent userAgent; + + private ReceiveMsgHook callback; + + private final ConcurrentHashMap callbackConcurrentHashMap = new ConcurrentHashMap<>(); + private ScheduledFuture task; + + public CloudEventTCPPubClient(EventMeshTcpClientConfig eventMeshTcpClientConfig) { + super(eventMeshTcpClientConfig); + this.userAgent = eventMeshTcpClientConfig.getUserAgent(); + } @Override public void init() throws EventMeshException { - + try { + open(new Handler()); + hello(); + } catch (Exception ex) { + throw new EventMeshException("Initialize EventMeshMessageTCPPubClient error", ex); + } } @Override public void heartbeat() throws EventMeshException { - + if (task != null) { + synchronized (EventMeshMessageTCPPubClient.class) { + task = scheduler.scheduleAtFixedRate(() -> { + try { + if (!isActive()) { + reconnect(); + } + Package msg = MessageUtils.heartBeat(); + io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); + } catch (Exception ignore) { + // ignore + } + }, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT, TimeUnit.MILLISECONDS); + } + } } @Override public void reconnect() throws EventMeshException { - + try { + super.reconnect(); + hello(); + } catch (Exception ex) { + throw new EventMeshException("reconnect error", ex); + } } @Override - public Package rr(CloudEvent msg, long timeout) throws EventMeshException { - return null; + public Package rr(CloudEvent event, long timeout) throws EventMeshException { + try { + Package msg = MessageUtils.buildPackage(event, Command.REQUEST_TO_SERVER); + log.info("{}|rr|send|type={}|msg={}", clientNo, msg, msg); + return io(msg, timeout); + } catch (Exception ex) { + throw new EventMeshException("rr error"); + } } @Override - public void asyncRR(CloudEvent msg, AsyncRRCallback callback, long timeout) throws EventMeshException { - + public void asyncRR(CloudEvent event, AsyncRRCallback callback, long timeout) throws EventMeshException { + try { + Package msg = MessageUtils.buildPackage(event, Command.REQUEST_TO_SERVER); + super.send(msg); + this.callbackConcurrentHashMap.put((String) RequestContext._key(msg), callback); + } catch (Exception ex) { + // should trigger callback? + throw new EventMeshException("asyncRR error", ex); + } } @Override public Package publish(CloudEvent cloudEvent, long timeout) throws EventMeshException { - return null; + try { + // todo: transform EventMeshMessage to Package + Package msg = MessageUtils.buildPackage(cloudEvent, Command.ASYNC_MESSAGE_TO_SERVER); + log.info("SimplePubClientImpl cloud event|{}|publish|send|type={}|protocol={}|msg={}", + clientNo, msg.getHeader().getCommand(), + msg.getHeader().getProperty(PropertyConst.PROPERTY_MESSAGE_PROTOCOL), msg); + return io(msg, timeout); + } catch (Exception ex) { + throw new EventMeshException("publish error", ex); + } } @Override public void broadcast(CloudEvent cloudEvent, long timeout) throws EventMeshException { - + try { + // todo: transform EventMeshMessage to Package + Package msg = MessageUtils.buildPackage(cloudEvent, Command.BROADCAST_MESSAGE_TO_SERVER); + log.info("{}|publish|send|type={}|protocol={}|msg={}", clientNo, msg.getHeader().getCommand(), + msg.getHeader().getProperty(PropertyConst.PROPERTY_MESSAGE_PROTOCOL), msg); + super.send(msg); + } catch (Exception ex) { + throw new EventMeshException("Broadcast message error", ex); + } } @Override @@ -57,7 +135,45 @@ public void registerBusiHandler(ReceiveMsgHook handler) throws Event } @Override - public void close() throws EventMeshException { + public void close() { + + } + + // todo: move to abstract class + @ChannelHandler.Sharable + private class Handler extends SimpleChannelInboundHandler { + @Override + protected void channelRead0(ChannelHandlerContext ctx, Package msg) throws Exception { + log.info("SimplePubClientImpl|{}|receive|type={}|msg={}", clientNo, msg.getHeader(), msg); + + Command cmd = msg.getHeader().getCommand(); + if (cmd == Command.RESPONSE_TO_CLIENT) { + if (callback != null) { + callback.handle(msg, ctx); + } + Package pkg = MessageUtils.responseToClientAck(msg); + send(pkg); + } else if (cmd == Command.SERVER_GOODBYE_REQUEST) { + //TODO + } + + RequestContext context = contexts.get(RequestContext._key(msg)); + if (context != null) { + contexts.remove(context.getKey()); + context.finish(msg); + } + } + } + + // todo: remove hello + private void hello() throws Exception { + Package msg = MessageUtils.hello(userAgent); + this.io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); + } + // todo: remove goodbye + private void goodbye() throws Exception { + Package msg = MessageUtils.goodbye(); + this.io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); } } diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java index d726c4b90c..716feae3cb 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java @@ -1,59 +1,191 @@ package org.apache.eventmesh.client.tcp.impl.cloudevent; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.apache.commons.collections4.CollectionUtils; import org.apache.eventmesh.client.tcp.EventMeshTCPSubClient; -import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; +import org.apache.eventmesh.client.tcp.common.*; +import org.apache.eventmesh.client.tcp.conf.EventMeshTcpClientConfig; +import org.apache.eventmesh.client.tcp.impl.eventmeshmessage.EventMeshMessageTCPSubClient; +import org.apache.eventmesh.common.EventMeshMessage; import org.apache.eventmesh.common.exception.EventMeshException; +import org.apache.eventmesh.common.protocol.SubscriptionItem; import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.SubscriptionType; +import org.apache.eventmesh.common.protocol.tcp.Command; +import org.apache.eventmesh.common.protocol.tcp.Package; import org.apache.eventmesh.common.protocol.tcp.UserAgent; import io.cloudevents.CloudEvent; import lombok.extern.slf4j.Slf4j; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + /** * CloudEvent TCP subscribe client implementation. */ @Slf4j -public class CloudEventTCPSubClient implements EventMeshTCPSubClient { +public class CloudEventTCPSubClient extends TcpClient implements EventMeshTCPSubClient { + + private final UserAgent userAgent; + private final List subscriptionItems = Collections.synchronizedList(new ArrayList<>()); + private ReceiveMsgHook callback; + private ScheduledFuture task; + + public CloudEventTCPSubClient(EventMeshTcpClientConfig eventMeshTcpClientConfig) { + super(eventMeshTcpClientConfig); + this.userAgent = eventMeshTcpClientConfig.getUserAgent(); + } @Override public void init() throws EventMeshException { - + try { + open(new Handler()); + hello(); + log.info("SimpleSubClientImpl|{}|started!", clientNo); + } catch (Exception ex) { + throw new EventMeshException("Initialize EventMeshMessageTcpSubClient error", ex); + } } @Override public void heartbeat() throws EventMeshException { - + if (task == null) { + synchronized (EventMeshMessageTCPSubClient.class) { + task = scheduler.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + if (!isActive()) { + reconnect(); + } + Package msg = MessageUtils.heartBeat(); + io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); + } catch (Exception ignore) { + // + } + } + }, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT, TimeUnit.MILLISECONDS); + } + } } @Override public void reconnect() throws EventMeshException { - + try { + super.reconnect(); + hello(); + if (!CollectionUtils.isEmpty(subscriptionItems)) { + for (SubscriptionItem item : subscriptionItems) { + Package request = MessageUtils.subscribe(item.getTopic(), item.getMode(), item.getType()); + this.io(request, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); + } + } + listen(); + } catch (Exception ex) { + // + } } @Override public void subscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) throws EventMeshException { - + try { + subscriptionItems.add(new SubscriptionItem(topic, subscriptionMode, subscriptionType)); + Package request = MessageUtils.subscribe(topic, subscriptionMode, subscriptionType); + io(request, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); + } catch (Exception ex) { + throw new EventMeshException("Subscribe error", ex); + } } @Override public void unsubscribe() throws EventMeshException { - + try { + Package request = MessageUtils.unsubscribe(); + io(request, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); + } catch (Exception ex) { + throw new EventMeshException("Unsubscribe error", ex); + } } @Override public void listen() throws EventMeshException { + try { + Package request = MessageUtils.listen(); + io(request, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); + } catch (Exception ex) { + throw new EventMeshException("Listen error", ex); + } + } + + private void goodbye() throws Exception { + Package msg = MessageUtils.goodbye(); + this.io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); + } + private void hello() throws Exception { + Package msg = MessageUtils.hello(userAgent); + this.io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); } @Override public void registerBusiHandler(ReceiveMsgHook handler) throws EventMeshException { - + this.callback = handler; } @Override - public void close() throws EventMeshException{ + public void close() { + try { + task.cancel(false); + goodbye(); + super.close(); + } catch (Exception ex) { + ex.printStackTrace(); + } + } + private class Handler extends SimpleChannelInboundHandler { + @SuppressWarnings("Duplicates") + @Override + protected void channelRead0(ChannelHandlerContext ctx, Package msg) throws Exception { + Command cmd = msg.getHeader().getCommand(); + log.info("|receive|type={}|msg={}", cmd, msg); + if (cmd == Command.REQUEST_TO_CLIENT) { + if (callback != null) { + callback.handle(msg, ctx); + } + Package pkg = MessageUtils.requestToClientAck(msg); + send(pkg); + } else if (cmd == Command.ASYNC_MESSAGE_TO_CLIENT) { + Package pkg = MessageUtils.asyncMessageAck(msg); + if (callback != null) { + callback.handle(msg, ctx); + } + send(pkg); + } else if (cmd == Command.BROADCAST_MESSAGE_TO_CLIENT) { + Package pkg = MessageUtils.broadcastMessageAck(msg); + if (callback != null) { + callback.handle(msg, ctx); + } + send(pkg); + } else if (cmd == Command.SERVER_GOODBYE_REQUEST) { + //TODO + } else { + log.error("msg ignored|{}|{}", cmd, msg); + } + RequestContext context = contexts.get(RequestContext._key(msg)); + if (context != null) { + contexts.remove(context.getKey()); + context.finish(msg); + } else { + log.error("msg ignored,context not found.|{}|{}", cmd, msg); + } + } } + } diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java index ba9e0afda4..1f4e774507 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java @@ -85,7 +85,7 @@ public void reconnect() throws EventMeshException { @Override public Package rr(EventMeshMessage eventMeshMessage, long timeout) throws EventMeshException { try { - Package msg = MessageUtils.asyncCloudEvent(eventMeshMessage); + Package msg = MessageUtils.buildPackage(eventMeshMessage, Command.REQUEST_TO_SERVER); log.info("{}|rr|send|type={}|msg={}", clientNo, msg, msg); return io(msg, timeout); } catch (Exception ex) { @@ -96,7 +96,7 @@ public Package rr(EventMeshMessage eventMeshMessage, long timeout) throws EventM @Override public void asyncRR(EventMeshMessage eventMeshMessage, AsyncRRCallback callback, long timeout) throws EventMeshException { try { - Package msg = MessageUtils.asyncCloudEvent(eventMeshMessage); + Package msg = MessageUtils.buildPackage(eventMeshMessage, Command.REQUEST_TO_SERVER); super.send(msg); this.callbackConcurrentHashMap.put((String) RequestContext._key(msg), callback); } catch (Exception ex) { @@ -109,7 +109,7 @@ public void asyncRR(EventMeshMessage eventMeshMessage, AsyncRRCallback callback, public Package publish(EventMeshMessage eventMeshMessage, long timeout) throws EventMeshException { try { // todo: transform EventMeshMessage to Package - Package msg = MessageUtils.asyncCloudEvent(eventMeshMessage); + Package msg = MessageUtils.buildPackage(eventMeshMessage, Command.ASYNC_MESSAGE_TO_SERVER); log.info("SimplePubClientImpl cloud event|{}|publish|send|type={}|protocol={}|msg={}", clientNo, msg.getHeader().getCommand(), msg.getHeader().getProperty(PropertyConst.PROPERTY_MESSAGE_PROTOCOL), msg); @@ -123,7 +123,7 @@ public Package publish(EventMeshMessage eventMeshMessage, long timeout) throws E public void broadcast(EventMeshMessage eventMeshMessage, long timeout) throws EventMeshException { try { // todo: transform EventMeshMessage to Package - Package msg = MessageUtils.asyncCloudEvent(eventMeshMessage); + Package msg = MessageUtils.buildPackage(eventMeshMessage, Command.BROADCAST_MESSAGE_TO_SERVER); log.info("{}|publish|send|type={}|protocol={}|msg={}", clientNo, msg.getHeader().getCommand(), msg.getHeader().getProperty(PropertyConst.PROPERTY_MESSAGE_PROTOCOL), msg); super.send(msg); @@ -138,7 +138,7 @@ public void registerBusiHandler(ReceiveMsgHook receiveMsgHook) } @Override - public void close() throws EventMeshException { + public void close() { try { task.cancel(false); goodbye(); diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java index 08aa170083..cdce845a96 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java @@ -113,6 +113,25 @@ public void unsubscribe() throws EventMeshException { } } + private void goodbye() throws Exception { + Package msg = MessageUtils.goodbye(); + this.io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); + } + + private void hello() throws Exception { + Package msg = MessageUtils.hello(userAgent); + this.io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); + } + + public void listen() throws EventMeshException { + try { + Package request = MessageUtils.listen(); + io(request, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); + } catch (Exception ex) { + throw new EventMeshException("Listen error", ex); + } + } + @Override public void registerBusiHandler(ReceiveMsgHook receiveMsgHook) throws EventMeshException { @@ -120,13 +139,13 @@ public void registerBusiHandler(ReceiveMsgHook receiveMsgHook) } @Override - public void close() throws EventMeshException { + public void close() { try { task.cancel(false); goodbye(); super.close(); } catch (Exception ex) { - throw new EventMeshException("Close EventMeshMessageTcpSubClient error", ex); + ex.printStackTrace(); } } @@ -169,22 +188,4 @@ protected void channelRead0(ChannelHandlerContext ctx, Package msg) throws Excep } } - private void goodbye() throws Exception { - Package msg = MessageUtils.goodbye(); - this.io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); - } - - private void hello() throws Exception { - Package msg = MessageUtils.hello(userAgent); - this.io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); - } - - public void listen() throws EventMeshException { - try { - Package request = MessageUtils.listen(); - io(request, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); - } catch (Exception ex) { - throw new EventMeshException("Listen error", ex); - } - } }