Skip to content

Commit

Permalink
[ISSUE #460]Support custom retry times configuration when Subcription…
Browse files Browse the repository at this point in the history
…Type is SYNC (#463)

* [ISSUE #460]Support custom retry times configuration when SubcriptionType is SYNC

* fix log args
  • Loading branch information
ruanwenjun authored Jul 28, 2021
1 parent 00571d1 commit 8c450e1
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@

package org.apache.eventmesh.common.config;

import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.ThreadPoolFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
Expand All @@ -25,11 +31,6 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.ThreadPoolFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConfigurationWrapper {

public Logger logger = LoggerFactory.getLogger(this.getClass());
Expand Down Expand Up @@ -73,4 +74,21 @@ private void load() {
public String getProp(String key) {
return StringUtils.isEmpty(key) ? null : properties.getProperty(key, null);
}

public int getIntProp(String configKey, int defaultValue) {
String configValue = StringUtils.deleteWhitespace(getProp(configKey));
if (StringUtils.isEmpty(configValue)) {
return defaultValue;
}
Preconditions.checkState(StringUtils.isNumeric(configKey), String.format("%s error", configKey));
return Integer.parseInt(configValue);
}

public boolean getBoolProp(String configKey, boolean defaultValue) {
String configValue = StringUtils.deleteWhitespace(getProp(configKey));
if (StringUtils.isEmpty(configValue)) {
return defaultValue;
}
return Boolean.parseBoolean(configValue);
}
}
6 changes: 4 additions & 2 deletions eventmesh-runtime/conf/eventmesh.properties
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ eventMesh.server.session.upstreamBufferSize=20
eventMesh.server.global.scheduler=5
eventMesh.server.tcp.taskHandleExecutorPoolSize=8
#retry
eventMesh.server.retry.pushRetryTimes=3
eventMesh.server.retry.pushRetryDelayInMills=500
eventMesh.server.retry.async.pushRetryTimes=3
eventMesh.server.retry.sync.pushRetryTimes=3
eventMesh.server.retry.async.pushRetryDelayInMills=500
eventMesh.server.retry.sync.pushRetryDelayInMills=500
eventMesh.server.retry.pushRetryQueueSize=10000
#admin
eventMesh.server.admin.http.port=10106
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

package org.apache.eventmesh.runtime.configuration;

import com.google.common.base.Preconditions;

import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.config.CommonConfiguration;
import org.apache.eventmesh.common.config.ConfigurationWrapper;

Expand Down Expand Up @@ -49,15 +46,19 @@ public class EventMeshTCPConfiguration extends CommonConfiguration {

public int eventMeshTcpTaskHandleExecutorPoolSize = Runtime.getRuntime().availableProcessors();

public int eventMeshTcpMsgDownStreamExecutorPoolSize = Runtime.getRuntime().availableProcessors() > 8 ? Runtime.getRuntime().availableProcessors() : 8;
public int eventMeshTcpMsgDownStreamExecutorPoolSize = Math.max(Runtime.getRuntime().availableProcessors(), 8);

public int eventMeshTcpSessionExpiredInMills = 60000;

public int eventMeshTcpSessionUpstreamBufferSize = 100;

public int eventMeshTcpMsgRetryTimes = 3;
public int eventMeshTcpMsgAsyncRetryTimes = 3;

public int eventMeshTcpMsgSyncRetryTimes = 1;

public int eventMeshTcpMsgRetrySyncDelayInMills = 500;

public int eventMeshTcpMsgRetryDelayInMills = 500;
public int eventMeshTcpMsgRetryAsyncDelayInMills = 500;

public int eventMeshTcpMsgRetryQueueSize = 10000;

Expand All @@ -81,118 +82,46 @@ public EventMeshTCPConfiguration(ConfigurationWrapper configurationWrapper) {
@Override
public void init() {
super.init();
String eventMeshTcpServerPortStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_TCP_PORT);
Preconditions.checkState(StringUtils.isNotEmpty(eventMeshTcpServerPortStr) && StringUtils.isNumeric(eventMeshTcpServerPortStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_SERVER_TCP_PORT));
eventMeshTcpServerPort = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshTcpServerPortStr));

String eventMeshTcpIdleReadSecondsStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_READER_IDLE_SECONDS);
Preconditions.checkState(StringUtils.isNotEmpty(eventMeshTcpIdleReadSecondsStr) && StringUtils.isNumeric(eventMeshTcpIdleReadSecondsStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_SERVER_READER_IDLE_SECONDS));
eventMeshTcpIdleReadSeconds = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshTcpIdleReadSecondsStr));

String eventMeshTcpIdleWriteSecondsStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_WRITER_IDLE_SECONDS);
Preconditions.checkState(StringUtils.isNotEmpty(eventMeshTcpIdleWriteSecondsStr) && StringUtils.isNumeric(eventMeshTcpIdleWriteSecondsStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_SERVER_WRITER_IDLE_SECONDS));
eventMeshTcpIdleWriteSeconds = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshTcpIdleWriteSecondsStr));

String eventMeshTcpIdleAllSecondsStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_ALL_IDLE_SECONDS);
Preconditions.checkState(StringUtils.isNotEmpty(eventMeshTcpIdleAllSecondsStr) && StringUtils.isNumeric(eventMeshTcpIdleAllSecondsStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_SERVER_ALL_IDLE_SECONDS));
eventMeshTcpIdleAllSeconds = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshTcpIdleAllSecondsStr));

String eventMeshTcpMsgReqnumPerSecondStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_MSG_REQ_NUM_PER_SECONDS);
Preconditions.checkState(StringUtils.isNotEmpty(eventMeshTcpMsgReqnumPerSecondStr) && StringUtils.isNumeric(eventMeshTcpMsgReqnumPerSecondStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_SERVER_MSG_REQ_NUM_PER_SECONDS));
eventMeshTcpMsgReqnumPerSecond = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshTcpMsgReqnumPerSecondStr));

String eventMeshTcpClientMaxNumStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_CLIENT_MAX_NUM);
Preconditions.checkState(StringUtils.isNotEmpty(eventMeshTcpClientMaxNumStr) && StringUtils.isNumeric(eventMeshTcpClientMaxNumStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_SERVER_CLIENT_MAX_NUM));
eventMeshTcpClientMaxNum = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshTcpClientMaxNumStr));

String eventMeshTcpServerEnabledStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_TCP_SERVER_ENABLED);
if (StringUtils.isNotEmpty(eventMeshTcpServerEnabledStr)) {
eventMeshTcpServerEnabled = Boolean.valueOf(StringUtils.deleteWhitespace(eventMeshTcpServerEnabledStr));
}
eventMeshTcpServerPort = configurationWrapper.getIntProp(ConfKeys.KEYS_EVENTMESH_SERVER_TCP_PORT, eventMeshTcpServerPort);

String eventMeshTcpGlobalSchedulerStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_GLOBAL_SCHEDULER);
if (StringUtils.isNotEmpty(eventMeshTcpGlobalSchedulerStr)) {
Preconditions.checkState(StringUtils.isNumeric(eventMeshTcpGlobalSchedulerStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_SERVER_GLOBAL_SCHEDULER));
eventMeshTcpGlobalScheduler = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshTcpGlobalSchedulerStr));
}
eventMeshTcpIdleReadSeconds = configurationWrapper.getIntProp(ConfKeys.KEYS_EVENTMESH_SERVER_READER_IDLE_SECONDS, eventMeshTcpIdleReadSeconds);

String eventMeshTcpTaskHandleExecutorPoolSizeStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_TCP_TASK_HANDLE_POOL_SIZE);
if (StringUtils.isNotEmpty(eventMeshTcpTaskHandleExecutorPoolSizeStr)) {
Preconditions.checkState(StringUtils.isNumeric(eventMeshTcpTaskHandleExecutorPoolSizeStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_SERVER_TCP_TASK_HANDLE_POOL_SIZE));
eventMeshTcpTaskHandleExecutorPoolSize = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshTcpTaskHandleExecutorPoolSizeStr));
}
eventMeshTcpIdleWriteSeconds = configurationWrapper.getIntProp(ConfKeys.KEYS_EVENTMESH_SERVER_WRITER_IDLE_SECONDS, eventMeshTcpIdleWriteSeconds);

