Skip to content

Commit

Permalink
[Issue apache#417] more update Grpc Message Model name to SimpleMessage
Browse files Browse the repository at this point in the history
  • Loading branch information
jinrongluo authored and xwm1992 committed Feb 16, 2022
1 parent 7b7716d commit da6b4fc
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup;

import lombok.Getter;
import org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage;
import org.apache.eventmesh.common.protocol.grpc.protos.Subscription.SubscriptionItem.SubscriptionMode;
import java.util.Date;
import lombok.Builder;
Expand All @@ -41,7 +42,7 @@ public class ConsumerGroupClient {

private String url;

private EventEmitter<EventMeshMessage> eventEmitter;
private EventEmitter<SimpleMessage> eventEmitter;

private final SubscriptionMode subscriptionMode;

Expand All @@ -60,7 +61,7 @@ public class ConsumerGroupClient {
public void setUrl(String url) {
this.url = url;
}
public void setEventEmitter(EventEmitter<EventMeshMessage> emitter) {
public void setEventEmitter(EventEmitter<SimpleMessage> emitter) {
this.eventEmitter = emitter;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup;

import org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage;
import org.apache.eventmesh.common.protocol.grpc.protos.Subscription.SubscriptionItem.SubscriptionMode;
import org.apache.eventmesh.runtime.core.protocol.grpc.service.EventEmitter;
import org.slf4j.Logger;
Expand All @@ -18,15 +19,15 @@ public class StreamTopicConfig extends ConsumerGroupTopicConfig {
* Key: IDC
* Value: list of emitters with Client_IP:port
*/
private final Map<String, Map<String, EventEmitter<EventMeshMessage>>> idcEmitterMap = new ConcurrentHashMap<>();
private final Map<String, Map<String, EventEmitter<SimpleMessage>>> idcEmitterMap = new ConcurrentHashMap<>();

/**
* Key: IDC
* Value: list of emitters
*/
private Map<String, List<EventEmitter<EventMeshMessage>>> idcEmitters = new ConcurrentHashMap<>();
private Map<String, List<EventEmitter<SimpleMessage>>> idcEmitters = new ConcurrentHashMap<>();

private List<EventEmitter<EventMeshMessage>> totalEmitters = new LinkedList<>();
private List<EventEmitter<SimpleMessage>> totalEmitters = new LinkedList<>();

public StreamTopicConfig(String consumerGroup, String topic, SubscriptionMode subscriptionMode) {
super(consumerGroup, topic, subscriptionMode, GrpcType.STREAM);
Expand All @@ -42,8 +43,8 @@ public synchronized void registerClient(ConsumerGroupClient client) {
String idc = client.getIdc();
String clientIp = client.getIp();
String clientPid = client.getPid();
EventEmitter<EventMeshMessage> emitter = client.getEventEmitter();
Map<String, EventEmitter<EventMeshMessage>> emitters = idcEmitterMap.computeIfAbsent(idc, k -> new HashMap<>());
EventEmitter<SimpleMessage> emitter = client.getEventEmitter();
Map<String, EventEmitter<SimpleMessage>> emitters = idcEmitterMap.computeIfAbsent(idc, k -> new HashMap<>());
emitters.put(clientIp + ":" + clientPid, emitter);

idcEmitters = buildIdcEmitter();
Expand All @@ -56,7 +57,7 @@ public void deregisterClient(ConsumerGroupClient client) {
String clientIp = client.getIp();
String clientPid = client.getPid();

Map<String, EventEmitter<EventMeshMessage>> emitters = idcEmitterMap.get(idc);
Map<String, EventEmitter<SimpleMessage>> emitters = idcEmitterMap.get(idc);
if (emitters == null) {
return;
}
Expand Down Expand Up @@ -96,26 +97,26 @@ public GrpcType getGrpcType() {
return grpcType;
}

public Map<String, List<EventEmitter<EventMeshMessage>>> getIdcEmitters() {
public Map<String, List<EventEmitter<SimpleMessage>>> getIdcEmitters() {
return idcEmitters;
}

public List<EventEmitter<EventMeshMessage>> getTotalEmitters() {
public List<EventEmitter<SimpleMessage>> getTotalEmitters() {
return totalEmitters;
}

private Map<String, List<EventEmitter<EventMeshMessage>>> buildIdcEmitter() {
Map<String, List<EventEmitter<EventMeshMessage>>> result = new HashMap<>();
for (Map.Entry<String, Map<String, EventEmitter<EventMeshMessage>>> entry : idcEmitterMap.entrySet()) {
List<EventEmitter<EventMeshMessage>> emitterList = new LinkedList<>(entry.getValue().values());
private Map<String, List<EventEmitter<SimpleMessage>>> buildIdcEmitter() {
Map<String, List<EventEmitter<SimpleMessage>>> result = new HashMap<>();
for (Map.Entry<String, Map<String, EventEmitter<SimpleMessage>>> entry : idcEmitterMap.entrySet()) {
List<EventEmitter<SimpleMessage>> emitterList = new LinkedList<>(entry.getValue().values());
result.put(entry.getKey(), emitterList);
}
return result;
}

private List<EventEmitter<EventMeshMessage>> buildTotalEmitter() {
List<EventEmitter<EventMeshMessage>> emitterList = new LinkedList<>();
for (List<EventEmitter<EventMeshMessage>> emitters : idcEmitters.values()) {
private List<EventEmitter<SimpleMessage>> buildTotalEmitter() {
List<EventEmitter<SimpleMessage>> emitterList = new LinkedList<>();
for (List<EventEmitter<SimpleMessage>> emitters : idcEmitters.values()) {
emitterList.addAll(emitters);
}
return emitterList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.apache.eventmesh.api.exception.AclException;
import org.apache.eventmesh.common.protocol.grpc.common.StatusCode;
import org.apache.eventmesh.common.protocol.grpc.protos.RequestHeader;
import org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage;
import org.apache.eventmesh.common.protocol.grpc.protos.Subscription;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.common.utils.JsonUtils;
Expand Down Expand Up @@ -37,7 +38,7 @@ public SubscribeStreamProcessor(EventMeshGrpcServer eventMeshGrpcServer) {
this.eventMeshGrpcServer = eventMeshGrpcServer;
}

public void process(Subscription subscription, EventEmitter<EventMeshMessage> emitter) throws Exception {
public void process(Subscription subscription, EventEmitter<SimpleMessage> emitter) throws Exception {

RequestHeader header = subscription.getHeader();

Expand Down Expand Up @@ -109,38 +110,38 @@ public void process(Subscription subscription, EventEmitter<EventMeshMessage> em
sendResp(subscription, StatusCode.SUCCESS, "subscribe success", emitter);
}

private void sendRespAndComplete(Subscription subscription, StatusCode code, EventEmitter<EventMeshMessage> emitter) {
private void sendRespAndComplete(Subscription subscription, StatusCode code, EventEmitter<SimpleMessage> emitter) {
Map<String, String> resp = new HashMap<>();
resp.put("respCode", code.getRetCode());
resp.put("respMsg", code.getErrMsg());

RequestHeader header = subscription.getHeader();
EventMeshMessage eventMeshMessage = EventMeshMessage.newBuilder()
SimpleMessage simpleMessage = SimpleMessage.newBuilder()
.setHeader(header)
.setContent(JsonUtils.serialize(resp))
.build();

emitter.onNext(eventMeshMessage);
emitter.onNext(simpleMessage);
emitter.onCompleted();
}

private void sendRespAndComplete(Subscription subscription, StatusCode code, String message, EventEmitter<EventMeshMessage> emitter) {
private void sendRespAndComplete(Subscription subscription, StatusCode code, String message, EventEmitter<SimpleMessage> emitter) {
sendResp(subscription, code, message, emitter);
emitter.onCompleted();
}

private void sendResp(Subscription subscription, StatusCode code, String message, EventEmitter<EventMeshMessage> emitter) {
private void sendResp(Subscription subscription, StatusCode code, String message, EventEmitter<SimpleMessage> emitter) {
Map<String, String> resp = new HashMap<>();
resp.put("respCode", code.getRetCode());
resp.put("respMsg", code.getErrMsg() + " " + message);

RequestHeader header = subscription.getHeader();
EventMeshMessage eventMeshMessage = EventMeshMessage.newBuilder()
SimpleMessage simpleMessage = SimpleMessage.newBuilder()
.setHeader(header)
.setContent(JsonUtils.serialize(resp))
.build();

emitter.onNext(eventMeshMessage);
emitter.onNext(simpleMessage);
}

private void doAclCheck(Subscription subscription) throws AclException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.ProtocolTransportObject;
import org.apache.eventmesh.common.protocol.grpc.common.SimpleMessageWrapper;
import org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage;
import org.apache.eventmesh.common.protocol.grpc.protos.Subscription.SubscriptionItem.SubscriptionMode;
import org.apache.eventmesh.protocol.api.ProtocolAdaptor;
import org.apache.eventmesh.protocol.api.ProtocolPluginFactory;
Expand Down Expand Up @@ -58,7 +59,7 @@ public abstract class AbstractPushRequest extends RetryContext {

protected HandleMsgContext handleMsgContext;
// protected CloudEvent event;
protected EventMeshMessage eventMeshMessage;
protected SimpleMessage simpleMessage;

private final AtomicBoolean complete = new AtomicBoolean(Boolean.FALSE);

Expand All @@ -70,12 +71,12 @@ public AbstractPushRequest(HandleMsgContext handleMsgContext, Map<String, Set<Ab
this.eventMeshGrpcConfiguration = handleMsgContext.getEventMeshGrpcServer().getEventMeshGrpcConfiguration();
this.grpcRetryer = handleMsgContext.getEventMeshGrpcServer().getGrpcRetryer();
CloudEvent event = handleMsgContext.getEvent();
this.eventMeshMessage = getEventMeshMessage(event);
this.simpleMessage = getSimpleMessage(event);
}

public abstract void tryPushRequest();

private EventMeshMessage getEventMeshMessage(CloudEvent cloudEvent) {
private SimpleMessage getSimpleMessage(CloudEvent cloudEvent) {
try {
String protocolType = Objects.requireNonNull(cloudEvent.getExtension(Constants.PROTOCOL_TYPE)).toString();
ProtocolAdaptor<ProtocolTransportObject> protocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType);
Expand All @@ -87,11 +88,11 @@ private EventMeshMessage getEventMeshMessage(CloudEvent cloudEvent) {
}
}

private CloudEvent getCloudEvent(EventMeshMessage eventMeshMessage) {
private CloudEvent getCloudEvent(SimpleMessage simpleMessage) {
try {
String protocolType = Objects.requireNonNull(eventMeshMessage.getHeader().getProtocolType());
String protocolType = Objects.requireNonNull(simpleMessage.getHeader().getProtocolType());
ProtocolAdaptor<ProtocolTransportObject> protocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType);
return protocolAdaptor.toCloudEvent(new SimpleMessageWrapper(eventMeshMessage));
return protocolAdaptor.toCloudEvent(new SimpleMessageWrapper(simpleMessage));
} catch (Exception e) {
logger.error("Error in getting CloudEvent from EventMeshMessage", e);
return null;
Expand Down Expand Up @@ -121,7 +122,7 @@ protected boolean isComplete() {
private void finish() {
AbstractContext context = handleMsgContext.getContext();
SubscriptionMode subscriptionMode = handleMsgContext.getSubscriptionMode();
CloudEvent event = getCloudEvent(eventMeshMessage);
CloudEvent event = getCloudEvent(simpleMessage);
if (eventMeshConsumer != null && context != null && event != null) {
try {
eventMeshConsumer.updateOffset(subscriptionMode, Collections.singletonList(event), context);
Expand All @@ -137,7 +138,7 @@ protected void complete() {
}

protected void timeout() {
if (!isComplete() && System.currentTimeMillis() - lastPushTime >= Long.parseLong(eventMeshMessage.getTtl())) {
if (!isComplete() && System.currentTimeMillis() - lastPushTime >= Long.parseLong(simpleMessage.getTtl())) {
delayRetry();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.StreamTopicConfig;
import org.apache.eventmesh.runtime.core.protocol.grpc.service.EventEmitter;
Expand All @@ -17,9 +18,9 @@ public class StreamPushRequest extends AbstractPushRequest {

private final Logger messageLogger = LoggerFactory.getLogger("message");

private final Map<String, List<EventEmitter<EventMeshMessage>>> idcEmitters;
private final Map<String, List<EventEmitter<SimpleMessage>>> idcEmitters;

private final List<EventEmitter<EventMeshMessage>> totalEmitters;
private final List<EventEmitter<SimpleMessage>> totalEmitters;

private final int startIdx;

Expand All @@ -34,42 +35,42 @@ public StreamPushRequest(HandleMsgContext handleMsgContext, Map<String, Set<Abst

@Override
public void tryPushRequest() {
if (eventMeshMessage == null) {
if (simpleMessage == null) {
return;
}

EventEmitter<EventMeshMessage> eventEmitter = selectEmitter();
EventEmitter<SimpleMessage> eventEmitter = selectEmitter();

if (eventEmitter == null) {
return;
}
this.lastPushTime = System.currentTimeMillis();

eventMeshMessage = EventMeshMessage.newBuilder(eventMeshMessage)
simpleMessage = SimpleMessage.newBuilder(simpleMessage)
.putProperties(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP, String.valueOf(lastPushTime))
.build();
try {
long cost = System.currentTimeMillis() - lastPushTime;
// catch the error and retry, don't use eventEmitter.onNext() to hide the error
eventEmitter.getEmitter().onNext(eventMeshMessage);
eventEmitter.getEmitter().onNext(simpleMessage);
messageLogger.info(
"message|eventMesh2client|emitter|topic={}|bizSeqNo={}"
+ "|uniqueId={}|cost={}", eventMeshMessage.getTopic(),
eventMeshMessage.getSeqNum(), eventMeshMessage.getUniqueId(), cost);
+ "|uniqueId={}|cost={}", simpleMessage.getTopic(),
simpleMessage.getSeqNum(), simpleMessage.getUniqueId(), cost);
complete();
} catch (Throwable t) {
long cost = System.currentTimeMillis() - lastPushTime;
messageLogger.error(
"message|eventMesh2client|exception={} |emitter|topic={}|bizSeqNo={}"
+ "|uniqueId={}|cost={}", t.getMessage(), eventMeshMessage.getTopic(),
eventMeshMessage.getSeqNum(), eventMeshMessage.getUniqueId(), cost, t);
+ "|uniqueId={}|cost={}", t.getMessage(), simpleMessage.getTopic(),
simpleMessage.getSeqNum(), simpleMessage.getUniqueId(), cost, t);

delayRetry();
}
}

private EventEmitter<EventMeshMessage> selectEmitter() {
List<EventEmitter<EventMeshMessage>> emitterList = MapUtils.getObject(idcEmitters,
private EventEmitter<SimpleMessage> selectEmitter() {
List<EventEmitter<SimpleMessage>> emitterList = MapUtils.getObject(idcEmitters,
eventMeshGrpcConfiguration.eventMeshIDC, null);
if (CollectionUtils.isNotEmpty(emitterList)) {
return emitterList.get((startIdx + retryTimes) % emitterList.size());
Expand Down
Loading

0 comments on commit da6b4fc

Please sign in to comment.