From aaf7d5702b3d57e33a1a9d495a5420b227696ba0 Mon Sep 17 00:00:00 2001 From: hanbingleixue Date: Mon, 6 Jan 2025 11:43:42 +0800 Subject: [PATCH] Optimize the performance of xds flow control functionality Signed-off-by: hanbingleixue --- .../common/config/CommonConst.java | 10 ++ .../core/match/XdsRouteMatchManager.java | 8 +- .../handler/AbstractRequestHandler.java | 24 ---- .../common/handler/retry/AbstractRetry.java | 4 +- .../common/handler/retry/Retry.java | 4 +- .../common/handler/retry/RetryContext.java | 20 +-- .../xds/circuit/XdsCircuitBreakerManager.java | 4 +- .../core/match/XdsRouteMatchManagerTest.java | 1 - .../circuit/XdsCircuitBreakerManagerTest.java | 42 +++++- .../ratelimit/XdsRateLimitManagerTest.java | 58 ++++++++ .../AbstractXdsHttpClientInterceptor.java | 82 +++++++----- .../DispatcherServletInterceptor.java | 38 +++++- .../retry/AlibabaDubboInvokerInterceptor.java | 2 +- .../retry/ApacheDubboInvokerInterceptor.java | 2 +- .../retry/FeignRequestInterceptor.java | 2 +- .../retry/HttpRequestInterceptor.java | 2 +- .../retry/client/HttpClient4xInterceptor.java | 6 +- .../HttpUrlConnectionConnectInterceptor.java | 7 +- ...ttpUrlConnectionDisconnectInterceptor.java | 2 +- ...rlConnectionResponseStreamInterceptor.java | 55 +++++--- .../client/OkHttp3ClientInterceptor.java | 16 +-- ...HttpClientInterceptorChainInterceptor.java | 6 +- .../retry/handler/RetryHandlerV2.java | 55 +++++--- .../service/InterceptorSupporter.java | 24 +++- .../rest4j/XdsHttpFlowControlService.java | 57 ++++++++ .../DispatcherServletInterceptorTest.java | 3 + .../res4j/chain/AbstractChainHandler.java | 2 - .../res4j/chain/AbstractXdsChainHandler.java | 125 ++++++++++++++++++ .../res4j/chain/HandlerChainBuilder.java | 22 ++- .../res4j/chain/HandlerChainEntry.java | 6 +- .../res4j/chain/XdsHandlerChain.java | 47 +++++++ .../res4j/chain/XdsHandlerChainEntry.java | 84 ++++++++++++ .../res4j/chain/XdsRequestHandler.java | 61 +++++++++ .../chain/handler/BusinessRequestHandler.java | 38 ++++-- .../XdsBusinessClientRequestHandler.java | 36 ++--- .../XdsBusinessServerRequestHandler.java | 44 +++--- .../chain/handler/XdsFaultRequestHandler.java | 31 ++--- .../handler/XdsRateLimitRequestHandler.java | 27 ++-- .../XdsHttpFlowControlServiceImpl.java | 46 +++++++ ....sermant.core.plugin.service.PluginService | 1 + ...owcontrol.res4j.chain.AbstractChainHandler | 4 - ...ontrol.res4j.chain.AbstractXdsChainHandler | 20 +++ 42 files changed, 875 insertions(+), 253 deletions(-) create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/ratelimit/XdsRateLimitManagerTest.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/service/rest4j/XdsHttpFlowControlService.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/AbstractXdsChainHandler.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/XdsHandlerChain.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/XdsHandlerChainEntry.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/XdsRequestHandler.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/service/XdsHttpFlowControlServiceImpl.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/resources/META-INF/services/io.sermant.flowcontrol.res4j.chain.AbstractXdsChainHandler diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/config/CommonConst.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/config/CommonConst.java index 86aa5cbeaa..4a9b69d9b2 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/config/CommonConst.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/config/CommonConst.java @@ -284,6 +284,16 @@ public class CommonConst { */ public static final int DEFAULT_RESPONSE_CODE = -1; + /** + * the key of Scenario information for flow control + */ + public static final String SCENARIO_INFO = "flowControlScenario"; + + /** + * the key of request-information + */ + public static final String REQUEST_INFO = "REQUEST_INFO"; + private CommonConst() { } } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/core/match/XdsRouteMatchManager.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/core/match/XdsRouteMatchManager.java index bbb0570dac..fb9ca13df7 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/core/match/XdsRouteMatchManager.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/core/match/XdsRouteMatchManager.java @@ -91,8 +91,12 @@ private boolean isPathMatched(XdsPathMatcher matcher, String path) { } private boolean isHeadersMatched(List matchers, Map headers) { - return matchers.stream() - .allMatch(xdsHeaderMatcher -> xdsHeaderMatcher.isMatch(headers)); + for (XdsHeaderMatcher xdsHeaderMatcher : matchers) { + if (!xdsHeaderMatcher.isMatch(headers)) { + return false; + } + } + return true; } private String selectClusterByRoute(XdsRoute matchedRoute) { diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/AbstractRequestHandler.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/AbstractRequestHandler.java index 9c3ac10f5e..6ab9f4f770 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/AbstractRequestHandler.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/AbstractRequestHandler.java @@ -20,9 +20,7 @@ import io.sermant.flowcontrol.common.core.ResolverManager; import io.sermant.flowcontrol.common.core.match.MatchManager; import io.sermant.flowcontrol.common.core.resolver.AbstractResolver; -import io.sermant.flowcontrol.common.entity.FlowControlScenario; import io.sermant.flowcontrol.common.entity.RequestEntity; -import io.sermant.flowcontrol.common.util.StringUtils; import java.util.Collections; import java.util.List; @@ -71,17 +69,6 @@ public List getHandlers(RequestEntity request) { return createOrGetHandlers(businessNames); } - /** - * gets the specified request handler - * - * @param flowControlScenario matched scenario information - * @return handler - */ - public List getXdsHandlers(FlowControlScenario flowControlScenario) { - Optional handlerOptions = createHandler(flowControlScenario, StringUtils.EMPTY); - return handlerOptions.map(Collections::singletonList).orElse(Collections.emptyList()); - } - /** * create handler * @@ -114,17 +101,6 @@ private Optional create(String businessName) { */ protected abstract Optional createHandler(String businessName, R rule); - /** - * create handler - * - * @param flowControlScenario matched business information - * @param businessName service scenario name - * @return handler - */ - public Optional createHandler(FlowControlScenario flowControlScenario, String businessName) { - return Optional.empty(); - } - /** * get configuration key * diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/AbstractRetry.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/AbstractRetry.java index 83ed5af6ec..d9b31925c6 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/AbstractRetry.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/AbstractRetry.java @@ -130,7 +130,7 @@ public boolean isNeedRetry(Throwable ex, XdsRetryPolicy retryPolicy) { * @throws UnsupportedOperationException unsupported operation */ public Optional getCode(Object result) { - throw new UnsupportedOperationException(); + return Optional.empty(); } /** @@ -141,7 +141,7 @@ public Optional getCode(Object result) { * @throws UnsupportedOperationException unsupported operation */ public Optional> getHeaderNames(Object result) { - throw new UnsupportedOperationException(); + return Optional.empty(); } /** diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/Retry.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/Retry.java index ffb2139fb7..0e25cbfea1 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/Retry.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/Retry.java @@ -73,7 +73,7 @@ public interface Retry { RetryFramework retryType(); /** - * get status code + * get status code * * @param result interface response result * @return response status code @@ -81,7 +81,7 @@ public interface Retry { Optional getCode(Object result); /** - * get header + * get header * * @param result interface response result * @return response header names diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/RetryContext.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/RetryContext.java index d56c706cae..566dabf138 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/RetryContext.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/RetryContext.java @@ -21,15 +21,11 @@ import io.sermant.flowcontrol.common.core.RuleUtils; import io.sermant.flowcontrol.common.core.resolver.RetryResolver; import io.sermant.flowcontrol.common.core.rule.RetryRule; -import io.sermant.flowcontrol.common.entity.FlowControlScenario; import io.sermant.flowcontrol.common.entity.HttpRequestEntity; import io.sermant.flowcontrol.common.handler.retry.policy.RetryOnUntriedPolicy; import io.sermant.flowcontrol.common.handler.retry.policy.RetryPolicy; -import io.sermant.flowcontrol.common.util.StringUtils; -import io.sermant.flowcontrol.common.xds.handler.XdsHandler; import java.util.List; -import java.util.Optional; /** * Retry context, used to manage retry policies based on different host framework types @@ -141,21 +137,11 @@ public void buildRetryPolicy(HttpRequestEntity requestEntity) { } /** - * build test strategy + * build retry policy * - * @param scenario scenario information + * @param retryPolicy retry policy information */ - public void buildXdsRetryPolicy(FlowControlScenario scenario) { - if (StringUtils.isEmpty(scenario.getServiceName()) - || StringUtils.isEmpty(scenario.getRouteName())) { - return; - } - Optional retryPolicyOptional = XdsHandler.INSTANCE - .getRetryPolicy(scenario.getServiceName(), scenario.getRouteName()); - if (!retryPolicyOptional.isPresent()) { - return; - } - XdsRetryPolicy retryPolicy = retryPolicyOptional.get(); + public void buildXdsRetryPolicy(XdsRetryPolicy retryPolicy) { policyThreadLocal.set(new RetryOnUntriedPolicy((int) retryPolicy.getMaxAttempts())); } } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerManager.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerManager.java index c8c720890f..0ab666f192 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerManager.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerManager.java @@ -65,13 +65,13 @@ public static int incrementActiveRequests(String serviceName, String clusterName } /** - * decrement Active Request + * decrease Active Request * * @param serviceName service name * @param clusterName route name * @param address request address */ - public static void decrementActiveRequests(String serviceName, String clusterName, String address) { + public static void decreaseActiveRequests(String serviceName, String clusterName, String address) { getActiveRequestCount(serviceName, clusterName, address).decrementAndGet(); } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/core/match/XdsRouteMatchManagerTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/core/match/XdsRouteMatchManagerTest.java index 1ccdc049fc..55cd341dc1 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/core/match/XdsRouteMatchManagerTest.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/core/match/XdsRouteMatchManagerTest.java @@ -19,7 +19,6 @@ import io.sermant.flowcontrol.common.entity.FlowControlScenario; import io.sermant.flowcontrol.common.entity.HttpRequestEntity; import io.sermant.flowcontrol.common.entity.RequestEntity; - import io.sermant.flowcontrol.common.util.XdsAbstractTest; import org.junit.Assert; import org.junit.Test; diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerManagerTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerManagerTest.java index 6c895e39c6..c15612fd28 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerManagerTest.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerManagerTest.java @@ -18,6 +18,7 @@ import io.sermant.core.service.xds.entity.XdsInstanceCircuitBreakers; import io.sermant.flowcontrol.common.entity.FlowControlScenario; +import io.sermant.flowcontrol.common.util.XdsThreadLocalUtil; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -43,12 +44,12 @@ public class XdsCircuitBreakerManagerTest { public void testActiveRequests() { assertEquals(1, XdsCircuitBreakerManager.incrementActiveRequests(SERVICE_NAME, CLUSTER_NAME, ADDRESS)); assertEquals(2, XdsCircuitBreakerManager.incrementActiveRequests(SERVICE_NAME, CLUSTER_NAME, ADDRESS)); - XdsCircuitBreakerManager.decrementActiveRequests(SERVICE_NAME, CLUSTER_NAME, ADDRESS); + XdsCircuitBreakerManager.decreaseActiveRequests(SERVICE_NAME, CLUSTER_NAME, ADDRESS); assertEquals(2, XdsCircuitBreakerManager.incrementActiveRequests(SERVICE_NAME, CLUSTER_NAME, ADDRESS)); } @Test - public void testCircuitBreaker() { + public void testCircuitBreaker() throws InterruptedException { final FlowControlScenario scenarioInfo = new FlowControlScenario(); scenarioInfo.setServiceName(SERVICE_NAME); scenarioInfo.setClusterName(CLUSTER_NAME); @@ -56,16 +57,49 @@ public void testCircuitBreaker() { scenarioInfo.setAddress(ADDRESS); final XdsInstanceCircuitBreakers circuitBreakers = new XdsInstanceCircuitBreakers(); circuitBreakers.setSplitExternalLocalOriginErrors(false); - circuitBreakers.setConsecutiveLocalOriginFailure(1); - circuitBreakers.setConsecutiveGatewayFailure(1); + circuitBreakers.setConsecutiveLocalOriginFailure(0); + circuitBreakers.setConsecutiveGatewayFailure(0); circuitBreakers.setConsecutive5xxFailure(1); circuitBreakers.setInterval(1000L); circuitBreakers.setBaseEjectionTime(10000L); boolean result = XdsCircuitBreakerManager.needsInstanceCircuitBreaker(scenarioInfo, ADDRESS); assertFalse(result); + + // Test the number of errors from the server reached the threshold XdsCircuitBreakerManager.recordFailureRequest(scenarioInfo, ADDRESS, 500, circuitBreakers); XdsCircuitBreakerManager.setCircuitBeakerStatus(circuitBreakers, scenarioInfo); result = XdsCircuitBreakerManager.needsInstanceCircuitBreaker(scenarioInfo, ADDRESS); assertTrue(result); + + // Test whether the circuit breaker time has been exceeded and whether it has been restored + Thread.sleep(1100L); + result = XdsCircuitBreakerManager.needsInstanceCircuitBreaker(scenarioInfo, ADDRESS); + assertFalse(result); + + // Test the number of errors from the local source reached the threshold + circuitBreakers.setSplitExternalLocalOriginErrors(true); + circuitBreakers.setConsecutiveLocalOriginFailure(1); + XdsThreadLocalUtil.setConnectionStatus(false); + XdsCircuitBreakerManager.recordFailureRequest(scenarioInfo, ADDRESS, -1, circuitBreakers); + XdsCircuitBreakerManager.setCircuitBeakerStatus(circuitBreakers, scenarioInfo); + result = XdsCircuitBreakerManager.needsInstanceCircuitBreaker(scenarioInfo, ADDRESS); + assertTrue(result); + + // Test whether the actual circuit breaker time is the product of the number of circuit breakers multiplied + // by the configured circuit breaker time + Thread.sleep(1100L); + result = XdsCircuitBreakerManager.needsInstanceCircuitBreaker(scenarioInfo, ADDRESS); + assertTrue(result); + Thread.sleep(1000L); + result = XdsCircuitBreakerManager.needsInstanceCircuitBreaker(scenarioInfo, ADDRESS); + assertFalse(result); + + // Test the number of errors from the gateway reached the threshold + circuitBreakers.setConsecutive5xxFailure(0); + circuitBreakers.setConsecutiveGatewayFailure(1); + XdsCircuitBreakerManager.recordFailureRequest(scenarioInfo, ADDRESS, 503, circuitBreakers); + XdsCircuitBreakerManager.setCircuitBeakerStatus(circuitBreakers, scenarioInfo); + result = XdsCircuitBreakerManager.needsInstanceCircuitBreaker(scenarioInfo, ADDRESS); + assertTrue(result); } } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/ratelimit/XdsRateLimitManagerTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/ratelimit/XdsRateLimitManagerTest.java new file mode 100644 index 0000000000..8dd05e19e3 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/ratelimit/XdsRateLimitManagerTest.java @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.xds.ratelimit; + +import io.sermant.core.service.xds.entity.XdsTokenBucket; +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * RateLimitManager Test + * + * @author zhp + * @since 2024-12-02 + */ +public class XdsRateLimitManagerTest { + private static final String SERVICE_NAME = "serviceA"; + + private static final String ROUTE_NAME = "routeA"; + + @Test + public void testFillAndConsumeToken() throws InterruptedException { + final XdsTokenBucket tokenBucket = new XdsTokenBucket(); + tokenBucket.setMaxTokens(1); + tokenBucket.setTokensPerFill(1); + tokenBucket.setFillInterval(1000L); + boolean result = XdsRateLimitManager.fillAndConsumeToken(SERVICE_NAME, ROUTE_NAME, tokenBucket); + assertTrue(result); + + // The situation where all tokens have been consumed + result = XdsRateLimitManager.fillAndConsumeToken(SERVICE_NAME, ROUTE_NAME, tokenBucket); + assertFalse(result); + Thread.sleep(1000L); + + // Test token refill situation + result = XdsRateLimitManager.fillAndConsumeToken(SERVICE_NAME, ROUTE_NAME, tokenBucket); + assertTrue(result); + + // Test all refilled tokens have been consumed + result = XdsRateLimitManager.fillAndConsumeToken(SERVICE_NAME, ROUTE_NAME, tokenBucket); + assertFalse(result); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/AbstractXdsHttpClientInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/AbstractXdsHttpClientInterceptor.java index 50e67f9d27..4779349f69 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/AbstractXdsHttpClientInterceptor.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/AbstractXdsHttpClientInterceptor.java @@ -22,9 +22,11 @@ import io.sermant.core.service.xds.entity.ServiceInstance; import io.sermant.core.service.xds.entity.XdsInstanceCircuitBreakers; import io.sermant.core.service.xds.entity.XdsRequestCircuitBreakers; +import io.sermant.core.service.xds.entity.XdsRetryPolicy; import io.sermant.core.utils.CollectionUtils; import io.sermant.flowcontrol.common.config.CommonConst; import io.sermant.flowcontrol.common.entity.FlowControlScenario; +import io.sermant.flowcontrol.common.entity.RequestEntity; import io.sermant.flowcontrol.common.exception.InvokerWrapperException; import io.sermant.flowcontrol.common.handler.retry.RetryContext; import io.sermant.flowcontrol.common.handler.retry.policy.RetryPolicy; @@ -69,17 +71,13 @@ public abstract class AbstractXdsHttpClientInterceptor extends InterceptorSuppor protected final io.sermant.flowcontrol.common.handler.retry.Retry retry; - protected final String className; - /** * Constructor * * @param retry Retry instance, used for retry determination - * @param className The fully qualified naming of interceptors */ - public AbstractXdsHttpClientInterceptor(io.sermant.flowcontrol.common.handler.retry.Retry retry, String className) { + public AbstractXdsHttpClientInterceptor(io.sermant.flowcontrol.common.handler.retry.Retry retry) { this.retry = retry; - this.className = className; } /** @@ -115,11 +113,12 @@ public void executeWithRetryPolicy(ExecuteContext context) { Throwable ex = context.getThrowable(); // Create logical function for service invocation or retry - final Supplier retryFunc = createRetryFunc(context, result); + final Supplier retryFunc = createRetryFunc(context); RetryContext.INSTANCE.markRetry(retry); try { // first execution taking over the host logic result = retryFunc.get(); + context.changeResult(result); } catch (Throwable throwable) { ex = throwable; log(throwable); @@ -129,7 +128,7 @@ public void executeWithRetryPolicy(ExecuteContext context) { final List handlers = getRetryHandlers(); // Determine whether retry is necessary - if (!handlers.isEmpty() && needRetry(handlers.get(0), result, ex)) { + if (!handlers.isEmpty() && isNeedRetry(handlers.get(0), result, ex)) { // execute retry logic result = handlers.get(0).executeCheckedSupplier(retryFunc::get); context.skip(result); @@ -152,12 +151,16 @@ public void executeWithRetryPolicy(ExecuteContext context) { @Override public ExecuteContext doAfter(ExecuteContext context) { - XdsThreadLocalUtil.removeSendByteFlag(); FlowControlScenario scenarioInfo = XdsThreadLocalUtil.getScenarioInfo(); - if (context.getThrowable() == null && scenarioInfo != null) { - decrementActiveRequestsAndCountFailureRequests(context, scenarioInfo); + Object requestEntity = context.getLocalFieldValue(CommonConst.REQUEST_INFO); + if (requestEntity instanceof RequestEntity) { + getXdsHttpFlowControlService().onAfter((RequestEntity) requestEntity, context.getResult(), scenarioInfo); } - chooseHttpService().onAfter(className, context); + if (context.getThrowableOut() == null) { + decreaseActiveRequestsAndCountFailureRequests(context, scenarioInfo); + } + XdsThreadLocalUtil.removeSendByteFlag(); + XdsThreadLocalUtil.removeScenarioInfo(); return context; } @@ -165,16 +168,26 @@ public ExecuteContext doAfter(ExecuteContext context) { public ExecuteContext doThrow(ExecuteContext context) { XdsThreadLocalUtil.removeSendByteFlag(); FlowControlScenario scenarioInfo = XdsThreadLocalUtil.getScenarioInfo(); - if (scenarioInfo != null) { - decrementActiveRequestsAndCountFailureRequests(context, scenarioInfo); + Object requestEntity = context.getLocalFieldValue(CommonConst.REQUEST_INFO); + if (requestEntity instanceof RequestEntity) { + getXdsHttpFlowControlService().onThrow((RequestEntity) requestEntity, context.getThrowable(), scenarioInfo); + } + if (context.getThrowableOut() != null) { + decreaseActiveRequestsAndCountFailureRequests(context, scenarioInfo); + XdsThreadLocalUtil.removeSendByteFlag(); + XdsThreadLocalUtil.removeScenarioInfo(); } - chooseHttpService().onAfter(className, context); return context; } - private void decrementActiveRequestsAndCountFailureRequests(ExecuteContext context, - FlowControlScenario scenarioInfo) { - XdsCircuitBreakerManager.decrementActiveRequests(scenarioInfo.getServiceName(), scenarioInfo.getServiceName(), + private void decreaseActiveRequestsAndCountFailureRequests(ExecuteContext context, + FlowControlScenario scenarioInfo) { + if (scenarioInfo == null || StringUtils.isEmpty(scenarioInfo.getServiceName()) + || StringUtils.isEmpty(scenarioInfo.getClusterName()) + || StringUtils.isEmpty(scenarioInfo.getAddress())) { + return; + } + XdsCircuitBreakerManager.decreaseActiveRequests(scenarioInfo.getServiceName(), scenarioInfo.getClusterName(), scenarioInfo.getAddress()); int statusCode = getStatusCode(context); if (statusCode >= MIN_SUCCESS_CODE && statusCode <= MAX_SUCCESS_CODE) { @@ -190,8 +203,6 @@ private void decrementActiveRequestsAndCountFailureRequests(ExecuteContext conte * @param scenario scenario information */ private void handleFailedRequests(FlowControlScenario scenario, int statusCode) { - XdsCircuitBreakerManager.decrementActiveRequests(scenario.getServiceName(), scenario.getClusterName(), - scenario.getAddress()); Optional instanceCircuitBreakersOptional = XdsHandler.INSTANCE. getInstanceCircuitBreakers(scenario.getServiceName(), scenario.getClusterName()); if (!instanceCircuitBreakersOptional.isPresent()) { @@ -230,8 +241,7 @@ protected Optional chooseServiceInstanceForXds() { if (serviceInstanceSet.isEmpty()) { return Optional.empty(); } - boolean needRetry = RetryContext.INSTANCE.isPolicyNeedRetry(); - if (needRetry) { + if (RetryContext.INSTANCE.isPolicyNeedRetry()) { removeRetriedServiceInstance(serviceInstanceSet); } removeCircuitBreakerInstance(scenarioInfo, serviceInstanceSet); @@ -275,7 +285,8 @@ private void removeCircuitBreakerInstance(FlowControlScenario scenarioInfo, Set< float maxCircuitBreakerPercent = (float) outlierDetection.getMaxEjectionPercent() / HUNDRED; int maxCircuitBreakerInstances = (int) Math.floor(count * maxCircuitBreakerPercent); for (ServiceInstance serviceInstance : instanceSet) { - if (hasReachedCircuitBreakerThreshold(circuitBreakerInstances, maxCircuitBreakerInstances)) { + if (maxCircuitBreakerInstances > 0 + && hasReachedCircuitBreakerThreshold(circuitBreakerInstances, maxCircuitBreakerInstances)) { break; } String address = serviceInstance.getHost() + CommonConst.CONNECT + serviceInstance.getPort(); @@ -308,23 +319,29 @@ private boolean checkMinInstanceNum(XdsInstanceCircuitBreakers outlierDetection, * @return Retry Handlers */ protected List getRetryHandlers() { - if (XdsThreadLocalUtil.getScenarioInfo() != null) { - FlowControlScenario scenarioInfo = XdsThreadLocalUtil.getScenarioInfo(); - RetryContext.INSTANCE.buildXdsRetryPolicy(scenarioInfo); - return getRetryHandler().getXdsHandlers(scenarioInfo); + FlowControlScenario scenarioInfo = XdsThreadLocalUtil.getScenarioInfo(); + if (scenarioInfo == null || StringUtils.isEmpty(scenarioInfo.getServiceName()) + || StringUtils.isEmpty(scenarioInfo.getRouteName())) { + return Collections.emptyList(); + } + Optional retryPolicyOptional = XdsHandler.INSTANCE + .getRetryPolicy(scenarioInfo.getServiceName(), scenarioInfo.getRouteName()); + if (!retryPolicyOptional.isPresent()) { + return Collections.emptyList(); } - return Collections.emptyList(); + XdsRetryPolicy retryPolicy = retryPolicyOptional.get(); + RetryContext.INSTANCE.buildXdsRetryPolicy(retryPolicy); + return getRetryHandler().getXdsRetryHandlers(scenarioInfo, retryPolicy); } /** * create retry method * * @param context The execution context of the Interceptor - * @param result The call result of the enhanced method * @return Define Supplier for retry calls of service calls * @throws InvokerWrapperException InvokerWrapperException */ - protected Supplier createRetryFunc(ExecuteContext context, Object result) { + protected Supplier createRetryFunc(ExecuteContext context) { Object obj = context.getObject(); Method method = context.getMethod(); Object[] allArguments = context.getArguments(); @@ -332,9 +349,10 @@ protected Supplier createRetryFunc(ExecuteContext context, Object result return () -> { method.setAccessible(true); try { - preRetry(obj, method, allArguments, result, isFirstInvoke.get()); - Object invokeResult = method.invoke(obj, allArguments); + preRetry(obj, method, allArguments, context.getResult(), isFirstInvoke.get()); isFirstInvoke.compareAndSet(true, false); + Object invokeResult = method.invoke(obj, allArguments); + context.changeResult(invokeResult); return invokeResult; } catch (IllegalAccessException ignored) { isFirstInvoke.compareAndSet(true, false); @@ -342,7 +360,7 @@ protected Supplier createRetryFunc(ExecuteContext context, Object result isFirstInvoke.compareAndSet(true, false); throw new InvokerWrapperException(ex.getTargetException()); } - return result; + return context.getResult(); }; } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/DispatcherServletInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/DispatcherServletInterceptor.java index dc1a8a2aac..da33153895 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/DispatcherServletInterceptor.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/DispatcherServletInterceptor.java @@ -21,10 +21,14 @@ import io.sermant.core.utils.CollectionUtils; import io.sermant.core.utils.LogUtils; import io.sermant.core.utils.ReflectUtils; +import io.sermant.flowcontrol.common.config.CommonConst; import io.sermant.flowcontrol.common.config.ConfigConst; import io.sermant.flowcontrol.common.entity.FlowControlResult; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; import io.sermant.flowcontrol.common.entity.HttpRequestEntity; +import io.sermant.flowcontrol.common.entity.RequestEntity; import io.sermant.flowcontrol.common.entity.RequestEntity.RequestType; +import io.sermant.flowcontrol.common.util.XdsThreadLocalUtil; import io.sermant.flowcontrol.service.InterceptorSupporter; import java.io.IOException; @@ -122,7 +126,15 @@ protected final ExecuteContext doBefore(ExecuteContext context) throws Exception if (!httpRequestEntity.isPresent()) { return context; } - chooseHttpService().onBefore(className, httpRequestEntity.get(), result); + HttpRequestEntity requestEntity = httpRequestEntity.get(); + if (xdsFlowControlConfig.isEnable()) { + getXdsHttpFlowControlService().onBefore(requestEntity, result); + context.setLocalFieldValue(CommonConst.SCENARIO_INFO, XdsThreadLocalUtil.getScenarioInfo()); + context.setLocalFieldValue(CommonConst.REQUEST_INFO, requestEntity); + XdsThreadLocalUtil.removeScenarioInfo(); + } else { + chooseHttpService().onBefore(className, requestEntity, result); + } if (!result.isSkip()) { return context; } @@ -144,14 +156,34 @@ protected final ExecuteContext doBefore(ExecuteContext context) throws Exception @Override protected final ExecuteContext doAfter(ExecuteContext context) { - chooseHttpService().onAfter(className, context.getResult()); + if (!xdsFlowControlConfig.isEnable()) { + chooseHttpService().onAfter(className, context.getThrowable()); + LogUtils.printHttpRequestAfterPoint(context); + return context; + } + Object scenarioInfo = context.getLocalFieldValue(CommonConst.SCENARIO_INFO); + Object requestEntity = context.getLocalFieldValue(CommonConst.REQUEST_INFO); + if (scenarioInfo instanceof FlowControlScenario && requestEntity instanceof HttpRequestEntity) { + getXdsHttpFlowControlService().onAfter((RequestEntity) requestEntity, context.getThrowable(), + (FlowControlScenario) scenarioInfo); + } LogUtils.printHttpRequestAfterPoint(context); return context; } @Override protected final ExecuteContext doThrow(ExecuteContext context) { - chooseHttpService().onThrow(className, context.getThrowable()); + if (!xdsFlowControlConfig.isEnable()) { + chooseHttpService().onThrow(className, context.getThrowable()); + LogUtils.printHttpRequestOnThrowPoint(context); + return context; + } + Object scenarioInfo = context.getLocalFieldValue(CommonConst.SCENARIO_INFO); + Object requestEntity = context.getLocalFieldValue(CommonConst.REQUEST_INFO); + if (scenarioInfo instanceof FlowControlScenario && requestEntity instanceof HttpRequestEntity) { + getXdsHttpFlowControlService().onThrow((RequestEntity) requestEntity, context.getThrowable(), + (FlowControlScenario) scenarioInfo); + } LogUtils.printHttpRequestOnThrowPoint(context); return context; } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/AlibabaDubboInvokerInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/AlibabaDubboInvokerInterceptor.java index 9fdc29e1fa..229b204ecf 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/AlibabaDubboInvokerInterceptor.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/AlibabaDubboInvokerInterceptor.java @@ -188,7 +188,7 @@ protected final ExecuteContext doAfter(ExecuteContext context) { if (invocation.getInvoker() != null) { final List handlers = getRetryHandler() .getHandlers(convertToAlibabaDubboEntity(invocation)); - if (!handlers.isEmpty() && needRetry(handlers.get(0), result, ((RpcResult) result).getException())) { + if (!handlers.isEmpty() && isNeedRetry(handlers.get(0), result, ((RpcResult) result).getException())) { result = handlers.get(0).executeCheckedSupplier( () -> invokeRetryMethod(context.getObject(), allArguments, context.getResult(), true, true)); diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/ApacheDubboInvokerInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/ApacheDubboInvokerInterceptor.java index 1c17dd1b36..5daae7df69 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/ApacheDubboInvokerInterceptor.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/ApacheDubboInvokerInterceptor.java @@ -211,7 +211,7 @@ protected final ExecuteContext doAfter(ExecuteContext context) { result = invokeRetryMethod(context.getObject(), allArguments, result, false, false); final List handlers = getRetryHandler() .getHandlers(convertToApacheDubboEntity(invocation)); - if (!handlers.isEmpty() && needRetry(handlers.get(0), result, ((AsyncRpcResult) result).getException())) { + if (!handlers.isEmpty() && isNeedRetry(handlers.get(0), result, ((AsyncRpcResult) result).getException())) { RetryContext.INSTANCE.markRetry(retry); result = handlers.get(0) .executeCheckedSupplier(() -> invokeRetryMethod(context.getObject(), allArguments, diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/FeignRequestInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/FeignRequestInterceptor.java index 8b0c31bd62..4d93cf0e75 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/FeignRequestInterceptor.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/FeignRequestInterceptor.java @@ -154,7 +154,7 @@ private void executeWithRetryPolicy(ExecuteContext context) { RetryContext.INSTANCE.buildRetryPolicy(httpRequestEntity.get()); final List handlers = getRetryHandler() .getHandlers(httpRequestEntity.get()); - if (!handlers.isEmpty() && needRetry(handlers.get(0), result, ex)) { + if (!handlers.isEmpty() && isNeedRetry(handlers.get(0), result, ex)) { result = handlers.get(0).executeCheckedSupplier(retryFunc::get); } } catch (Throwable throwable) { diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/HttpRequestInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/HttpRequestInterceptor.java index 1936630b17..363b92db45 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/HttpRequestInterceptor.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/HttpRequestInterceptor.java @@ -123,7 +123,7 @@ private void executeWithRetryPolicy(ExecuteContext context, HttpRequestEntity ht try { RetryContext.INSTANCE.buildRetryPolicy(httpRequestEntity); final List handlers = getRetryHandler().getHandlers(httpRequestEntity); - if (!handlers.isEmpty() && needRetry(handlers.get(0), result, ex)) { + if (!handlers.isEmpty() && isNeedRetry(handlers.get(0), result, ex)) { // retry only one policy request.getHeaders().add(RETRY_KEY, RETRY_VALUE); result = handlers.get(0).executeCheckedSupplier(retryFunc::get); diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpClient4xInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpClient4xInterceptor.java index a9cbdab491..edf8cd638f 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpClient4xInterceptor.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpClient4xInterceptor.java @@ -65,7 +65,7 @@ public class HttpClient4xInterceptor extends AbstractXdsHttpClientInterceptor { * Constructor */ public HttpClient4xInterceptor() { - super(new HttpClientRetry(), HttpClient4xInterceptor.class.getName()); + super(new HttpClientRetry()); } /** @@ -92,7 +92,9 @@ public ExecuteContext doBefore(ExecuteContext context) { final FlowControlResult flowControlResult = new FlowControlResult(); // Execute the flow control handler chain, with only fault for XDS - chooseHttpService().onBefore(className, httpRequestEntity.get(), flowControlResult); + HttpRequestEntity requestEntity = httpRequestEntity.get(); + getXdsHttpFlowControlService().onBefore(httpRequestEntity.get(), flowControlResult); + context.setLocalFieldValue(CommonConst.REQUEST_INFO, requestEntity); // When triggering some flow control rules, it is necessary to skip execution and return the result directly if (flowControlResult.isSkip()) { diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionConnectInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionConnectInterceptor.java index d539e87ca1..7ca4df233e 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionConnectInterceptor.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionConnectInterceptor.java @@ -60,14 +60,11 @@ public class HttpUrlConnectionConnectInterceptor extends AbstractXdsHttpClientIn * Constructor */ public HttpUrlConnectionConnectInterceptor() { - super(new HttpUrlConnectionRetry(), HttpUrlConnectionConnectInterceptor.class.getName()); + super(new HttpUrlConnectionRetry()); } @Override public ExecuteContext doBefore(ExecuteContext context) { - if (!(context.getObject() instanceof HttpURLConnection)) { - return context; - } HttpURLConnection connection = (HttpURLConnection) context.getObject(); // Parse the service name, request path, and request header in the request information and convert them into @@ -79,7 +76,7 @@ public ExecuteContext doBefore(ExecuteContext context) { final FlowControlResult flowControlResult = new FlowControlResult(); // Execute the flow control handler chain, with only fault for XDS - chooseHttpService().onBefore(className, httpRequestEntity.get(), flowControlResult); + getXdsHttpFlowControlService().onBefore(httpRequestEntity.get(), flowControlResult); // When triggering some flow control rules, it is necessary to skip execution and return the result directly if (flowControlResult.isSkip()) { diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionDisconnectInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionDisconnectInterceptor.java index 4bf752cce3..9263c51832 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionDisconnectInterceptor.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionDisconnectInterceptor.java @@ -37,7 +37,7 @@ public class HttpUrlConnectionDisconnectInterceptor extends AbstractXdsHttpClien * Constructor */ public HttpUrlConnectionDisconnectInterceptor() { - super(null, HttpUrlConnectionDisconnectInterceptor.class.getCanonicalName()); + super(null); } @Override diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionResponseStreamInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionResponseStreamInterceptor.java index a1abf01103..4656588ff7 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionResponseStreamInterceptor.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionResponseStreamInterceptor.java @@ -16,6 +16,7 @@ package io.sermant.flowcontrol.retry.client; +import io.github.resilience4j.retry.RetryConfig; import io.sermant.core.common.LoggerFactory; import io.sermant.core.plugin.agent.entity.ExecuteContext; import io.sermant.core.service.xds.entity.ServiceInstance; @@ -38,7 +39,6 @@ import java.net.MalformedURLException; import java.net.Proxy; import java.net.URL; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -59,12 +59,11 @@ public class HttpUrlConnectionResponseStreamInterceptor extends AbstractXdsHttpC * Constructor */ public HttpUrlConnectionResponseStreamInterceptor() { - super(new HttpUrlConnectionRetry(), HttpUrlConnectionResponseStreamInterceptor.class.getCanonicalName()); + super(new HttpUrlConnectionRetry()); } @Override protected ExecuteContext doBefore(ExecuteContext context) throws Exception { - // Remove the status to prevent multiple executions of the same request due to enhanced logic XdsThreadLocalUtil.removeConnectionStatus(); executeWithRetryPolicy(context); return context; @@ -80,6 +79,11 @@ public ExecuteContext doThrow(ExecuteContext context) { return context; } + @Override + protected boolean canInvoke(ExecuteContext context) { + return XdsThreadLocalUtil.isConnected() && XdsThreadLocalUtil.getScenarioInfo() != null; + } + @Override protected int getStatusCode(ExecuteContext context) { HttpURLConnection connection = (HttpURLConnection) context.getObject(); @@ -92,8 +96,19 @@ protected int getStatusCode(ExecuteContext context) { } @Override - protected boolean canInvoke(ExecuteContext context) { - return XdsThreadLocalUtil.isConnected() && XdsThreadLocalUtil.getScenarioInfo() != null; + protected boolean isNeedRetry(io.github.resilience4j.retry.Retry retry, Object result, Throwable throwable) { + final RetryConfig retryConfig = retry.getRetryConfig(); + boolean isNeedRetry = retryConfig.getExceptionPredicate().test(throwable); + if (isNeedRetry) { + final long interval = retryConfig.getIntervalBiFunction().apply(1, null); + try { + // wait according to the first wait time + Thread.sleep(interval); + } catch (InterruptedException e) { + LOGGER.log(Level.WARNING, "Interruption error:", e); + } + } + return isNeedRetry; } @Override @@ -180,6 +195,9 @@ public static class HttpUrlConnectionRetry extends AbstractRetry { @Override public Optional getCode(Object result) { HttpURLConnection connection = XdsThreadLocalUtil.getHttpUrlConnection(); + if (connection == null) { + return Optional.empty(); + } try { return Optional.of(String.valueOf(connection.getResponseCode())); } catch (IOException io) { @@ -188,20 +206,6 @@ public Optional getCode(Object result) { } } - @Override - public Optional> getHeaderNames(Object result) { - HttpURLConnection connection = XdsThreadLocalUtil.getHttpUrlConnection(); - Set headerNames = new HashSet<>(); - if (MapUtils.isEmpty(connection.getHeaderFields())) { - return Optional.empty(); - } - Set headers = connection.getHeaderFields().keySet(); - for (Map.Entry> header : connection.getHeaderFields().entrySet()) { - headers.add(header.getKey()); - } - return Optional.of(headerNames); - } - @Override public boolean isNeedRetry(Throwable throwable, XdsRetryPolicy retryPolicy) { List conditions = retryPolicy.getRetryConditions(); @@ -231,6 +235,19 @@ public boolean isNeedRetry(Object result, XdsRetryPolicy retryPolicy) { return this.isNeedRetry((Throwable) null, retryPolicy); } + @Override + public Optional> getHeaderNames(Object result) { + HttpURLConnection connection = XdsThreadLocalUtil.getHttpUrlConnection(); + if (connection == null) { + return Optional.empty(); + } + Map> headers = connection.getHeaderFields(); + if (MapUtils.isEmpty(headers)) { + return Optional.empty(); + } + return Optional.of(headers.keySet()); + } + @Override public Class[] retryExceptions() { return getRetryExceptions(); diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/OkHttp3ClientInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/OkHttp3ClientInterceptor.java index a109557874..f60a550626 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/OkHttp3ClientInterceptor.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/OkHttp3ClientInterceptor.java @@ -55,13 +55,11 @@ public class OkHttp3ClientInterceptor extends AbstractXdsHttpClientInterceptor { private static final String REQUEST_FIELD_NAME = "originalRequest"; - private final String className = HttpClient4xInterceptor.class.getName(); - /** * Constructor */ public OkHttp3ClientInterceptor() { - super(new OkHttp3Retry(), OkHttp3ClientInterceptor.class.getCanonicalName()); + super(new OkHttp3Retry()); } @Override @@ -83,13 +81,12 @@ protected ExecuteContext doBefore(ExecuteContext context) throws Exception { final FlowControlResult flowControlResult = new FlowControlResult(); // Execute the flow control handler chain, with only fault for XDS - chooseHttpService().onBefore(className, httpRequestEntity.get(), flowControlResult); + getXdsHttpFlowControlService().onBefore(httpRequestEntity.get(), flowControlResult); // When triggering some flow control rules, it is necessary to skip execution and return the result directly if (flowControlResult.isSkip()) { Response.Builder builder = new Response.Builder(); - context.skip(builder.code(flowControlResult.getResponse().getCode()) - .protocol(Protocol.HTTP_1_1) + context.skip(builder.code(flowControlResult.getResponse().getCode()).protocol(Protocol.HTTP_1_1) .message(flowControlResult.buildResponseMsg()).request(request).build()); return context; } @@ -182,7 +179,7 @@ private Object copyNewCall(Object object, Request newRequest) { } @Override - public Supplier createRetryFunc(ExecuteContext context, Object result) { + public Supplier createRetryFunc(ExecuteContext context) { Object obj = context.getObject(); Method method = context.getMethod(); Object[] allArguments = context.getArguments(); @@ -191,9 +188,10 @@ public Supplier createRetryFunc(ExecuteContext context, Object result) { method.setAccessible(true); try { Request request = (Request) context.getLocalFieldValue(REQUEST_FIELD_NAME); - preRetry(obj, method, allArguments, result, isFirstInvoke.get()); Object newCall = copyNewCall(obj, request); + preRetry(newCall, method, allArguments, context.getResult(), isFirstInvoke.get()); Object invokeResult = method.invoke(newCall, allArguments); + context.changeResult(invokeResult); isFirstInvoke.compareAndSet(true, false); return invokeResult; } catch (IllegalAccessException ignored) { @@ -201,7 +199,7 @@ public Supplier createRetryFunc(ExecuteContext context, Object result) { } catch (InvocationTargetException ex) { throw new InvokerWrapperException(ex.getTargetException()); } - return result; + return context.getResult(); }; } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/OkHttpClientInterceptorChainInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/OkHttpClientInterceptorChainInterceptor.java index f82895692a..e7d29b6112 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/OkHttpClientInterceptorChainInterceptor.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/OkHttpClientInterceptorChainInterceptor.java @@ -50,13 +50,11 @@ * @since 2024-12-20 */ public class OkHttpClientInterceptorChainInterceptor extends AbstractXdsHttpClientInterceptor { - private final String className = HttpClient4xInterceptor.class.getName(); - /** * Constructor */ public OkHttpClientInterceptorChainInterceptor() { - super(new OkHttpRetry(), OkHttpClientInterceptorChainInterceptor.class.getCanonicalName()); + super(new OkHttpRetry()); } @Override @@ -76,7 +74,7 @@ protected ExecuteContext doBefore(ExecuteContext context) throws Exception { final FlowControlResult flowControlResult = new FlowControlResult(); // Execute the flow control handler chain, with only fault for XDS - chooseHttpService().onBefore(className, httpRequestEntity.get(), flowControlResult); + getXdsHttpFlowControlService().onBefore(httpRequestEntity.get(), flowControlResult); // When triggering some flow control rules, it is necessary to skip execution and return the result directly if (flowControlResult.isSkip()) { diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/RetryHandlerV2.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/RetryHandlerV2.java index 912ab850eb..9f2fb36546 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/RetryHandlerV2.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/RetryHandlerV2.java @@ -28,11 +28,15 @@ import io.sermant.flowcontrol.common.entity.FlowControlScenario; import io.sermant.flowcontrol.common.handler.AbstractRequestHandler; import io.sermant.flowcontrol.common.handler.retry.RetryContext; -import io.sermant.flowcontrol.common.xds.handler.XdsHandler; import io.sermant.flowcontrol.retry.FeignRequestInterceptor.FeignRetry; import io.sermant.flowcontrol.retry.HttpRequestInterceptor.HttpRetry; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; /** * based on resilience4j retry @@ -43,27 +47,48 @@ public class RetryHandlerV2 extends AbstractRequestHandler { private final RetryPredicateCreator retryPredicateCreator = new DefaultRetryPredicateCreator(); - @Override - public Optional createHandler(FlowControlScenario flowControlScenario, String businessName) { + /** + * XDS Handler cache, where the Key of the first level is the service name, + * the Key of the second level is the route name, the Key of the three level is the xdsRetryPolicy, value + * is Retry instance, The Retry instance is thread-safe can be used to decorate multiple requests + */ + private final Map>>> xdsHandlers = new ConcurrentHashMap<>(); + + /** + * gets the specified retry handler + * + * @param scenario Scenario information for flow control + * @param xdsRetryPolicy retry policy information + * @return handler + */ + public List getXdsRetryHandlers(FlowControlScenario scenario, XdsRetryPolicy xdsRetryPolicy) { + Map>> serviceRetryHandlers = xdsHandlers.computeIfAbsent( + scenario.getServiceName(), k -> new HashMap<>()); + Map> routeRetryHandlers = serviceRetryHandlers.computeIfAbsent(scenario.getRouteName(), + k -> new HashMap<>()); + String retryName = xdsRetryPolicy.toString(); + Optional retryHandlerOptions = routeRetryHandlers.computeIfAbsent(retryName, s -> { + // Clear the original handler to prevent the use of the original handler during configuration refresh + routeRetryHandlers.clear(); + return createHandler(xdsRetryPolicy, retryName); + }); + return retryHandlerOptions.map(Collections::singletonList).orElse(Collections.emptyList()); + } + + private Optional createHandler(XdsRetryPolicy xdsRetryPolicy, String businessName) { final io.sermant.flowcontrol.common.handler.retry.Retry retry = RetryContext.INSTANCE.getRetry(); if (retry == null) { return Optional.empty(); } - Optional retryPolicyOptional = XdsHandler.INSTANCE - .getRetryPolicy(flowControlScenario.getServiceName(), flowControlScenario.getRouteName()); - if (!retryPolicyOptional.isPresent()) { - return Optional.empty(); - } - XdsRetryPolicy retryPolicy = retryPolicyOptional.get(); - if (retryPolicy.getPerTryTimeout() <= 0 || CollectionUtils.isEmpty(retryPolicy.getRetryConditions()) - || retryPolicy.getMaxAttempts() <= 0) { + if (xdsRetryPolicy.getPerTryTimeout() <= 0 || CollectionUtils.isEmpty(xdsRetryPolicy.getRetryConditions()) + || xdsRetryPolicy.getMaxAttempts() <= 0) { return Optional.empty(); } final RetryConfig retryConfig = RetryConfig.custom() - .maxAttempts((int)retryPolicy.getMaxAttempts()) - .retryOnResult(retryPredicateCreator.createResultPredicate(retry, retryPolicy)) - .retryOnException(retryPredicateCreator.createExceptionPredicate(retry, retryPolicy)) - .intervalFunction(IntervalFunction.of(retryPolicy.getPerTryTimeout())) + .maxAttempts((int)xdsRetryPolicy.getMaxAttempts()) + .retryOnResult(retryPredicateCreator.createResultPredicate(retry, xdsRetryPolicy)) + .retryOnException(retryPredicateCreator.createExceptionPredicate(retry, xdsRetryPolicy)) + .intervalFunction(IntervalFunction.of(xdsRetryPolicy.getPerTryTimeout())) .failAfterMaxAttempts(false) .build(); return Optional.of(RetryRegistry.of(retryConfig).retry(businessName)); diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/service/InterceptorSupporter.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/service/InterceptorSupporter.java index 33f2418554..8d41adfd7b 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/service/InterceptorSupporter.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/service/InterceptorSupporter.java @@ -32,6 +32,7 @@ import io.sermant.flowcontrol.retry.handler.RetryHandlerV2; import io.sermant.flowcontrol.service.rest4j.DubboRest4jService; import io.sermant.flowcontrol.service.rest4j.HttpRest4jService; +import io.sermant.flowcontrol.service.rest4j.XdsHttpFlowControlService; import io.sermant.flowcontrol.service.sen.DubboSenService; import io.sermant.flowcontrol.service.sen.HttpSenService; @@ -89,6 +90,8 @@ public abstract class InterceptorSupporter extends ReflectMethodCacheSupport imp private HttpService httpService; + private XdsHttpFlowControlService xdsHttpFlowControlService; + /** * constructor */ @@ -156,6 +159,23 @@ protected final HttpService chooseHttpService() { return httpService; } + /** + * gets the selected XDS flow control service + * + * @return HttpService + */ + protected XdsHttpFlowControlService getXdsHttpFlowControlService() { + if (xdsHttpFlowControlService == null) { + lock.lock(); + try { + xdsHttpFlowControlService = PluginServiceManager.getPluginService(XdsHttpFlowControlService.class); + } finally { + lock.unlock(); + } + } + return xdsHttpFlowControlService; + } + /** * create retry method * @@ -189,12 +209,12 @@ protected Supplier createRetryFunc(Object obj, Method method, Object[] a * @param throwable Exception information for the first execution * @return check through */ - protected final boolean needRetry(io.github.resilience4j.retry.Retry retry, Object result, Throwable throwable) { - final long interval = retry.getRetryConfig().getIntervalBiFunction().apply(1, null); + protected boolean isNeedRetry(io.github.resilience4j.retry.Retry retry, Object result, Throwable throwable) { final RetryConfig retryConfig = retry.getRetryConfig(); boolean isNeedRetry = isMatchResult(result, retryConfig.getResultPredicate()) || isTargetException(throwable, retryConfig.getExceptionPredicate()); if (isNeedRetry) { + final long interval = retryConfig.getIntervalBiFunction().apply(1, null); try { // wait according to the first wait time Thread.sleep(interval); diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/service/rest4j/XdsHttpFlowControlService.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/service/rest4j/XdsHttpFlowControlService.java new file mode 100644 index 0000000000..70447dc5b9 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/service/rest4j/XdsHttpFlowControlService.java @@ -0,0 +1,57 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.service.rest4j; + +import io.sermant.core.plugin.service.PluginService; +import io.sermant.flowcontrol.common.entity.FlowControlResult; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; +import io.sermant.flowcontrol.common.entity.RequestEntity; + +/** + * XDS flow control service, used to execute flow control processor chains for HTTP clients, Currently, + * the handlers are rate limited handler and error injection handler + * + * @author zhp + * @since 2024-12-28 + */ +public interface XdsHttpFlowControlService extends PluginService { + /** + * Used to perform pre operation of the flow control processor chain before the request + * + * @param requestEntity request information + * @param fixedResult fixed result + */ + void onBefore(RequestEntity requestEntity, FlowControlResult fixedResult); + + /** + * Used to perform post operations on the flow control processor chain after a request + * + * @param requestEntity request information + * @param result response result + * @param flowControlScenario Scenario information for flow control + */ + void onAfter(RequestEntity requestEntity, Object result, FlowControlScenario flowControlScenario); + + /** + * Used to handle exceptions in the flow control processor chain during request exceptions + * + * @param requestEntity request information + * @param throwable exception message + * @param flowControlScenario Scenario information for flow control + */ + void onThrow(RequestEntity requestEntity, Throwable throwable, FlowControlScenario flowControlScenario); +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/test/java/io/sermant/flowcontrol/DispatcherServletInterceptorTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/test/java/io/sermant/flowcontrol/DispatcherServletInterceptorTest.java index 3eca9f3d1b..ca7835530f 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/test/java/io/sermant/flowcontrol/DispatcherServletInterceptorTest.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/test/java/io/sermant/flowcontrol/DispatcherServletInterceptorTest.java @@ -22,6 +22,7 @@ import io.sermant.core.service.ServiceManager; import io.sermant.flowcontrol.common.config.ConfigConst; import io.sermant.flowcontrol.common.config.FlowControlConfig; +import io.sermant.flowcontrol.common.config.XdsFlowControlConfig; import io.sermant.flowcontrol.common.entity.FlowControlResult; import io.sermant.flowcontrol.common.entity.RequestEntity; import io.sermant.flowcontrol.service.rest4j.HttpRest4jService; @@ -62,6 +63,8 @@ public void setUp() { pluginConfigManagerMockedStatic = Mockito.mockStatic(PluginConfigManager.class); pluginConfigManagerMockedStatic.when(() -> PluginConfigManager.getPluginConfig(FlowControlConfig.class)) .thenReturn(flowControlConfig); + pluginConfigManagerMockedStatic.when(() -> PluginConfigManager.getPluginConfig(XdsFlowControlConfig.class)) + .thenReturn(new XdsFlowControlConfig()); serviceManagerMockedStatic = Mockito.mockStatic(ServiceManager.class); serviceManagerMockedStatic.when(() -> ServiceManager.getService(HttpRest4jService.class)) .thenReturn(createRestService()); diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/AbstractChainHandler.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/AbstractChainHandler.java index 1162b840ad..f677223daf 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/AbstractChainHandler.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/AbstractChainHandler.java @@ -31,8 +31,6 @@ * @since 2022-07-11 */ public abstract class AbstractChainHandler implements RequestHandler, Comparable { - protected static final String MATCHED_SCENARIO_NAMES = "__MATCHED_SCENARIO_ENTITY__"; - protected static final XdsFlowControlConfig XDS_FLOW_CONTROL_CONFIG = PluginConfigManager.getPluginConfig(XdsFlowControlConfig.class); diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/AbstractXdsChainHandler.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/AbstractXdsChainHandler.java new file mode 100644 index 0000000000..6c7061d094 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/AbstractXdsChainHandler.java @@ -0,0 +1,125 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.res4j.chain; + +import io.sermant.core.plugin.config.PluginConfigManager; +import io.sermant.flowcontrol.common.config.XdsFlowControlConfig; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; +import io.sermant.flowcontrol.common.entity.RequestEntity; +import io.sermant.flowcontrol.common.entity.RequestEntity.RequestType; + +/** + * abstract Xds handler chain, the order of execution is as follows: + *

onBefore -> onThrow(executed when an exception exists) -> onResult

+ * + * @author zhp + * @since 2024-12-28 + */ +public abstract class AbstractXdsChainHandler implements XdsRequestHandler, Comparable { + protected static final XdsFlowControlConfig XDS_FLOW_CONTROL_CONFIG = + PluginConfigManager.getPluginConfig(XdsFlowControlConfig.class); + + private AbstractXdsChainHandler next; + + @Override + public void onBefore(RequestEntity requestEntity, FlowControlScenario flowControlScenario) { + AbstractXdsChainHandler cur = getNextHandler(requestEntity, flowControlScenario); + if (cur != null) { + cur.onBefore(requestEntity, flowControlScenario); + } + } + + @Override + public void onThrow(RequestEntity requestEntity, FlowControlScenario flowControlScenario, Throwable throwable) { + AbstractXdsChainHandler cur = getNextHandler(requestEntity, flowControlScenario); + if (cur != null) { + cur.onThrow(requestEntity, flowControlScenario, throwable); + } + } + + @Override + public void onAfter(RequestEntity requestEntity, FlowControlScenario flowControlScenario, Object result) { + AbstractXdsChainHandler cur = getNextHandler(requestEntity, flowControlScenario); + if (cur != null) { + cur.onAfter(requestEntity, flowControlScenario, result); + } + } + + private AbstractXdsChainHandler getNextHandler(RequestEntity entity, FlowControlScenario flowControlScenario) { + AbstractXdsChainHandler handler = next; + while (handler != null) { + if (!isNeedSkip(handler, entity, flowControlScenario)) { + break; + } + handler = handler.getNext(); + } + return handler; + } + + private boolean isNeedSkip(AbstractXdsChainHandler handler, RequestEntity requestEntity, + FlowControlScenario scenario) { + final RequestType direct = handler.direct(); + if (direct != RequestType.BOTH && requestEntity.getRequestType() != direct) { + return true; + } + return handler.isSkip(requestEntity, scenario); + } + + /** + * request direction, both are processed by default, In actual processing, the request direction determines whether + * it needs to be processed by the current handler + * + * @return RequestType + */ + protected RequestType direct() { + return RequestType.BOTH; + } + + /** + * whether to skip the current handler + * + * @param requestEntity request-information + * @param flowControlScenario matched scenario information + * @return skip or not + */ + protected boolean isSkip(RequestEntity requestEntity, FlowControlScenario flowControlScenario) { + return !XDS_FLOW_CONTROL_CONFIG.isEnable(); + } + + @Override + public int compareTo(AbstractXdsChainHandler handler) { + return getOrder() - handler.getOrder(); + } + + @Override + public boolean equals(Object obj) { + return super.equals(obj); + } + + @Override + public int hashCode() { + return super.hashCode(); + } + + public void setNext(AbstractXdsChainHandler handler) { + this.next = handler; + } + + public AbstractXdsChainHandler getNext() { + return next; + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/HandlerChainBuilder.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/HandlerChainBuilder.java index 2a0f3d08f1..86d71fa945 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/HandlerChainBuilder.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/HandlerChainBuilder.java @@ -41,11 +41,17 @@ public enum HandlerChainBuilder { private static final List HANDLERS = new ArrayList<>(HANDLER_SIZE); + private static final List XDS_HANDLERS = new ArrayList<>(HANDLER_SIZE); + static { - for (AbstractChainHandler handler : ServiceLoader.load(AbstractChainHandler.class, HandlerChainBuilder.class - .getClassLoader())) { + ClassLoader classLoader = HandlerChainBuilder.class.getClassLoader(); + for (AbstractChainHandler handler : ServiceLoader.load(AbstractChainHandler.class, classLoader)) { HANDLERS.add(handler); } + + for (AbstractXdsChainHandler handler : ServiceLoader.load(AbstractXdsChainHandler.class, classLoader)) { + XDS_HANDLERS.add(handler); + } } /** @@ -60,6 +66,18 @@ public HandlerChain build() { return processorChain; } + /** + * build Xds chain + * + * @return ProcessorChain execution chain + */ + public XdsHandlerChain buildXdsHandlerChain() { + Collections.sort(XDS_HANDLERS); + final XdsHandlerChain xdsHandlerChain = new XdsHandlerChain(); + XDS_HANDLERS.forEach(xdsHandlerChain::addLastHandler); + return xdsHandlerChain; + } + /** * get handler chain * diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/HandlerChainEntry.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/HandlerChainEntry.java index 35c9311785..bcc9220c49 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/HandlerChainEntry.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/HandlerChainEntry.java @@ -20,6 +20,7 @@ import io.sermant.core.common.LoggerFactory; import io.sermant.flowcontrol.common.entity.FlowControlResult; import io.sermant.flowcontrol.common.entity.RequestEntity; +import io.sermant.flowcontrol.common.util.XdsThreadLocalUtil; import io.sermant.flowcontrol.res4j.chain.context.ChainContext; import io.sermant.flowcontrol.res4j.chain.context.RequestContext; import io.sermant.flowcontrol.res4j.util.FlowControlExceptionUtils; @@ -99,7 +100,8 @@ private String formatSourceName(String sourceName, boolean isProvider) { */ public void onResult(String sourceName, Object result) { try { - chain.onResult(ChainContext.getThreadLocalContext(sourceName), null, result); + chain.onResult(ChainContext.getThreadLocalContext(sourceName), XdsThreadLocalUtil.getScenarioInfo(), + result); } finally { ChainContext.remove(sourceName); } @@ -135,7 +137,7 @@ public void onDubboResult(String sourceName, Object result, boolean isProvider) public void onThrow(String sourceName, Throwable throwable) { final RequestContext context = ChainContext.getThreadLocalContext(sourceName); context.save(HandlerConstants.OCCURRED_REQUEST_EXCEPTION, throwable); - chain.onThrow(context, null, throwable); + chain.onThrow(context, XdsThreadLocalUtil.getScenarioInfo(), throwable); } /** diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/XdsHandlerChain.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/XdsHandlerChain.java new file mode 100644 index 0000000000..c536e4737b --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/XdsHandlerChain.java @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.res4j.chain; + +/** + * Xds HandlerChain + * + * @author zhp + * @since 2024-12-28 + */ +public class XdsHandlerChain extends AbstractXdsChainHandler { + private AbstractXdsChainHandler tail; + + /** + * add Handler + * + * @param handler handler + */ + public void addLastHandler(AbstractXdsChainHandler handler) { + if (tail == null) { + tail = handler; + setNext(handler); + return; + } + tail.setNext(handler); + tail = handler; + } + + @Override + public int getOrder() { + return 0; + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/XdsHandlerChainEntry.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/XdsHandlerChainEntry.java new file mode 100644 index 0000000000..b917fbf812 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/XdsHandlerChainEntry.java @@ -0,0 +1,84 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.res4j.chain; + +import io.sermant.core.common.LoggerFactory; +import io.sermant.flowcontrol.common.entity.FlowControlResult; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; +import io.sermant.flowcontrol.common.entity.RequestEntity; +import io.sermant.flowcontrol.res4j.util.FlowControlExceptionUtils; + +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * request chain entry class + * + * @author zhp + * @since 2024-12-28 + */ +public enum XdsHandlerChainEntry { + /** + * singleton + */ + INSTANCE; + + private static final Logger LOGGER = LoggerFactory.getLogger(); + + /** + * HandlerChain + */ + private final XdsHandlerChain chain = HandlerChainBuilder.INSTANCE.buildXdsHandlerChain(); + + /** + * pre-method + * + * @param requestEntity request body + * @param flowControlResult flow control result + */ + public void onBefore(RequestEntity requestEntity, FlowControlResult flowControlResult) { + try { + chain.onBefore(requestEntity, null); + } catch (Exception ex) { + flowControlResult.setRequestType(requestEntity.getRequestType()); + FlowControlExceptionUtils.handleException(ex, flowControlResult); + LOGGER.log(Level.FINE, ex, ex::getMessage); + } + } + + /** + * postset method + * + * @param requestEntity request body + * @param result execution result + * @param flowControlScenario Scenario information for flow control + */ + public void onAfter(RequestEntity requestEntity, Object result, FlowControlScenario flowControlScenario) { + chain.onAfter(requestEntity, flowControlScenario, result); + } + + /** + * exception method + * + * @param requestEntity request body + * @param throwable exception message + * @param flowControlScenario Scenario information for flow control + */ + public void onThrow(RequestEntity requestEntity, Throwable throwable, FlowControlScenario flowControlScenario) { + chain.onThrow(requestEntity, flowControlScenario, throwable); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/XdsRequestHandler.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/XdsRequestHandler.java new file mode 100644 index 0000000000..daa1502dfc --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/XdsRequestHandler.java @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.res4j.chain; + +import io.sermant.flowcontrol.common.entity.FlowControlScenario; +import io.sermant.flowcontrol.common.entity.RequestEntity; + +/** + * Xds request handler definition + * + * @author zhp + * @since 2024-12-31 + */ +public interface XdsRequestHandler { + /** + * request processing + * + * @param requestEntity request-information + * @param flowControlScenario matched business information + */ + void onBefore(RequestEntity requestEntity, FlowControlScenario flowControlScenario); + + /** + * response processing + * + * @param requestEntity request-information + * @param flowControlScenario matched business information + * @param result response result + */ + void onAfter(RequestEntity requestEntity, FlowControlScenario flowControlScenario, Object result); + + /** + * response processing + * + * @param requestEntity request-information + * @param flowControlScenario matched business information + * @param throwable throwable + */ + void onThrow(RequestEntity requestEntity, FlowControlScenario flowControlScenario, Throwable throwable); + + /** + * priority + * + * @return priority the smaller the value the higher the priority + */ + int getOrder(); +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/BusinessRequestHandler.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/BusinessRequestHandler.java index 60a86a5475..4579cc2186 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/BusinessRequestHandler.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/BusinessRequestHandler.java @@ -17,6 +17,7 @@ package io.sermant.flowcontrol.res4j.chain.handler; +import io.sermant.core.utils.CollectionUtils; import io.sermant.flowcontrol.common.core.match.MatchManager; import io.sermant.flowcontrol.common.entity.FlowControlScenario; import io.sermant.flowcontrol.res4j.chain.AbstractChainHandler; @@ -33,33 +34,46 @@ * @since 2022-07-05 */ public class BusinessRequestHandler extends AbstractChainHandler { + private static final String MATCHED_SCENARIO_INFO = "__MATCHED_SCENARIO_INFO__"; + @Override - public void onBefore(RequestContext context, FlowControlScenario scenarioInfo) { - final Set matchBusinessNames = MatchManager.INSTANCE.matchWithCache(context.getRequestEntity()); - if (scenarioInfo != null) { - scenarioInfo.setMatchedScenarioNames(matchBusinessNames); - super.onBefore(context, scenarioInfo); - } else { - FlowControlScenario flowControlScenario = new FlowControlScenario(); - flowControlScenario.setMatchedScenarioNames(matchBusinessNames); - super.onBefore(context, flowControlScenario); + public void onBefore(RequestContext context, FlowControlScenario flowControlScenario) { + final Set matchedBusinessNames = MatchManager.INSTANCE.matchWithCache(context.getRequestEntity()); + if (CollectionUtils.isEmpty(matchedBusinessNames)) { + return; } + FlowControlScenario scenarioInfo = new FlowControlScenario(); + scenarioInfo.setMatchedScenarioNames(matchedBusinessNames); + context.save(MATCHED_SCENARIO_INFO, scenarioInfo); + super.onBefore(context, scenarioInfo); } @Override - public void onThrow(RequestContext context, FlowControlScenario scenarioInfo, Throwable throwable) { + public void onThrow(RequestContext context, FlowControlScenario flowControlScenario, Throwable throwable) { + final FlowControlScenario scenarioInfo = getMatchedScenarioInfo(context); + if (scenarioInfo == null || CollectionUtils.isEmpty(scenarioInfo.getMatchedScenarioNames())) { + return; + } super.onThrow(context, scenarioInfo, throwable); } @Override - public void onResult(RequestContext context, FlowControlScenario scenarioInfo, Object result) { + public void onResult(RequestContext context, FlowControlScenario flowControlScenario, Object result) { + final FlowControlScenario scenarioInfo = getMatchedScenarioInfo(context); + if (scenarioInfo == null || CollectionUtils.isEmpty(scenarioInfo.getMatchedScenarioNames())) { + return; + } try { super.onResult(context, scenarioInfo, result); } finally { - ChainContext.getThreadLocalContext(context.getSourceName()).remove(MATCHED_SCENARIO_NAMES); + ChainContext.getThreadLocalContext(context.getSourceName()).remove(MATCHED_SCENARIO_INFO); } } + private FlowControlScenario getMatchedScenarioInfo(RequestContext context) { + return context.get(MATCHED_SCENARIO_INFO, FlowControlScenario.class); + } + @Override public int getOrder() { return HandlerConstants.BUSINESS_ORDER; diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsBusinessClientRequestHandler.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsBusinessClientRequestHandler.java index 0938bff1cd..e93fa0b3e5 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsBusinessClientRequestHandler.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsBusinessClientRequestHandler.java @@ -19,10 +19,9 @@ import io.sermant.flowcontrol.common.core.match.XdsRouteMatchManager; import io.sermant.flowcontrol.common.entity.FlowControlScenario; import io.sermant.flowcontrol.common.entity.RequestEntity; -import io.sermant.flowcontrol.common.util.XdsThreadLocalUtil; -import io.sermant.flowcontrol.res4j.chain.AbstractChainHandler; +import io.sermant.flowcontrol.common.entity.RequestEntity.RequestType; +import io.sermant.flowcontrol.res4j.chain.AbstractXdsChainHandler; import io.sermant.flowcontrol.res4j.chain.HandlerConstants; -import io.sermant.flowcontrol.res4j.chain.context.RequestContext; /** * Business handler class for client requests, Get the matched scenario information based on the routing match rule @@ -30,28 +29,22 @@ * @author zhp * @since 2024-12-05 */ -public class XdsBusinessClientRequestHandler extends AbstractChainHandler { +public class XdsBusinessClientRequestHandler extends AbstractXdsChainHandler { @Override - public void onBefore(RequestContext context, FlowControlScenario scenario) { + public void onBefore(RequestEntity requestEntity, FlowControlScenario scenario) { FlowControlScenario matchedScenario = XdsRouteMatchManager.INSTANCE.getMatchedScenarioInfo( - context.getRequestEntity(), context.getRequestEntity().getServiceName()); - context.save(MATCHED_SCENARIO_NAMES, matchedScenario); - XdsThreadLocalUtil.setScenarioInfo(matchedScenario); - super.onBefore(context, matchedScenario); + requestEntity, requestEntity.getServiceName()); + super.onBefore(requestEntity, matchedScenario); } @Override - public void onThrow(RequestContext context, FlowControlScenario scenario, Throwable throwable) { - super.onThrow(context, scenario, throwable); + public void onThrow(RequestEntity requestEntity, FlowControlScenario scenario, Throwable throwable) { + super.onThrow(requestEntity, scenario, throwable); } @Override - public void onResult(RequestContext context, FlowControlScenario scenario, Object result) { - try { - super.onResult(context, scenario, result); - } finally { - XdsThreadLocalUtil.removeScenarioInfo(); - } + public void onAfter(RequestEntity requestEntity, FlowControlScenario scenario, Object result) { + super.onAfter(requestEntity, scenario, result); } @Override @@ -60,12 +53,7 @@ public int getOrder() { } @Override - protected boolean isSkip(RequestContext context, FlowControlScenario scenario) { - return !XDS_FLOW_CONTROL_CONFIG.isEnable(); - } - - @Override - protected RequestEntity.RequestType direct() { - return RequestEntity.RequestType.CLIENT; + protected RequestType direct() { + return RequestType.CLIENT; } } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsBusinessServerRequestHandler.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsBusinessServerRequestHandler.java index fc8314fa82..c0d083eccd 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsBusinessServerRequestHandler.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsBusinessServerRequestHandler.java @@ -21,10 +21,10 @@ import io.sermant.flowcontrol.common.core.match.XdsRouteMatchManager; import io.sermant.flowcontrol.common.entity.FlowControlScenario; import io.sermant.flowcontrol.common.entity.RequestEntity; +import io.sermant.flowcontrol.common.entity.RequestEntity.RequestType; import io.sermant.flowcontrol.common.util.XdsThreadLocalUtil; -import io.sermant.flowcontrol.res4j.chain.AbstractChainHandler; +import io.sermant.flowcontrol.res4j.chain.AbstractXdsChainHandler; import io.sermant.flowcontrol.res4j.chain.HandlerConstants; -import io.sermant.flowcontrol.res4j.chain.context.RequestContext; /** * Business handler class for server requests, Get the matched scenario information based on the routing match rule @@ -32,34 +32,33 @@ * @author zhp * @since 2024-12-05 */ -public class XdsBusinessServerRequestHandler extends AbstractChainHandler { +public class XdsBusinessServerRequestHandler extends AbstractXdsChainHandler { private final ServiceMeta serviceMeta = ConfigManager.getConfig(ServiceMeta.class); @Override - public void onBefore(RequestContext context, FlowControlScenario scenarioInfo) { - if (XdsThreadLocalUtil.getScenarioInfo() != null) { - super.onBefore(context, XdsThreadLocalUtil.getScenarioInfo()); - return; - } + public void onBefore(RequestEntity requestEntity, FlowControlScenario scenarioInfo) { FlowControlScenario matchedScenarioEntity = XdsRouteMatchManager.INSTANCE.getMatchedScenarioInfo( - context.getRequestEntity(), serviceMeta.getService()); - context.save(MATCHED_SCENARIO_NAMES, matchedScenarioEntity); + requestEntity, serviceMeta.getService()); XdsThreadLocalUtil.setScenarioInfo(matchedScenarioEntity); - super.onBefore(context, matchedScenarioEntity); + super.onBefore(requestEntity, matchedScenarioEntity); } @Override - public void onThrow(RequestContext context, FlowControlScenario scenarioInfo, Throwable throwable) { - super.onThrow(context, scenarioInfo, throwable); + public void onThrow(RequestEntity requestEntity, FlowControlScenario scenarioInfo, Throwable throwable) { + if (scenarioInfo == null) { + super.onThrow(requestEntity, XdsThreadLocalUtil.getScenarioInfo(), throwable); + return; + } + super.onThrow(requestEntity, scenarioInfo, throwable); } @Override - public void onResult(RequestContext context, FlowControlScenario scenarioInfo, Object result) { - try { - super.onResult(context, scenarioInfo, result); - } finally { - XdsThreadLocalUtil.removeScenarioInfo(); + public void onAfter(RequestEntity requestEntity, FlowControlScenario scenarioInfo, Object result) { + if (scenarioInfo == null) { + super.onAfter(requestEntity, XdsThreadLocalUtil.getScenarioInfo(), result); + return; } + super.onAfter(requestEntity, scenarioInfo, result); } @Override @@ -68,12 +67,7 @@ public int getOrder() { } @Override - protected boolean isSkip(RequestContext context, FlowControlScenario scenarioInfo) { - return !XDS_FLOW_CONTROL_CONFIG.isEnable(); - } - - @Override - protected RequestEntity.RequestType direct() { - return RequestEntity.RequestType.SERVER; + protected RequestType direct() { + return RequestType.SERVER; } } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsFaultRequestHandler.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsFaultRequestHandler.java index 874a72a6bc..a2e2f6d362 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsFaultRequestHandler.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsFaultRequestHandler.java @@ -26,11 +26,11 @@ import io.sermant.flowcontrol.common.core.rule.fault.FaultException; import io.sermant.flowcontrol.common.entity.FlowControlScenario; import io.sermant.flowcontrol.common.entity.RequestEntity; +import io.sermant.flowcontrol.common.entity.RequestEntity.RequestType; import io.sermant.flowcontrol.common.util.RandomUtil; import io.sermant.flowcontrol.common.xds.handler.XdsHandler; -import io.sermant.flowcontrol.res4j.chain.AbstractChainHandler; +import io.sermant.flowcontrol.res4j.chain.AbstractXdsChainHandler; import io.sermant.flowcontrol.res4j.chain.HandlerConstants; -import io.sermant.flowcontrol.res4j.chain.context.RequestContext; import java.util.Optional; import java.util.logging.Level; @@ -42,27 +42,27 @@ * @author zhp * @since 2024-12-05 */ -public class XdsFaultRequestHandler extends AbstractChainHandler { +public class XdsFaultRequestHandler extends AbstractXdsChainHandler { private static final String MESSAGE = "Request has been aborted by fault-ThrowException"; private static final Logger LOGGER = LoggerFactory.getLogger(); @Override - public void onBefore(RequestContext context, FlowControlScenario scenarioInfo) { + public void onBefore(RequestEntity requestEntity, FlowControlScenario scenarioInfo) { Optional xdsHttpFaultOptional = XdsHandler.INSTANCE. getHttpFault(scenarioInfo.getServiceName(), scenarioInfo.getRouteName()); if (!xdsHttpFaultOptional.isPresent()) { - super.onBefore(context, scenarioInfo); + super.onBefore(requestEntity, scenarioInfo); return; } XdsHttpFault xdsHttpFault = xdsHttpFaultOptional.get(); if (xdsHttpFault.getAbort() == null && xdsHttpFault.getDelay() == null) { - super.onBefore(context, scenarioInfo); + super.onBefore(requestEntity, scenarioInfo); return; } executeAbort(xdsHttpFault.getAbort()); executeDelay(xdsHttpFault.getDelay()); - super.onBefore(context, scenarioInfo); + super.onBefore(requestEntity, scenarioInfo); } private void executeAbort(XdsAbort xdsAbort) { @@ -101,27 +101,24 @@ private void executeDelay(XdsDelay delay) { } @Override - public void onThrow(RequestContext context, FlowControlScenario scenarioInfo, Throwable throwable) { - super.onThrow(context, scenarioInfo, throwable); + public void onThrow(RequestEntity requestEntity, FlowControlScenario scenarioInfo, Throwable throwable) { + super.onThrow(requestEntity, scenarioInfo, throwable); } @Override - public void onResult(RequestContext context, FlowControlScenario scenarioInfo, Object result) { - super.onResult(context, scenarioInfo, result); + public void onAfter(RequestEntity requestEntity, FlowControlScenario scenarioInfo, Object result) { + super.onAfter(requestEntity, scenarioInfo, result); } @Override - protected boolean isSkip(RequestContext context, FlowControlScenario scenarioInfo) { - if (!XDS_FLOW_CONTROL_CONFIG.isEnable()) { - return true; - } + protected boolean isSkip(RequestEntity requestEntity, FlowControlScenario scenarioInfo) { return scenarioInfo == null || StringUtils.isEmpty(scenarioInfo.getServiceName()) || StringUtils.isEmpty(scenarioInfo.getRouteName()); } @Override - protected RequestEntity.RequestType direct() { - return RequestEntity.RequestType.CLIENT; + protected RequestType direct() { + return RequestType.CLIENT; } @Override diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsRateLimitRequestHandler.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsRateLimitRequestHandler.java index 3b88353b77..4ffab180be 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsRateLimitRequestHandler.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsRateLimitRequestHandler.java @@ -22,12 +22,12 @@ import io.sermant.core.utils.StringUtils; import io.sermant.flowcontrol.common.entity.FlowControlScenario; import io.sermant.flowcontrol.common.entity.RequestEntity; +import io.sermant.flowcontrol.common.entity.RequestEntity.RequestType; import io.sermant.flowcontrol.common.util.RandomUtil; import io.sermant.flowcontrol.common.xds.handler.XdsHandler; import io.sermant.flowcontrol.common.xds.ratelimit.XdsRateLimitManager; -import io.sermant.flowcontrol.res4j.chain.AbstractChainHandler; +import io.sermant.flowcontrol.res4j.chain.AbstractXdsChainHandler; import io.sermant.flowcontrol.res4j.chain.HandlerConstants; -import io.sermant.flowcontrol.res4j.chain.context.RequestContext; import io.sermant.flowcontrol.res4j.exceptions.RateLimitException; import java.util.Optional; @@ -38,11 +38,11 @@ * @author zhp * @since 2024-12-05 */ -public class XdsRateLimitRequestHandler extends AbstractChainHandler { +public class XdsRateLimitRequestHandler extends AbstractXdsChainHandler { @Override - public void onBefore(RequestContext context, FlowControlScenario scenarioInfo) { + public void onBefore(RequestEntity requestEntity, FlowControlScenario scenarioInfo) { handleRateLimit(scenarioInfo); - super.onBefore(context, scenarioInfo); + super.onBefore(requestEntity, scenarioInfo); } private void handleRateLimit(FlowControlScenario scenarioInfo) { @@ -72,13 +72,13 @@ private void handleRateLimit(FlowControlScenario scenarioInfo) { } @Override - public void onThrow(RequestContext context, FlowControlScenario scenarioInfo, Throwable throwable) { - super.onThrow(context, scenarioInfo, throwable); + public void onThrow(RequestEntity requestEntity, FlowControlScenario scenarioInfo, Throwable throwable) { + super.onThrow(requestEntity, scenarioInfo, throwable); } @Override - public void onResult(RequestContext context, FlowControlScenario scenarioInfo, Object result) { - super.onResult(context, scenarioInfo, result); + public void onAfter(RequestEntity requestEntity, FlowControlScenario scenarioInfo, Object result) { + super.onAfter(requestEntity, scenarioInfo, result); } @Override @@ -87,16 +87,13 @@ public int getOrder() { } @Override - protected boolean isSkip(RequestContext context, FlowControlScenario scenarioInfo) { - if (!XDS_FLOW_CONTROL_CONFIG.isEnable()) { - return true; - } + protected boolean isSkip(RequestEntity requestEntity, FlowControlScenario scenarioInfo) { return scenarioInfo == null || StringUtils.isEmpty(scenarioInfo.getServiceName()) || StringUtils.isEmpty(scenarioInfo.getRouteName()); } @Override - protected RequestEntity.RequestType direct() { - return RequestEntity.RequestType.SERVER; + protected RequestType direct() { + return RequestType.SERVER; } } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/service/XdsHttpFlowControlServiceImpl.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/service/XdsHttpFlowControlServiceImpl.java new file mode 100644 index 0000000000..2fcefbd6d9 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/service/XdsHttpFlowControlServiceImpl.java @@ -0,0 +1,46 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.res4j.service; + +import io.sermant.flowcontrol.common.entity.FlowControlResult; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; +import io.sermant.flowcontrol.common.entity.RequestEntity; +import io.sermant.flowcontrol.res4j.chain.XdsHandlerChainEntry; +import io.sermant.flowcontrol.service.rest4j.XdsHttpFlowControlService; + +/** + * http request interception logic implementation + * + * @author zhp + * @since 2024-12-28 + */ +public class XdsHttpFlowControlServiceImpl implements XdsHttpFlowControlService { + @Override + public void onBefore(RequestEntity requestEntity, FlowControlResult flowControlResult) { + XdsHandlerChainEntry.INSTANCE.onBefore(requestEntity, flowControlResult); + } + + @Override + public void onAfter(RequestEntity requestEntity, Object result, FlowControlScenario flowControlScenario) { + XdsHandlerChainEntry.INSTANCE.onAfter(requestEntity, result, flowControlScenario); + } + + @Override + public void onThrow(RequestEntity requestEntity, Throwable throwable, FlowControlScenario flowControlScenario) { + XdsHandlerChainEntry.INSTANCE.onThrow(requestEntity, throwable, flowControlScenario); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/resources/META-INF/services/io.sermant.core.plugin.service.PluginService b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/resources/META-INF/services/io.sermant.core.plugin.service.PluginService index af8e5ba1b7..c67646a6f8 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/resources/META-INF/services/io.sermant.core.plugin.service.PluginService +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/resources/META-INF/services/io.sermant.core.plugin.service.PluginService @@ -19,3 +19,4 @@ io.sermant.flowcontrol.res4j.service.HttpRest4jServiceImpl io.sermant.flowcontrol.res4j.service.DubboRest4jServiceImpl io.sermant.flowcontrol.res4j.service.ServiceCollectorService io.sermant.flowcontrol.res4j.service.SystemStatusSlidingWindow +io.sermant.flowcontrol.res4j.service.XdsHttpFlowControlServiceImpl diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/resources/META-INF/services/io.sermant.flowcontrol.res4j.chain.AbstractChainHandler b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/resources/META-INF/services/io.sermant.flowcontrol.res4j.chain.AbstractChainHandler index 3ea32db77d..5d38258705 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/resources/META-INF/services/io.sermant.flowcontrol.res4j.chain.AbstractChainHandler +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/resources/META-INF/services/io.sermant.flowcontrol.res4j.chain.AbstractChainHandler @@ -24,7 +24,3 @@ io.sermant.flowcontrol.res4j.chain.handler.CircuitBreakerServerReqHandler io.sermant.flowcontrol.res4j.chain.handler.InstanceIsolationRequestHandler io.sermant.flowcontrol.res4j.chain.handler.FaultRequestHandler io.sermant.flowcontrol.res4j.chain.handler.SystemServerReqHandler -io.sermant.flowcontrol.res4j.chain.handler.XdsRateLimitRequestHandler -io.sermant.flowcontrol.res4j.chain.handler.XdsBusinessClientRequestHandler -io.sermant.flowcontrol.res4j.chain.handler.XdsBusinessServerRequestHandler -io.sermant.flowcontrol.res4j.chain.handler.XdsFaultRequestHandler diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/resources/META-INF/services/io.sermant.flowcontrol.res4j.chain.AbstractXdsChainHandler b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/resources/META-INF/services/io.sermant.flowcontrol.res4j.chain.AbstractXdsChainHandler new file mode 100644 index 0000000000..1d7e044a5d --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/resources/META-INF/services/io.sermant.flowcontrol.res4j.chain.AbstractXdsChainHandler @@ -0,0 +1,20 @@ +# +# Copyright (C) 2024-2024 Sermant Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +io.sermant.flowcontrol.res4j.chain.handler.XdsRateLimitRequestHandler +io.sermant.flowcontrol.res4j.chain.handler.XdsBusinessClientRequestHandler +io.sermant.flowcontrol.res4j.chain.handler.XdsBusinessServerRequestHandler +io.sermant.flowcontrol.res4j.chain.handler.XdsFaultRequestHandler