Skip to content

Commit

Permalink
Merge pull request #1725 from hanbingleixue/xds-flowcontrol-service-1
Browse files Browse the repository at this point in the history
Fix some bugs in xds flow control
  • Loading branch information
Sherlockhan authored Jan 3, 2025
2 parents c2d5831 + 3626542 commit ecdfb43
Show file tree
Hide file tree
Showing 30 changed files with 318 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ runs:
shell: bash
run: |
sudo apt-get install gnupg curl
curl -fsSL https://www.mongodb.org/static/pgp/server-7.0.asc | \
curl -fsSL -k https://www.mongodb.org/static/pgp/server-7.0.asc | \
sudo gpg -o /usr/share/keyrings/mongodb-server-7.0.gpg \
--dearmor
echo "deb [ arch=amd64,arm64 signed-by=/usr/share/keyrings/mongodb-server-7.0.gpg ] https://repo.mongodb.org/apt/ubuntu jammy/mongodb-org/7.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-7.0.list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.sermant.core.service.xds.entity;

import java.util.List;

/**
* Xds Retry Policy information
*
Expand All @@ -37,12 +39,7 @@ public class XdsRetryPolicy {
/**
* Specifies the conditions under which retry takes place
*/
private String retryOn;

/**
* Specifies the conditions under which retry takes place
*/
private String retryHostPredicate;
private List<String> retryConditions;