String eventMeshTcpMsgDownStreamExecutorPoolSizeStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_TCP_MSG_DOWNSTREAM_POOL_SIZE);
if(StringUtils.isNotEmpty(eventMeshTcpMsgDownStreamExecutorPoolSizeStr)){
Preconditions.checkState(StringUtils.isNumeric(eventMeshTcpMsgDownStreamExecutorPoolSizeStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_SERVER_TCP_MSG_DOWNSTREAM_POOL_SIZE));
eventMeshTcpMsgDownStreamExecutorPoolSize = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshTcpMsgDownStreamExecutorPoolSizeStr));
}
eventMeshTcpIdleAllSeconds = configurationWrapper.getIntProp(ConfKeys.KEYS_EVENTMESH_SERVER_ALL_IDLE_SECONDS, eventMeshTcpIdleAllSeconds);

String eventMeshTcpSessionExpiredInMillsStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_SESSION_EXPIRED_TIME);
if (StringUtils.isNotEmpty(eventMeshTcpSessionExpiredInMillsStr)) {
Preconditions.checkState(StringUtils.isNumeric(eventMeshTcpSessionExpiredInMillsStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_SERVER_SESSION_EXPIRED_TIME));
eventMeshTcpSessionExpiredInMills = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshTcpSessionExpiredInMillsStr));
}
eventMeshTcpMsgReqnumPerSecond = configurationWrapper.getIntProp(ConfKeys.KEYS_EVENTMESH_SERVER_MSG_REQ_NUM_PER_SECONDS, eventMeshTcpMsgReqnumPerSecond);

String eventMeshTcpSessionUpstreamBufferSizeStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_SESSION_UPSTREAM_BUFFER_SIZE);
if (StringUtils.isNotEmpty(eventMeshTcpSessionUpstreamBufferSizeStr)) {
Preconditions.checkState(StringUtils.isNumeric(eventMeshTcpSessionUpstreamBufferSizeStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_SERVER_SESSION_UPSTREAM_BUFFER_SIZE));
eventMeshTcpSessionUpstreamBufferSize = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshTcpSessionUpstreamBufferSizeStr));
}
eventMeshTcpClientMaxNum = configurationWrapper.getIntProp(ConfKeys.KEYS_EVENTMESH_SERVER_CLIENT_MAX_NUM, eventMeshTcpClientMaxNum);

eventMeshTcpServerEnabled = configurationWrapper.getBoolProp(ConfKeys.KEYS_EVENTMESH_TCP_SERVER_ENABLED, eventMeshTcpServerEnabled);

eventMeshTcpGlobalScheduler = configurationWrapper.getIntProp(ConfKeys.KEYS_EVENTMESH_SERVER_GLOBAL_SCHEDULER, eventMeshTcpGlobalScheduler);

eventMeshTcpTaskHandleExecutorPoolSize = configurationWrapper.getIntProp(ConfKeys.KEYS_EVENTMESH_SERVER_TCP_TASK_HANDLE_POOL_SIZE, eventMeshTcpTaskHandleExecutorPoolSize);

eventMeshTcpMsgDownStreamExecutorPoolSize = configurationWrapper.getIntProp(ConfKeys.KEYS_EVENTMESH_SERVER_TCP_MSG_DOWNSTREAM_POOL_SIZE, eventMeshTcpMsgDownStreamExecutorPoolSize);

eventMeshTcpSessionExpiredInMills = configurationWrapper.getIntProp(ConfKeys.KEYS_EVENTMESH_SERVER_SESSION_EXPIRED_TIME, eventMeshTcpSessionExpiredInMills);

eventMeshTcpSessionUpstreamBufferSize = configurationWrapper.getIntProp(ConfKeys.KEYS_EVENTMESH_SERVER_SESSION_UPSTREAM_BUFFER_SIZE, eventMeshTcpSessionUpstreamBufferSize);

