diff --git a/docs/cn/instructions/eventmesh-runtime-protocol.md b/docs/cn/instructions/eventmesh-runtime-protocol.md index 378c7fc942..d0933d4278 100644 --- a/docs/cn/instructions/eventmesh-runtime-protocol.md +++ b/docs/cn/instructions/eventmesh-runtime-protocol.md @@ -290,7 +290,7 @@ message RequestHeader { string protocolDesc = 12; } -message EventMeshMessage { +message SimpleMessage { RequestHeader header = 1; string producerGroup = 2; string topic = 3; @@ -388,10 +388,10 @@ message Heartbeat { ``` service PublisherService { # 异步事件生产 - rpc publish(EventMeshMessage) returns (Response); + rpc publish(SimpleMessage) returns (Response); # 同步事件生产 - rpc requestReply(EventMeshMessage) returns (Response); + rpc requestReply(SimpleMessage) returns (Response); # 批量事件生产 rpc batchPublish(BatchMessage) returns (Response); @@ -406,7 +406,7 @@ service ConsumerService { rpc subscribe(Subscription) returns (Response); # 所消费事件通过 TCP stream推送事件 - rpc subscribeStream(Subscription) returns (stream EventMeshMessage); + rpc subscribeStream(Subscription) returns (stream SimpleMessage); rpc unsubscribe(Subscription) returns (Response); } diff --git a/docs/en/instructions/eventmesh-runtime-protocol.md b/docs/en/instructions/eventmesh-runtime-protocol.md index 9b705d3382..b584412b9d 100644 --- a/docs/en/instructions/eventmesh-runtime-protocol.md +++ b/docs/en/instructions/eventmesh-runtime-protocol.md @@ -293,7 +293,7 @@ message RequestHeader { string protocolDesc = 12; } -message EventMeshMessage { +message SimpleMessage { RequestHeader header = 1; string producerGroup = 2; string topic = 3; @@ -391,10 +391,10 @@ message Heartbeat { ``` service PublisherService { # Async event publish - rpc publish(EventMeshMessage) returns (Response); + rpc publish(SimpleMessage) returns (Response); # Sync event publish - rpc requestReply(EventMeshMessage) returns (Response); + rpc requestReply(SimpleMessage) returns (Response); # Batch event publish rpc batchPublish(BatchMessage) returns (Response); @@ -409,7 +409,7 @@ service ConsumerService { rpc subscribe(Subscription) returns (Response); # The subscribed event will be delivered through stream of Message - rpc subscribeStream(Subscription) returns (stream EventMeshMessage); + rpc subscribeStream(Subscription) returns (stream SimpleMessage); rpc unsubscribe(Subscription) returns (Response); } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/EventMeshMessageWrapper.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/SimpleMessageWrapper.java similarity index 70% rename from eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/EventMeshMessageWrapper.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/SimpleMessageWrapper.java index 07bd66f968..d16ac5d8f6 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/EventMeshMessageWrapper.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/SimpleMessageWrapper.java @@ -18,17 +18,17 @@ package org.apache.eventmesh.common.protocol.grpc.common; import org.apache.eventmesh.common.protocol.ProtocolTransportObject; -import org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage; +import org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage; -public class EventMeshMessageWrapper implements ProtocolTransportObject { +public class SimpleMessageWrapper implements ProtocolTransportObject { - private EventMeshMessage eventMeshMessage; + private SimpleMessage simpleMessage; - public EventMeshMessageWrapper(EventMeshMessage eventMeshMessage) { - this.eventMeshMessage = eventMeshMessage; + public SimpleMessageWrapper(SimpleMessage simpleMessage) { + this.simpleMessage = simpleMessage; } - public EventMeshMessage getMessage() { - return eventMeshMessage; + public SimpleMessage getMessage() { + return simpleMessage; } } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/protos/ConsumerServiceGrpc.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/protos/ConsumerServiceGrpc.java index e643a3baf2..859e602da3 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/protos/ConsumerServiceGrpc.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/protos/ConsumerServiceGrpc.java @@ -72,21 +72,21 @@ Response> getSubscribeMethod() { } private static volatile io.grpc.MethodDescriptor getSubscribeStreamMethod; + org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage> getSubscribeStreamMethod; @io.grpc.stub.annotations.RpcMethod( fullMethodName = SERVICE_NAME + '/' + "subscribeStream", requestType = Subscription.class, - responseType = EventMeshMessage.class, + responseType = org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage.class, methodType = io.grpc.MethodDescriptor.MethodType.SERVER_STREAMING) public static io.grpc.MethodDescriptor getSubscribeStreamMethod() { - io.grpc.MethodDescriptor getSubscribeStreamMethod; + org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage> getSubscribeStreamMethod() { + io.grpc.MethodDescriptor getSubscribeStreamMethod; if ((getSubscribeStreamMethod = ConsumerServiceGrpc.getSubscribeStreamMethod) == null) { synchronized (ConsumerServiceGrpc.class) { if ((getSubscribeStreamMethod = ConsumerServiceGrpc.getSubscribeStreamMethod) == null) { ConsumerServiceGrpc.getSubscribeStreamMethod = getSubscribeStreamMethod = - io.grpc.MethodDescriptor.newBuilder() + io.grpc.MethodDescriptor.newBuilder() .setType(io.grpc.MethodDescriptor.MethodType.SERVER_STREAMING) .setFullMethodName(generateFullMethodName( "eventmesh.common.protocol.grpc.ConsumerService", "subscribeStream")) @@ -94,7 +94,7 @@ EventMeshMessage> getSubscribeStreamMethod() { .setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller( Subscription.getDefaultInstance())) .setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller( - EventMeshMessage.getDefaultInstance())) + org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage.getDefaultInstance())) .setSchemaDescriptor(new ConsumerServiceMethodDescriptorSupplier("subscribeStream")) .build(); } @@ -172,7 +172,7 @@ public void subscribe(Subscription request, /** */ public void subscribeStream(Subscription request, - io.grpc.stub.StreamObserver responseObserver) { + io.grpc.stub.StreamObserver responseObserver) { asyncUnimplementedUnaryCall(getSubscribeStreamMethod(), responseObserver); } @@ -197,7 +197,7 @@ public void unsubscribe(Subscription request, asyncServerStreamingCall( new MethodHandlers< Subscription, - EventMeshMessage>( + org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage>( this, METHODID_SUBSCRIBE_STREAM))) .addMethod( getUnsubscribeMethod(), @@ -239,7 +239,7 @@ public void subscribe(Subscription request, /** */ public void subscribeStream(Subscription request, - io.grpc.stub.StreamObserver responseObserver) { + io.grpc.stub.StreamObserver responseObserver) { asyncServerStreamingCall( getChannel().newCall(getSubscribeStreamMethod(), getCallOptions()), request, responseObserver); } @@ -280,7 +280,7 @@ public Response subscribe(Subscription request) { /** */ - public java.util.Iterator subscribeStream( + public java.util.Iterator subscribeStream( Subscription request) { return blockingServerStreamingCall( getChannel(), getSubscribeStreamMethod(), getCallOptions(), request); @@ -356,7 +356,7 @@ public void invoke(Req request, io.grpc.stub.StreamObserver responseObserv break; case METHODID_SUBSCRIBE_STREAM: serviceImpl.subscribeStream((Subscription) request, - (io.grpc.stub.StreamObserver) responseObserver); + (io.grpc.stub.StreamObserver) responseObserver); break; case METHODID_UNSUBSCRIBE: serviceImpl.unsubscribe((Subscription) request, diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/protos/EventmeshGrpc.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/protos/EventmeshGrpc.java index 406be5d652..dee8e102cd 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/protos/EventmeshGrpc.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/protos/EventmeshGrpc.java @@ -37,15 +37,15 @@ public static void registerAllExtensions( com.google.protobuf.GeneratedMessageV3.FieldAccessorTable internal_static_eventmesh_common_protocol_grpc_RequestHeader_fieldAccessorTable; static final com.google.protobuf.Descriptors.Descriptor - internal_static_eventmesh_common_protocol_grpc_EventMeshMessage_descriptor; + internal_static_eventmesh_common_protocol_grpc_SimpleMessage_descriptor; static final com.google.protobuf.GeneratedMessageV3.FieldAccessorTable - internal_static_eventmesh_common_protocol_grpc_EventMeshMessage_fieldAccessorTable; + internal_static_eventmesh_common_protocol_grpc_SimpleMessage_fieldAccessorTable; static final com.google.protobuf.Descriptors.Descriptor - internal_static_eventmesh_common_protocol_grpc_EventMeshMessage_PropertiesEntry_descriptor; + internal_static_eventmesh_common_protocol_grpc_SimpleMessage_PropertiesEntry_descriptor; static final com.google.protobuf.GeneratedMessageV3.FieldAccessorTable - internal_static_eventmesh_common_protocol_grpc_EventMeshMessage_PropertiesEntry_fieldAccessorTable; + internal_static_eventmesh_common_protocol_grpc_SimpleMessage_PropertiesEntry_fieldAccessorTable; static final com.google.protobuf.Descriptors.Descriptor internal_static_eventmesh_common_protocol_grpc_BatchMessage_descriptor; static final @@ -101,72 +101,72 @@ public static void registerAllExtensions( "ip\030\004 \001(\t\022\013\n\003pid\030\005 \001(\t\022\013\n\003sys\030\006 \001(\t\022\020\n\010us" + "ername\030\007 \001(\t\022\020\n\010password\030\010 \001(\t\022\020\n\010langua" + "ge\030\t \001(\t\022\024\n\014protocolType\030\n \001(\t\022\027\n\017protoc" + - "olVersion\030\013 \001(\t\022\024\n\014protocolDesc\030\014 \001(\t\"\315\002" + - "\n\020EventMeshMessage\022=\n\006header\030\001 \001(\0132-.eve" + - "ntmesh.common.protocol.grpc.RequestHeade" + - "r\022\025\n\rproducerGroup\030\002 \001(\t\022\r\n\005topic\030\003 \001(\t\022" + - "\017\n\007content\030\004 \001(\t\022\013\n\003ttl\030\005 \001(\t\022\020\n\010uniqueI" + - "d\030\006 \001(\t\022\016\n\006seqNum\030\007 \001(\t\022\013\n\003tag\030\010 \001(\t\022T\n\n" + - "properties\030\t \003(\0132@.eventmesh.common.prot" + - "ocol.grpc.EventMeshMessage.PropertiesEnt" + - "ry\0321\n\017PropertiesEntry\022\013\n\003key\030\001 \001(\t\022\r\n\005va" + - "lue\030\002 \001(\t:\0028\001\"\260\003\n\014BatchMessage\022=\n\006header" + - "\030\001 \001(\0132-.eventmesh.common.protocol.grpc." + - "RequestHeader\022\025\n\rproducerGroup\030\002 \001(\t\022\r\n\005" + - "topic\030\003 \001(\t\022M\n\013messageItem\030\004 \003(\01328.event" + - "mesh.common.protocol.grpc.BatchMessage.M" + - "essageItem\032\353\001\n\013MessageItem\022\017\n\007content\030\001 " + - "\001(\t\022\013\n\003ttl\030\002 \001(\t\022\020\n\010uniqueId\030\003 \001(\t\022\016\n\006se" + - "qNum\030\004 \001(\t\022\013\n\003tag\030\005 \001(\t\022\\\n\nproperties\030\006 " + - "\003(\0132H.eventmesh.common.protocol.grpc.Bat" + - "chMessage.MessageItem.PropertiesEntry\0321\n" + - "\017PropertiesEntry\022\013\n\003key\030\001 \001(\t\022\r\n\005value\030\002" + - " \001(\t:\0028\001\"?\n\010Response\022\020\n\010respCode\030\001 \001(\t\022\017" + - "\n\007respMsg\030\002 \001(\t\022\020\n\010respTime\030\003 \001(\t\"\212\004\n\014Su" + - "bscription\022=\n\006header\030\001 \001(\0132-.eventmesh.c" + - "ommon.protocol.grpc.RequestHeader\022\025\n\rcon" + - "sumerGroup\030\002 \001(\t\022X\n\021subscriptionItems\030\003 " + - "\003(\0132=.eventmesh.common.protocol.grpc.Sub" + - "scription.SubscriptionItem\022\013\n\003url\030\004 \001(\t\032" + - "\274\002\n\020SubscriptionItem\022\r\n\005topic\030\001 \001(\t\022\\\n\004m" + - "ode\030\002 \001(\0162N.eventmesh.common.protocol.gr" + - "pc.Subscription.SubscriptionItem.Subscri" + - "ptionMode\022\\\n\004type\030\003 \001(\0162N.eventmesh.comm" + - "on.protocol.grpc.Subscription.Subscripti" + - "onItem.SubscriptionType\"4\n\020SubscriptionM" + - "ode\022\016\n\nCLUSTERING\020\000\022\020\n\014BROADCASTING\020\001\"\'\n" + - "\020SubscriptionType\022\t\n\005ASYNC\020\000\022\010\n\004SYNC\020\001\"\340" + - "\002\n\tHeartbeat\022=\n\006header\030\001 \001(\0132-.eventmesh" + - ".common.protocol.grpc.RequestHeader\022H\n\nc" + - "lientType\030\002 \001(\01624.eventmesh.common.proto" + - "col.grpc.Heartbeat.ClientType\022\025\n\rproduce" + - "rGroup\030\003 \001(\t\022\025\n\rconsumerGroup\030\004 \001(\t\022O\n\016h" + - "eartbeatItems\030\005 \003(\01327.eventmesh.common.p" + - "rotocol.grpc.Heartbeat.HeartbeatItem\032+\n\r" + - "HeartbeatItem\022\r\n\005topic\030\001 \001(\t\022\013\n\003url\030\002 \001(" + - "\t\"\036\n\nClientType\022\007\n\003PUB\020\000\022\007\n\003SUB\020\0012\315\002\n\020Pu" + - "blisherService\022e\n\007publish\0220.eventmesh.co" + - "mmon.protocol.grpc.EventMeshMessage\032(.ev" + - "entmesh.common.protocol.grpc.Response\022j\n" + - "\014requestReply\0220.eventmesh.common.protoco" + - "l.grpc.EventMeshMessage\032(.eventmesh.comm" + - "on.protocol.grpc.Response\022f\n\014batchPublis" + - "h\022,.eventmesh.common.protocol.grpc.Batch" + - "Message\032(.eventmesh.common.protocol.grpc" + - ".Response2\322\002\n\017ConsumerService\022c\n\tsubscri" + - "be\022,.eventmesh.common.protocol.grpc.Subs" + - "cription\032(.eventmesh.common.protocol.grp" + - "c.Response\022s\n\017subscribeStream\022,.eventmes" + - "h.common.protocol.grpc.Subscription\0320.ev" + - "entmesh.common.protocol.grpc.EventMeshMe" + - "ssage0\001\022e\n\013unsubscribe\022,.eventmesh.commo" + - "n.protocol.grpc.Subscription\032(.eventmesh" + - ".common.protocol.grpc.Response2t\n\020Heartb" + - "eatService\022`\n\theartbeat\022).eventmesh.comm" + - "on.protocol.grpc.Heartbeat\032(.eventmesh.c" + - "ommon.protocol.grpc.ResponseBC\n0org.apac" + - "he.eventmesh.common.protocol.grpc.protos" + - "B\rEventmeshGrpcP\001b\006proto3" + "olVersion\030\013 \001(\t\022\024\n\014protocolDesc\030\014 \001(\t\"\307\002" + + "\n\rSimpleMessage\022=\n\006header\030\001 \001(\0132-.eventm" + + "esh.common.protocol.grpc.RequestHeader\022\025" + + "\n\rproducerGroup\030\002 \001(\t\022\r\n\005topic\030\003 \001(\t\022\017\n\007" + + "content\030\004 \001(\t\022\013\n\003ttl\030\005 \001(\t\022\020\n\010uniqueId\030\006" + + " \001(\t\022\016\n\006seqNum\030\007 \001(\t\022\013\n\003tag\030\010 \001(\t\022Q\n\npro" + + "perties\030\t \003(\0132=.eventmesh.common.protoco" + + "l.grpc.SimpleMessage.PropertiesEntry\0321\n\017" + + "PropertiesEntry\022\013\n\003key\030\001 \001(\t\022\r\n\005value\030\002 " + + "\001(\t:\0028\001\"\260\003\n\014BatchMessage\022=\n\006header\030\001 \001(\013" + + "2-.eventmesh.common.protocol.grpc.Reques" + + "tHeader\022\025\n\rproducerGroup\030\002 \001(\t\022\r\n\005topic\030" + + "\003 \001(\t\022M\n\013messageItem\030\004 \003(\01328.eventmesh.c" + + "ommon.protocol.grpc.BatchMessage.Message" + + "Item\032\353\001\n\013MessageItem\022\017\n\007content\030\001 \001(\t\022\013\n" + + "\003ttl\030\002 \001(\t\022\020\n\010uniqueId\030\003 \001(\t\022\016\n\006seqNum\030\004" + + " \001(\t\022\013\n\003tag\030\005 \001(\t\022\\\n\nproperties\030\006 \003(\0132H." + + "eventmesh.common.protocol.grpc.BatchMess" + + "age.MessageItem.PropertiesEntry\0321\n\017Prope" + + "rtiesEntry\022\013\n\003key\030\001 \001(\t\022\r\n\005value\030\002 \001(\t:\002" + + "8\001\"?\n\010Response\022\020\n\010respCode\030\001 \001(\t\022\017\n\007resp" + + "Msg\030\002 \001(\t\022\020\n\010respTime\030\003 \001(\t\"\212\004\n\014Subscrip" + + "tion\022=\n\006header\030\001 \001(\0132-.eventmesh.common." + + "protocol.grpc.RequestHeader\022\025\n\rconsumerG" + + "roup\030\002 \001(\t\022X\n\021subscriptionItems\030\003 \003(\0132=." + + "eventmesh.common.protocol.grpc.Subscript" + + "ion.SubscriptionItem\022\013\n\003url\030\004 \001(\t\032\274\002\n\020Su" + + "bscriptionItem\022\r\n\005topic\030\001 \001(\t\022\\\n\004mode\030\002 " + + "\001(\0162N.eventmesh.common.protocol.grpc.Sub" + + "scription.SubscriptionItem.SubscriptionM" + + "ode\022\\\n\004type\030\003 \001(\0162N.eventmesh.common.pro" + + "tocol.grpc.Subscription.SubscriptionItem" + + ".SubscriptionType\"4\n\020SubscriptionMode\022\016\n" + + "\nCLUSTERING\020\000\022\020\n\014BROADCASTING\020\001\"\'\n\020Subsc" + + "riptionType\022\t\n\005ASYNC\020\000\022\010\n\004SYNC\020\001\"\340\002\n\tHea" + + "rtbeat\022=\n\006header\030\001 \001(\0132-.eventmesh.commo" + + "n.protocol.grpc.RequestHeader\022H\n\nclientT" + + "ype\030\002 \001(\01624.eventmesh.common.protocol.gr" + + "pc.Heartbeat.ClientType\022\025\n\rproducerGroup" + + "\030\003 \001(\t\022\025\n\rconsumerGroup\030\004 \001(\t\022O\n\016heartbe" + + "atItems\030\005 \003(\01327.eventmesh.common.protoco" + + "l.grpc.Heartbeat.HeartbeatItem\032+\n\rHeartb" + + "eatItem\022\r\n\005topic\030\001 \001(\t\022\013\n\003url\030\002 \001(\t\"\036\n\nC" + + "lientType\022\007\n\003PUB\020\000\022\007\n\003SUB\020\0012\307\002\n\020Publishe" + + "rService\022b\n\007publish\022-.eventmesh.common.p" + + "rotocol.grpc.SimpleMessage\032(.eventmesh.c" + + "ommon.protocol.grpc.Response\022g\n\014requestR" + + "eply\022-.eventmesh.common.protocol.grpc.Si" + + "mpleMessage\032(.eventmesh.common.protocol." + + "grpc.Response\022f\n\014batchPublish\022,.eventmes" + + "h.common.protocol.grpc.BatchMessage\032(.ev" + + "entmesh.common.protocol.grpc.Response2\317\002" + + "\n\017ConsumerService\022c\n\tsubscribe\022,.eventme" + + "sh.common.protocol.grpc.Subscription\032(.e" + + "ventmesh.common.protocol.grpc.Response\022p" + + "\n\017subscribeStream\022,.eventmesh.common.pro" + + "tocol.grpc.Subscription\032-.eventmesh.comm" + + "on.protocol.grpc.SimpleMessage0\001\022e\n\013unsu" + + "bscribe\022,.eventmesh.common.protocol.grpc" + + ".Subscription\032(.eventmesh.common.protoco" + + "l.grpc.Response2t\n\020HeartbeatService\022`\n\th" + + "eartbeat\022).eventmesh.common.protocol.grp" + + "c.Heartbeat\032(.eventmesh.common.protocol." + + "grpc.ResponseBC\n0org.apache.eventmesh.co" + + "mmon.protocol.grpc.protosB\rEventmeshGrpc" + + "P\001b\006proto3" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor. InternalDescriptorAssigner() { @@ -186,17 +186,17 @@ public com.google.protobuf.ExtensionRegistry assignDescriptors( com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( internal_static_eventmesh_common_protocol_grpc_RequestHeader_descriptor, new String[] { "Env", "Region", "Idc", "Ip", "Pid", "Sys", "Username", "Password", "Language", "ProtocolType", "ProtocolVersion", "ProtocolDesc", }); - internal_static_eventmesh_common_protocol_grpc_EventMeshMessage_descriptor = + internal_static_eventmesh_common_protocol_grpc_SimpleMessage_descriptor = getDescriptor().getMessageTypes().get(1); - internal_static_eventmesh_common_protocol_grpc_EventMeshMessage_fieldAccessorTable = new + internal_static_eventmesh_common_protocol_grpc_SimpleMessage_fieldAccessorTable = new com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( - internal_static_eventmesh_common_protocol_grpc_EventMeshMessage_descriptor, + internal_static_eventmesh_common_protocol_grpc_SimpleMessage_descriptor, new String[] { "Header", "ProducerGroup", "Topic", "Content", "Ttl", "UniqueId", "SeqNum", "Tag", "Properties", }); - internal_static_eventmesh_common_protocol_grpc_EventMeshMessage_PropertiesEntry_descriptor = - internal_static_eventmesh_common_protocol_grpc_EventMeshMessage_descriptor.getNestedTypes().get(0); - internal_static_eventmesh_common_protocol_grpc_EventMeshMessage_PropertiesEntry_fieldAccessorTable = new + internal_static_eventmesh_common_protocol_grpc_SimpleMessage_PropertiesEntry_descriptor = + internal_static_eventmesh_common_protocol_grpc_SimpleMessage_descriptor.getNestedTypes().get(0); + internal_static_eventmesh_common_protocol_grpc_SimpleMessage_PropertiesEntry_fieldAccessorTable = new com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( - internal_static_eventmesh_common_protocol_grpc_EventMeshMessage_PropertiesEntry_descriptor, + internal_static_eventmesh_common_protocol_grpc_SimpleMessage_PropertiesEntry_descriptor, new String[] { "Key", "Value", }); internal_static_eventmesh_common_protocol_grpc_BatchMessage_descriptor = getDescriptor().getMessageTypes().get(2); diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/protos/PublisherServiceGrpc.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/protos/PublisherServiceGrpc.java index 48dea0c797..ee9971cca5 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/protos/PublisherServiceGrpc.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/protos/PublisherServiceGrpc.java @@ -36,28 +36,28 @@ private PublisherServiceGrpc() {} public static final String SERVICE_NAME = "eventmesh.common.protocol.grpc.PublisherService"; // Static method descriptors that strictly reflect the proto. - private static volatile io.grpc.MethodDescriptor getPublishMethod; @io.grpc.stub.annotations.RpcMethod( fullMethodName = SERVICE_NAME + '/' + "publish", - requestType = EventMeshMessage.class, + requestType = org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage.class, responseType = Response.class, methodType = io.grpc.MethodDescriptor.MethodType.UNARY) - public static io.grpc.MethodDescriptor getPublishMethod() { - io.grpc.MethodDescriptor getPublishMethod; + io.grpc.MethodDescriptor getPublishMethod; if ((getPublishMethod = PublisherServiceGrpc.getPublishMethod) == null) { synchronized (PublisherServiceGrpc.class) { if ((getPublishMethod = PublisherServiceGrpc.getPublishMethod) == null) { PublisherServiceGrpc.getPublishMethod = getPublishMethod = - io.grpc.MethodDescriptor.newBuilder() + io.grpc.MethodDescriptor.newBuilder() .setType(io.grpc.MethodDescriptor.MethodType.UNARY) .setFullMethodName(generateFullMethodName( "eventmesh.common.protocol.grpc.PublisherService", "publish")) .setSampledToLocalTracing(true) .setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller( - EventMeshMessage.getDefaultInstance())) + org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage.getDefaultInstance())) .setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller( Response.getDefaultInstance())) .setSchemaDescriptor(new PublisherServiceMethodDescriptorSupplier("publish")) @@ -68,28 +68,28 @@ Response> getPublishMethod() { return getPublishMethod; } - private static volatile io.grpc.MethodDescriptor getRequestReplyMethod; @io.grpc.stub.annotations.RpcMethod( fullMethodName = SERVICE_NAME + '/' + "requestReply", - requestType = EventMeshMessage.class, + requestType = org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage.class, responseType = Response.class, methodType = io.grpc.MethodDescriptor.MethodType.UNARY) - public static io.grpc.MethodDescriptor getRequestReplyMethod() { - io.grpc.MethodDescriptor getRequestReplyMethod; + io.grpc.MethodDescriptor getRequestReplyMethod; if ((getRequestReplyMethod = PublisherServiceGrpc.getRequestReplyMethod) == null) { synchronized (PublisherServiceGrpc.class) { if ((getRequestReplyMethod = PublisherServiceGrpc.getRequestReplyMethod) == null) { PublisherServiceGrpc.getRequestReplyMethod = getRequestReplyMethod = - io.grpc.MethodDescriptor.newBuilder() + io.grpc.MethodDescriptor.newBuilder() .setType(io.grpc.MethodDescriptor.MethodType.UNARY) .setFullMethodName(generateFullMethodName( "eventmesh.common.protocol.grpc.PublisherService", "requestReply")) .setSampledToLocalTracing(true) .setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller( - EventMeshMessage.getDefaultInstance())) + org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage.getDefaultInstance())) .setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller( Response.getDefaultInstance())) .setSchemaDescriptor(new PublisherServiceMethodDescriptorSupplier("requestReply")) @@ -161,15 +161,15 @@ public static abstract class PublisherServiceImplBase implements io.grpc.Bindabl /** */ - public void publish(EventMeshMessage request, - io.grpc.stub.StreamObserver responseObserver) { + public void publish(org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage request, + io.grpc.stub.StreamObserver responseObserver) { asyncUnimplementedUnaryCall(getPublishMethod(), responseObserver); } /** */ - public void requestReply(EventMeshMessage request, - io.grpc.stub.StreamObserver responseObserver) { + public void requestReply(org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage request, + io.grpc.stub.StreamObserver responseObserver) { asyncUnimplementedUnaryCall(getRequestReplyMethod(), responseObserver); } @@ -186,14 +186,14 @@ public void batchPublish(BatchMessage request, getPublishMethod(), asyncUnaryCall( new MethodHandlers< - EventMeshMessage, + org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage, Response>( this, METHODID_PUBLISH))) .addMethod( getRequestReplyMethod(), asyncUnaryCall( new MethodHandlers< - EventMeshMessage, + org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage, Response>( this, METHODID_REQUEST_REPLY))) .addMethod( @@ -227,16 +227,16 @@ protected PublisherServiceStub build(io.grpc.Channel channel, /** */ - public void publish(EventMeshMessage request, - io.grpc.stub.StreamObserver responseObserver) { + public void publish(org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage request, + io.grpc.stub.StreamObserver responseObserver) { asyncUnaryCall( getChannel().newCall(getPublishMethod(), getCallOptions()), request, responseObserver); } /** */ - public void requestReply(EventMeshMessage request, - io.grpc.stub.StreamObserver responseObserver) { + public void requestReply(org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage request, + io.grpc.stub.StreamObserver responseObserver) { asyncUnaryCall( getChannel().newCall(getRequestReplyMethod(), getCallOptions()), request, responseObserver); } @@ -270,14 +270,14 @@ protected PublisherServiceBlockingStub build(io.grpc.Channel channel, /** */ - public Response publish(EventMeshMessage request) { + public Response publish(org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage request) { return blockingUnaryCall( getChannel(), getPublishMethod(), getCallOptions(), request); } /** */ - public Response requestReply(EventMeshMessage request) { + public Response requestReply(org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage request) { return blockingUnaryCall( getChannel(), getRequestReplyMethod(), getCallOptions(), request); } @@ -311,7 +311,7 @@ protected PublisherServiceFutureStub build(io.grpc.Channel channel, /** */ public com.google.common.util.concurrent.ListenableFuture publish( - EventMeshMessage request) { + org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage request) { return futureUnaryCall( getChannel().newCall(getPublishMethod(), getCallOptions()), request); } @@ -319,7 +319,7 @@ public com.google.common.util.concurrent.ListenableFuture publish( /** */ public com.google.common.util.concurrent.ListenableFuture requestReply( - EventMeshMessage request) { + org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage request) { return futureUnaryCall( getChannel().newCall(getRequestReplyMethod(), getCallOptions()), request); } @@ -355,11 +355,11 @@ private static final class MethodHandlers implements public void invoke(Req request, io.grpc.stub.StreamObserver responseObserver) { switch (methodId) { case METHODID_PUBLISH: - serviceImpl.publish((EventMeshMessage) request, + serviceImpl.publish((org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage) request, (io.grpc.stub.StreamObserver) responseObserver); break; case METHODID_REQUEST_REPLY: - serviceImpl.requestReply((EventMeshMessage) request, + serviceImpl.requestReply((org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage) request, (io.grpc.stub.StreamObserver) responseObserver); break; case METHODID_BATCH_PUBLISH: diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/protos/EventMeshMessage.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/protos/SimpleMessage.java similarity index 93% rename from eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/protos/EventMeshMessage.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/protos/SimpleMessage.java index 8adb1191df..a359ee4951 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/protos/EventMeshMessage.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/protos/SimpleMessage.java @@ -21,18 +21,18 @@ package org.apache.eventmesh.common.protocol.grpc.protos; /** - * Protobuf type {@code eventmesh.common.protocol.grpc.EventMeshMessage} + * Protobuf type {@code eventmesh.common.protocol.grpc.SimpleMessage} */ -public final class EventMeshMessage extends +public final class SimpleMessage extends com.google.protobuf.GeneratedMessageV3 implements - // @@protoc_insertion_point(message_implements:eventmesh.common.protocol.grpc.EventMeshMessage) - EventMeshMessageOrBuilder { + // @@protoc_insertion_point(message_implements:eventmesh.common.protocol.grpc.SimpleMessage) + SimpleMessageOrBuilder { private static final long serialVersionUID = 0L; - // Use EventMeshMessage.newBuilder() to construct. - private EventMeshMessage(com.google.protobuf.GeneratedMessageV3.Builder builder) { + // Use SimpleMessage.newBuilder() to construct. + private SimpleMessage(com.google.protobuf.GeneratedMessageV3.Builder builder) { super(builder); } - private EventMeshMessage() { + private SimpleMessage() { producerGroup_ = ""; topic_ = ""; content_ = ""; @@ -47,7 +47,7 @@ private EventMeshMessage() { getUnknownFields() { return this.unknownFields; } - private EventMeshMessage( + private SimpleMessage( com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { @@ -155,7 +155,7 @@ private EventMeshMessage( } public static final com.google.protobuf.Descriptors.Descriptor getDescriptor() { - return EventmeshGrpc.internal_static_eventmesh_common_protocol_grpc_EventMeshMessage_descriptor; + return EventmeshGrpc.internal_static_eventmesh_common_protocol_grpc_SimpleMessage_descriptor; } @SuppressWarnings({"rawtypes"}) @@ -171,9 +171,9 @@ protected com.google.protobuf.MapField internalGetMapField( } protected FieldAccessorTable internalGetFieldAccessorTable() { - return EventmeshGrpc.internal_static_eventmesh_common_protocol_grpc_EventMeshMessage_fieldAccessorTable + return EventmeshGrpc.internal_static_eventmesh_common_protocol_grpc_SimpleMessage_fieldAccessorTable .ensureFieldAccessorsInitialized( - EventMeshMessage.class, Builder.class); + SimpleMessage.class, Builder.class); } private int bitField0_; @@ -442,7 +442,7 @@ private static final class PropertiesDefaultEntryHolder { String, String> defaultEntry = com.google.protobuf.MapEntry .newDefaultInstance( - EventmeshGrpc.internal_static_eventmesh_common_protocol_grpc_EventMeshMessage_PropertiesEntry_descriptor, + EventmeshGrpc.internal_static_eventmesh_common_protocol_grpc_SimpleMessage_PropertiesEntry_descriptor, com.google.protobuf.WireFormat.FieldType.STRING, "", com.google.protobuf.WireFormat.FieldType.STRING, @@ -607,10 +607,10 @@ public boolean equals(final Object obj) { if (obj == this) { return true; } - if (!(obj instanceof EventMeshMessage)) { + if (!(obj instanceof SimpleMessage)) { return super.equals(obj); } - EventMeshMessage other = (EventMeshMessage) obj; + SimpleMessage other = (SimpleMessage) obj; boolean result = true; result = result && (hasHeader() == other.hasHeader()); @@ -672,69 +672,69 @@ public int hashCode() { return hash; } - public static EventMeshMessage parseFrom( + public static SimpleMessage parseFrom( java.nio.ByteBuffer data) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data); } - public static EventMeshMessage parseFrom( + public static SimpleMessage parseFrom( java.nio.ByteBuffer data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data, extensionRegistry); } - public static EventMeshMessage parseFrom( + public static SimpleMessage parseFrom( com.google.protobuf.ByteString data) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data); } - public static EventMeshMessage parseFrom( + public static SimpleMessage parseFrom( com.google.protobuf.ByteString data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data, extensionRegistry); } - public static EventMeshMessage parseFrom(byte[] data) + public static SimpleMessage parseFrom(byte[] data) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data); } - public static EventMeshMessage parseFrom( + public static SimpleMessage parseFrom( byte[] data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data, extensionRegistry); } - public static EventMeshMessage parseFrom(java.io.InputStream input) + public static SimpleMessage parseFrom(java.io.InputStream input) throws java.io.IOException { return com.google.protobuf.GeneratedMessageV3 .parseWithIOException(PARSER, input); } - public static EventMeshMessage parseFrom( + public static SimpleMessage parseFrom( java.io.InputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { return com.google.protobuf.GeneratedMessageV3 .parseWithIOException(PARSER, input, extensionRegistry); } - public static EventMeshMessage parseDelimitedFrom(java.io.InputStream input) + public static SimpleMessage parseDelimitedFrom(java.io.InputStream input) throws java.io.IOException { return com.google.protobuf.GeneratedMessageV3 .parseDelimitedWithIOException(PARSER, input); } - public static EventMeshMessage parseDelimitedFrom( + public static SimpleMessage parseDelimitedFrom( java.io.InputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { return com.google.protobuf.GeneratedMessageV3 .parseDelimitedWithIOException(PARSER, input, extensionRegistry); } - public static EventMeshMessage parseFrom( + public static SimpleMessage parseFrom( com.google.protobuf.CodedInputStream input) throws java.io.IOException { return com.google.protobuf.GeneratedMessageV3 .parseWithIOException(PARSER, input); } - public static EventMeshMessage parseFrom( + public static SimpleMessage parseFrom( com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { @@ -746,7 +746,7 @@ public static EventMeshMessage parseFrom( public static Builder newBuilder() { return DEFAULT_INSTANCE.toBuilder(); } - public static Builder newBuilder(EventMeshMessage prototype) { + public static Builder newBuilder(SimpleMessage prototype) { return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype); } public Builder toBuilder() { @@ -761,15 +761,15 @@ protected Builder newBuilderForType( return builder; } /** - * Protobuf type {@code eventmesh.common.protocol.grpc.EventMeshMessage} + * Protobuf type {@code eventmesh.common.protocol.grpc.SimpleMessage} */ public static final class Builder extends com.google.protobuf.GeneratedMessageV3.Builder implements - // @@protoc_insertion_point(builder_implements:eventmesh.common.protocol.grpc.EventMeshMessage) - EventMeshMessageOrBuilder { + // @@protoc_insertion_point(builder_implements:eventmesh.common.protocol.grpc.SimpleMessage) + SimpleMessageOrBuilder { public static final com.google.protobuf.Descriptors.Descriptor getDescriptor() { - return EventmeshGrpc.internal_static_eventmesh_common_protocol_grpc_EventMeshMessage_descriptor; + return EventmeshGrpc.internal_static_eventmesh_common_protocol_grpc_SimpleMessage_descriptor; } @SuppressWarnings({"rawtypes"}) @@ -796,12 +796,12 @@ protected com.google.protobuf.MapField internalGetMutableMapField( } protected FieldAccessorTable internalGetFieldAccessorTable() { - return EventmeshGrpc.internal_static_eventmesh_common_protocol_grpc_EventMeshMessage_fieldAccessorTable + return EventmeshGrpc.internal_static_eventmesh_common_protocol_grpc_SimpleMessage_fieldAccessorTable .ensureFieldAccessorsInitialized( - EventMeshMessage.class, Builder.class); + SimpleMessage.class, Builder.class); } - // Construct using org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage.newBuilder() + // Construct using org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage.newBuilder() private Builder() { maybeForceBuilderInitialization(); } @@ -844,23 +844,23 @@ public Builder clear() { public com.google.protobuf.Descriptors.Descriptor getDescriptorForType() { - return EventmeshGrpc.internal_static_eventmesh_common_protocol_grpc_EventMeshMessage_descriptor; + return EventmeshGrpc.internal_static_eventmesh_common_protocol_grpc_SimpleMessage_descriptor; } - public EventMeshMessage getDefaultInstanceForType() { - return EventMeshMessage.getDefaultInstance(); + public SimpleMessage getDefaultInstanceForType() { + return SimpleMessage.getDefaultInstance(); } - public EventMeshMessage build() { - EventMeshMessage result = buildPartial(); + public SimpleMessage build() { + SimpleMessage result = buildPartial(); if (!result.isInitialized()) { throw newUninitializedMessageException(result); } return result; } - public EventMeshMessage buildPartial() { - EventMeshMessage result = new EventMeshMessage(this); + public SimpleMessage buildPartial() { + SimpleMessage result = new SimpleMessage(this); int from_bitField0_ = bitField0_; int to_bitField0_ = 0; if (headerBuilder_ == null) { @@ -909,16 +909,16 @@ public Builder addRepeatedField( return (Builder) super.addRepeatedField(field, value); } public Builder mergeFrom(com.google.protobuf.Message other) { - if (other instanceof EventMeshMessage) { - return mergeFrom((EventMeshMessage)other); + if (other instanceof SimpleMessage) { + return mergeFrom((SimpleMessage)other); } else { super.mergeFrom(other); return this; } } - public Builder mergeFrom(EventMeshMessage other) { - if (other == EventMeshMessage.getDefaultInstance()) return this; + public Builder mergeFrom(SimpleMessage other) { + if (other == SimpleMessage.getDefaultInstance()) return this; if (other.hasHeader()) { mergeHeader(other.getHeader()); } @@ -965,11 +965,11 @@ public Builder mergeFrom( com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { - EventMeshMessage parsedMessage = null; + SimpleMessage parsedMessage = null; try { parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); } catch (com.google.protobuf.InvalidProtocolBufferException e) { - parsedMessage = (EventMeshMessage) e.getUnfinishedMessage(); + parsedMessage = (SimpleMessage) e.getUnfinishedMessage(); throw e.unwrapIOException(); } finally { if (parsedMessage != null) { @@ -1713,39 +1713,39 @@ public final Builder mergeUnknownFields( } - // @@protoc_insertion_point(builder_scope:eventmesh.common.protocol.grpc.EventMeshMessage) + // @@protoc_insertion_point(builder_scope:eventmesh.common.protocol.grpc.SimpleMessage) } - // @@protoc_insertion_point(class_scope:eventmesh.common.protocol.grpc.EventMeshMessage) - private static final EventMeshMessage DEFAULT_INSTANCE; + // @@protoc_insertion_point(class_scope:eventmesh.common.protocol.grpc.SimpleMessage) + private static final SimpleMessage DEFAULT_INSTANCE; static { - DEFAULT_INSTANCE = new EventMeshMessage(); + DEFAULT_INSTANCE = new SimpleMessage(); } - public static EventMeshMessage getDefaultInstance() { + public static SimpleMessage getDefaultInstance() { return DEFAULT_INSTANCE; } - private static final com.google.protobuf.Parser - PARSER = new com.google.protobuf.AbstractParser() { - public EventMeshMessage parsePartialFrom( + private static final com.google.protobuf.Parser + PARSER = new com.google.protobuf.AbstractParser() { + public SimpleMessage parsePartialFrom( com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { - return new EventMeshMessage(input, extensionRegistry); + return new SimpleMessage(input, extensionRegistry); } }; - public static com.google.protobuf.Parser parser() { + public static com.google.protobuf.Parser parser() { return PARSER; } @Override - public com.google.protobuf.Parser getParserForType() { + public com.google.protobuf.Parser getParserForType() { return PARSER; } - public EventMeshMessage getDefaultInstanceForType() { + public SimpleMessage getDefaultInstanceForType() { return DEFAULT_INSTANCE; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/protos/EventMeshMessageOrBuilder.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/protos/SimpleMessageOrBuilder.java similarity index 97% rename from eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/protos/EventMeshMessageOrBuilder.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/protos/SimpleMessageOrBuilder.java index 5439ad1fca..f73874c57d 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/protos/EventMeshMessageOrBuilder.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/protos/SimpleMessageOrBuilder.java @@ -20,8 +20,8 @@ package org.apache.eventmesh.common.protocol.grpc.protos; -public interface EventMeshMessageOrBuilder extends - // @@protoc_insertion_point(interface_extends:eventmesh.common.protocol.grpc.EventMeshMessage) +public interface SimpleMessageOrBuilder extends + // @@protoc_insertion_point(interface_extends:eventmesh.common.protocol.grpc.SimpleMessage) com.google.protobuf.MessageOrBuilder { /** diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsPublishInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsPublishInstance.java index ec0dd5ed01..a4b631a2b2 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsPublishInstance.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsPublishInstance.java @@ -24,9 +24,7 @@ import org.apache.eventmesh.client.grpc.config.EventMeshGrpcClientConfig; import org.apache.eventmesh.client.tcp.common.EventMeshCommon; import org.apache.eventmesh.common.Constants; -import org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage; import org.apache.eventmesh.common.utils.JsonUtils; -import org.apache.eventmesh.common.utils.RandomStringUtils; import org.apache.eventmesh.util.Utils; import java.net.URI; diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsRequestReplyInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsRequestReplyInstance.java index dd7203beaf..ed5c1c9cea 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsRequestReplyInstance.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsRequestReplyInstance.java @@ -24,9 +24,7 @@ import org.apache.eventmesh.client.grpc.config.EventMeshGrpcClientConfig; import org.apache.eventmesh.client.tcp.common.EventMeshCommon; import org.apache.eventmesh.common.Constants; -import org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage; import org.apache.eventmesh.common.utils.JsonUtils; -import org.apache.eventmesh.common.utils.RandomStringUtils; import org.apache.eventmesh.util.Utils; import java.net.URI; diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/AsyncPublishInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/AsyncPublishInstance.java index 552273b284..6c9351c3f5 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/AsyncPublishInstance.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/AsyncPublishInstance.java @@ -20,11 +20,9 @@ import lombok.extern.slf4j.Slf4j; import org.apache.eventmesh.client.grpc.EventMeshGrpcProducer; import org.apache.eventmesh.client.grpc.config.EventMeshGrpcClientConfig; -import org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage; -import org.apache.eventmesh.common.utils.IPUtils; +import org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage; import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.common.utils.RandomStringUtils; -import org.apache.eventmesh.common.utils.ThreadUtils; import org.apache.eventmesh.util.Utils; import java.util.HashMap; @@ -60,7 +58,7 @@ public static void main(String[] args) throws Exception { content.put("content", "testAsyncMessage"); for (int i = 0; i < messageSize; i++) { - EventMeshMessage message = EventMeshMessage.newBuilder() + SimpleMessage message = SimpleMessage.newBuilder() .setContent(JsonUtils.serialize(content)) .setTopic(topic) .setUniqueId(RandomStringUtils.generateNum(30)) diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/BatchPublishInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/BatchPublishInstance.java index d45105bc68..7a4d8b45ea 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/BatchPublishInstance.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/BatchPublishInstance.java @@ -21,7 +21,6 @@ import org.apache.eventmesh.client.grpc.EventMeshGrpcProducer; import org.apache.eventmesh.client.grpc.config.EventMeshGrpcClientConfig; import org.apache.eventmesh.common.protocol.grpc.protos.BatchMessage; -import org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage; import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.common.utils.RandomStringUtils; import org.apache.eventmesh.util.Utils; diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/RequestReplyInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/RequestReplyInstance.java index 8273ec2d2b..eb41370924 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/RequestReplyInstance.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/RequestReplyInstance.java @@ -20,7 +20,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.eventmesh.client.grpc.EventMeshGrpcProducer; import org.apache.eventmesh.client.grpc.config.EventMeshGrpcClientConfig; -import org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage; +import org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage; import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.common.utils.RandomStringUtils; import org.apache.eventmesh.util.Utils; @@ -58,7 +58,7 @@ public static void main(String[] args) throws Exception { content.put("content", "testRequestReplyMessage"); for (int i = 0; i < messageSize; i++) { - EventMeshMessage message = EventMeshMessage.newBuilder() + SimpleMessage message = SimpleMessage.newBuilder() .setContent(JsonUtils.serialize(content)) .setTopic(topic) .setUniqueId(RandomStringUtils.generateNum(30)) diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/EventmeshAsyncSubscribe.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/EventmeshAsyncSubscribe.java index 939969e02c..647ba65a7c 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/EventmeshAsyncSubscribe.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/EventmeshAsyncSubscribe.java @@ -5,7 +5,7 @@ import org.apache.eventmesh.client.grpc.ReceiveMsgHook; import org.apache.eventmesh.client.grpc.config.EventMeshGrpcClientConfig; import org.apache.eventmesh.client.tcp.common.EventMeshCommon; -import org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage; +import org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage; import org.apache.eventmesh.common.protocol.grpc.protos.Subscription; import org.apache.eventmesh.common.protocol.grpc.protos.Subscription.SubscriptionItem; import org.apache.eventmesh.util.Utils; @@ -14,7 +14,7 @@ import java.util.Properties; @Slf4j -public class EventmeshAsyncSubscribe implements ReceiveMsgHook { +public class EventmeshAsyncSubscribe implements ReceiveMsgHook { public static EventmeshAsyncSubscribe handler = new EventmeshAsyncSubscribe(); @@ -56,7 +56,7 @@ public static void main(String[] args) { } @Override - public Optional handle(EventMeshMessage msg) { + public Optional handle(SimpleMessage msg) { log.info("receive async msg====================={}", msg); return Optional.empty(); } diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java index 86babadc31..056814a173 100644 --- a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java +++ b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java @@ -18,11 +18,11 @@ package org.apache.eventmesh.protocol.cloudevents; import org.apache.eventmesh.common.Constants; -import org.apache.eventmesh.common.EventMeshMessage; import org.apache.eventmesh.common.protocol.ProtocolTransportObject; import org.apache.eventmesh.common.protocol.grpc.common.BatchMessageWrapper; -import org.apache.eventmesh.common.protocol.grpc.common.EventMeshMessageWrapper; +import org.apache.eventmesh.common.protocol.grpc.common.SimpleMessageWrapper; import org.apache.eventmesh.common.protocol.grpc.protos.BatchMessage; +import org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage; import org.apache.eventmesh.common.protocol.http.HttpCommand; import org.apache.eventmesh.common.protocol.http.body.Body; import org.apache.eventmesh.common.protocol.http.common.RequestCode; @@ -74,10 +74,9 @@ public CloudEvent toCloudEvent(ProtocolTransportObject cloudEvent) throws Protoc String requestCode = ((HttpCommand) cloudEvent).getRequestCode(); return deserializeHttpProtocol(requestCode, header, body); - } else if (cloudEvent instanceof EventMeshMessageWrapper) { - org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage eventMeshMessage - = ((EventMeshMessageWrapper) cloudEvent).getMessage(); - return GrpcMessageProtocolResolver.buildEvent(eventMeshMessage); + } else if (cloudEvent instanceof SimpleMessageWrapper) { + SimpleMessage simpleMessage = ((SimpleMessageWrapper) cloudEvent).getMessage(); + return GrpcMessageProtocolResolver.buildEvent(simpleMessage); } else { throw new ProtocolHandleException(String.format("protocol class: %s", cloudEvent.getClass())); } @@ -145,7 +144,7 @@ public Map toMap() { pkg.setBody(eventFormat.serialize(cloudEvent)); return pkg; } else if (StringUtils.equals("grpc", protocolDesc)){ - return GrpcMessageProtocolResolver.buildEventMeshMessage(cloudEvent); + return GrpcMessageProtocolResolver.buildSimpleMessage(cloudEvent); } else { throw new ProtocolHandleException(String.format("Unsupported protocolDesc: %s", protocolDesc)); } diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/grpc/GrpcMessageProtocolResolver.java b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/grpc/GrpcMessageProtocolResolver.java index 3a2d3def4b..b13c44f33f 100644 --- a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/grpc/GrpcMessageProtocolResolver.java +++ b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/grpc/GrpcMessageProtocolResolver.java @@ -5,11 +5,11 @@ import io.cloudevents.core.format.EventFormat; import io.cloudevents.core.provider.EventFormatProvider; import org.apache.commons.lang3.StringUtils; -import org.apache.eventmesh.common.protocol.grpc.common.EventMeshMessageWrapper; +import org.apache.eventmesh.common.protocol.grpc.common.SimpleMessageWrapper; import org.apache.eventmesh.common.protocol.grpc.common.ProtocolKey; import org.apache.eventmesh.common.protocol.grpc.protos.BatchMessage; -import org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage; import org.apache.eventmesh.common.protocol.grpc.protos.RequestHeader; +import org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -17,7 +17,7 @@ public class GrpcMessageProtocolResolver { - public static CloudEvent buildEvent(EventMeshMessage message) { + public static CloudEvent buildEvent(SimpleMessage message) { String cloudEventJson = message.getContent(); String contentType = message.getPropertiesOrDefault(ProtocolKey.CONTENT_TYPE, "application/cloudevents+json"); @@ -74,7 +74,7 @@ public static CloudEvent buildEvent(EventMeshMessage message) { .withExtension(ProtocolKey.TTL, ttl).build(); } - public static EventMeshMessageWrapper buildEventMeshMessage(CloudEvent cloudEvent) { + public static SimpleMessageWrapper buildSimpleMessage(CloudEvent cloudEvent) { String env = cloudEvent.getExtension(ProtocolKey.ENV) == null ? null : cloudEvent.getExtension(ProtocolKey.ENV).toString(); String idc = cloudEvent.getExtension(ProtocolKey.IDC) == null ? null : cloudEvent.getExtension(ProtocolKey.IDC).toString(); String ip = cloudEvent.getExtension(ProtocolKey.IP) == null ? null : cloudEvent.getExtension(ProtocolKey.IP).toString(); @@ -106,7 +106,7 @@ public static EventMeshMessageWrapper buildEventMeshMessage(CloudEvent cloudEven String contentType = cloudEvent.getDataContentType(); EventFormat eventFormat = EventFormatProvider.getInstance().resolveFormat(contentType); - EventMeshMessage.Builder messageBuilder = EventMeshMessage.newBuilder() + SimpleMessage.Builder messageBuilder = SimpleMessage.newBuilder() .setHeader(header) .setContent(new String( eventFormat.serialize(cloudEvent), StandardCharsets.UTF_8)) .setProducerGroup(producerGroup) @@ -120,9 +120,9 @@ public static EventMeshMessageWrapper buildEventMeshMessage(CloudEvent cloudEven messageBuilder.putProperties(key, cloudEvent.getExtension(key).toString()); } - EventMeshMessage eventMeshMessage = messageBuilder.build(); + SimpleMessage simpleMessage = messageBuilder.build(); - return new EventMeshMessageWrapper(eventMeshMessage); + return new SimpleMessageWrapper(simpleMessage); } public static List buildBatchEvents(BatchMessage batchMessage) { diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-grpc/src/main/proto/eventmesh-client.proto b/eventmesh-protocol-plugin/eventmesh-protocol-grpc/src/main/proto/eventmesh-client.proto index 911fc11cd7..c9703f9820 100644 --- a/eventmesh-protocol-plugin/eventmesh-protocol-grpc/src/main/proto/eventmesh-client.proto +++ b/eventmesh-protocol-plugin/eventmesh-protocol-grpc/src/main/proto/eventmesh-client.proto @@ -38,7 +38,7 @@ message RequestHeader { string protocolDesc = 12; } -message EventMeshMessage { +message SimpleMessage { RequestHeader header = 1; string producerGroup = 2; string topic = 3; @@ -117,9 +117,9 @@ message Heartbeat { } service PublisherService { - rpc publish(EventMeshMessage) returns (Response); + rpc publish(SimpleMessage) returns (Response); - rpc requestReply(EventMeshMessage) returns (Response); + rpc requestReply(SimpleMessage) returns (Response); rpc batchPublish(BatchMessage) returns (Response); } @@ -127,7 +127,7 @@ service PublisherService { service ConsumerService { rpc subscribe(Subscription) returns (Response); - rpc subscribeStream(Subscription) returns (stream EventMeshMessage); + rpc subscribeStream(Subscription) returns (stream SimpleMessage); rpc unsubscribe(Subscription) returns (Response); } diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/MeshMessageProtocolAdaptor.java b/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/MeshMessageProtocolAdaptor.java index 34498ae063..65a4ef9c7a 100644 --- a/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/MeshMessageProtocolAdaptor.java +++ b/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/MeshMessageProtocolAdaptor.java @@ -20,8 +20,9 @@ import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.protocol.ProtocolTransportObject; import org.apache.eventmesh.common.protocol.grpc.common.BatchMessageWrapper; -import org.apache.eventmesh.common.protocol.grpc.common.EventMeshMessageWrapper; +import org.apache.eventmesh.common.protocol.grpc.common.SimpleMessageWrapper; import org.apache.eventmesh.common.protocol.grpc.protos.BatchMessage; +import org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage; import org.apache.eventmesh.common.protocol.http.HttpCommand; import org.apache.eventmesh.common.protocol.http.body.Body; import org.apache.eventmesh.common.protocol.http.common.RequestCode; @@ -63,15 +64,15 @@ public CloudEvent toCloudEvent(ProtocolTransportObject protocol) throws Protocol String requestCode = ((HttpCommand) protocol).getRequestCode(); return deserializeHttpProtocol(requestCode, header, body); - } else if (protocol instanceof EventMeshMessageWrapper) { - org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage message = ((EventMeshMessageWrapper)protocol).getMessage(); + } else if (protocol instanceof SimpleMessageWrapper) { + SimpleMessage message = ((SimpleMessageWrapper)protocol).getMessage(); return deserializeGrpcProtocol(message); } else { throw new ProtocolHandleException(String.format("protocol class: %s", protocol.getClass())); } } - private CloudEvent deserializeGrpcProtocol(org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage message) + private CloudEvent deserializeGrpcProtocol(SimpleMessage message) throws ProtocolHandleException { return GrpcMessageProtocolResolver.buildEvent(message); } @@ -127,7 +128,7 @@ public Map toMap() { httpCommand.setBody(body); return httpCommand; } else if (StringUtils.equals("grpc", protocolDesc)) { - return GrpcMessageProtocolResolver.buildEventMeshMessage(cloudEvent); + return GrpcMessageProtocolResolver.buildSimpleMessage(cloudEvent); } else if (StringUtils.equals("tcp", protocolDesc)) { return TcpMessageProtocolResolver.buildEventMeshMessage(cloudEvent); } else { diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/resolver/grpc/GrpcMessageProtocolResolver.java b/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/resolver/grpc/GrpcMessageProtocolResolver.java index 03a1e69b54..3484599aa6 100644 --- a/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/resolver/grpc/GrpcMessageProtocolResolver.java +++ b/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/resolver/grpc/GrpcMessageProtocolResolver.java @@ -4,12 +4,12 @@ import io.cloudevents.SpecVersion; import io.cloudevents.core.builder.CloudEventBuilder; import org.apache.commons.lang3.StringUtils; -import org.apache.eventmesh.common.protocol.grpc.common.EventMeshMessageWrapper; +import org.apache.eventmesh.common.protocol.grpc.common.SimpleMessageWrapper; import org.apache.eventmesh.common.protocol.grpc.common.ProtocolKey; import org.apache.eventmesh.common.protocol.grpc.protos.BatchMessage; import org.apache.eventmesh.common.protocol.grpc.protos.BatchMessage.MessageItem; -import org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage; import org.apache.eventmesh.common.protocol.grpc.protos.RequestHeader; +import org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage; import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException; import java.net.URI; @@ -20,7 +20,7 @@ public class GrpcMessageProtocolResolver { - public static CloudEvent buildEvent(EventMeshMessage message) throws ProtocolHandleException { + public static CloudEvent buildEvent(SimpleMessage message) throws ProtocolHandleException { try { RequestHeader requestHeader = message.getHeader(); @@ -200,7 +200,7 @@ public static List buildBatchEvents(BatchMessage message) { return events; } - public static EventMeshMessageWrapper buildEventMeshMessage(CloudEvent cloudEvent) { + public static SimpleMessageWrapper buildSimpleMessage(CloudEvent cloudEvent) { String env = cloudEvent.getExtension(ProtocolKey.ENV) == null ? null : cloudEvent.getExtension(ProtocolKey.ENV).toString(); String idc = cloudEvent.getExtension(ProtocolKey.IDC) == null ? null : cloudEvent.getExtension(ProtocolKey.IDC).toString(); String ip = cloudEvent.getExtension(ProtocolKey.IP) == null ? null : cloudEvent.getExtension(ProtocolKey.IP).toString(); @@ -229,7 +229,7 @@ public static EventMeshMessageWrapper buildEventMeshMessage(CloudEvent cloudEven .setProtocolDesc(protocolDesc).setProtocolVersion(protocolVersion) .build(); - EventMeshMessage.Builder messageBuilder = EventMeshMessage.newBuilder() + SimpleMessage.Builder messageBuilder = SimpleMessage.newBuilder() .setHeader(header) .setContent(new String(cloudEvent.getData().toBytes(), StandardCharsets.UTF_8)) .setProducerGroup(producerGroup) @@ -242,8 +242,8 @@ public static EventMeshMessageWrapper buildEventMeshMessage(CloudEvent cloudEven messageBuilder.putProperties(key, cloudEvent.getExtension(key).toString()); } - EventMeshMessage eventMeshMessage = messageBuilder.build(); + SimpleMessage simpleMessage = messageBuilder.build(); - return new EventMeshMessageWrapper(eventMeshMessage); + return new SimpleMessageWrapper(simpleMessage); } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java index bb9b75958a..8b8223c997 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java @@ -2,7 +2,6 @@ import io.cloudevents.CloudEvent; import io.cloudevents.core.builder.CloudEventBuilder; -import io.grpc.stub.StreamObserver; import org.apache.eventmesh.api.AbstractContext; import org.apache.eventmesh.api.EventListener; import org.apache.eventmesh.api.EventMeshAction; @@ -11,7 +10,6 @@ import org.apache.eventmesh.api.SendResult; import org.apache.eventmesh.api.exception.OnExceptionContext; import org.apache.eventmesh.common.Constants; -import org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage; import org.apache.eventmesh.common.protocol.grpc.protos.Subscription; import org.apache.eventmesh.common.protocol.grpc.protos.Subscription.SubscriptionItem.SubscriptionMode; import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer; @@ -22,8 +20,6 @@ import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.ConsumerGroupClient; import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.ConsumerGroupTopicConfig; import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.GrpcType; -import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.StreamTopicConfig; -import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.WebhookTopicConfig; import org.apache.eventmesh.runtime.core.protocol.grpc.producer.EventMeshProducer; import org.apache.eventmesh.runtime.core.protocol.grpc.producer.SendMessageContext; import org.apache.eventmesh.runtime.core.protocol.grpc.push.HandleMsgContext; @@ -32,11 +28,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; public class EventMeshConsumer { diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/ConsumerGroupClient.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/ConsumerGroupClient.java index 74894d5619..95d256bc69 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/ConsumerGroupClient.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/ConsumerGroupClient.java @@ -20,7 +20,6 @@ package org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup; import lombok.Getter; -import org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage; import org.apache.eventmesh.common.protocol.grpc.protos.Subscription.SubscriptionItem.SubscriptionMode; import java.util.Date; import lombok.Builder; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/StreamTopicConfig.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/StreamTopicConfig.java index 0d4d8565ac..b95f4f682b 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/StreamTopicConfig.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/StreamTopicConfig.java @@ -1,6 +1,5 @@ package org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup; -import org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage; import org.apache.eventmesh.common.protocol.grpc.protos.Subscription.SubscriptionItem.SubscriptionMode; import org.apache.eventmesh.runtime.core.protocol.grpc.service.EventEmitter; import org.slf4j.Logger; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/WebhookTopicConfig.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/WebhookTopicConfig.java index 3e7b7c4993..77a70648fb 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/WebhookTopicConfig.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/WebhookTopicConfig.java @@ -1,11 +1,8 @@ package org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup; -import io.grpc.stub.StreamObserver; -import org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage; import org.apache.eventmesh.common.protocol.grpc.protos.Subscription.SubscriptionItem.SubscriptionMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import sun.awt.image.ImageWatched; import java.util.ArrayList; import java.util.HashSet; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/RequestReplyMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/RequestReplyMessageProcessor.java index 6272f79dde..0dc3991b52 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/RequestReplyMessageProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/RequestReplyMessageProcessor.java @@ -19,16 +19,13 @@ import io.cloudevents.CloudEvent; import org.apache.eventmesh.api.RequestReplyCallback; -import org.apache.eventmesh.api.SendCallback; -import org.apache.eventmesh.api.SendResult; import org.apache.eventmesh.api.exception.AclException; -import org.apache.eventmesh.api.exception.OnExceptionContext; import org.apache.eventmesh.common.protocol.ProtocolTransportObject; -import org.apache.eventmesh.common.protocol.grpc.common.EventMeshMessageWrapper; +import org.apache.eventmesh.common.protocol.grpc.common.SimpleMessageWrapper; import org.apache.eventmesh.common.protocol.grpc.common.StatusCode; -import org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage; import org.apache.eventmesh.common.protocol.grpc.protos.RequestHeader; import org.apache.eventmesh.common.protocol.grpc.protos.Response; +import org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage; import org.apache.eventmesh.common.protocol.http.common.RequestCode; import org.apache.eventmesh.protocol.api.ProtocolAdaptor; import org.apache.eventmesh.protocol.api.ProtocolPluginFactory; @@ -58,7 +55,7 @@ public RequestReplyMessageProcessor(EventMeshGrpcServer eventMeshGrpcServer) { this.eventMeshGrpcServer = eventMeshGrpcServer; } - public void process(EventMeshMessage message, EventEmitter emitter) throws Exception { + public void process(SimpleMessage message, EventEmitter emitter) throws Exception { RequestHeader requestHeader = message.getHeader(); if (!ServiceUtils.validateHeader(requestHeader)) { @@ -95,7 +92,7 @@ public void process(EventMeshMessage message, EventEmitter emitter) th String protocolType = requestHeader.getProtocolType(); ProtocolAdaptor grpcCommandProtocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType); - CloudEvent cloudEvent = grpcCommandProtocolAdaptor.toCloudEvent(new EventMeshMessageWrapper(message)); + CloudEvent cloudEvent = grpcCommandProtocolAdaptor.toCloudEvent(new SimpleMessageWrapper(message)); ProducerManager producerManager = eventMeshGrpcServer.getProducerManager(); EventMeshProducer eventMeshProducer = producerManager.getEventMeshProducer(producerGroup); @@ -123,7 +120,7 @@ public void onException(Throwable e) { }, ttl); } - private void doAclCheck(EventMeshMessage message) throws AclException { + private void doAclCheck(SimpleMessage message) throws AclException { RequestHeader requestHeader = message.getHeader(); if (eventMeshGrpcServer.getEventMeshGrpcConfiguration().eventMeshServerSecurityEnable) { String remoteAdd = requestHeader.getIp(); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/SendAsyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/SendAsyncMessageProcessor.java index 7d49c57949..93456f62c5 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/SendAsyncMessageProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/SendAsyncMessageProcessor.java @@ -23,16 +23,12 @@ import org.apache.eventmesh.api.exception.AclException; import org.apache.eventmesh.api.exception.OnExceptionContext; import org.apache.eventmesh.common.protocol.ProtocolTransportObject; -import org.apache.eventmesh.common.protocol.grpc.common.EventMeshMessageWrapper; +import org.apache.eventmesh.common.protocol.grpc.common.SimpleMessageWrapper; import org.apache.eventmesh.common.protocol.grpc.common.StatusCode; -import org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage; import org.apache.eventmesh.common.protocol.grpc.protos.RequestHeader; import org.apache.eventmesh.common.protocol.grpc.protos.Response; -import org.apache.eventmesh.common.protocol.http.body.message.SendMessageResponseBody; -import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode; -import org.apache.eventmesh.common.protocol.http.common.ProtocolKey; +import org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage; import org.apache.eventmesh.common.protocol.http.common.RequestCode; -import org.apache.eventmesh.common.utils.IPUtils; import org.apache.eventmesh.protocol.api.ProtocolAdaptor; import org.apache.eventmesh.protocol.api.ProtocolPluginFactory; import org.apache.eventmesh.runtime.acl.Acl; @@ -45,7 +41,6 @@ import org.apache.eventmesh.runtime.core.protocol.grpc.service.ServiceUtils; import org.apache.eventmesh.runtime.util.EventMeshUtil; -import org.apache.eventmesh.runtime.util.RemotingHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,7 +58,7 @@ public SendAsyncMessageProcessor(EventMeshGrpcServer eventMeshGrpcServer) { this.eventMeshGrpcServer = eventMeshGrpcServer; } - public void process(EventMeshMessage message, EventEmitter emitter) throws Exception { + public void process(SimpleMessage message, EventEmitter emitter) throws Exception { RequestHeader requestHeader = message.getHeader(); if (!ServiceUtils.validateHeader(requestHeader)) { @@ -99,7 +94,7 @@ public void process(EventMeshMessage message, EventEmitter emitter) th String protocolType = requestHeader.getProtocolType(); ProtocolAdaptor grpcCommandProtocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType); - CloudEvent cloudEvent = grpcCommandProtocolAdaptor.toCloudEvent(new EventMeshMessageWrapper(message)); + CloudEvent cloudEvent = grpcCommandProtocolAdaptor.toCloudEvent(new SimpleMessageWrapper(message)); ProducerManager producerManager = eventMeshGrpcServer.getProducerManager(); EventMeshProducer eventMeshProducer = producerManager.getEventMeshProducer(producerGroup); @@ -127,7 +122,7 @@ public void onException(OnExceptionContext context) { }); } - private void doAclCheck(EventMeshMessage message) throws AclException { + private void doAclCheck(SimpleMessage message) throws AclException { RequestHeader requestHeader = message.getHeader(); if (eventMeshGrpcServer.getEventMeshGrpcConfiguration().eventMeshServerSecurityEnable) { String remoteAdd = requestHeader.getIp(); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/SubscribeStreamProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/SubscribeStreamProcessor.java index 35c855d336..aebffb5599 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/SubscribeStreamProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/SubscribeStreamProcessor.java @@ -2,7 +2,6 @@ import org.apache.eventmesh.api.exception.AclException; import org.apache.eventmesh.common.protocol.grpc.common.StatusCode; -import org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage; import org.apache.eventmesh.common.protocol.grpc.protos.RequestHeader; import org.apache.eventmesh.common.protocol.grpc.protos.Subscription; import org.apache.eventmesh.common.protocol.http.common.RequestCode; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/AbstractPushRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/AbstractPushRequest.java index d280280fb5..50ac802ee1 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/AbstractPushRequest.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/AbstractPushRequest.java @@ -19,15 +19,11 @@ import com.google.common.collect.Sets; import io.cloudevents.CloudEvent; -import io.cloudevents.core.builder.CloudEventBuilder; -import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.api.AbstractContext; import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.protocol.ProtocolTransportObject; -import org.apache.eventmesh.common.protocol.grpc.common.EventMeshMessageWrapper; -import org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage; +import org.apache.eventmesh.common.protocol.grpc.common.SimpleMessageWrapper; import org.apache.eventmesh.common.protocol.grpc.protos.Subscription.SubscriptionItem.SubscriptionMode; -import org.apache.eventmesh.common.utils.RandomStringUtils; import org.apache.eventmesh.protocol.api.ProtocolAdaptor; import org.apache.eventmesh.protocol.api.ProtocolPluginFactory; import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer; @@ -84,7 +80,7 @@ private EventMeshMessage getEventMeshMessage(CloudEvent cloudEvent) { String protocolType = Objects.requireNonNull(cloudEvent.getExtension(Constants.PROTOCOL_TYPE)).toString(); ProtocolAdaptor protocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType); ProtocolTransportObject protocolTransportObject = protocolAdaptor.fromCloudEvent(cloudEvent); - return ((EventMeshMessageWrapper) protocolTransportObject).getMessage(); + return ((SimpleMessageWrapper) protocolTransportObject).getMessage(); } catch (Exception e) { logger.error("Error in getting EventMeshMessage from CloudEvent", e); return null; @@ -95,7 +91,7 @@ private CloudEvent getCloudEvent(EventMeshMessage eventMeshMessage) { try { String protocolType = Objects.requireNonNull(eventMeshMessage.getHeader().getProtocolType()); ProtocolAdaptor protocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType); - return protocolAdaptor.toCloudEvent(new EventMeshMessageWrapper(eventMeshMessage)); + return protocolAdaptor.toCloudEvent(new SimpleMessageWrapper(eventMeshMessage)); } catch (Exception e) { logger.error("Error in getting CloudEvent from EventMeshMessage", e); return null; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/StreamPushRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/StreamPushRequest.java index c5e67f4808..82a0e492ee 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/StreamPushRequest.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/StreamPushRequest.java @@ -3,7 +3,6 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.RandomUtils; -import org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage; import org.apache.eventmesh.runtime.constants.EventMeshConstants; import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.StreamTopicConfig; import org.apache.eventmesh.runtime.core.protocol.grpc.service.EventEmitter; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/WebhookPushRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/WebhookPushRequest.java index 8dd613b8c6..25e9993c4a 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/WebhookPushRequest.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/WebhookPushRequest.java @@ -24,7 +24,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.eventmesh.common.Constants; -import org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage; import org.apache.eventmesh.common.protocol.grpc.protos.RequestHeader; import org.apache.eventmesh.common.protocol.http.body.message.PushMessageRequestBody; import org.apache.eventmesh.common.protocol.http.common.ClientRetCode; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/ConsumerService.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/ConsumerService.java index fe987aabcc..2841066d1d 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/ConsumerService.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/ConsumerService.java @@ -3,19 +3,15 @@ import io.grpc.stub.StreamObserver; import org.apache.eventmesh.common.protocol.grpc.common.StatusCode; import org.apache.eventmesh.common.protocol.grpc.protos.ConsumerServiceGrpc; -import org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage; import org.apache.eventmesh.common.protocol.grpc.protos.RequestHeader; import org.apache.eventmesh.common.protocol.grpc.protos.Response; import org.apache.eventmesh.common.protocol.grpc.protos.Subscription; -import org.apache.eventmesh.common.protocol.http.common.RequestCode; -import org.apache.eventmesh.common.utils.IPUtils; import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer; import org.apache.eventmesh.runtime.constants.EventMeshConstants; import org.apache.eventmesh.runtime.core.protocol.grpc.processor.SubscribeProcessor; import org.apache.eventmesh.runtime.core.protocol.grpc.processor.SubscribeStreamProcessor; import org.apache.eventmesh.runtime.core.protocol.grpc.processor.UnsubscribeProcessor; -import org.apache.eventmesh.runtime.util.RemotingHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/ProducerService.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/ProducerService.java index 6e71a6274b..510b6f8d26 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/ProducerService.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/ProducerService.java @@ -3,9 +3,9 @@ import io.grpc.stub.StreamObserver; import org.apache.eventmesh.common.protocol.grpc.common.StatusCode; import org.apache.eventmesh.common.protocol.grpc.protos.BatchMessage; -import org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage; import org.apache.eventmesh.common.protocol.grpc.protos.PublisherServiceGrpc; import org.apache.eventmesh.common.protocol.grpc.protos.Response; +import org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage; import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer; import org.apache.eventmesh.runtime.constants.EventMeshConstants; import org.apache.eventmesh.runtime.core.protocol.grpc.processor.BatchPublishMessageProcessor; @@ -32,7 +32,7 @@ public ProducerService(EventMeshGrpcServer eventMeshGrpcServer, this.threadPoolExecutor = threadPoolExecutor; } - public void publish(EventMeshMessage request, StreamObserver responseObserver) { + public void publish(SimpleMessage request, StreamObserver responseObserver) { cmdLogger.info("cmd={}|{}|client2eventMesh|from={}|to={}", "AsyncPublish", EventMeshConstants.PROTOCOL_GRPC, request.getHeader().getIp(), eventMeshGrpcServer.getEventMeshGrpcConfiguration().eventMeshIp); @@ -50,7 +50,7 @@ public void publish(EventMeshMessage request, StreamObserver responseO }); } - public void requestReply(EventMeshMessage request, StreamObserver responseObserver) { + public void requestReply(SimpleMessage request, StreamObserver responseObserver) { cmdLogger.info("cmd={}|{}|client2eventMesh|from={}|to={}", "RequestReply", EventMeshConstants.PROTOCOL_GRPC, request.getHeader().getIp(), eventMeshGrpcServer.getEventMeshGrpcConfiguration().eventMeshIp); @@ -86,6 +86,4 @@ public void batchPublish(BatchMessage request, StreamObserver response }); } - public void broadcast(EventMeshMessage request, StreamObserver responseObserver) { - } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/ServiceUtils.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/ServiceUtils.java index ff4b2d0382..458b3c0e30 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/ServiceUtils.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/ServiceUtils.java @@ -1,15 +1,14 @@ package org.apache.eventmesh.runtime.core.protocol.grpc.service; -import io.grpc.stub.StreamObserver; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.common.protocol.grpc.common.StatusCode; import org.apache.eventmesh.common.protocol.grpc.protos.BatchMessage; -import org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage; import org.apache.eventmesh.common.protocol.grpc.protos.Heartbeat; import org.apache.eventmesh.common.protocol.grpc.protos.Heartbeat.ClientType; import org.apache.eventmesh.common.protocol.grpc.protos.RequestHeader; import org.apache.eventmesh.common.protocol.grpc.protos.Response; +import org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage; import org.apache.eventmesh.common.protocol.grpc.protos.Subscription; import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.GrpcType; @@ -27,7 +26,7 @@ public static boolean validateHeader(RequestHeader header) { && StringUtils.isNotEmpty(header.getLanguage()); } - public static boolean validateMessage(EventMeshMessage message) { + public static boolean validateMessage(SimpleMessage message) { return StringUtils.isNotEmpty(message.getUniqueId()) && StringUtils.isNotEmpty(message.getProducerGroup()) && StringUtils.isNotEmpty(message.getTopic()) diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/EventMeshGrpcConsumer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/EventMeshGrpcConsumer.java index 367cd23c2e..7b45bcae75 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/EventMeshGrpcConsumer.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/EventMeshGrpcConsumer.java @@ -1,7 +1,6 @@ package org.apache.eventmesh.client.grpc; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import io.cloudevents.CloudEvent; import io.cloudevents.core.provider.EventFormatProvider; import io.cloudevents.jackson.JsonFormat; import io.grpc.ManagedChannel; @@ -12,18 +11,17 @@ import org.apache.eventmesh.client.tcp.common.EventMeshCommon; import org.apache.eventmesh.common.protocol.grpc.protos.ConsumerServiceGrpc; import org.apache.eventmesh.common.protocol.grpc.protos.ConsumerServiceGrpc.ConsumerServiceBlockingStub; -import org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage; import org.apache.eventmesh.common.protocol.grpc.protos.Heartbeat; import org.apache.eventmesh.common.protocol.grpc.protos.HeartbeatServiceGrpc; import org.apache.eventmesh.common.protocol.grpc.protos.HeartbeatServiceGrpc.HeartbeatServiceBlockingStub; import org.apache.eventmesh.common.protocol.grpc.protos.RequestHeader; import org.apache.eventmesh.common.protocol.grpc.protos.Response; +import org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage; import org.apache.eventmesh.common.protocol.grpc.protos.Subscription; import org.apache.eventmesh.common.protocol.http.common.ProtocolKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.awt.Event; import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.LinkedList; @@ -96,7 +94,7 @@ public void subscribeStream(Subscription subscription) { .setHeader(EventMeshClientUtil.buildHeader(clientConfig, EventMeshCommon.EM_MESSAGE_PROTOCOL_NAME)) .setConsumerGroup(clientConfig.getConsumerGroup()) .build(); - Iterator msgIterator; + Iterator msgIterator; try { msgIterator = consumerClient.subscribeStream(enhancedSubscription); } catch (Exception e) { @@ -191,13 +189,13 @@ public void close() { } static class ListenerThread extends Thread { - private final Iterator msgIterator; + private final Iterator msgIterator; private final ReceiveMsgHook listener; private String protocolType; - ListenerThread(Iterator msgIterator, ReceiveMsgHook listener, String protocolType) { + ListenerThread(Iterator msgIterator, ReceiveMsgHook listener, String protocolType) { this.msgIterator = msgIterator; this.listener = listener; this.protocolType = protocolType; @@ -209,8 +207,8 @@ public void run() { while (msgIterator.hasNext()) { logger.info("sdk received message "); - EventMeshMessage eventMeshMessage = msgIterator.next(); - T msg = buildMessage(eventMeshMessage); + SimpleMessage simpleMessage = msgIterator.next(); + T msg = buildMessage(simpleMessage); if (msg != null) { listener.handle(msg); } @@ -220,14 +218,14 @@ public void run() { } } - private T buildMessage(EventMeshMessage eventMeshMessage) { + private T buildMessage(SimpleMessage simpleMessage) { try { if (EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME.equals(protocolType)) { - String contentType = eventMeshMessage.getPropertiesOrDefault(ProtocolKey.CONTENT_TYPE, JsonFormat.CONTENT_TYPE); + String contentType = simpleMessage.getPropertiesOrDefault(ProtocolKey.CONTENT_TYPE, JsonFormat.CONTENT_TYPE); return (T) EventFormatProvider.getInstance().resolveFormat(contentType) - .deserialize(eventMeshMessage.getContent().getBytes(StandardCharsets.UTF_8)); + .deserialize(simpleMessage.getContent().getBytes(StandardCharsets.UTF_8)); } - return (T) eventMeshMessage; + return (T) simpleMessage; } catch (Throwable t) { logger.warn("Error in building message. {}", t.getMessage()); return null; diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/EventMeshGrpcProducer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/EventMeshGrpcProducer.java index a521fb890c..e71bea2eef 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/EventMeshGrpcProducer.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/EventMeshGrpcProducer.java @@ -7,10 +7,10 @@ import org.apache.eventmesh.client.grpc.producer.CloudEventProducer; import org.apache.eventmesh.client.grpc.util.EventMeshClientUtil; import org.apache.eventmesh.common.protocol.grpc.protos.BatchMessage; -import org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage; import org.apache.eventmesh.common.protocol.grpc.protos.PublisherServiceGrpc; import org.apache.eventmesh.common.protocol.grpc.protos.PublisherServiceGrpc.PublisherServiceBlockingStub; import org.apache.eventmesh.common.protocol.grpc.protos.Response; +import org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,10 +54,10 @@ public Response requestReply(CloudEvent cloudEvent, int timeout) { return cloudEventProducer.requestReply(cloudEvent, timeout); } - public Response publish(EventMeshMessage message) { + public Response publish(SimpleMessage message) { logger.info("Publish message " + message.toString()); - EventMeshMessage enhancedMessage = EventMeshMessage.newBuilder(message) + SimpleMessage enhancedMessage = SimpleMessage.newBuilder(message) .setHeader(EventMeshClientUtil.buildHeader(clientConfig, PROTOCOL_TYPE)) .setProducerGroup(clientConfig.getProducerGroup()) .build(); @@ -71,10 +71,10 @@ public Response publish(EventMeshMessage message) { } } - public Response requestReply(EventMeshMessage message, int timeout) { + public Response requestReply(SimpleMessage message, int timeout) { logger.info("RequestReply message " + message.toString()); - EventMeshMessage enhancedMessage = EventMeshMessage.newBuilder(message) + SimpleMessage enhancedMessage = SimpleMessage.newBuilder(message) .setHeader(EventMeshClientUtil.buildHeader(clientConfig, PROTOCOL_TYPE)) .setProducerGroup(clientConfig.getProducerGroup()) .setTtl(String.valueOf(timeout)) diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/producer/CloudEventProducer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/producer/CloudEventProducer.java index 3941d94c9f..5733a58f54 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/producer/CloudEventProducer.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/producer/CloudEventProducer.java @@ -10,10 +10,10 @@ import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.protocol.grpc.protos.BatchMessage; import org.apache.eventmesh.common.protocol.grpc.protos.BatchMessage.MessageItem; -import org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage; import org.apache.eventmesh.common.protocol.grpc.protos.PublisherServiceGrpc.PublisherServiceBlockingStub; import org.apache.eventmesh.common.protocol.grpc.protos.Response; import org.apache.eventmesh.common.protocol.grpc.common.ProtocolKey; +import org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage; import org.apache.eventmesh.common.utils.IPUtils; import org.apache.eventmesh.common.utils.RandomStringUtils; import org.apache.eventmesh.common.utils.ThreadUtils; @@ -42,7 +42,7 @@ public Response publish(CloudEvent cloudEvent) { logger.info("Publish message " + cloudEvent.toString()); CloudEvent enhanceEvent = enhanceCloudEvent(cloudEvent, null); - EventMeshMessage enhancedMessage = buildEventMeshMessage(enhanceEvent); + SimpleMessage enhancedMessage = buildSimpleMessage(enhanceEvent); try { Response response = publisherClient.publish(enhancedMessage); @@ -58,7 +58,7 @@ public Response requestReply(CloudEvent cloudEvent, int timeout) { logger.info("RequestReply message " + cloudEvent.toString()); CloudEvent enhanceEvent = enhanceCloudEvent(cloudEvent, String.valueOf(timeout)); - EventMeshMessage enhancedMessage = buildEventMeshMessage(enhanceEvent); + SimpleMessage enhancedMessage = buildSimpleMessage(enhanceEvent); try { Response response = publisherClient.requestReply(enhancedMessage); logger.info("Received response " + response.toString()); @@ -113,7 +113,7 @@ private CloudEvent enhanceCloudEvent(final CloudEvent cloudEvent, String timeout return builder.build(); } - private EventMeshMessage buildEventMeshMessage(CloudEvent cloudEvent) { + private SimpleMessage buildSimpleMessage(CloudEvent cloudEvent) { String contentType = StringUtils.isEmpty(cloudEvent.getDataContentType()) ? "application/cloudevents+json" : cloudEvent.getDataContentType(); byte[] bodyByte = EventFormatProvider.getInstance().resolveFormat(contentType) @@ -122,7 +122,7 @@ private EventMeshMessage buildEventMeshMessage(CloudEvent cloudEvent) { String ttl = cloudEvent.getExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL) == null ? "4000" : cloudEvent.getExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL).toString(); - return EventMeshMessage.newBuilder() + return SimpleMessage.newBuilder() .setHeader(EventMeshClientUtil.buildHeader(clientConfig, PROTOCOL_TYPE)) .setProducerGroup(clientConfig.getProducerGroup()) .setTopic(cloudEvent.getSubject())