Skip to content

Commit

Permalink
[ISSUE #4166] Fix grpc AsyncPublishInstance has no push messages. (#4167
Browse files Browse the repository at this point in the history
)

* Create .lift.toml

* Update .lift.toml

* Update .lift.toml

* Update .lift.toml

* Update .lift.toml

* Update .lift.toml

* Update .lift.toml

* Update .lift.toml

* Update .lift.toml

* delete lift

* push message optimization methods and correction of topic.

* push message optimization methods and correction of topic.
  • Loading branch information
Alonexc authored Jun 30, 2023
1 parent 525a8dd commit bae1307
Show file tree
Hide file tree
Showing 8 changed files with 18 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ protected static EventMeshGrpcClientConfig initEventMeshGrpcClientConfig(final S
.build();
}

protected static CloudEvent buildCloudEvent(final Map<String, String> content) {
protected static CloudEvent buildCloudEvent(final Map<String, String> content, String topic) {
return CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSubject(ExampleConstants.EVENTMESH_GRPC_ASYNC_TEST_TOPIC)
.withSubject(topic)
.withSource(URI.create("/"))
.withDataContentType(ExampleConstants.CLOUDEVENT_CONTENT_TYPE)
.withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
Expand All @@ -66,10 +66,10 @@ protected static CloudEvent buildCloudEvent(final Map<String, String> content) {

}

protected static EventMeshMessage buildEventMeshMessage(final Map<String, String> content) {
protected static EventMeshMessage buildEventMeshMessage(final Map<String, String> content, String topic) {
return EventMeshMessage.builder()
.content(JsonUtils.toJSONString(content))
.topic(ExampleConstants.EVENTMESH_GRPC_BROADCAT_TEST_TOPIC)
.topic(topic)
.uniqueId(RandomStringUtils.generateNum(30))
.bizSeqNo(RandomStringUtils.generateNum(30))
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public static void main(String[] args) throws Exception {

final List<CloudEvent> cloudEventList = new ArrayList<>();
for (int i = 0; i < 5; i++) {
cloudEventList.add(buildCloudEvent(content));
cloudEventList.add(buildCloudEvent(content,
ExampleConstants.EVENTMESH_GRPC_BROADCAT_TEST_TOPIC));
}
eventMeshGrpcProducer.publish(cloudEventList);
ThreadUtils.sleep(10, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public static void main(String[] args) throws Exception {
content.put("content", "testAsyncMessage");

for (int i = 0; i < MESSAGE_SIZE; i++) {
eventMeshGrpcProducer.publish(buildCloudEvent(content));
eventMeshGrpcProducer.publish(buildCloudEvent(content,
ExampleConstants.EVENTMESH_GRPC_ASYNC_TEST_TOPIC));
ThreadUtils.sleep(1, TimeUnit.SECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public static void main(String[] args) throws Exception {
content.put("content", "testRequestReplyMessage");

for (int i = 0; i < MESSAGE_SIZE; i++) {
eventMeshGrpcProducer.requestReply(buildCloudEvent(content), EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
eventMeshGrpcProducer.requestReply(buildCloudEvent(content,
ExampleConstants.EVENTMESH_GRPC_RR_TEST_TOPIC), EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
ThreadUtils.sleep(1, TimeUnit.SECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public static void main(String[] args) throws Exception {
content.put("content", "testAsyncMessage");

for (int i = 0; i < MESSAGE_SIZE; i++) {
eventMeshGrpcProducer.publish(buildEventMeshMessage(content));
eventMeshGrpcProducer.publish(buildEventMeshMessage(content,
ExampleConstants.EVENTMESH_GRPC_BROADCAT_TEST_TOPIC));
ThreadUtils.sleep(1, TimeUnit.SECONDS);
}
ThreadUtils.sleep(30, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public static void main(String[] args) throws Exception {
content.put("content", "testAsyncMessage");

for (int i = 0; i < MESSAGE_SIZE; i++) {
buildEventMeshMessage(content);
eventMeshGrpcProducer.publish(buildEventMeshMessage(content,
ExampleConstants.EVENTMESH_GRPC_ASYNC_TEST_TOPIC));
ThreadUtils.sleep(1, TimeUnit.SECONDS);
}
ThreadUtils.sleep(30, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public static void main(String[] args) throws Exception {

List<EventMeshMessage> messageList = new ArrayList<>();
for (int i = 0; i < 5; i++) {
messageList.add(buildEventMeshMessage(content));
messageList.add(buildEventMeshMessage(content,
ExampleConstants.EVENTMESH_GRPC_BROADCAT_TEST_TOPIC));
}

eventMeshGrpcProducer.publish(messageList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public static void main(String[] args) throws Exception {
content.put("content", "testRequestReplyMessage");

for (int i = 0; i < MESSAGE_SIZE; i++) {
eventMeshGrpcProducer.requestReply(buildEventMeshMessage(content),
eventMeshGrpcProducer.requestReply(buildEventMeshMessage(content,
ExampleConstants.EVENTMESH_GRPC_RR_TEST_TOPIC),
EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
ThreadUtils.sleep(1, TimeUnit.SECONDS);
}
Expand Down

0 comments on commit bae1307

Please sign in to comment.