Skip to content

Commit

Permalink
feat(alibaba#10727): when the doPushWithCallback push task fails, it …
Browse files Browse the repository at this point in the history
…no longer retries forever, but only the default number of retries.
  • Loading branch information
Bo-Qiu committed Jul 4, 2023
1 parent 7ad446b commit d9c4d7b
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class PushDelayTask extends AbstractDelayTask {

private Set<String> targetClients;

private int retriedCount = 0;

public PushDelayTask(Service service, long delay) {
this.service = service;
pushToAll = true;
Expand All @@ -53,6 +55,25 @@ public PushDelayTask(Service service, long delay, String targetClient) {
setLastProcessTime(System.currentTimeMillis());
}

public PushDelayTask(Service service, long delay, int retriedCount) {
this.service = service;
pushToAll = true;
targetClients = null;
this.retriedCount = retriedCount;
setTaskInterval(delay);
setLastProcessTime(System.currentTimeMillis());
}

public PushDelayTask(Service service, long delay, String targetClient, int retriedCount) {
this.service = service;
this.pushToAll = false;
this.targetClients = new HashSet<>(1);
this.targetClients.add(targetClient);
this.retriedCount = retriedCount;
setTaskInterval(delay);
setLastProcessTime(System.currentTimeMillis());
}

@Override
public void merge(AbstractDelayTask task) {
if (!(task instanceof PushDelayTask)) {
Expand Down Expand Up @@ -80,4 +101,8 @@ public boolean isPushToAll() {
public Set<String> getTargetClients() {
return targetClients;
}

public int getRetriedCount() {
return retriedCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.alibaba.nacos.naming.push.v2.PushDataWrapper;
import com.alibaba.nacos.naming.push.v2.hook.PushResult;
import com.alibaba.nacos.naming.push.v2.hook.PushResultHookHolder;
import com.alibaba.nacos.sys.env.EnvUtil;

import java.util.Collection;

Expand Down Expand Up @@ -74,7 +75,9 @@ public void run() {
}
} catch (Exception e) {
Loggers.PUSH.error("Push task for service" + service.getGroupedServiceName() + " execute failed ", e);
delayTaskEngine.addTask(service, new PushDelayTask(service, 1000L));
if (delayTask.getRetriedCount() < EnvUtil.getPushRetryTimes()) {
delayTaskEngine.addTask(service, new PushDelayTask(service, 1000L, delayTask.getRetriedCount() + 1));
}
}
}

Expand Down Expand Up @@ -156,8 +159,11 @@ public void onFail(Throwable e) {
subscriber.getIp());
if (!(e instanceof NoRequiredRetryException)) {
Loggers.PUSH.error("Reason detail: ", e);
delayTaskEngine.addTask(service,
new PushDelayTask(service, PushConfig.getInstance().getPushTaskRetryDelay(), clientId));
if (delayTask.getRetriedCount() < EnvUtil.getPushRetryTimes()) {
delayTaskEngine.addTask(service,
new PushDelayTask(service, PushConfig.getInstance().getPushTaskRetryDelay(), clientId,
delayTask.getRetriedCount() + 1));
}
}
PushResult result = PushResult
.pushFailed(service, clientId, actualServiceInfo, subscriber, pushCostTime, e, isPushToAll);
Expand Down
2 changes: 2 additions & 0 deletions naming/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,5 @@ server.tomcat.accesslog.enabled=true
server.tomcat.accesslog.pattern=%h %l %u %t "%r" %s %b %D
# default current work dir
server.tomcat.basedir=file:.
# push retry times
push.retry.times=3
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@
import com.alibaba.nacos.naming.monitor.MetricsMonitor;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.push.v2.NoRequiredRetryException;
import com.alibaba.nacos.sys.env.EnvUtil;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.mock.env.MockEnvironment;

import java.util.Collections;
import java.util.Optional;
Expand Down Expand Up @@ -91,6 +93,9 @@ public void setUp() {
when(delayTaskExecuteEngine.getMetadataManager()).thenReturn(metadataManager);
when(metadataManager.getServiceMetadata(service)).thenReturn(Optional.empty());
ApplicationUtils.injectContext(context);
MockEnvironment mockEnvironment = new MockEnvironment();
mockEnvironment.setProperty("push.retry.times", "0");
EnvUtil.setEnvironment(mockEnvironment);
}

@Test
Expand Down Expand Up @@ -140,4 +145,20 @@ public void testRunFailedWithRetry() {
assertEquals(1, MetricsMonitor.getFailedPushMonitor().get());
verify(delayTaskExecuteEngine).addTask(eq(service), any(PushDelayTask.class));
}

@Test
public void testRunFailedWithRetryLimit() {
PushDelayTask delayTask = new PushDelayTask(service, 0L);
PushExecuteTask executeTask = new PushExecuteTask(service, delayTaskExecuteEngine, delayTask);
pushExecutor.setShouldSuccess(false);
pushExecutor.setFailedException(new RuntimeException());
executeTask.run();
assertEquals(1, MetricsMonitor.getFailedPushMonitor().get());
verify(delayTaskExecuteEngine, never()).addTask(eq(service), any(PushDelayTask.class));

EnvUtil.setPushRetryTimes(1);
executeTask.run();
assertEquals(2, MetricsMonitor.getFailedPushMonitor().get());
verify(delayTaskExecuteEngine).addTask(eq(service), any(PushDelayTask.class));
}
}
31 changes: 26 additions & 5 deletions sys/src/main/java/com/alibaba/nacos/sys/env/EnvUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.HashMap;

/**
* Its own configuration information manipulation tool class.
Expand Down Expand Up @@ -103,19 +103,25 @@ public class EnvUtil {
private static final String NACOS_TEMP_DIR_1 = "data";

private static final String NACOS_TEMP_DIR_2 = "tmp";

private static final String NACOS_CUSTOM_ENVIRONMENT_ENABLED = "nacos.custom.environment.enabled";

private static final String NACOS_CUSTOM_CONFIG_NAME = "customFirstNacosConfig";


private static final int DEFAULT_PUSH_RETRY_TIME = 3;

private static final String PUSH_RETRY_TIME = "push.retry.times";

private static Integer pushRetryTimes;

@JustForTest
private static String confPath = "";

@JustForTest
private static String nacosHomePath = null;

private static ConfigurableEnvironment environment;

/**
* customEnvironment.
*/
Expand Down Expand Up @@ -228,6 +234,21 @@ public static void setContextPath(String contextPath) {
EnvUtil.contextPath = contextPath;
}

public static int getPushRetryTimes() {
if (null == pushRetryTimes) {
try {
pushRetryTimes = getProperty(PUSH_RETRY_TIME, Integer.class, DEFAULT_PUSH_RETRY_TIME);
} catch (Exception e) {
pushRetryTimes = DEFAULT_PUSH_RETRY_TIME;
}
}
return pushRetryTimes;
}

public static void setPushRetryTimes(int pushRetryTimes) {
EnvUtil.pushRetryTimes = pushRetryTimes;
}

@JustForTest
public static void setIsStandalone(Boolean isStandalone) {
EnvUtil.isStandalone = isStandalone;
Expand Down

0 comments on commit d9c4d7b

Please sign in to comment.