Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add XDS flow control function #1698

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions sermant-plugins/sermant-flowcontrol/config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@ flow.control.plugin:
xds.flow.control.config:
# Whether to enable Xds flow control
enable: false
retry:
# The specified response status codes that need to be retried. Retry will be performed when the response's status
# code matches one of the specified codes.
x-sermant-retriable-status-codes:
# The specified response header names that need to be retried. Retry will be performed when the response contains
# the specified headers.
x-sermant-retriable-header-names:
# The specified response status codes that need to be retried. Retry will be performed when the response's status
# code matches one of the specified codes.
x-sermant-retriable-status-codes:
# The specified response header names that need to be retried. Retry will be performed when the response contains
# the specified headers.
x-sermant-retriable-header-names:
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ public class XdsFlowControlConfig implements PluginConfig {
/**
* Specify the response code for retry, and retry will be executed when the response code is included
*/
@ConfigFieldKey("retry.x-sermant-retriable-status-codes")
@ConfigFieldKey("x-sermant-retriable-status-codes")
private List<String> retryStatusCodes;

/**
* Specify the response code for retry, and retry will be executed when the response header is included
*/
@ConfigFieldKey("retry.x-sermant-retriable-header-names")
@ConfigFieldKey("x-sermant-retriable-header-names")
private List<String> retryHeaderNames;

/**
Expand Down
21 changes: 21 additions & 0 deletions sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
<netflix-core.version>1.4.7.RELEASE</netflix-core.version>
<spring.cloud.context.version>2.2.0.RELEASE</spring.cloud.context.version>
<google.guava>31.1-jre</google.guava>
<apache-httpclient.version>4.5.13</apache-httpclient.version>
<okhttp.version>4.11.0</okhttp.version>
<okhttp.sq.version>2.7.5</okhttp.sq.version>
</properties>
<dependencies>
<!--compile-->
Expand Down Expand Up @@ -111,6 +114,24 @@
<version>${google.guava}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${apache-httpclient.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>${okhttp.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.squareup.okhttp</groupId>
<artifactId>okhttp</artifactId>
<version>${okhttp.sq.version}</version>
<scope>provided</scope>
</dependency>
<!--test-->
<dependency>
<groupId>junit</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.sermant.flowcontrol;

import io.sermant.core.plugin.agent.entity.ExecuteContext;
import io.sermant.core.utils.CollectionUtils;
import io.sermant.core.utils.LogUtils;
import io.sermant.core.utils.ReflectUtils;
import io.sermant.flowcontrol.common.config.ConfigConst;
Expand All @@ -30,6 +31,7 @@
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -120,13 +122,21 @@ protected final ExecuteContext doBefore(ExecuteContext context) throws Exception
return context;
}
chooseHttpService().onBefore(className, httpRequestEntity.get(), result);
if (result.isSkip()) {
context.skip(null);
final Object response = allArguments[1];
if (response != null) {
setStatus.accept(response, result.getResponse().getCode());
getWriter.apply(response).print(result.buildResponseMsg());
if (!result.isSkip()) {
return context;
}
context.skip(null);
final Object response = allArguments[1];
if (response == null) {
return context;
}
setStatus.accept(response, result.getResponse().getCode());
getWriter.apply(response).print(result.buildResponseMsg());
for (Map.Entry<String, List<String>> entry : result.getResponse().getHeaders().entrySet()) {
if (CollectionUtils.isEmpty(entry.getValue())) {
continue;
}
setResponseHeader(response, entry.getKey(), entry.getValue().get(0));
}
return context;
}
Expand Down Expand Up @@ -167,6 +177,11 @@ private String getHeader(Object httpServletRequest, String key) {
new Object[]{key}).orElse(null);
}

private Optional<Object> setResponseHeader(Object httpServletResponse, String key, String value) {
return ReflectUtils.invokeMethod(httpServletResponse, "setHeader",
new Class[]{String.class, String.class}, new Object[]{key, value});
}

private PrintWriter getWriter(Object httpServletRequest) {
return (PrintWriter) ReflectUtils.invokeMethodWithNoneParameter(httpServletRequest, "getWriter")
.orElse(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,12 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -209,10 +212,29 @@ public static class FeignRetry extends AbstractRetry {

@Override
public Optional<String> getCode(Object result) {
Optional<Object> resultOptional = getMethodResult(result, "status");
return resultOptional.map(String::valueOf);
}

@Override
public Optional<Set<String>> getHeaderNames(Object result) {
Optional<Object> resultOptional = getMethodResult(result, "headers");
if (!resultOptional.isPresent() || !(resultOptional.get() instanceof Map)) {
return Optional.empty();
}
Map<?, ?> headers = (Map<?, ?>) resultOptional.get();
Set<String> headerNames = new HashSet<>();
for (Map.Entry<?, ?> entry : headers.entrySet()) {
headerNames.add(entry.getKey().toString());
}
return Optional.of(headerNames);
}

private Optional<Object> getMethodResult(Object result, String methodName) {
final Optional<Method> status = getInvokerMethod(result.getClass().getName() + METHOD_KEY, fn -> {
final Method method;
try {
method = result.getClass().getDeclaredMethod("status");
method = result.getClass().getDeclaredMethod(methodName);
method.setAccessible(true);
return method;
} catch (NoSuchMethodException ex) {
Expand All @@ -225,7 +247,7 @@ public Optional<String> getCode(Object result) {
return Optional.empty();
}
try {
return Optional.of(String.valueOf(status.get().invoke(result)));
return Optional.of(status.get().invoke(result));
} catch (IllegalAccessException ex) {
LOGGER.warning(String.format(Locale.ENGLISH, "Can not find method status from class [%s]!",
result.getClass().getCanonicalName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.sermant.flowcontrol.common.entity.RequestEntity.RequestType;
import io.sermant.flowcontrol.common.handler.retry.AbstractRetry;
import io.sermant.flowcontrol.common.handler.retry.RetryContext;
import io.sermant.flowcontrol.common.util.XdsThreadLocalUtil;
import io.sermant.flowcontrol.inject.DefaultClientHttpResponse;
import io.sermant.flowcontrol.inject.RetryClientHttpResponse;
import io.sermant.flowcontrol.service.InterceptorSupporter;
Expand All @@ -39,9 +40,12 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import java.util.logging.Logger;

Expand Down Expand Up @@ -95,12 +99,12 @@ protected final ExecuteContext doBefore(ExecuteContext context) {
if (flowControlResult.isSkip()) {
context.skip(new DefaultClientHttpResponse(flowControlResult));
} else {
tryExeWithRetry(context);
tryExeWithRetry(context, httpRequestEntity.get());
}
return context;
}

private void tryExeWithRetry(ExecuteContext context) {
private void tryExeWithRetry(ExecuteContext context, HttpRequestEntity httpRequestEntity) {
final Object[] allArguments = context.getArguments();
final HttpRequest request = (HttpRequest) context.getObject();
Object result;
Expand All @@ -118,12 +122,8 @@ private void tryExeWithRetry(ExecuteContext context) {
}
context.afterMethod(result, ex);
try {
final Optional<HttpRequestEntity> httpRequestEntity = convertToHttpEntity(request);
if (!httpRequestEntity.isPresent()) {
return;
}
RetryContext.INSTANCE.buildRetryPolicy(httpRequestEntity.get());
final List<Retry> handlers = getRetryHandler().getHandlers(httpRequestEntity.get());
RetryContext.INSTANCE.buildRetryPolicy(httpRequestEntity);
final List<Retry> handlers = getRetryHandler().getHandlers(httpRequestEntity);
if (!handlers.isEmpty() && needRetry(handlers.get(0), result, ex)) {
// retry only one policy
request.getHeaders().add(RETRY_KEY, RETRY_VALUE);
Expand All @@ -143,6 +143,7 @@ private void tryExeWithRetry(ExecuteContext context) {
@Override
protected ExecuteContext doThrow(ExecuteContext context) {
chooseHttpService().onThrow(className, context.getThrowable());
XdsThreadLocalUtil.removeSendByteFlag();
return context;
}

Expand All @@ -155,6 +156,7 @@ protected final ExecuteContext doAfter(ExecuteContext context) throws IOExceptio
chooseHttpService().onThrow(className, defaultException);
}
chooseHttpService().onAfter(className, context.getResult());
XdsThreadLocalUtil.removeSendByteFlag();
return context;
}

Expand All @@ -176,9 +178,38 @@ public static class HttpRetry extends AbstractRetry {

@Override
public Optional<String> getCode(Object result) {
Optional<Object> resultOptional = getMethodResult(result, "getRawStatusCode");
return resultOptional.map(String::valueOf);
}

@Override
public Optional<Set<String>> getHeaderNames(Object result) {
Optional<Object> resultOptional = getMethodResult(result, "getHeaders");
if (!resultOptional.isPresent() || !(resultOptional.get() instanceof Map)) {
return Optional.empty();
}
Map<?, ?> headers = (Map<?, ?>) resultOptional.get();
Set<String> headerNames = new HashSet<>();
for (Map.Entry<?, ?> entry : headers.entrySet()) {
headerNames.add(entry.getKey().toString());
}
return Optional.of(headerNames);
}

@Override
public Class<? extends Throwable>[] retryExceptions() {
return getRetryExceptions();
}

@Override
public RetryFramework retryType() {
return RetryFramework.SPRING_CLOUD;
}

private Optional<Object> getMethodResult(Object result, String methodName) {
final Optional<Method> getRawStatusCode = getInvokerMethod(result.getClass().getName() + METHOD_KEY, fn -> {
try {
final Method method = result.getClass().getDeclaredMethod("getRawStatusCode");
final Method method = result.getClass().getDeclaredMethod(methodName);
method.setAccessible(true);
return method;
} catch (NoSuchMethodException ex) {
Expand All @@ -192,7 +223,7 @@ public Optional<String> getCode(Object result) {
return Optional.empty();
}
try {
return Optional.of(String.valueOf(getRawStatusCode.get().invoke(result)));
return Optional.of(getRawStatusCode.get().invoke(result));
} catch (IllegalAccessException ex) {
LOGGER.warning(String.format(Locale.ENGLISH,
"Can not find method getRawStatusCode from class [%s]!",
Expand All @@ -203,15 +234,5 @@ public Optional<String> getCode(Object result) {
}
return Optional.empty();
}

@Override
public Class<? extends Throwable>[] retryExceptions() {
return getRetryExceptions();
}

@Override
public RetryFramework retryType() {
return RetryFramework.SPRING_CLOUD;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@

package io.sermant.flowcontrol.retry.handler;

import io.sermant.core.service.xds.entity.XdsRetryPolicy;
import io.sermant.core.utils.CollectionUtils;
import io.sermant.core.utils.ReflectUtils;
import io.sermant.core.utils.StringUtils;
import io.sermant.flowcontrol.common.core.rule.RetryRule;
import io.sermant.flowcontrol.common.exception.InvokerWrapperException;
import io.sermant.flowcontrol.common.handler.retry.Retry;
import io.sermant.flowcontrol.common.xds.retry.RetryCondition;
import io.sermant.flowcontrol.common.xds.retry.RetryConditionType;

import java.io.IOException;
import java.net.ConnectException;
Expand Down Expand Up @@ -67,6 +72,12 @@ public Predicate<Throwable> createExceptionPredicate(Class<? extends Throwable>[
.orElseGet(() -> throwable -> true);
}

@Override
public Predicate<Throwable> createExceptionPredicate(Class<? extends Throwable>[] retryExceptions,
XdsRetryPolicy policy) {
return (Throwable ex) -> needRetry(ex, policy);
}

private Predicate<Throwable> createExceptionPredicate(Class<? extends Throwable> retryClass) {
return (Throwable ex) -> {
if (retryClass.isAssignableFrom(getRealExceptionClass(ex))) {
Expand Down Expand Up @@ -120,4 +131,58 @@ public Predicate<Object> createResultPredicate(Retry retry, RetryRule rule) {
}
return result -> retry.needRetry(new HashSet<>(retryOnResponseStatus), result);
}

@Override
public Predicate<Object> createResultPredicate(Retry retry, XdsRetryPolicy xdsRetryPolicy) {
return result -> needRetry(retry, result, xdsRetryPolicy);
}

private boolean needRetry(Retry retry, Object result, XdsRetryPolicy retryPolicy) {
List<String> conditions = getRetryConditions(retryPolicy);
if (CollectionUtils.isEmpty(conditions)) {
return false;
}
Optional<String> statusCodeOptional = retry.getCode(result);
if (!statusCodeOptional.isPresent()) {
return false;
}
String statusCode = statusCodeOptional.get();
if (conditions.contains(statusCode)) {
return true;
}
for (String conditionName : conditions) {
Optional<RetryCondition> retryConditionOptional = RetryConditionType.getRetryConditionByName(conditionName);
if (!retryConditionOptional.isPresent()) {
continue;
}
boolean needRetry = retryConditionOptional.get().needRetry(retry, null, statusCode, result);
if (needRetry) {
return true;
}
}
return false;
}

private boolean needRetry(Throwable ex, XdsRetryPolicy retryPolicy) {
List<String> conditions = getRetryConditions(retryPolicy);
for (String conditionName : conditions) {
Optional<RetryCondition> retryConditionOptional = RetryConditionType.getRetryConditionByName(conditionName);
if (!retryConditionOptional.isPresent()) {
continue;
}
boolean needRetry = retryConditionOptional.get().needRetry(null, ex, null, null);
if (needRetry) {
return true;
}
}
return false;
}

private static List<String> getRetryConditions(XdsRetryPolicy xdsRetryPolicy) {
String retryOn = xdsRetryPolicy.getRetryOn();
if (StringUtils.isExist(retryOn)) {
return Arrays.asList(retryOn.split(","));
}
return Collections.emptyList();
}
}
Loading
Loading