diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/ConsumerGroupClient.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/ConsumerGroupClient.java index 95d256bc69..3c5d644bad 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/ConsumerGroupClient.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/ConsumerGroupClient.java @@ -20,6 +20,7 @@ package org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup; import lombok.Getter; +import org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage; import org.apache.eventmesh.common.protocol.grpc.protos.Subscription.SubscriptionItem.SubscriptionMode; import java.util.Date; import lombok.Builder; @@ -41,7 +42,7 @@ public class ConsumerGroupClient { private String url; - private EventEmitter eventEmitter; + private EventEmitter eventEmitter; private final SubscriptionMode subscriptionMode; @@ -60,7 +61,7 @@ public class ConsumerGroupClient { public void setUrl(String url) { this.url = url; } - public void setEventEmitter(EventEmitter emitter) { + public void setEventEmitter(EventEmitter emitter) { this.eventEmitter = emitter; } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/StreamTopicConfig.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/StreamTopicConfig.java index b95f4f682b..a80a093a80 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/StreamTopicConfig.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/StreamTopicConfig.java @@ -1,5 +1,6 @@ package org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup; +import org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage; import org.apache.eventmesh.common.protocol.grpc.protos.Subscription.SubscriptionItem.SubscriptionMode; import org.apache.eventmesh.runtime.core.protocol.grpc.service.EventEmitter; import org.slf4j.Logger; @@ -18,15 +19,15 @@ public class StreamTopicConfig extends ConsumerGroupTopicConfig { * Key: IDC * Value: list of emitters with Client_IP:port */ - private final Map>> idcEmitterMap = new ConcurrentHashMap<>(); + private final Map>> idcEmitterMap = new ConcurrentHashMap<>(); /** * Key: IDC * Value: list of emitters */ - private Map>> idcEmitters = new ConcurrentHashMap<>(); + private Map>> idcEmitters = new ConcurrentHashMap<>(); - private List> totalEmitters = new LinkedList<>(); + private List> totalEmitters = new LinkedList<>(); public StreamTopicConfig(String consumerGroup, String topic, SubscriptionMode subscriptionMode) { super(consumerGroup, topic, subscriptionMode, GrpcType.STREAM); @@ -42,8 +43,8 @@ public synchronized void registerClient(ConsumerGroupClient client) { String idc = client.getIdc(); String clientIp = client.getIp(); String clientPid = client.getPid(); - EventEmitter emitter = client.getEventEmitter(); - Map> emitters = idcEmitterMap.computeIfAbsent(idc, k -> new HashMap<>()); + EventEmitter emitter = client.getEventEmitter(); + Map> emitters = idcEmitterMap.computeIfAbsent(idc, k -> new HashMap<>()); emitters.put(clientIp + ":" + clientPid, emitter); idcEmitters = buildIdcEmitter(); @@ -56,7 +57,7 @@ public void deregisterClient(ConsumerGroupClient client) { String clientIp = client.getIp(); String clientPid = client.getPid(); - Map> emitters = idcEmitterMap.get(idc); + Map> emitters = idcEmitterMap.get(idc); if (emitters == null) { return; } @@ -96,26 +97,26 @@ public GrpcType getGrpcType() { return grpcType; } - public Map>> getIdcEmitters() { + public Map>> getIdcEmitters() { return idcEmitters; } - public List> getTotalEmitters() { + public List> getTotalEmitters() { return totalEmitters; } - private Map>> buildIdcEmitter() { - Map>> result = new HashMap<>(); - for (Map.Entry>> entry : idcEmitterMap.entrySet()) { - List> emitterList = new LinkedList<>(entry.getValue().values()); + private Map>> buildIdcEmitter() { + Map>> result = new HashMap<>(); + for (Map.Entry>> entry : idcEmitterMap.entrySet()) { + List> emitterList = new LinkedList<>(entry.getValue().values()); result.put(entry.getKey(), emitterList); } return result; } - private List> buildTotalEmitter() { - List> emitterList = new LinkedList<>(); - for (List> emitters : idcEmitters.values()) { + private List> buildTotalEmitter() { + List> emitterList = new LinkedList<>(); + for (List> emitters : idcEmitters.values()) { emitterList.addAll(emitters); } return emitterList; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/SubscribeStreamProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/SubscribeStreamProcessor.java index aebffb5599..5df083f34b 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/SubscribeStreamProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/SubscribeStreamProcessor.java @@ -3,6 +3,7 @@ import org.apache.eventmesh.api.exception.AclException; import org.apache.eventmesh.common.protocol.grpc.common.StatusCode; import org.apache.eventmesh.common.protocol.grpc.protos.RequestHeader; +import org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage; import org.apache.eventmesh.common.protocol.grpc.protos.Subscription; import org.apache.eventmesh.common.protocol.http.common.RequestCode; import org.apache.eventmesh.common.utils.JsonUtils; @@ -37,7 +38,7 @@ public SubscribeStreamProcessor(EventMeshGrpcServer eventMeshGrpcServer) { this.eventMeshGrpcServer = eventMeshGrpcServer; } - public void process(Subscription subscription, EventEmitter emitter) throws Exception { + public void process(Subscription subscription, EventEmitter emitter) throws Exception { RequestHeader header = subscription.getHeader(); @@ -109,38 +110,38 @@ public void process(Subscription subscription, EventEmitter em sendResp(subscription, StatusCode.SUCCESS, "subscribe success", emitter); } - private void sendRespAndComplete(Subscription subscription, StatusCode code, EventEmitter emitter) { + private void sendRespAndComplete(Subscription subscription, StatusCode code, EventEmitter emitter) { Map resp = new HashMap<>(); resp.put("respCode", code.getRetCode()); resp.put("respMsg", code.getErrMsg()); RequestHeader header = subscription.getHeader(); - EventMeshMessage eventMeshMessage = EventMeshMessage.newBuilder() + SimpleMessage simpleMessage = SimpleMessage.newBuilder() .setHeader(header) .setContent(JsonUtils.serialize(resp)) .build(); - emitter.onNext(eventMeshMessage); + emitter.onNext(simpleMessage); emitter.onCompleted(); } - private void sendRespAndComplete(Subscription subscription, StatusCode code, String message, EventEmitter emitter) { + private void sendRespAndComplete(Subscription subscription, StatusCode code, String message, EventEmitter emitter) { sendResp(subscription, code, message, emitter); emitter.onCompleted(); } - private void sendResp(Subscription subscription, StatusCode code, String message, EventEmitter emitter) { + private void sendResp(Subscription subscription, StatusCode code, String message, EventEmitter emitter) { Map resp = new HashMap<>(); resp.put("respCode", code.getRetCode()); resp.put("respMsg", code.getErrMsg() + " " + message); RequestHeader header = subscription.getHeader(); - EventMeshMessage eventMeshMessage = EventMeshMessage.newBuilder() + SimpleMessage simpleMessage = SimpleMessage.newBuilder() .setHeader(header) .setContent(JsonUtils.serialize(resp)) .build(); - emitter.onNext(eventMeshMessage); + emitter.onNext(simpleMessage); } private void doAclCheck(Subscription subscription) throws AclException { diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/AbstractPushRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/AbstractPushRequest.java index 50ac802ee1..4c5d54110c 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/AbstractPushRequest.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/AbstractPushRequest.java @@ -23,6 +23,7 @@ import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.protocol.ProtocolTransportObject; import org.apache.eventmesh.common.protocol.grpc.common.SimpleMessageWrapper; +import org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage; import org.apache.eventmesh.common.protocol.grpc.protos.Subscription.SubscriptionItem.SubscriptionMode; import org.apache.eventmesh.protocol.api.ProtocolAdaptor; import org.apache.eventmesh.protocol.api.ProtocolPluginFactory; @@ -58,7 +59,7 @@ public abstract class AbstractPushRequest extends RetryContext { protected HandleMsgContext handleMsgContext; // protected CloudEvent event; - protected EventMeshMessage eventMeshMessage; + protected SimpleMessage simpleMessage; private final AtomicBoolean complete = new AtomicBoolean(Boolean.FALSE); @@ -70,12 +71,12 @@ public AbstractPushRequest(HandleMsgContext handleMsgContext, Map protocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType); @@ -87,11 +88,11 @@ private EventMeshMessage getEventMeshMessage(CloudEvent cloudEvent) { } } - private CloudEvent getCloudEvent(EventMeshMessage eventMeshMessage) { + private CloudEvent getCloudEvent(SimpleMessage simpleMessage) { try { - String protocolType = Objects.requireNonNull(eventMeshMessage.getHeader().getProtocolType()); + String protocolType = Objects.requireNonNull(simpleMessage.getHeader().getProtocolType()); ProtocolAdaptor protocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType); - return protocolAdaptor.toCloudEvent(new SimpleMessageWrapper(eventMeshMessage)); + return protocolAdaptor.toCloudEvent(new SimpleMessageWrapper(simpleMessage)); } catch (Exception e) { logger.error("Error in getting CloudEvent from EventMeshMessage", e); return null; @@ -121,7 +122,7 @@ protected boolean isComplete() { private void finish() { AbstractContext context = handleMsgContext.getContext(); SubscriptionMode subscriptionMode = handleMsgContext.getSubscriptionMode(); - CloudEvent event = getCloudEvent(eventMeshMessage); + CloudEvent event = getCloudEvent(simpleMessage); if (eventMeshConsumer != null && context != null && event != null) { try { eventMeshConsumer.updateOffset(subscriptionMode, Collections.singletonList(event), context); @@ -137,7 +138,7 @@ protected void complete() { } protected void timeout() { - if (!isComplete() && System.currentTimeMillis() - lastPushTime >= Long.parseLong(eventMeshMessage.getTtl())) { + if (!isComplete() && System.currentTimeMillis() - lastPushTime >= Long.parseLong(simpleMessage.getTtl())) { delayRetry(); } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/StreamPushRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/StreamPushRequest.java index 82a0e492ee..4ed2ddd273 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/StreamPushRequest.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/StreamPushRequest.java @@ -3,6 +3,7 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.RandomUtils; +import org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage; import org.apache.eventmesh.runtime.constants.EventMeshConstants; import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.StreamTopicConfig; import org.apache.eventmesh.runtime.core.protocol.grpc.service.EventEmitter; @@ -17,9 +18,9 @@ public class StreamPushRequest extends AbstractPushRequest { private final Logger messageLogger = LoggerFactory.getLogger("message"); - private final Map>> idcEmitters; + private final Map>> idcEmitters; - private final List> totalEmitters; + private final List> totalEmitters; private final int startIdx; @@ -34,42 +35,42 @@ public StreamPushRequest(HandleMsgContext handleMsgContext, Map eventEmitter = selectEmitter(); + EventEmitter eventEmitter = selectEmitter(); if (eventEmitter == null) { return; } this.lastPushTime = System.currentTimeMillis(); - eventMeshMessage = EventMeshMessage.newBuilder(eventMeshMessage) + simpleMessage = SimpleMessage.newBuilder(simpleMessage) .putProperties(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP, String.valueOf(lastPushTime)) .build(); try { long cost = System.currentTimeMillis() - lastPushTime; // catch the error and retry, don't use eventEmitter.onNext() to hide the error - eventEmitter.getEmitter().onNext(eventMeshMessage); + eventEmitter.getEmitter().onNext(simpleMessage); messageLogger.info( "message|eventMesh2client|emitter|topic={}|bizSeqNo={}" - + "|uniqueId={}|cost={}", eventMeshMessage.getTopic(), - eventMeshMessage.getSeqNum(), eventMeshMessage.getUniqueId(), cost); + + "|uniqueId={}|cost={}", simpleMessage.getTopic(), + simpleMessage.getSeqNum(), simpleMessage.getUniqueId(), cost); complete(); } catch (Throwable t) { long cost = System.currentTimeMillis() - lastPushTime; messageLogger.error( "message|eventMesh2client|exception={} |emitter|topic={}|bizSeqNo={}" - + "|uniqueId={}|cost={}", t.getMessage(), eventMeshMessage.getTopic(), - eventMeshMessage.getSeqNum(), eventMeshMessage.getUniqueId(), cost, t); + + "|uniqueId={}|cost={}", t.getMessage(), simpleMessage.getTopic(), + simpleMessage.getSeqNum(), simpleMessage.getUniqueId(), cost, t); delayRetry(); } } - private EventEmitter selectEmitter() { - List> emitterList = MapUtils.getObject(idcEmitters, + private EventEmitter selectEmitter() { + List> emitterList = MapUtils.getObject(idcEmitters, eventMeshGrpcConfiguration.eventMeshIDC, null); if (CollectionUtils.isNotEmpty(emitterList)) { return emitterList.get((startIdx + retryTimes) % emitterList.size()); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/WebhookPushRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/WebhookPushRequest.java index 25e9993c4a..181aa75815 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/WebhookPushRequest.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/WebhookPushRequest.java @@ -25,6 +25,7 @@ import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.protocol.grpc.protos.RequestHeader; +import org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage; import org.apache.eventmesh.common.protocol.http.body.message.PushMessageRequestBody; import org.apache.eventmesh.common.protocol.http.common.ClientRetCode; import org.apache.eventmesh.common.protocol.http.common.ProtocolKey; @@ -82,7 +83,7 @@ public WebhookPushRequest(HandleMsgContext handleMsgContext, @Override public void tryPushRequest() { - if (eventMeshMessage == null) { + if (simpleMessage == null) { return; } @@ -104,23 +105,23 @@ public void tryPushRequest() { builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV, eventMeshGrpcConfiguration.eventMeshEnv); builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC, eventMeshGrpcConfiguration.eventMeshIDC); - RequestHeader requestHeader = eventMeshMessage.getHeader(); + RequestHeader requestHeader = simpleMessage.getHeader(); builder.addHeader(ProtocolKey.PROTOCOL_TYPE, requestHeader.getProtocolType()); builder.addHeader(ProtocolKey.PROTOCOL_DESC, requestHeader.getProtocolDesc()); builder.addHeader(ProtocolKey.PROTOCOL_VERSION, requestHeader.getProtocolVersion()); - builder.addHeader(ProtocolKey.CONTENT_TYPE, eventMeshMessage.getPropertiesOrDefault(ProtocolKey.CONTENT_TYPE, + builder.addHeader(ProtocolKey.CONTENT_TYPE, simpleMessage.getPropertiesOrDefault(ProtocolKey.CONTENT_TYPE, "application/cloudevents+json")); List body = new ArrayList<>(); - body.add(new BasicNameValuePair(PushMessageRequestBody.CONTENT, eventMeshMessage.getContent())); - body.add(new BasicNameValuePair(PushMessageRequestBody.BIZSEQNO, eventMeshMessage.getSeqNum())); - body.add(new BasicNameValuePair(PushMessageRequestBody.UNIQUEID, eventMeshMessage.getUniqueId())); + body.add(new BasicNameValuePair(PushMessageRequestBody.CONTENT, simpleMessage.getContent())); + body.add(new BasicNameValuePair(PushMessageRequestBody.BIZSEQNO, simpleMessage.getSeqNum())); + body.add(new BasicNameValuePair(PushMessageRequestBody.UNIQUEID, simpleMessage.getUniqueId())); body.add(new BasicNameValuePair(PushMessageRequestBody.RANDOMNO, handleMsgContext.getMsgRandomNo())); - body.add(new BasicNameValuePair(PushMessageRequestBody.TOPIC, eventMeshMessage.getTopic())); + body.add(new BasicNameValuePair(PushMessageRequestBody.TOPIC, simpleMessage.getTopic())); body.add(new BasicNameValuePair(PushMessageRequestBody.EXTFIELDS, - JsonUtils.serialize(eventMeshMessage.getPropertiesMap()))); + JsonUtils.serialize(simpleMessage.getPropertiesMap()))); - eventMeshMessage = EventMeshMessage.newBuilder(eventMeshMessage) + simpleMessage = SimpleMessage.newBuilder(simpleMessage) .putProperties(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP, String.valueOf(lastPushTime)) .build(); @@ -137,14 +138,14 @@ public void tryPushRequest() { eventMeshGrpcServer.getHttpClient().execute(builder, handleResponse()); messageLogger .info("message|eventMesh2client|url={}|topic={}|bizSeqNo={}|uniqueId={}", - selectedPushUrl, eventMeshMessage.getTopic(), eventMeshMessage.getSeqNum(), - eventMeshMessage.getUniqueId()); + selectedPushUrl, simpleMessage.getTopic(), simpleMessage.getSeqNum(), + simpleMessage.getUniqueId()); } catch (IOException e) { long cost = System.currentTimeMillis() - lastPushTime; messageLogger.error( "message|eventMesh2client|exception={} |emitter|topic={}|bizSeqNo={}" - + "|uniqueId={}|cost={}", e.getMessage(), eventMeshMessage.getTopic(), - eventMeshMessage.getSeqNum(), eventMeshMessage.getUniqueId(), cost, e); + + "|uniqueId={}|cost={}", e.getMessage(), simpleMessage.getTopic(), + simpleMessage.getSeqNum(), simpleMessage.getUniqueId(), cost, e); removeWaitingMap(this); delayRetry(); } @@ -153,10 +154,10 @@ public void tryPushRequest() { @Override public String toString() { return "asyncPushRequest={" - + "bizSeqNo=" + eventMeshMessage.getSeqNum() + + "bizSeqNo=" + simpleMessage.getSeqNum() + ",startIdx=" + startIdx + ",retryTimes=" + retryTimes - + ",uniqueId=" + eventMeshMessage.getUniqueId() + + ",uniqueId=" + simpleMessage.getUniqueId() + ",executeTime=" + DateFormatUtils.format(executeTime, Constants.DATE_FORMAT) + ",lastPushTime=" @@ -174,8 +175,8 @@ private ResponseHandler handleResponse() { //eventMeshHTTPServer.metrics.summaryMetrics.recordHttpPushMsgFailed(); messageLogger.info( "message|eventMesh2client|exception|url={}|topic={}|bizSeqNo={}" - + "|uniqueId={}|cost={}", selectedPushUrl, eventMeshMessage.getTopic(), - eventMeshMessage.getSeqNum(), eventMeshMessage.getUniqueId(), cost); + + "|uniqueId={}|cost={}", selectedPushUrl, simpleMessage.getTopic(), + simpleMessage.getSeqNum(), simpleMessage.getUniqueId(), cost); delayRetry(); } else { @@ -190,8 +191,8 @@ private ResponseHandler handleResponse() { ClientRetCode result = processResponseContent(res); messageLogger.info( "message|eventMesh2client|{}|url={}|topic={}|bizSeqNo={}" - + "|uniqueId={}|cost={}", result, selectedPushUrl, eventMeshMessage.getTopic(), - eventMeshMessage.getSeqNum(), eventMeshMessage.getUniqueId(), cost); + + "|uniqueId={}|cost={}", result, selectedPushUrl, simpleMessage.getTopic(), + simpleMessage.getSeqNum(), simpleMessage.getUniqueId(), cost); if (result == ClientRetCode.OK || result == ClientRetCode.FAIL) { complete(); } else if (result == ClientRetCode.RETRY || result == ClientRetCode.NOLISTEN) { @@ -218,7 +219,7 @@ private ClientRetCode processResponseContent(String content) { return ClientRetCode.FAIL; } catch (Exception e) { messageLogger.warn("url:{}, bizSeqno:{}, uniqueId:{}, httpResponse:{}", selectedPushUrl, - eventMeshMessage.getSeqNum(), eventMeshMessage.getUniqueId(), content); + simpleMessage.getSeqNum(), simpleMessage.getUniqueId(), content); return ClientRetCode.FAIL; } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/ConsumerService.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/ConsumerService.java index 2841066d1d..90936056e0 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/ConsumerService.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/ConsumerService.java @@ -5,6 +5,7 @@ import org.apache.eventmesh.common.protocol.grpc.protos.ConsumerServiceGrpc; import org.apache.eventmesh.common.protocol.grpc.protos.RequestHeader; import org.apache.eventmesh.common.protocol.grpc.protos.Response; +import org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage; import org.apache.eventmesh.common.protocol.grpc.protos.Subscription; import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer; @@ -51,12 +52,12 @@ public void subscribe(Subscription request, StreamObserver responseObs }); } - public void subscribeStream(Subscription request, StreamObserver responseObserver) { + public void subscribeStream(Subscription request, StreamObserver responseObserver) { logger.info("cmd={}|{}|client2eventMesh|from={}|to={}", "subscribeStream", EventMeshConstants.PROTOCOL_GRPC, request.getHeader().getIp(), eventMeshGrpcServer.getEventMeshGrpcConfiguration().eventMeshIp); - EventEmitter emitter = new EventEmitter<>(responseObserver); + EventEmitter emitter = new EventEmitter<>(responseObserver); threadPoolExecutor.submit(() -> { SubscribeStreamProcessor streamProcessor = new SubscribeStreamProcessor(eventMeshGrpcServer); try { @@ -70,12 +71,12 @@ public void subscribeStream(Subscription request, StreamObserver