diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubscriptionItem.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubscriptionItem.java index abcc22c20e..0eda53ec99 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubscriptionItem.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubscriptionItem.java @@ -23,22 +23,22 @@ public class SubscriptionItem { private SubscriptionMode mode; - private SubcriptionType type; + private SubscriptionType type; public SubscriptionItem() { } - public SubscriptionItem(String topic, SubscriptionMode mode, SubcriptionType type) { + public SubscriptionItem(String topic, SubscriptionMode mode, SubscriptionType type) { this.topic = topic; this.mode = mode; this.type = type; } - public SubcriptionType getType() { + public SubscriptionType getType() { return type; } - public void setType(SubcriptionType type) { + public void setType(SubscriptionType type) { this.type = type; } diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java index 570b24d7e6..0772fca952 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java @@ -30,7 +30,7 @@ import org.apache.eventmesh.common.EventMeshException; import org.apache.eventmesh.common.IPUtil; import org.apache.eventmesh.common.ThreadUtil; -import org.apache.eventmesh.common.protocol.SubcriptionType; +import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.SubscriptionItem; import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.http.demo.AsyncPublishInstance; @@ -52,7 +52,7 @@ public class SubService implements InitializingBean { final Properties properties = Utils.readPropertiesFile("application.properties"); - final List topicList = Arrays.asList(new SubscriptionItem("TEST-TOPIC-HTTP-ASYNC", SubscriptionMode.CLUSTERING, SubcriptionType.ASYNC)); + final List topicList = Arrays.asList(new SubscriptionItem("TEST-TOPIC-HTTP-ASYNC", SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC)); final String localIp = IPUtil.getLocalAddress(); final String localPort = properties.getProperty("server.port"); final String eventMeshIp = properties.getProperty("eventmesh.ip"); diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribe.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribe.java index c44d6f090c..769fd22268 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribe.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribe.java @@ -24,7 +24,7 @@ import org.apache.eventmesh.client.tcp.EventMeshClient; import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient; -import org.apache.eventmesh.common.protocol.SubcriptionType; +import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; import org.apache.eventmesh.common.protocol.tcp.Package; import org.apache.eventmesh.common.protocol.SubscriptionMode; @@ -52,7 +52,7 @@ public static void main(String[] agrs) throws Exception { client.init(); client.heartbeat(); - client.subscribe("TEST-TOPIC-TCP-ASYNC", SubscriptionMode.CLUSTERING, SubcriptionType.ASYNC); + client.subscribe("TEST-TOPIC-TCP-ASYNC", SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC); client.registerSubBusiHandler(handler); client.listen(); diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribeBroadcast.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribeBroadcast.java index 6c2cdf4d14..99735487bc 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribeBroadcast.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribeBroadcast.java @@ -24,7 +24,7 @@ import org.apache.eventmesh.client.tcp.EventMeshClient; import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient; -import org.apache.eventmesh.common.protocol.SubcriptionType; +import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; import org.apache.eventmesh.common.protocol.tcp.Package; import org.apache.eventmesh.common.protocol.SubscriptionMode; @@ -52,7 +52,7 @@ public static void main(String[] agrs) throws Exception { client.init(); client.heartbeat(); - client.subscribe("TEST-TOPIC-TCP-BROADCAST", SubscriptionMode.BROADCASTING, SubcriptionType.ASYNC); + client.subscribe("TEST-TOPIC-TCP-BROADCAST", SubscriptionMode.BROADCASTING, SubscriptionType.ASYNC); client.registerSubBusiHandler(handler); client.listen(); diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/SyncResponse.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/SyncResponse.java index d8e6a85e11..033dfb43ef 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/SyncResponse.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/SyncResponse.java @@ -22,7 +22,7 @@ import org.apache.eventmesh.client.tcp.EventMeshClient; import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient; -import org.apache.eventmesh.common.protocol.SubcriptionType; +import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.tcp.Package; import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.tcp.UserAgent; @@ -45,7 +45,7 @@ public static void main(String[] agrs) throws Exception { client.init(); client.heartbeat(); - client.subscribe("TEST-TOPIC-TCP-SYNC", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC); + client.subscribe("TEST-TOPIC-TCP-SYNC", SubscriptionMode.CLUSTERING, SubscriptionType.SYNC); // Synchronize RR messages client.registerSubBusiHandler(handler); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java index 47c9e5f36c..f50c3eeece 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java @@ -35,7 +35,7 @@ import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.IPUtil; import org.apache.eventmesh.common.RandomStringUtil; -import org.apache.eventmesh.common.protocol.SubcriptionType; +import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.http.body.message.PushMessageRequestBody; import org.apache.eventmesh.common.protocol.http.common.ClientRetCode; import org.apache.eventmesh.common.protocol.http.common.ProtocolKey; @@ -85,7 +85,7 @@ public void tryHTTPRequest() { String requestCode = ""; - if (SubcriptionType.SYNC.equals(handleMsgContext.getSubscriptionItem().getType())) { + if (SubscriptionType.SYNC.equals(handleMsgContext.getSubscriptionItem().getType())) { requestCode = String.valueOf(RequestCode.HTTP_PUSH_CLIENT_SYNC.getRequestCode()); } else { requestCode = String.valueOf(RequestCode.HTTP_PUSH_CLIENT_ASYNC.getRequestCode()); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java index a4d2867769..4b78332938 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java @@ -20,8 +20,7 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import org.apache.commons.collections4.CollectionUtils; -import org.apache.eventmesh.common.Constants; -import org.apache.eventmesh.common.protocol.SubcriptionType; +import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.tcp.*; import org.apache.eventmesh.common.protocol.tcp.Package; @@ -64,7 +63,7 @@ public void push(final DownStreamMsgContext downStreamMsgContext) { Command cmd; if (SubscriptionMode.BROADCASTING.equals(downStreamMsgContext.subscriptionItem.getMode())) { cmd = Command.BROADCAST_MESSAGE_TO_CLIENT; - } else if (SubcriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType())) { + } else if (SubscriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType())) { cmd = Command.REQUEST_TO_CLIENT; } else { cmd = Command.ASYNC_MESSAGE_TO_CLIENT; @@ -101,7 +100,7 @@ public void operationComplete(ChannelFuture future) throws Exception { logger.warn("isolate client:{},isolateTime:{}", session.getClient(), isolateTime); //retry - long delayTime = SubcriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType()) + long delayTime = SubscriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType()) ? session.getEventMeshTCPConfiguration().eventMeshTcpMsgRetrySyncDelayInMills : session.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryAsyncDelayInMills; downStreamMsgContext.delay(delayTime); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/EventMeshTcpRetryer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/EventMeshTcpRetryer.java index 8c29df64dd..cf4e3b7ab5 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/EventMeshTcpRetryer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/EventMeshTcpRetryer.java @@ -20,7 +20,7 @@ import io.openmessaging.api.Message; import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.common.Constants; -import org.apache.eventmesh.common.protocol.SubcriptionType; +import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.runtime.boot.EventMeshTCPServer; import org.apache.eventmesh.runtime.constants.EventMeshConstants; @@ -75,7 +75,7 @@ public void pushRetry(DownStreamMsgContext downStreamMsgContext) { return; } - int maxRetryTimes = SubcriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType()) + int maxRetryTimes = SubscriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType()) ? eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgSyncRetryTimes : eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgAsyncRetryTimes; if (downStreamMsgContext.retryTimes >= maxRetryTimes) { diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/client/api/EventMeshClient.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/client/api/EventMeshClient.java index fd5f6e8088..2ba50e7dbf 100644 --- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/client/api/EventMeshClient.java +++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/client/api/EventMeshClient.java @@ -17,7 +17,7 @@ package org.apache.eventmesh.runtime.client.api; -import org.apache.eventmesh.common.protocol.SubcriptionType; +import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.tcp.Package; import org.apache.eventmesh.runtime.client.hook.ReceiveMsgHook; @@ -39,9 +39,9 @@ public interface EventMeshClient { Package listen() throws Exception; - Package justSubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception; + Package justSubscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) throws Exception; - Package justUnsubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception; + Package justUnsubscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) throws Exception; void registerPubBusiHandler(ReceiveMsgHook handler) throws Exception; diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/client/api/SubClient.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/client/api/SubClient.java index 467bec59a3..03772d19d7 100644 --- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/client/api/SubClient.java +++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/client/api/SubClient.java @@ -17,7 +17,7 @@ package org.apache.eventmesh.runtime.client.api; -import org.apache.eventmesh.common.protocol.SubcriptionType; +import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.tcp.Package; import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.tcp.UserAgent; @@ -34,9 +34,9 @@ public interface SubClient { void reconnect() throws Exception; - Package justSubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception; + Package justSubscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) throws Exception; - Package justUnsubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception; + Package justUnsubscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) throws Exception; Package listen() throws Exception; diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/client/common/MessageUtils.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/client/common/MessageUtils.java index 3d9fadeab5..d8d01f7f0c 100644 --- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/client/common/MessageUtils.java +++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/client/common/MessageUtils.java @@ -21,7 +21,7 @@ import java.util.List; import java.util.concurrent.ThreadLocalRandom; -import org.apache.eventmesh.common.protocol.SubcriptionType; +import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.tcp.Subscription; import org.apache.eventmesh.common.protocol.SubscriptionItem; import org.apache.eventmesh.common.protocol.SubscriptionMode; @@ -63,10 +63,10 @@ public static Package subscribe() { return msg; } - public static Package subscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) { + public static Package subscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) { Package msg = new Package(); msg.setHeader(new Header(Command.SUBSCRIBE_REQUEST, 0, null, generateRandomString(seqLength))); - msg.setBody(generateSubscription(topic, subscriptionMode, subcriptionType)); + msg.setBody(generateSubscription(topic, subscriptionMode, subscriptionType)); return msg; } @@ -76,10 +76,10 @@ public static Package unsubscribe() { return msg; } - public static Package unsubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) { + public static Package unsubscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) { Package msg = new Package(); msg.setHeader(new Header(Command.UNSUBSCRIBE_REQUEST, 0, null, generateRandomString(seqLength))); - msg.setBody(generateSubscription(topic, subscriptionMode, subcriptionType)); + msg.setBody(generateSubscription(topic, subscriptionMode, subscriptionType)); return msg; } @@ -169,18 +169,18 @@ public static UserAgent generateSubServer() { public static Subscription generateSubscription() { Subscription subscription = new Subscription(); List subscriptionItems = new ArrayList<>(); - subscriptionItems.add(new SubscriptionItem("TEST-TOPIC-TCP-SYNC", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC)); - subscriptionItems.add(new SubscriptionItem("TEST-TOPIC-TCP-SYNC2", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC)); - subscriptionItems.add(new SubscriptionItem("TEST-TOPIC-TCP-SYNC3", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC)); - subscriptionItems.add(new SubscriptionItem("TEST-TOPIC-TCP-SYNC4", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC)); + subscriptionItems.add(new SubscriptionItem("TEST-TOPIC-TCP-SYNC", SubscriptionMode.CLUSTERING, SubscriptionType.SYNC)); + subscriptionItems.add(new SubscriptionItem("TEST-TOPIC-TCP-SYNC2", SubscriptionMode.CLUSTERING, SubscriptionType.SYNC)); + subscriptionItems.add(new SubscriptionItem("TEST-TOPIC-TCP-SYNC3", SubscriptionMode.CLUSTERING, SubscriptionType.SYNC)); + subscriptionItems.add(new SubscriptionItem("TEST-TOPIC-TCP-SYNC4", SubscriptionMode.CLUSTERING, SubscriptionType.SYNC)); subscription.setTopicList(subscriptionItems); return subscription; } - public static Subscription generateSubscription(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) { + public static Subscription generateSubscription(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) { Subscription subscription = new Subscription(); List subscriptionItems = new ArrayList<>(); - subscriptionItems.add(new SubscriptionItem(topic, subscriptionMode, subcriptionType)); + subscriptionItems.add(new SubscriptionItem(topic, subscriptionMode, subscriptionType)); subscription.setTopicList(subscriptionItems); return subscription; } diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/client/impl/EventMeshClientImpl.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/client/impl/EventMeshClientImpl.java index e6ac6ef01b..9d830f7912 100644 --- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/client/impl/EventMeshClientImpl.java +++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/client/impl/EventMeshClientImpl.java @@ -17,7 +17,7 @@ package org.apache.eventmesh.runtime.client.impl; -import org.apache.eventmesh.common.protocol.SubcriptionType; +import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.tcp.Package; import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.tcp.UserAgent; @@ -84,14 +84,14 @@ public Package listen() throws Exception { @Override public Package justSubscribe(String topic, SubscriptionMode subscriptionMode, - SubcriptionType subcriptionType) throws Exception { - return this.subClient.justSubscribe(topic, subscriptionMode, subcriptionType); + SubscriptionType subscriptionType) throws Exception { + return this.subClient.justSubscribe(topic, subscriptionMode, subscriptionType); } @Override public Package justUnsubscribe(String topic, SubscriptionMode subscriptionMode, - SubcriptionType subcriptionType) throws Exception { - return this.subClient.justUnsubscribe(topic, subscriptionMode, subcriptionType); + SubscriptionType subscriptionType) throws Exception { + return this.subClient.justUnsubscribe(topic, subscriptionMode, subscriptionType); } public void registerSubBusiHandler(ReceiveMsgHook handler) throws Exception { diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/client/impl/SubClientImpl.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/client/impl/SubClientImpl.java index 80e9bcb333..473a82d1fa 100644 --- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/client/impl/SubClientImpl.java +++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/client/impl/SubClientImpl.java @@ -27,12 +27,11 @@ import io.netty.channel.SimpleChannelInboundHandler; import org.apache.commons.collections4.CollectionUtils; -import org.apache.eventmesh.common.protocol.SubcriptionType; +import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.SubscriptionItem; import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.tcp.*; import org.apache.eventmesh.common.protocol.tcp.Package; -import org.apache.eventmesh.runtime.demo.SyncPubClient; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -119,9 +118,9 @@ private void hello() throws Exception { this.dispatcher(msg, ClientConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS); } - public Package justSubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception { - subscriptionItems.add(new SubscriptionItem(topic, subscriptionMode, subcriptionType)); - Package msg = MessageUtils.subscribe(topic, subscriptionMode, subcriptionType); + public Package justSubscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) throws Exception { + subscriptionItems.add(new SubscriptionItem(topic, subscriptionMode, subscriptionType)); + Package msg = MessageUtils.subscribe(topic, subscriptionMode, subscriptionType); return this.dispatcher(msg, ClientConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS); } @@ -142,9 +141,9 @@ public Package listen() throws Exception { // } public Package justUnsubscribe(String topic, SubscriptionMode subscriptionMode, - SubcriptionType subcriptionType) throws Exception { + SubscriptionType subscriptionType) throws Exception { subscriptionItems.remove(topic); - Package msg = MessageUtils.unsubscribe(topic, subscriptionMode, subcriptionType); + Package msg = MessageUtils.unsubscribe(topic, subscriptionMode, subscriptionType); return this.dispatcher(msg, ClientConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS); } diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/AsyncSubClient.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/AsyncSubClient.java index a615bd4fa7..c634d35eeb 100644 --- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/AsyncSubClient.java +++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/AsyncSubClient.java @@ -19,7 +19,7 @@ import io.netty.channel.ChannelHandlerContext; -import org.apache.eventmesh.common.protocol.SubcriptionType; +import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; import org.apache.eventmesh.common.protocol.tcp.Package; @@ -39,7 +39,7 @@ public static void main(String[] args) throws Exception { SubClientImpl client = new SubClientImpl("127.0.0.1", 10002, MessageUtils.generateSubServer()); client.init(); client.heartbeat(); - client.justSubscribe(ClientConstants.ASYNC_TOPIC, SubscriptionMode.CLUSTERING, SubcriptionType.ASYNC); + client.justSubscribe(ClientConstants.ASYNC_TOPIC, SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC); client.registerBusiHandler(new ReceiveMsgHook() { @Override public void handle(Package msg, ChannelHandlerContext ctx) { diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/BroadCastSubClient.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/BroadCastSubClient.java index 0ae07c3578..a61c562214 100644 --- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/BroadCastSubClient.java +++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/BroadCastSubClient.java @@ -19,7 +19,7 @@ import io.netty.channel.ChannelHandlerContext; -import org.apache.eventmesh.common.protocol.SubcriptionType; +import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.tcp.Command; import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; import org.apache.eventmesh.common.protocol.tcp.Package; @@ -40,7 +40,7 @@ public static void main(String[] args) throws Exception { SubClientImpl client = new SubClientImpl("127.0.0.1", 10000, MessageUtils.generateSubServer()); client.init(); client.heartbeat(); - client.justSubscribe(ClientConstants.BROADCAST_TOPIC, SubscriptionMode.BROADCASTING, SubcriptionType.ASYNC); + client.justSubscribe(ClientConstants.BROADCAST_TOPIC, SubscriptionMode.BROADCASTING, SubscriptionType.ASYNC); client.registerBusiHandler(new ReceiveMsgHook() { @Override public void handle(Package msg, ChannelHandlerContext ctx) { diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CCSubClient.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CCSubClient.java index 17908b68e1..bb96c1db83 100644 --- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CCSubClient.java +++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CCSubClient.java @@ -19,7 +19,7 @@ import io.netty.channel.ChannelHandlerContext; -import org.apache.eventmesh.common.protocol.SubcriptionType; +import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.tcp.Command; import org.apache.eventmesh.common.protocol.tcp.Package; @@ -40,7 +40,7 @@ public static void main(String[] args) throws Exception { subClient.init(); subClient.heartbeat(); subClient.listen(); - subClient.justSubscribe("TEST-TOPIC-TCP-SYNC", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC); + subClient.justSubscribe("TEST-TOPIC-TCP-SYNC", SubscriptionMode.CLUSTERING, SubscriptionType.SYNC); subClient.registerBusiHandler(new ReceiveMsgHook() { @Override public void handle(Package msg, ChannelHandlerContext ctx) { diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CClientDemo.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CClientDemo.java index 5fd7aa4441..4f33595ee0 100644 --- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CClientDemo.java +++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CClientDemo.java @@ -19,7 +19,7 @@ import io.netty.channel.ChannelHandlerContext; -import org.apache.eventmesh.common.protocol.SubcriptionType; +import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.tcp.Command; import org.apache.eventmesh.common.protocol.tcp.Package; import org.apache.eventmesh.common.protocol.SubscriptionMode; @@ -45,8 +45,8 @@ public static void main(String[] args) throws Exception { EventMeshClientImpl client = new EventMeshClientImpl("127.0.0.1", 10000); client.init(); client.heartbeat(); - client.justSubscribe(ASYNC_TOPIC, SubscriptionMode.CLUSTERING, SubcriptionType.ASYNC); - client.justSubscribe(BROADCAST_TOPIC, SubscriptionMode.BROADCASTING, SubcriptionType.ASYNC); + client.justSubscribe(ASYNC_TOPIC, SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC); + client.justSubscribe(BROADCAST_TOPIC, SubscriptionMode.BROADCASTING, SubscriptionType.ASYNC); client.listen(); // for (int i = 0; i < 10000; i++) { // Package rr = null; diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/SyncSubClient.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/SyncSubClient.java index 5e4f2c8d90..6921f3dd20 100644 --- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/SyncSubClient.java +++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/SyncSubClient.java @@ -19,7 +19,7 @@ import io.netty.channel.ChannelHandlerContext; -import org.apache.eventmesh.common.protocol.SubcriptionType; +import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.tcp.Command; import org.apache.eventmesh.common.protocol.tcp.Package; @@ -39,7 +39,7 @@ public static void main(String[] args) throws Exception { SubClientImpl client = new SubClientImpl("127.0.0.1", 10000, MessageUtils.generateSubServer()); client.init(); client.heartbeat(); - client.justSubscribe(ClientConstants.SYNC_TOPIC, SubscriptionMode.CLUSTERING, SubcriptionType.SYNC); + client.justSubscribe(ClientConstants.SYNC_TOPIC, SubscriptionMode.CLUSTERING, SubscriptionType.SYNC); client.registerBusiHandler(new ReceiveMsgHook() { @Override public void handle(Package msg, ChannelHandlerContext ctx) { diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshClient.java index afdb90ac5f..c98aa7a8b3 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshClient.java @@ -19,7 +19,7 @@ import org.apache.eventmesh.client.tcp.common.AsyncRRCallback; import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; -import org.apache.eventmesh.common.protocol.SubcriptionType; +import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.tcp.Package; import org.apache.eventmesh.common.protocol.SubscriptionMode; @@ -41,7 +41,7 @@ public interface EventMeshClient { void listen() throws Exception; - void subscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception; + void subscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) throws Exception; void unsubscribe() throws Exception; diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/SimpleSubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/SimpleSubClient.java index 3803edcb07..3439f208f9 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/SimpleSubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/SimpleSubClient.java @@ -19,7 +19,7 @@ import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; -import org.apache.eventmesh.common.protocol.SubcriptionType; +import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.tcp.UserAgent; @@ -32,7 +32,7 @@ public interface SimpleSubClient { void reconnect() throws Exception; - void subscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception; + void subscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) throws Exception; void unsubscribe() throws Exception; diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java index 2d6c7ca834..5b9d5143d5 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java @@ -21,7 +21,7 @@ import java.util.List; import java.util.concurrent.ThreadLocalRandom; -import org.apache.eventmesh.common.protocol.SubcriptionType; +import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.tcp.Subscription; import org.apache.eventmesh.common.protocol.SubscriptionItem; import org.apache.eventmesh.common.protocol.SubscriptionMode; @@ -56,10 +56,10 @@ public static Package listen() { return msg; } - public static Package subscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) { + public static Package subscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) { Package msg = new Package(); msg.setHeader(new Header(Command.SUBSCRIBE_REQUEST, 0, null, generateRandomString(seqLength))); - msg.setBody(generateSubscription(topic, subscriptionMode, subcriptionType)); + msg.setBody(generateSubscription(topic, subscriptionMode, subscriptionType)); return msg; } @@ -132,10 +132,10 @@ public static UserAgent generatePubClient(UserAgent agent) { return user; } - private static Subscription generateSubscription(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) { + private static Subscription generateSubscription(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) { Subscription subscription = new Subscription(); List subscriptionItems = new ArrayList<>(); - subscriptionItems.add(new SubscriptionItem(topic, subscriptionMode, subcriptionType)); + subscriptionItems.add(new SubscriptionItem(topic, subscriptionMode, subscriptionType)); subscription.setTopicList(subscriptionItems); return subscription; } diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/DefaultEventMeshClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/DefaultEventMeshClient.java index 8e645fa35f..c75922f688 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/DefaultEventMeshClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/DefaultEventMeshClient.java @@ -24,7 +24,7 @@ import org.apache.eventmesh.client.tcp.common.AsyncRRCallback; import org.apache.eventmesh.client.tcp.common.MessageUtils; import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; -import org.apache.eventmesh.common.protocol.SubcriptionType; +import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.tcp.Package; import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.tcp.UserAgent; @@ -97,8 +97,8 @@ public void listen() throws Exception { } @Override - public void subscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception { - this.subClient.subscribe(topic, subscriptionMode, subcriptionType); + public void subscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) throws Exception { + this.subClient.subscribe(topic, subscriptionMode, subscriptionType); } @Override diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/SimpleSubClientImpl.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/SimpleSubClientImpl.java index 7e341cadd4..68d607bde8 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/SimpleSubClientImpl.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/SimpleSubClientImpl.java @@ -33,7 +33,7 @@ import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; import org.apache.eventmesh.client.tcp.common.RequestContext; import org.apache.eventmesh.client.tcp.common.TcpClient; -import org.apache.eventmesh.common.protocol.SubcriptionType; +import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.SubscriptionItem; import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.tcp.*; @@ -123,9 +123,9 @@ public void listen() throws Exception { } - public void subscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception { - subscriptionItems.add(new SubscriptionItem(topic, subscriptionMode, subcriptionType)); - Package request = MessageUtils.subscribe(topic, subscriptionMode, subcriptionType); + public void subscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) throws Exception { + subscriptionItems.add(new SubscriptionItem(topic, subscriptionMode, subscriptionType)); + Package request = MessageUtils.subscribe(topic, subscriptionMode, subscriptionType); this.io(request, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); } diff --git a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribe.java b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribe.java index 0f5f040403..abd3ef8a87 100644 --- a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribe.java +++ b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribe.java @@ -24,7 +24,7 @@ import org.apache.eventmesh.client.tcp.common.EventMeshTestUtils; import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient; -import org.apache.eventmesh.common.protocol.SubcriptionType; +import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; import org.apache.eventmesh.common.protocol.tcp.UserAgent; @@ -47,7 +47,7 @@ public static void main(String[] agrs) throws Exception { client.init(); client.heartbeat(); - client.subscribe("TEST-TOPIC-TCP-ASYNC", SubscriptionMode.CLUSTERING, SubcriptionType.ASYNC); + client.subscribe("TEST-TOPIC-TCP-ASYNC", SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC); client.registerSubBusiHandler(handler); client.listen(); diff --git a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribeBroadcast.java b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribeBroadcast.java index 6a28d2634a..52706c176a 100644 --- a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribeBroadcast.java +++ b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribeBroadcast.java @@ -24,7 +24,7 @@ import org.apache.eventmesh.client.tcp.common.EventMeshTestUtils; import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient; -import org.apache.eventmesh.common.protocol.SubcriptionType; +import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; import org.apache.eventmesh.common.protocol.tcp.UserAgent; @@ -47,7 +47,7 @@ public static void main(String[] agrs) throws Exception { client.init(); client.heartbeat(); - client.subscribe("TEST-TOPIC-TCP-BROADCAST", SubscriptionMode.BROADCASTING, SubcriptionType.ASYNC); + client.subscribe("TEST-TOPIC-TCP-BROADCAST", SubscriptionMode.BROADCASTING, SubscriptionType.ASYNC); client.registerSubBusiHandler(handler); client.listen(); diff --git a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/SyncResponse.java b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/SyncResponse.java index c71067cd50..96b415ca59 100644 --- a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/SyncResponse.java +++ b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/SyncResponse.java @@ -23,7 +23,7 @@ import org.apache.eventmesh.client.tcp.common.EventMeshTestUtils; import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient; -import org.apache.eventmesh.common.protocol.SubcriptionType; +import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.tcp.UserAgent; import org.apache.eventmesh.common.protocol.tcp.Package; @@ -45,7 +45,7 @@ public static void main(String[] agrs) throws Exception { client.init(); client.heartbeat(); - client.subscribe("TEST-TOPIC-TCP-SYNC", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC); + client.subscribe("TEST-TOPIC-TCP-SYNC", SubscriptionMode.CLUSTERING, SubscriptionType.SYNC); // Synchronize RR messages client.registerSubBusiHandler(handler);