//========================================eventMesh retry config=============================================//
String eventMeshTcpMsgRetryTimesStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_RETRY_PUSH_RETRY_TIMES);
if (StringUtils.isNotEmpty(eventMeshTcpMsgRetryTimesStr)) {
Preconditions.checkState(StringUtils.isNumeric(eventMeshTcpMsgRetryTimesStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_SERVER_RETRY_PUSH_RETRY_TIMES));
eventMeshTcpMsgRetryTimes = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshTcpMsgRetryTimesStr));
}
eventMeshTcpMsgAsyncRetryTimes = configurationWrapper.getIntProp(ConfKeys.KEYS_EVENTMESH_SERVER_RETRY_ASYNC_PUSH_RETRY_TIMES, eventMeshTcpMsgAsyncRetryTimes);
eventMeshTcpMsgSyncRetryTimes = configurationWrapper.getIntProp(ConfKeys.KEYS_EVENTMESH_SERVER_RETRY_SYNC_PUSH_RETRY_TIMES, eventMeshTcpMsgSyncRetryTimes);

String eventMeshTcpMsgRetryDelayInMillsStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_RETRY_PUSH_RETRY_DELAY);
if (StringUtils.isNotEmpty(eventMeshTcpMsgRetryDelayInMillsStr)) {
Preconditions.checkState(StringUtils.isNumeric(eventMeshTcpMsgRetryDelayInMillsStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_SERVER_RETRY_PUSH_RETRY_DELAY));
eventMeshTcpMsgRetryDelayInMills = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshTcpMsgRetryDelayInMillsStr));
}
eventMeshTcpMsgRetryAsyncDelayInMills = configurationWrapper.getIntProp(ConfKeys.KEYS_EVENTMESH_SERVER_RETRY_ASYNC_PUSH_RETRY_DELAY, eventMeshTcpMsgRetryAsyncDelayInMills);
eventMeshTcpMsgRetrySyncDelayInMills = configurationWrapper.getIntProp(ConfKeys.KEYS_EVENTMESH_SERVER_RETRY_SYNC_PUSH_RETRY_DELAY, eventMeshTcpMsgRetrySyncDelayInMills);

String eventMeshTcpMsgRetryQueueSizeStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_RETRY_PUSH_RETRY_QUEUE_SIZE);
if (StringUtils.isNotEmpty(eventMeshTcpMsgRetryQueueSizeStr)) {
Preconditions.checkState(StringUtils.isNumeric(eventMeshTcpMsgRetryQueueSizeStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_SERVER_RETRY_PUSH_RETRY_QUEUE_SIZE));
eventMeshTcpMsgRetryQueueSize = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshTcpMsgRetryQueueSizeStr));
}
eventMeshTcpMsgRetryQueueSize = configurationWrapper.getIntProp(ConfKeys.KEYS_EVENTMESH_SERVER_RETRY_PUSH_RETRY_QUEUE_SIZE, eventMeshTcpMsgRetryQueueSize);

String eventMeshTcpRebalanceIntervalStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_TCP_REBALANCE_INTERVAL);
if (StringUtils.isNotEmpty(eventMeshTcpRebalanceIntervalStr)) {
Preconditions.checkState(StringUtils.isNumeric(eventMeshTcpRebalanceIntervalStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_SERVER_TCP_REBALANCE_INTERVAL));
eventMeshTcpRebalanceIntervalInMills = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshTcpRebalanceIntervalStr));
}
eventMeshTcpRebalanceIntervalInMills = configurationWrapper.getIntProp(ConfKeys.KEYS_EVENTMESH_SERVER_TCP_REBALANCE_INTERVAL, eventMeshTcpRebalanceIntervalInMills);

String eventMeshServerAdminPortStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_ADMIN_HTTP_PORT);
if (StringUtils.isNotEmpty(eventMeshServerAdminPortStr)) {
Preconditions.checkState(StringUtils.isNumeric(eventMeshServerAdminPortStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_SERVER_ADMIN_HTTP_PORT));
eventMeshServerAdminPort = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshServerAdminPortStr));
}
eventMeshServerAdminPort = configurationWrapper.getIntProp(ConfKeys.KEYS_EVENTMESH_SERVER_ADMIN_HTTP_PORT, eventMeshServerAdminPort);

String eventMeshTcpSendBackEnabledStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_TCP_SEND_BACK_ENABLED);
if (StringUtils.isNotEmpty(eventMeshTcpSendBackEnabledStr)) {
eventMeshTcpSendBackEnabled = Boolean.valueOf(StringUtils.deleteWhitespace(eventMeshTcpSendBackEnabledStr));
}
eventMeshTcpSendBackEnabled = configurationWrapper.getBoolProp(ConfKeys.KEYS_EVENTMESH_TCP_SEND_BACK_ENABLED, eventMeshTcpSendBackEnabled);