public long getMaxAttempts() {
return maxAttempts;
Expand All @@ -60,19 +57,11 @@ public void setPerTryTimeout(long perTryTimeout) {
this.perTryTimeout = perTryTimeout;
}

public String getRetryOn() {
return retryOn;
}

public void setRetryOn(String retryOn) {
this.retryOn = retryOn;
}

public String getRetryHostPredicate() {
return retryHostPredicate;
public List<String> getRetryConditions() {
return retryConditions;
}

public void setRetryHostPredicate(String retryHostPredicate) {
this.retryHostPredicate = retryHostPredicate;
public void setRetryConditions(List<String> retryConditions) {
this.retryConditions = retryConditions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.junit.Assert;
import org.junit.Test;

import java.util.Collections;

/**
* XdsRetryPolicyTest
*
Expand All @@ -29,12 +31,10 @@ public class XdsRetryPolicyTest {
@Test
public void testXdsRetryPolicy() {
XdsRetryPolicy xdsRetryPolicy = new XdsRetryPolicy();
xdsRetryPolicy.setRetryHostPredicate("PreviousHostsPredicate");
xdsRetryPolicy.setRetryOn("503");
xdsRetryPolicy.setRetryConditions(Collections.singletonList("503"));
xdsRetryPolicy.setMaxAttempts(8);
xdsRetryPolicy.setPerTryTimeout(2000);
Assert.assertEquals("PreviousHostsPredicate", xdsRetryPolicy.getRetryHostPredicate());
Assert.assertEquals("503", xdsRetryPolicy.getRetryOn());
Assert.assertEquals("503", xdsRetryPolicy.getRetryConditions().get(0));
Assert.assertEquals(8, xdsRetryPolicy.getMaxAttempts());
Assert.assertEquals(2000, xdsRetryPolicy.getPerTryTimeout());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.envoyproxy.envoy.extensions.filters.http.fault.v3.HTTPFault;
import io.envoyproxy.envoy.type.matcher.v3.StringMatcher;
import io.envoyproxy.envoy.type.v3.FractionalPercent;
import io.sermant.core.common.CommonConstant;
import io.sermant.core.common.LoggerFactory;
import io.sermant.core.service.xds.entity.XdsAbort;
import io.sermant.core.service.xds.entity.XdsDelay;
Expand Down Expand Up @@ -68,6 +69,7 @@

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -264,13 +266,12 @@ private static XdsClusterWeight parseClusterWeight(ClusterWeight clusterWeight)

private static XdsRetryPolicy parseRetryPolicy(RetryPolicy retryPolicy) {
XdsRetryPolicy xdsRetryPolicy = new XdsRetryPolicy();
xdsRetryPolicy.setRetryOn(retryPolicy.getRetryOn());
if (!StringUtils.isEmpty(retryPolicy.getRetryOn())) {
xdsRetryPolicy.setRetryConditions(Arrays.asList(retryPolicy.getRetryOn().split(CommonConstant.COMMA)));
}
xdsRetryPolicy.setMaxAttempts(retryPolicy.getHostSelectionRetryMaxAttempts());
long perTryTimeout = Duration.ofSeconds(retryPolicy.getPerTryTimeout().getSeconds()).toMillis();
xdsRetryPolicy.setPerTryTimeout(perTryTimeout);
if (retryPolicy.getRetryHostPredicateCount() != 0) {
xdsRetryPolicy.setRetryHostPredicate(retryPolicy.getRetryHostPredicate(0).getName());
}
return xdsRetryPolicy;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@

import io.sermant.core.common.LoggerFactory;
import io.sermant.core.plugin.config.PluginConfigManager;
import io.sermant.core.service.xds.entity.XdsRetryPolicy;
import io.sermant.core.utils.CollectionUtils;
import io.sermant.flowcontrol.common.config.FlowControlConfig;
import io.sermant.flowcontrol.common.support.ReflectMethodCacheSupport;
import io.sermant.flowcontrol.common.xds.retry.RetryCondition;
import io.sermant.flowcontrol.common.xds.retry.RetryConditionType;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -63,16 +67,62 @@ protected final Class<? extends Throwable>[] findClass(String[] classNames) {
}

@Override
public boolean needRetry(Set<String> statusList, Object result) {
public boolean isNeedRetry(Set<String> statusList, Object result) {
if (result == null) {
return false;
}
final Optional<String> code = getCode(result);
return code.filter(statusList::contains).isPresent();
}

@Override
public boolean isNeedRetry(Object result, XdsRetryPolicy retryPolicy) {
if (result == null) {
return false;
}
List<String> conditions = retryPolicy.getRetryConditions();
if (CollectionUtils.isEmpty(conditions)) {
return false;
}
Optional<String> statusCodeOptional = this.getCode(result);
if (!statusCodeOptional.isPresent()) {
return false;
}
String statusCode = statusCodeOptional.get();
if (conditions.contains(statusCode)) {
return true;
}
for (String conditionName : conditions) {
Optional<RetryCondition> retryConditionOptional = RetryConditionType.getRetryConditionByName(conditionName);
if (!retryConditionOptional.isPresent()) {
continue;
}
if (retryConditionOptional.get().needRetry(this, null, statusCode, result)) {
return true;
}
}
return false;
}

@Override
public boolean isNeedRetry(Throwable ex, XdsRetryPolicy retryPolicy) {
if (ex == null) {
return false;
}
for (String conditionName : retryPolicy.getRetryConditions()) {
Optional<RetryCondition> retryConditionOptional = RetryConditionType.getRetryConditionByName(conditionName);
if (!retryConditionOptional.isPresent()) {
continue;
}
if (retryConditionOptional.get().needRetry(null, ex, null, null)) {
return true;
}
}
return false;
}

/**
* implemented by subclasses, if subclass implement {@link #needRetry(Set, Object)}, no need to implement the get
* implemented by subclasses, if subclass implement {@link #isNeedRetry(Set, Object)}, no need to implement the get
* code method
*
* @param result interface response result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package io.sermant.flowcontrol.common.handler.retry;

import io.sermant.core.service.xds.entity.XdsRetryPolicy;

import java.util.Optional;
import java.util.Set;

Expand All @@ -28,13 +30,33 @@
*/
public interface Retry {
/**
* needToRetry
* Retry based on the request result. Retrying is required if the request status is in the statusList.
*
* @param statusList List of status codes, valid only for http applications
* @param result responseResult
* @return retryOrNot
*/
boolean needRetry(Set<String> statusList, Object result);
boolean isNeedRetry(Set<String> statusList, Object result);

/**
* Retry based on the request result. If the request result meets the retry conditions in the retry policy,
* a retry will be executed
*
* @param result responseResult
* @param retryPolicy retry policy information
* @return retryOrNot
*/
boolean isNeedRetry(Object result, XdsRetryPolicy retryPolicy);

/**
* Retry based on the throwable. If the throwable during the execution of the request method meets the retry
* conditions in the retry policy, a retry will be executed
*
* @param throwable Exception thrown during retry
* @param retryPolicy Xds Retry Policy information
* @return retryOrNot
*/
boolean isNeedRetry(Throwable throwable, XdsRetryPolicy retryPolicy);

/**
* define which exceptions need to be retried
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,13 @@ private XdsRouterUtils() {
*/
public static Optional<XdsLocality> getLocalityInfoOfSelfService() {
if (localityObtainedFlag) {
return Optional.of(selfServiceLocality);
return Optional.ofNullable(selfServiceLocality);
}
synchronized (XdsRouterUtils.class) {
if (localityObtainedFlag) {
return Optional.of(selfServiceLocality);
return Optional.ofNullable(selfServiceLocality);
}
localityObtainedFlag = true;
String podIp = NetworkUtils.getKubernetesPodIp();
if (StringUtils.isEmpty(podIp)) {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import io.sermant.flowcontrol.common.entity.FlowControlScenario;

import java.net.HttpURLConnection;

/**
* xds thread local utility class
*
Expand All @@ -29,6 +31,10 @@ public class XdsThreadLocalUtil {

private static final ThreadLocal<FlowControlScenario> FLOW_CONTROL_SCENARIO_THREAD_LOCAL = new ThreadLocal<>();

private static final ThreadLocal<HttpURLConnection> CONNECTION_IN_CURRENT_THREAD = new ThreadLocal<>();

private static final ThreadLocal<Boolean> IS_CONNECTED = new ThreadLocal<>();

private XdsThreadLocalUtil() {
}

Expand Down Expand Up @@ -57,6 +63,31 @@ public static void removeSendByteFlag() {
SEND_BYTE_FLAG.remove();
}

/**
* save the instance object of HttpURLConnection
*
* @param connection the instance object of HttpURLConnection
*/
public static void saveHttpUrlConnection(HttpURLConnection connection) {
CONNECTION_IN_CURRENT_THREAD.set(connection);
}

/**
* get the instance object of HttpURLConnection
*
* @return the instance object of HttpURLConnection
*/
public static HttpURLConnection getHttpUrlConnection() {
return CONNECTION_IN_CURRENT_THREAD.get();
}

/**
* remove the instance object of HttpURLConnection
*/
public static void removeHttpUrlConnection() {
CONNECTION_IN_CURRENT_THREAD.remove();
}

/**
* Set scenario information
*
Expand All @@ -81,4 +112,29 @@ public static FlowControlScenario getScenarioInfo() {
public static void removeScenarioInfo() {
FLOW_CONTROL_SCENARIO_THREAD_LOCAL.remove();
}

/**
* Set the connection status of the httpUrlConnection
*
* @param executeStatus the execution status of the connect method, true: executed, false: Not executed
*/
public static void setConnectionStatus(boolean executeStatus) {
IS_CONNECTED.set(executeStatus);
}

/**
* Is it already connected
*
* @return connection status
*/
public static boolean isConnected() {
return IS_CONNECTED.get() != null && IS_CONNECTED.get();
}

/**
* remove the connection status of the httpUrlConnection
*/
public static void removeConnectionStatus() {
IS_CONNECTED.remove();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public static void setCircuitBeakerStatus(XdsInstanceCircuitBreakers circuitBrea
XdsCircuitBreakerInfo circuitBreakerInfo = getCircuitBreakerInfo(scenarioInfo.getServiceName(),
scenarioInfo.getRouteName(), scenarioInfo.getAddress());
if (!XdsThreadLocalUtil.getSendByteFlag() && circuitBreakers.isSplitExternalLocalOriginErrors()
&& shouldCircuitBreakerByFailure(circuitBreakerInfo.getGateWayFailure(),
&& shouldCircuitBreakerByFailure(circuitBreakerInfo.getLocalFailure(),
circuitBreakers.getConsecutiveLocalOriginFailure(), circuitBreakers.getInterval())) {
openCircuitBreaker(circuitBreakerInfo, circuitBreakers.getInterval());
}
Expand Down Expand Up @@ -178,7 +178,7 @@ private static void recordRequestTime(Deque<Long> times, int failureRequestThres
if (failureRequestThreshold <= 0) {
return;
}
for (int i = times.size(); i >= failureRequestThreshold - 1 && !times.isEmpty(); i--) {
for (int i = times.size(); i >= failureRequestThreshold && !times.isEmpty(); i--) {
times.removeFirst();
}
times.add(currentTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package io.sermant.flowcontrol.common.handler.retry;

import io.sermant.core.service.xds.entity.XdsRetryPolicy;
import io.sermant.flowcontrol.common.core.rule.RetryRule;

import org.junit.Assert;
Expand Down Expand Up @@ -75,7 +76,17 @@ private void muteRetry(Object instance) {
private Retry buildRetry() {
return new Retry() {
@Override
public boolean needRetry(Set<String> statusList, Object result) {
public boolean isNeedRetry(Set<String> statusList, Object result) {
return false;
}

@Override
public boolean isNeedRetry(Object result, XdsRetryPolicy retryPolicy) {
return false;
}

@Override
public boolean isNeedRetry(Throwable throwable, XdsRetryPolicy retryPolicy) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public void setUp() {
Mockito.when(xdsCoreService.getLoadBalanceService()).thenReturn(xdsLoadBalanceService);
requestCircuitBreakers.setMaxRequests(1000);
instanceCircuitBreakers.setInterval(1000);
retryPolicy.setRetryOn("503");
retryPolicy.setRetryConditions(Collections.singletonList("503"));
rateLimit.setResponseHeaderOption(Collections.singletonList(new XdsHeaderOption()));
XdsDelay delay = new XdsDelay();
delay.setFixedDelay(1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void testGetRetryPolicy() {
final Optional<XdsRetryPolicy> result = XdsHandler.INSTANCE.getRetryPolicy(
SERVICE_NAME, ROUTE_NAME);
Assert.assertTrue(result.isPresent());
Assert.assertEquals(retryPolicy.getRetryOn(), result.get().getRetryOn());
Assert.assertEquals(retryPolicy.getRetryConditions().get(0), result.get().getRetryConditions().get(0));
}

@Test
Expand Down
Loading

0 comments on commit ecdfb43

Please sign in to comment.