String eventMeshTcpPushFailIsolateTimeInMillsStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_PUSH_FAIL_ISOLATE_TIME);
if (StringUtils.isNotEmpty(eventMeshTcpPushFailIsolateTimeInMillsStr)) {
Preconditions.checkState(StringUtils.isNumeric(eventMeshTcpPushFailIsolateTimeInMillsStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_SERVER_PUSH_FAIL_ISOLATE_TIME));
eventMeshTcpPushFailIsolateTimeInMills = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshTcpPushFailIsolateTimeInMillsStr));
}
eventMeshTcpPushFailIsolateTimeInMills = configurationWrapper.getIntProp(ConfKeys.KEYS_EVENTMESH_SERVER_PUSH_FAIL_ISOLATE_TIME, eventMeshTcpPushFailIsolateTimeInMills);
}

public TrafficShapingConfig getGtc() {
Expand All @@ -218,8 +147,10 @@ static class ConfKeys {
public static String KEYS_EVENTMESH_SERVER_SESSION_EXPIRED_TIME = "eventMesh.server.session.expiredInMills";
public static String KEYS_EVENTMESH_SERVER_SESSION_UPSTREAM_BUFFER_SIZE = "eventMesh.server.session.upstreamBufferSize";
public static String KEYS_EVENTMESH_SERVER_SESSION_DOWNSTREAM_UNACK_SIZE = "eventMesh.server.session.downstreamUnackSize";
public static String KEYS_EVENTMESH_SERVER_RETRY_PUSH_RETRY_TIMES = "eventMesh.server.retry.pushRetryTimes";
public static String KEYS_EVENTMESH_SERVER_RETRY_PUSH_RETRY_DELAY = "eventMesh.server.retry.pushRetryDelayInMills";
public static String KEYS_EVENTMESH_SERVER_RETRY_ASYNC_PUSH_RETRY_TIMES = "eventMesh.server.retry.async.pushRetryTimes";
public static String KEYS_EVENTMESH_SERVER_RETRY_SYNC_PUSH_RETRY_TIMES = "eventMesh.server.retry.sync.pushRetryTimes";
public static String KEYS_EVENTMESH_SERVER_RETRY_ASYNC_PUSH_RETRY_DELAY = "eventMesh.server.retry.async.pushRetryDelayInMills";
public static String KEYS_EVENTMESH_SERVER_RETRY_SYNC_PUSH_RETRY_DELAY = "eventMesh.server.retry.sync.pushRetryDelayInMills";
public static String KEYS_EVENTMESH_SERVER_RETRY_PUSH_RETRY_QUEUE_SIZE = "eventMesh.server.retry.pushRetryQueueSize";
public static String KEYS_EVENTMESH_SERVER_ADMIN_HTTP_PORT = "eventMesh.server.admin.http.port";
public static String KEYS_EVENTMESH_TCP_SERVER_ENABLED = "eventMesh.server.tcp.enabled";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,15 @@ public void operationComplete(ChannelFuture future) throws Exception {
logger.warn("isolate client:{},isolateTime:{}", session.getClient(), isolateTime);

//retry
long delayTime = SubcriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType()) ? 0 : session.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryDelayInMills;
long delayTime = SubcriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType())
? session.getEventMeshTCPConfiguration().eventMeshTcpMsgRetrySyncDelayInMills
: session.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryAsyncDelayInMills;
downStreamMsgContext.delay(delayTime);
session.getClientGroupWrapper().get().getEventMeshTcpRetryer().pushRetry(downStreamMsgContext);
} else {
deliveredMsgsCount.incrementAndGet();
logger.info("downstreamMsg success,seq:{}, retryTimes:{}, bizSeq:{}", downStreamMsgContext.seq, downStreamMsgContext.retryTimes, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt));
logger.info("downstreamMsg success,seq:{}, retryTimes:{}, bizSeq:{}", downStreamMsgContext.seq,
downStreamMsgContext.retryTimes, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt));

if (session.isIsolated()) {
logger.info("cancel isolated,client:{}", session.getClient());
Expand Down
Loading

0 comments on commit 8c450e1

Please sign in to comment.