From b4545befe9f283ec252d8b82b1f91260a8346791 Mon Sep 17 00:00:00 2001 From: hanbingleixue Date: Tue, 24 Dec 2024 14:35:44 +0800 Subject: [PATCH] Add the common module with XDS circuit breaker and current limiting functions Signed-off-by: hanbingleixue --- .../core/service/xds/entity/XdsAbort.java | 6 +- .../core/service/xds/entity/XdsDelay.java | 6 +- .../core/service/xds/entity/XdsAbortTest.java | 5 +- .../core/service/xds/entity/XdsDelayTest.java | 5 +- .../xds/utils/RdsProtocolTransformer.java | 14 +- .../sermant-flowcontrol/config/config.yaml | 22 +- .../common/config/CommonConst.java | 15 + .../common/config/XdsFlowControlConfig.java | 74 +++++ .../core/match/XdsRouteMatchManager.java | 121 ++++++++ .../common/entity/FlowControlResponse.java | 28 ++ .../common/entity/FlowControlScenario.java | 92 ++++++ .../handler/AbstractRequestHandler.java | 34 ++- .../common/handler/retry/AbstractRetry.java | 15 +- .../common/handler/retry/Retry.java | 24 +- .../common/handler/retry/RetryContext.java | 34 ++- .../retry/policy/RetryOnSamePolicy.java | 74 ----- .../handler/retry/policy/RetryPolicy.java | 12 +- .../flowcontrol/common/util/RandomUtil.java | 53 ++++ .../common/util/XdsRouterUtils.java | 144 +++++++++ .../common/util/XdsThreadLocalUtil.java | 80 +++++ .../xds/circuit/XdsCircuitBreakerInfo.java | 126 ++++++++ .../xds/circuit/XdsCircuitBreakerManager.java | 203 +++++++++++++ .../common/xds/handler/XdsHandler.java | 276 ++++++++++++++++++ .../common/xds/lb/XdsLoadBalancer.java | 37 +++ .../common/xds/lb/XdsLoadBalancerFactory.java | 82 ++++++ .../common/xds/lb/XdsRandomLoadBalancer.java | 48 +++ .../xds/lb/XdsRoundRobinLoadBalancer.java | 56 ++++ .../xds/ratelimit/XdsRateLimitInfo.java | 58 ++++ .../xds/ratelimit/XdsRateLimitManager.java | 81 +++++ .../config/XdsFlowControlConfigTest.java | 63 ++++ .../core/match/XdsRouteMatchManagerTest.java | 117 ++++++++ .../entity/FlowControlScenarioTest.java | 76 +++++ .../handler/retry/RetryContextTest.java | 15 +- .../common/util/RandomUtilTest.java | 39 +++ .../common/util/XdsRouterUtilsTest.java | 112 +++++++ .../common/util/XdsThreadLocalUtilTest.java | 61 ++++ .../circuit/XdsCircuitBreakerInfoTest.java | 94 ++++++ .../circuit/XdsCircuitBreakerManagerTest.java | 71 +++++ .../common/xds/handler/XdsHandlerTest.java | 221 ++++++++++++++ .../xds/ratelimit/XdsRateLimitInfoTest.java | 53 ++++ 40 files changed, 2637 insertions(+), 110 deletions(-) create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/config/XdsFlowControlConfig.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/core/match/XdsRouteMatchManager.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/entity/FlowControlScenario.java delete mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/policy/RetryOnSamePolicy.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/util/RandomUtil.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/util/XdsRouterUtils.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/util/XdsThreadLocalUtil.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerInfo.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerManager.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/handler/XdsHandler.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/lb/XdsLoadBalancer.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/lb/XdsLoadBalancerFactory.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/lb/XdsRandomLoadBalancer.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/lb/XdsRoundRobinLoadBalancer.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/ratelimit/XdsRateLimitInfo.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/ratelimit/XdsRateLimitManager.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/config/XdsFlowControlConfigTest.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/core/match/XdsRouteMatchManagerTest.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/entity/FlowControlScenarioTest.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/util/RandomUtilTest.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/util/XdsRouterUtilsTest.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/util/XdsThreadLocalUtilTest.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerInfoTest.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerManagerTest.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/handler/XdsHandlerTest.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/ratelimit/XdsRateLimitInfoTest.java diff --git a/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/entity/XdsAbort.java b/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/entity/XdsAbort.java index ceb76e7cdd..027506ece0 100644 --- a/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/entity/XdsAbort.java +++ b/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/entity/XdsAbort.java @@ -31,7 +31,7 @@ public class XdsAbort { /** * The percentage of requests/ operations/ connections that will be aborted with the error code */ - private int percentage; + private FractionalPercent percentage; public int getHttpStatus() { return httpStatus; @@ -41,11 +41,11 @@ public void setHttpStatus(int httpStatus) { this.httpStatus = httpStatus; } - public int getPercentage() { + public FractionalPercent getPercentage() { return percentage; } - public void setPercentage(int percentage) { + public void setPercentage(FractionalPercent percentage) { this.percentage = percentage; } } diff --git a/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/entity/XdsDelay.java b/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/entity/XdsDelay.java index 3d63f923d9..c94de7e51d 100644 --- a/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/entity/XdsDelay.java +++ b/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/entity/XdsDelay.java @@ -31,7 +31,7 @@ public class XdsDelay { /** * The percentage of requests on which the delay will be injected */ - private int percentage; + private FractionalPercent percentage; public long getFixedDelay() { return fixedDelay; @@ -41,11 +41,11 @@ public void setFixedDelay(long fixedDelay) { this.fixedDelay = fixedDelay; } - public int getPercentage() { + public FractionalPercent getPercentage() { return percentage; } - public void setPercentage(int percentage) { + public void setPercentage(FractionalPercent percentage) { this.percentage = percentage; } } diff --git a/sermant-agentcore/sermant-agentcore-core/src/test/java/io/sermant/core/service/xds/entity/XdsAbortTest.java b/sermant-agentcore/sermant-agentcore-core/src/test/java/io/sermant/core/service/xds/entity/XdsAbortTest.java index e97fc1e0b1..3f5325dfcf 100644 --- a/sermant-agentcore/sermant-agentcore-core/src/test/java/io/sermant/core/service/xds/entity/XdsAbortTest.java +++ b/sermant-agentcore/sermant-agentcore-core/src/test/java/io/sermant/core/service/xds/entity/XdsAbortTest.java @@ -29,9 +29,10 @@ public class XdsAbortTest { @Test public void testXdsAbort() { XdsAbort abort = new XdsAbort(); - abort.setPercentage(100); + FractionalPercent percent = new FractionalPercent(); + abort.setPercentage(percent); abort.setHttpStatus(200); Assert.assertEquals(200, abort.getHttpStatus()); - Assert.assertEquals(100, abort.getPercentage(), 0); + Assert.assertEquals(percent, abort.getPercentage()); } } diff --git a/sermant-agentcore/sermant-agentcore-core/src/test/java/io/sermant/core/service/xds/entity/XdsDelayTest.java b/sermant-agentcore/sermant-agentcore-core/src/test/java/io/sermant/core/service/xds/entity/XdsDelayTest.java index 47fb845ee0..97a05fec04 100644 --- a/sermant-agentcore/sermant-agentcore-core/src/test/java/io/sermant/core/service/xds/entity/XdsDelayTest.java +++ b/sermant-agentcore/sermant-agentcore-core/src/test/java/io/sermant/core/service/xds/entity/XdsDelayTest.java @@ -29,9 +29,10 @@ public class XdsDelayTest { @Test public void testXdsDelay() { XdsDelay delay = new XdsDelay(); - delay.setPercentage(100); + FractionalPercent fractionalPercent = new FractionalPercent(); + delay.setPercentage(fractionalPercent); delay.setFixedDelay(200L); Assert.assertEquals(200, delay.getFixedDelay()); - Assert.assertEquals(100, delay.getPercentage(), 0); + Assert.assertEquals(fractionalPercent, delay.getPercentage()); } } diff --git a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/utils/RdsProtocolTransformer.java b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/utils/RdsProtocolTransformer.java index d8963ccc47..1be0673f4d 100644 --- a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/utils/RdsProtocolTransformer.java +++ b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/utils/RdsProtocolTransformer.java @@ -294,7 +294,12 @@ private static XdsHttpFault parseHttpFault(HTTPFault httpFault) { private static XdsAbort parseAbort(FaultAbort faultAbort) { XdsAbort xdsAbort = new XdsAbort(); - xdsAbort.setPercentage(faultAbort.getPercentage().getNumerator()); + io.sermant.core.service.xds.entity.FractionalPercent fractionalPercent = + new io.sermant.core.service.xds.entity.FractionalPercent(); + fractionalPercent.setNumerator(faultAbort.getPercentage().getNumerator()); + fractionalPercent.setDenominator(DenominatorType.getValueByName(faultAbort.getPercentage() + .getDenominator().name())); + xdsAbort.setPercentage(fractionalPercent); xdsAbort.setHttpStatus(faultAbort.getHttpStatus()); return xdsAbort; } @@ -303,7 +308,12 @@ private static XdsDelay parseDelay(FaultDelay faultDelay) { XdsDelay xdsDelay = new XdsDelay(); long fixedDelay = Duration.ofSeconds(faultDelay.getFixedDelay().getSeconds()).toMillis(); xdsDelay.setFixedDelay(fixedDelay); - xdsDelay.setPercentage(faultDelay.getPercentage().getNumerator()); + io.sermant.core.service.xds.entity.FractionalPercent fractionalPercent = + new io.sermant.core.service.xds.entity.FractionalPercent(); + fractionalPercent.setNumerator(faultDelay.getPercentage().getNumerator()); + fractionalPercent.setDenominator(DenominatorType.getValueByName(faultDelay.getPercentage() + .getDenominator().name())); + xdsDelay.setPercentage(fractionalPercent); return xdsDelay; } diff --git a/sermant-plugins/sermant-flowcontrol/config/config.yaml b/sermant-plugins/sermant-flowcontrol/config/config.yaml index eca2a07e41..298e9d0559 100644 --- a/sermant-plugins/sermant-flowcontrol/config/config.yaml +++ b/sermant-plugins/sermant-flowcontrol/config/config.yaml @@ -1,6 +1,20 @@ # FlowControl configuration flow.control.plugin: - useCseRule: true # whether to configure cse rules - enable-start-monitor: false # whether to enable indicator monitoring - enable-system-adaptive: false # whether to enable system adaptive flow control - enable-system-rule: false # whether to enable system rule flow control + # whether to configure cse rules + useCseRule: true + # whether to enable indicator monitoring + enable-start-monitor: false + # whether to enable system adaptive flow control + enable-system-adaptive: false + # whether to enable system rule flow control + enable-system-rule: false +xds.flow.control.config: + # Whether to enable Xds flow control + enable: false + retry: + # The specified response status codes that need to be retried. Retry will be performed when the response's status + # code matches one of the specified codes. + x-sermant-retriable-status-codes: + # The specified response header names that need to be retried. Retry will be performed when the response contains + # the specified headers. + x-sermant-retriable-header-names: \ No newline at end of file diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/config/CommonConst.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/config/CommonConst.java index 4da2bae4d7..86aa5cbeaa 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/config/CommonConst.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/config/CommonConst.java @@ -269,6 +269,21 @@ public class CommonConst { */ public static final String REQUEST_START_TIME = "requestStartTime"; + /** + * the connect for request address + */ + public static final String CONNECT = ":"; + + /** + * point + */ + public static final String ESCAPED_POINT = "\\."; + + /** + * Default response status code + */ + public static final int DEFAULT_RESPONSE_CODE = -1; + private CommonConst() { } } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/config/XdsFlowControlConfig.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/config/XdsFlowControlConfig.java new file mode 100644 index 0000000000..26e7a55f6d --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/config/XdsFlowControlConfig.java @@ -0,0 +1,74 @@ +/* + * Copyright (C) 2021-2022 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.sermant.flowcontrol.common.config; + +import io.sermant.core.config.common.ConfigFieldKey; +import io.sermant.core.config.common.ConfigTypeKey; +import io.sermant.core.plugin.config.PluginConfig; + +import java.util.List; + +/** + * retry configuration class + * + * @author zhouss + * @since 2022-01-28 + */ +@ConfigTypeKey("xds.flow.control.config") +public class XdsFlowControlConfig implements PluginConfig { + /** + * Specify the response code for retry, and retry will be executed when the response code is included + */ + @ConfigFieldKey("retry.x-sermant-retriable-status-codes") + private List retryStatusCodes; + + /** + * Specify the response code for retry, and retry will be executed when the response header is included + */ + @ConfigFieldKey("retry.x-sermant-retriable-header-names") + private List retryHeaderNames; + + /** + * xds flow control switch + */ + private boolean enable; + + public List getRetryStatusCodes() { + return retryStatusCodes; + } + + public void setRetryStatusCodes(List retryStatusCodes) { + this.retryStatusCodes = retryStatusCodes; + } + + public List getRetryHeaderNames() { + return retryHeaderNames; + } + + public void setRetryHeaderNames(List retryHeaderNames) { + this.retryHeaderNames = retryHeaderNames; + } + + public boolean isEnable() { + return enable; + } + + public void setEnable(boolean enable) { + this.enable = enable; + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/core/match/XdsRouteMatchManager.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/core/match/XdsRouteMatchManager.java new file mode 100644 index 0000000000..bbb0570dac --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/core/match/XdsRouteMatchManager.java @@ -0,0 +1,121 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.core.match; + +import io.sermant.core.service.xds.entity.XdsHeaderMatcher; +import io.sermant.core.service.xds.entity.XdsPathMatcher; +import io.sermant.core.service.xds.entity.XdsRoute; +import io.sermant.core.service.xds.entity.XdsRouteAction; +import io.sermant.core.service.xds.entity.XdsRouteAction.XdsClusterWeight; +import io.sermant.core.service.xds.entity.XdsRouteAction.XdsWeightedClusters; +import io.sermant.core.service.xds.entity.XdsRouteMatch; +import io.sermant.core.utils.CollectionUtils; +import io.sermant.core.utils.StringUtils; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; +import io.sermant.flowcontrol.common.entity.RequestEntity; +import io.sermant.flowcontrol.common.util.RandomUtil; +import io.sermant.flowcontrol.common.xds.handler.XdsHandler; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * XdsRouteMatchManager, Get BusinessEntity based on xDS routing rules + * + * @author zhp + * @since 2024-12-20 + **/ +public enum XdsRouteMatchManager { + /** + * singleton + */ + INSTANCE; + + /** + * get matched scenario information + * + * @param requestEntity request-information + * @param serviceName service name + * @return matched business information + */ + public FlowControlScenario getMatchedScenarioInfo(RequestEntity requestEntity, String serviceName) { + FlowControlScenario scenario = new FlowControlScenario(); + scenario.setServiceName(serviceName); + Optional matchedRouteOptional = getMatchedRoute(requestEntity, serviceName); + if (!matchedRouteOptional.isPresent()) { + return scenario; + } + XdsRoute matchedRoute = matchedRouteOptional.get(); + scenario.setRouteName(matchedRoute.getName()); + scenario.setClusterName(selectClusterByRoute(matchedRoute)); + return scenario; + } + + private Optional getMatchedRoute(RequestEntity requestEntity, String serviceName) { + List routes = + XdsHandler.INSTANCE.getServiceRouteByServiceName(serviceName); + for (XdsRoute route : routes) { + XdsRouteMatch routeMatch = route.getRouteMatch(); + + // check path matching + if (!isPathMatched(routeMatch.getPathMatcher(), requestEntity.getApiPath())) { + continue; + } + + // check head matching + if (!isHeadersMatched(routeMatch.getHeaderMatchers(), requestEntity.getHeaders())) { + continue; + } + return Optional.of(route); + } + return Optional.empty(); + } + + private boolean isPathMatched(XdsPathMatcher matcher, String path) { + return matcher.isMatch(path); + } + + private boolean isHeadersMatched(List matchers, Map headers) { + return matchers.stream() + .allMatch(xdsHeaderMatcher -> xdsHeaderMatcher.isMatch(headers)); + } + + private String selectClusterByRoute(XdsRoute matchedRoute) { + XdsRouteAction routeAction = matchedRoute.getRouteAction(); + String cluster = routeAction.getCluster(); + if (!routeAction.isWeighted() || routeAction.getWeightedClusters() == null) { + return cluster; + } + XdsWeightedClusters weightedClusters = routeAction.getWeightedClusters(); + List clusters = weightedClusters.getClusters(); + int totalWeight = weightedClusters.getTotalWeight(); + if (CollectionUtils.isEmpty(clusters) || totalWeight == 0) { + return StringUtils.EMPTY; + } + int randomWeight = RandomUtil.randomInt(totalWeight); + + int currentWeight = 0; + for (XdsClusterWeight clusterWeight : clusters) { + currentWeight += clusterWeight.getWeight(); + if (randomWeight < currentWeight) { + return clusterWeight.getClusterName(); + } + } + return StringUtils.EMPTY; + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/entity/FlowControlResponse.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/entity/FlowControlResponse.java index b0bea33071..57d83b33a5 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/entity/FlowControlResponse.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/entity/FlowControlResponse.java @@ -17,6 +17,10 @@ package io.sermant.flowcontrol.common.entity; +import java.util.Collections; +import java.util.List; +import java.util.Map; + /** * flow control response * @@ -33,6 +37,11 @@ public class FlowControlResponse { */ private Object result; + /** + * response + */ + private Map> headers; + /** * Whether to replace the actual response result, if true, replace */ @@ -54,6 +63,21 @@ public FlowControlResponse(String msg, int code) { this.code = code; } + /** + * flow control response results + * + * @param msg prompt message + * @param code response code + * @param headers response headers + * @param result response result + */ + public FlowControlResponse(String msg, int code, Map> headers, Object result) { + this.msg = msg; + this.code = code; + this.headers = headers; + this.result = result; + } + /** * flow control response results * @@ -91,4 +115,8 @@ public Object getResult() { public boolean isReplaceResult() { return isReplaceResult; } + + public Map> getHeaders() { + return headers == null ? Collections.emptyMap() : headers; + } } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/entity/FlowControlScenario.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/entity/FlowControlScenario.java new file mode 100644 index 0000000000..2993f9e706 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/entity/FlowControlScenario.java @@ -0,0 +1,92 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.entity; + +import java.util.Set; + +/** + * Scenario information for flow control, primarily used to retrieve matching flow control rules + * + * @author zhp + * @since 2024-11-27 + */ +public class FlowControlScenario { + /** + * matched service scenario name + */ + private Set matchedScenarioNames; + + /** + * The name of the downstream service + */ + private String serviceName; + + /** + * cluster name + */ + private String clusterName; + + /** + * route rule name + */ + private String routeName; + + /** + * request Address,ip:port + */ + private String address; + + public Set getMatchedScenarioNames() { + return matchedScenarioNames; + } + + public void setMatchedScenarioNames(Set matchedScenarioNames) { + this.matchedScenarioNames = matchedScenarioNames; + } + + public String getServiceName() { + return serviceName; + } + + public void setServiceName(String serviceName) { + this.serviceName = serviceName; + } + + public String getClusterName() { + return clusterName; + } + + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + public String getRouteName() { + return routeName; + } + + public void setRouteName(String routeName) { + this.routeName = routeName; + } + + public String getAddress() { + return address; + } + + public void setAddress(String address) { + this.address = address; + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/AbstractRequestHandler.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/AbstractRequestHandler.java index b24e2c6355..653836ca1e 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/AbstractRequestHandler.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/AbstractRequestHandler.java @@ -17,11 +17,13 @@ package io.sermant.flowcontrol.common.handler; +import io.sermant.flowcontrol.common.config.CommonConst; import io.sermant.flowcontrol.common.core.ResolverManager; import io.sermant.flowcontrol.common.core.match.MatchManager; import io.sermant.flowcontrol.common.core.resolver.AbstractResolver; -import io.sermant.flowcontrol.common.core.rule.AbstractRule; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; import io.sermant.flowcontrol.common.entity.RequestEntity; +import io.sermant.flowcontrol.common.util.StringUtils; import java.util.Collections; import java.util.List; @@ -39,7 +41,7 @@ * @author zhouss * @since 2022-01-22 */ -public abstract class AbstractRequestHandler { +public abstract class AbstractRequestHandler { /** * Handler cache */ @@ -70,10 +72,21 @@ public List getHandlers(RequestEntity request) { return createOrGetHandlers(businessNames); } + /** + * gets the specified request handler + * + * @param flowControlScenario matched scenario information + * @return handler + */ + public List getXdsHandlers(FlowControlScenario flowControlScenario) { + Optional handlerOptions = createHandler(flowControlScenario, StringUtils.EMPTY); + return handlerOptions.map(Collections::singletonList).orElse(Collections.emptyList()); + } + /** * create handler * - * @param businessNames matched service name + * @param businessNames matching service scenarios * @return handler */ public List createOrGetHandlers(Set businessNames) { @@ -90,7 +103,7 @@ private Optional create(String businessName) { if (rule == null) { return Optional.empty(); } - return createProcessor(businessName, rule); + return createHandler(businessName, rule); } /** @@ -100,7 +113,18 @@ private Optional create(String businessName) { * @param rule matching resolution rules * @return handler */ - protected abstract Optional createProcessor(String businessName, R rule); + protected abstract Optional createHandler(String businessName, R rule); + + /** + * create handler + * + * @param flowControlScenario matched business information + * @param businessName service scenario name + * @return handler + */ + public Optional createHandler(FlowControlScenario flowControlScenario, String businessName) { + return Optional.empty(); + } /** * get configuration key diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/AbstractRetry.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/AbstractRetry.java index 37013156e9..d0ca324c2c 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/AbstractRetry.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/AbstractRetry.java @@ -79,7 +79,18 @@ public boolean needRetry(Set statusList, Object result) { * @return response status code * @throws UnsupportedOperationException unsupported operation */ - protected Optional getCode(Object result) { + public Optional getCode(Object result) { + throw new UnsupportedOperationException(); + } + + /** + * Get the name of the response header in the response information + * + * @param result interface response result + * @return response header names + * @throws UnsupportedOperationException unsupported operation + */ + public Optional> getHeaderNames(Object result) { throw new UnsupportedOperationException(); } @@ -97,7 +108,7 @@ protected final Class[] getRetryExceptions() { final RetryFramework retryFramework = retryType(); final FlowControlConfig pluginConfig = PluginConfigManager.getPluginConfig(FlowControlConfig.class); String[] retryExceptions; - if (retryFramework == RetryFramework.SPRING_CLOUD) { + if (retryFramework == RetryFramework.SPRING_CLOUD || retryFramework == RetryFramework.SPRING) { retryExceptions = pluginConfig.getSpringRetryExceptions(); } else if (retryFramework == RetryFramework.ALIBABA_DUBBO) { retryExceptions = pluginConfig.getAlibabaDubboRetryExceptions(); diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/Retry.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/Retry.java index fbc3ec2ee5..e3db641444 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/Retry.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/Retry.java @@ -17,6 +17,7 @@ package io.sermant.flowcontrol.common.handler.retry; +import java.util.Optional; import java.util.Set; /** @@ -49,6 +50,22 @@ public interface Retry { */ RetryFramework retryType(); + /** + * get status code + * + * @param result interface response result + * @return response status code + */ + Optional getCode(Object result); + + /** + * get header + * + * @param result interface response result + * @return response header names + */ + Optional> getHeaderNames(Object result); + /** * retryFrame * @@ -68,6 +85,11 @@ enum RetryFramework { /** * apache dubbo retry */ - APACHE_DUBBO + APACHE_DUBBO, + + /** + * Spring retry + */ + SPRING; } } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/RetryContext.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/RetryContext.java index 1a3600a95c..d56c706cae 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/RetryContext.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/RetryContext.java @@ -17,14 +17,19 @@ package io.sermant.flowcontrol.common.handler.retry; +import io.sermant.core.service.xds.entity.XdsRetryPolicy; import io.sermant.flowcontrol.common.core.RuleUtils; import io.sermant.flowcontrol.common.core.resolver.RetryResolver; import io.sermant.flowcontrol.common.core.rule.RetryRule; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; import io.sermant.flowcontrol.common.entity.HttpRequestEntity; -import io.sermant.flowcontrol.common.handler.retry.policy.RetryOnSamePolicy; +import io.sermant.flowcontrol.common.handler.retry.policy.RetryOnUntriedPolicy; import io.sermant.flowcontrol.common.handler.retry.policy.RetryPolicy; +import io.sermant.flowcontrol.common.util.StringUtils; +import io.sermant.flowcontrol.common.xds.handler.XdsHandler; import java.util.List; +import java.util.Optional; /** * Retry context, used to manage retry policies based on different host framework types @@ -87,7 +92,7 @@ public boolean isPolicyNeedRetry() { if (retryPolicy == null) { return false; } - return retryPolicy.isRetry() && retryPolicy.needRetry(); + return retryPolicy.isRetry() && retryPolicy.isReachedRetryThreshold(); } /** @@ -104,12 +109,12 @@ public RetryPolicy getRetryPolicy() { * * @param serviceInstance service instance */ - public void updateServiceInstance(Object serviceInstance) { + public void updateRetriedServiceInstance(Object serviceInstance) { final RetryPolicy retryPolicy = getRetryPolicy(); if (retryPolicy == null) { return; } - retryPolicy.update(serviceInstance); + retryPolicy.updateRetriedInstance(serviceInstance); retryPolicy.retryMark(); } @@ -119,7 +124,7 @@ public void updateServiceInstance(Object serviceInstance) { * @param retryRule retry rule */ public void buildRetryPolicy(RetryRule retryRule) { - policyThreadLocal.set(new RetryOnSamePolicy(retryRule.getRetryOnSame())); + policyThreadLocal.set(new RetryOnUntriedPolicy(retryRule.getRetryOnSame())); } /** @@ -134,4 +139,23 @@ public void buildRetryPolicy(HttpRequestEntity requestEntity) { RetryContext.INSTANCE.buildRetryPolicy(rule.get(0)); } } + + /** + * build test strategy + * + * @param scenario scenario information + */ + public void buildXdsRetryPolicy(FlowControlScenario scenario) { + if (StringUtils.isEmpty(scenario.getServiceName()) + || StringUtils.isEmpty(scenario.getRouteName())) { + return; + } + Optional retryPolicyOptional = XdsHandler.INSTANCE + .getRetryPolicy(scenario.getServiceName(), scenario.getRouteName()); + if (!retryPolicyOptional.isPresent()) { + return; + } + XdsRetryPolicy retryPolicy = retryPolicyOptional.get(); + policyThreadLocal.set(new RetryOnUntriedPolicy((int) retryPolicy.getMaxAttempts())); + } } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/policy/RetryOnSamePolicy.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/policy/RetryOnSamePolicy.java deleted file mode 100644 index cc5e6b7141..0000000000 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/policy/RetryOnSamePolicy.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright (C) 2022-2022 Huawei Technologies Co., Ltd. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package io.sermant.flowcontrol.common.handler.retry.policy; - -/** - * Retry on the same instance, regardless of thread safety, only on thread variables - * - * @author zhouss - * @since 2022-07-25 - */ -public class RetryOnSamePolicy implements RetryPolicy { - private final int retryOnSame; - - private Object lastRetryInstance; - - private int hasTriedCount; - - private boolean isRetry; - - private boolean isFirstMark = true; - - /** - * retry constructor - * - * @param retryOnSame the number of retries for the same instance - */ - public RetryOnSamePolicy(int retryOnSame) { - this.retryOnSame = retryOnSame; - } - - @Override - public boolean needRetry() { - return hasTriedCount < retryOnSame; - } - - @Override - public void retryMark() { - if (!isFirstMark) { - this.hasTriedCount++; - } - this.isRetry = true; - isFirstMark = false; - } - - @Override - public boolean isRetry() { - return isRetry; - } - - @Override - public Object getLastRetryServer() { - return lastRetryInstance; - } - - @Override - public void update(Object instance) { - this.lastRetryInstance = instance; - } -} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/policy/RetryPolicy.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/policy/RetryPolicy.java index 588814369e..3cb3b4203e 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/policy/RetryPolicy.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/policy/RetryPolicy.java @@ -17,6 +17,8 @@ package io.sermant.flowcontrol.common.handler.retry.policy; +import java.util.List; + /** * retry strategy * @@ -25,11 +27,11 @@ */ public interface RetryPolicy { /** - * need to retry + * is reached max attempts * * @return retry or not */ - boolean needRetry(); + boolean isReachedRetryThreshold(); /** * retry mark @@ -44,16 +46,16 @@ public interface RetryPolicy { boolean isRetry(); /** - * Gets the last retry instance + * Gets All retry instance * * @return retry instance */ - Object getLastRetryServer(); + List getAllRetriedInstance(); /** * update retry instance * * @param instance instance */ - void update(Object instance); + void updateRetriedInstance(Object instance); } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/util/RandomUtil.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/util/RandomUtil.java new file mode 100644 index 0000000000..fddf9f8c30 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/util/RandomUtil.java @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.util; + +import java.util.Random; + +/** + * Random util + * + * @author zhp + * @since 2024-12-05 + */ +public class RandomUtil { + private static final Random RANDOM = new Random(); + + private RandomUtil() { + } + + /** + * Generate random numbers + * + * @param min The minimum value of a random number + * @param max The maximum value of a random number + * @return random numbers + */ + public static int randomInt(int min, int max) { + return RANDOM.nextInt(max - min) + min; + } + + /** + * Generate random numbers + * + * @param max The maximum value of a random number + * @return random numbers + */ + public static int randomInt(int max) { + return RANDOM.nextInt(max); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/util/XdsRouterUtils.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/util/XdsRouterUtils.java new file mode 100644 index 0000000000..517f3a01d3 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/util/XdsRouterUtils.java @@ -0,0 +1,144 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.util; + +import io.sermant.core.config.ConfigManager; +import io.sermant.core.plugin.config.ServiceMeta; +import io.sermant.core.service.xds.entity.ServiceInstance; +import io.sermant.core.service.xds.entity.XdsLocality; +import io.sermant.core.utils.NetworkUtils; +import io.sermant.core.utils.StringUtils; +import io.sermant.flowcontrol.common.xds.handler.XdsHandler; + +import java.net.URI; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * XdsRouterUtils + * + * @author zhp + * @since 2024-12-10 + **/ +public class XdsRouterUtils { + private static final String LOCAL_HOST = "localhost"; + + /** + * the locality information of the host microservice itself + */ + private static XdsLocality selfServiceLocality; + + private static volatile boolean localityObtainedFlag = false; + + private XdsRouterUtils() { + } + + /** + * get XdsLocality of self-service + * + * @return XdsLocality + */ + public static Optional getLocalityInfoOfSelfService() { + if (localityObtainedFlag) { + return Optional.of(selfServiceLocality); + } + synchronized (XdsRouterUtils.class) { + if (localityObtainedFlag) { + return Optional.of(selfServiceLocality); + } + String podIp = NetworkUtils.getKubernetesPodIp(); + if (StringUtils.isEmpty(podIp)) { + return Optional.empty(); + } + Set serviceInstances = XdsHandler.INSTANCE + .getServiceInstanceByServiceName(ConfigManager.getConfig(ServiceMeta.class).getService()); + Optional serviceInstance = getMatchedServiceInstanceByPodIp(serviceInstances, podIp); + if (!serviceInstance.isPresent()) { + return Optional.empty(); + } + Optional validXdsLocality = createValidXdsLocality(serviceInstance.get().getMetaData()); + selfServiceLocality = validXdsLocality.orElse(null); + return validXdsLocality; + } + } + + /** + * update localityObtainedFlag + * + * @param flag locality obtained flag + */ + public static void updateLocalityObtainedFlag(boolean flag) { + localityObtainedFlag = flag; + } + + private static Optional getMatchedServiceInstanceByPodIp(Set serviceInstances, + String podIp) { + return serviceInstances.stream() + .filter(serviceInstance -> podIp.equals(serviceInstance.getHost())) + .findFirst(); + } + + /** + * rebuild new url by XdsServiceInstance + * + * @param oldUri old uri + * @param serviceInstance xds service instance + * @return new url + */ + public static String rebuildUrlByXdsServiceInstance(URI oldUri, ServiceInstance serviceInstance) { + StringBuilder builder = new StringBuilder(); + builder.append(oldUri.getScheme()) + .append("://") + .append(serviceInstance.getHost()) + .append(":") + .append(serviceInstance.getPort()) + .append(oldUri.getPath()); + String query = oldUri.getQuery(); + if (StringUtils.isEmpty(query)) { + return builder.toString(); + } + builder.append("?").append(query); + return builder.toString(); + } + + /** + * isXdsRouteRequired + * + * @param serviceName serviceName + * @return isXdsRouteRequired + */ + public static boolean isXdsRouteRequired(String serviceName) { + // if service is localhost or started not with lowercase, so no xds routing required + return !StringUtils.isEmpty(serviceName) && !serviceName.equals(LOCAL_HOST) + && Character.isLowerCase(serviceName.charAt(0)); + } + + private static Optional createValidXdsLocality(Map metaData) { + XdsLocality locality = new XdsLocality(); + String region = metaData.get("region"); + String zone = metaData.get("zone"); + String subZone = metaData.get("sub_zone"); + if (StringUtils.isEmpty(region) || StringUtils.isEmpty(zone) && !StringUtils.isEmpty(subZone)) { + return Optional.empty(); + } + locality.setRegion(region); + locality.setZone(zone); + locality.setSubZone(subZone); + return Optional.of(locality); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/util/XdsThreadLocalUtil.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/util/XdsThreadLocalUtil.java new file mode 100644 index 0000000000..9a59d46d6c --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/util/XdsThreadLocalUtil.java @@ -0,0 +1,80 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.util; + +import io.sermant.flowcontrol.common.entity.FlowControlScenario; + +/** + * xds thread local utility class + * + * @author zhp + * @since 2024-11-30 + */ +public class XdsThreadLocalUtil { + private static final ThreadLocal SEND_BYTE_FLAG = new ThreadLocal<>(); + + private static final ThreadLocal FLOW_CONTROL_SCENARIO_THREAD_LOCAL = new ThreadLocal<>(); + + private XdsThreadLocalUtil() { + } + + /** + * Set byte send flag + * + * @param flag byte send flag + */ + public static void setSendByteFlag(boolean flag) { + SEND_BYTE_FLAG.set(flag); + } + + /** + * get byte send flag + */ + public static boolean getSendByteFlag() { + return SEND_BYTE_FLAG.get() != null && SEND_BYTE_FLAG.get(); + } + + /** + * remove byte send flag + */ + public static void removeSendByteFlag() { + SEND_BYTE_FLAG.remove(); + } + + /** + * Set scenario information + * + * @param flowControlScenario scenario information + */ + public static void setScenarioInfo(FlowControlScenario flowControlScenario) { + FLOW_CONTROL_SCENARIO_THREAD_LOCAL.set(flowControlScenario); + } + + /** + * get scenario information + */ + public static FlowControlScenario getScenarioInfo() { + return FLOW_CONTROL_SCENARIO_THREAD_LOCAL.get(); + } + + /** + * remove business information + */ + public static void removeScenarioInfo() { + FLOW_CONTROL_SCENARIO_THREAD_LOCAL.remove(); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerInfo.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerInfo.java new file mode 100644 index 0000000000..24c4f07323 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerInfo.java @@ -0,0 +1,126 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.xds.circuit; + +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Circuit Breaker information + * + * @author zhp + * @since 2024-12-02 + */ +public class XdsCircuitBreakerInfo { + /** + * Number of local failures + */ + private Deque localFailure; + + /** + * Number of gateway errors, Response status code 502,503,504 is gateway error + */ + private Deque gateWayFailure; + + /** + * The number of server errors + */ + private Deque serverFailure; + + /** + * Identification of whether the circuit breaker is open or not + */ + private boolean isOpen; + + /** + * End time of circuit breaker + */ + private long circuitBreakerEndTime; + + /** + * Number of circuit breakers + */ + private AtomicInteger circuitBreakerCount; + + /** + * Constructor + */ + public XdsCircuitBreakerInfo() { + this.localFailure = new ArrayDeque<>(); + this.gateWayFailure = new ArrayDeque<>(); + this.serverFailure = new ArrayDeque<>(); + this.circuitBreakerCount = new AtomicInteger(0); + } + + public Deque getLocalFailure() { + return localFailure; + } + + public void setLocalFailure(Deque localFailure) { + this.localFailure = localFailure; + } + + public Deque getGateWayFailure() { + return gateWayFailure; + } + + public void setGateWayFailure(Deque gateWayFailure) { + this.gateWayFailure = gateWayFailure; + } + + public Deque getServerFailure() { + return serverFailure; + } + + public void setServerFailure(Deque serverFailure) { + this.serverFailure = serverFailure; + } + + public boolean isOpen() { + return isOpen; + } + + public void setOpen(boolean open) { + isOpen = open; + } + + public long getCircuitBreakerEndTime() { + return circuitBreakerEndTime; + } + + public void setCircuitBreakerEndTime(long circuitBreakerEndTime) { + this.circuitBreakerEndTime = circuitBreakerEndTime; + } + + public AtomicInteger getCircuitBreakerCount() { + return circuitBreakerCount; + } + + public void setCircuitBreakerCount(AtomicInteger circuitBreakerCount) { + this.circuitBreakerCount = circuitBreakerCount; + } + + /** + * reset the data + */ + public void cleanRequestData() { + this.localFailure.clear(); + this.gateWayFailure.clear(); + this.serverFailure.clear(); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerManager.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerManager.java new file mode 100644 index 0000000000..dc3cc8186b --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerManager.java @@ -0,0 +1,203 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.xds.circuit; + +import io.sermant.core.service.xds.entity.XdsInstanceCircuitBreakers; +import io.sermant.core.utils.CollectionUtils; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; +import io.sermant.flowcontrol.common.util.XdsThreadLocalUtil; + +import java.util.Deque; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Circuit Breaker manager + * + * @author zhp + * @since 2024-12-02 + */ +public class XdsCircuitBreakerManager { + /** + * The map that stores Circuit breaker information, where the Key of the first level is the service name, + * the Key of the second level is the cluster name, the Key of the three level is the server address + */ + private static final Map>> INSTANCE_CIRCUIT_BREAKER_MAP = + new ConcurrentHashMap<>(); + + /** + * The map that stores the count of active requests, where the Key of the first level is the service name, + * the Key of the second level is the cluster name, the Key of the three level is the server address + */ + private static final Map>> REQUEST_CIRCUIT_BREAKER_MAP = + new ConcurrentHashMap<>(); + + private static final String GATE_WAY_FAILURE = "502,503,504"; + + private XdsCircuitBreakerManager() { + } + + /** + * increment Active Request + * + * @param serviceName service name + * @param clusterName route name + * @param address request address + * @return active request num + */ + public static int incrementActiveRequests(String serviceName, String clusterName, String address) { + return getActiveRequestCount(serviceName, clusterName, address).incrementAndGet(); + } + + /** + * decrement Active Request + * + * @param serviceName service name + * @param clusterName route name + * @param address request address + */ + public static void decrementActiveRequests(String serviceName, String clusterName, String address) { + getActiveRequestCount(serviceName, clusterName, address).decrementAndGet(); + } + + /** + * Determine whether instance circuit breaking is opened + * + * @param scenarioInfo Flow Control Scenario information + * @param address service address + * @return The result of the check, where true indicates that instance circuit breaking is required + */ + public static boolean needsInstanceCircuitBreaker(FlowControlScenario scenarioInfo, String address) { + XdsCircuitBreakerInfo circuitBreakerInfo = getCircuitBreakerInfo(scenarioInfo.getServiceName(), + scenarioInfo.getRouteName(), address); + return isCircuitBreakerOpen(circuitBreakerInfo); + } + + /** + * set circuit breaker status, The circuit breaker will be open when the number of failed instance calls reaches + * the threshold. + * + * @param circuitBreakers circuitBreakers rule + * @param scenarioInfo scenario information + */ + public static void setCircuitBeakerStatus(XdsInstanceCircuitBreakers circuitBreakers, + FlowControlScenario scenarioInfo) { + XdsCircuitBreakerInfo circuitBreakerInfo = getCircuitBreakerInfo(scenarioInfo.getServiceName(), + scenarioInfo.getRouteName(), scenarioInfo.getAddress()); + if (!XdsThreadLocalUtil.getSendByteFlag() && circuitBreakers.isSplitExternalLocalOriginErrors() + && shouldCircuitBreakerByFailure(circuitBreakerInfo.getGateWayFailure(), + circuitBreakers.getConsecutiveLocalOriginFailure(), circuitBreakers.getInterval())) { + openCircuitBreaker(circuitBreakerInfo, circuitBreakers.getInterval()); + } + if (shouldCircuitBreakerByFailure(circuitBreakerInfo.getGateWayFailure(), + circuitBreakers.getConsecutiveGatewayFailure(), circuitBreakers.getInterval())) { + openCircuitBreaker(circuitBreakerInfo, circuitBreakers.getInterval()); + } + if (shouldCircuitBreakerByFailure(circuitBreakerInfo.getServerFailure(), + circuitBreakers.getConsecutive5xxFailure(), circuitBreakers.getInterval())) { + openCircuitBreaker(circuitBreakerInfo, circuitBreakers.getInterval()); + } + } + + private static void openCircuitBreaker(XdsCircuitBreakerInfo circuitBreakerInfo, long interval) { + circuitBreakerInfo.setOpen(true); + circuitBreakerInfo.getCircuitBreakerCount().incrementAndGet(); + circuitBreakerInfo.cleanRequestData(); + circuitBreakerInfo.setCircuitBreakerEndTime( + System.currentTimeMillis() + circuitBreakerInfo.getCircuitBreakerCount().get() * interval); + } + + private static boolean shouldCircuitBreakerByFailure(Deque times, int failureRequestThreshold, + long interval) { + if (failureRequestThreshold <= 0 || CollectionUtils.isEmpty(times) || times.size() < failureRequestThreshold) { + return false; + } + for (int i = times.size(); i > failureRequestThreshold; i--) { + times.removeFirst(); + } + long currentTime = System.currentTimeMillis(); + Long time = times.getFirst(); + return currentTime - time <= interval; + } + + /** + * Check if circuit breaker is open + * + * @param circuitBreakerInfo circuit Breaker information + * @return if circuit breaker is open + */ + private static boolean isCircuitBreakerOpen(XdsCircuitBreakerInfo circuitBreakerInfo) { + return circuitBreakerInfo.isOpen() + && circuitBreakerInfo.getCircuitBreakerEndTime() > System.currentTimeMillis(); + } + + /** + * record failure request + * + * @param scenarioInfo scenario information + * @param address request address + * @param code response code + * @param circuitBreakers circuit rule + */ + public static void recordFailureRequest(FlowControlScenario scenarioInfo, String address, int code, + XdsInstanceCircuitBreakers circuitBreakers) { + XdsCircuitBreakerInfo circuitBreakerInfo = getCircuitBreakerInfo(scenarioInfo.getServiceName(), + scenarioInfo.getRouteName(), address); + if (isCircuitBreakerOpen(circuitBreakerInfo)) { + return; + } + long currentTime = System.currentTimeMillis(); + if (!XdsThreadLocalUtil.getSendByteFlag()) { + recordRequestTime(circuitBreakerInfo.getLocalFailure(), circuitBreakers.getConsecutiveLocalOriginFailure(), + currentTime); + } + if (code != 0 && GATE_WAY_FAILURE.contains(String.valueOf(code))) { + recordRequestTime(circuitBreakerInfo.getGateWayFailure(), circuitBreakers.getConsecutiveGatewayFailure(), + currentTime); + } + recordRequestTime(circuitBreakerInfo.getServerFailure(), circuitBreakers.getConsecutive5xxFailure(), + currentTime); + } + + private static void recordRequestTime(Deque times, int failureRequestThreshold, long currentTime) { + if (failureRequestThreshold <= 0) { + return; + } + for (int i = times.size(); i >= failureRequestThreshold - 1 && !times.isEmpty(); i--) { + times.removeFirst(); + } + times.add(currentTime); + } + + private static XdsCircuitBreakerInfo getCircuitBreakerInfo(String serviceName, String routeName, + String address) { + Map> serviceCircuitBreakerMap = INSTANCE_CIRCUIT_BREAKER_MAP. + computeIfAbsent(serviceName, key -> new ConcurrentHashMap<>()); + Map instanceCircuitBreakerMap = serviceCircuitBreakerMap. + computeIfAbsent(routeName, key -> new ConcurrentHashMap<>()); + return instanceCircuitBreakerMap.computeIfAbsent(address, key -> new XdsCircuitBreakerInfo()); + } + + private static AtomicInteger getActiveRequestCount(String serviceName, String clusterName, String address) { + Map> clusterCircuitBreakerMap = REQUEST_CIRCUIT_BREAKER_MAP. + computeIfAbsent(serviceName, key -> new ConcurrentHashMap<>()); + Map requestCircuitBreakerMap = clusterCircuitBreakerMap. + computeIfAbsent(clusterName, key -> new ConcurrentHashMap<>()); + return requestCircuitBreakerMap.computeIfAbsent(address, key -> new AtomicInteger()); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/handler/XdsHandler.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/handler/XdsHandler.java new file mode 100644 index 0000000000..e54cf29463 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/handler/XdsHandler.java @@ -0,0 +1,276 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.xds.handler; + +import io.sermant.core.common.CommonConstant; +import io.sermant.core.common.LoggerFactory; +import io.sermant.core.service.ServiceManager; +import io.sermant.core.service.xds.XdsCoreService; +import io.sermant.core.service.xds.XdsFlowControlService; +import io.sermant.core.service.xds.XdsLoadBalanceService; +import io.sermant.core.service.xds.XdsRouteService; +import io.sermant.core.service.xds.XdsServiceDiscovery; +import io.sermant.core.service.xds.entity.ServiceInstance; +import io.sermant.core.service.xds.entity.XdsClusterLoadAssigment; +import io.sermant.core.service.xds.entity.XdsHttpFault; +import io.sermant.core.service.xds.entity.XdsInstanceCircuitBreakers; +import io.sermant.core.service.xds.entity.XdsLbPolicy; +import io.sermant.core.service.xds.entity.XdsLocality; +import io.sermant.core.service.xds.entity.XdsRateLimit; +import io.sermant.core.service.xds.entity.XdsRequestCircuitBreakers; +import io.sermant.core.service.xds.entity.XdsRetryPolicy; +import io.sermant.core.service.xds.entity.XdsRoute; +import io.sermant.core.utils.StringUtils; +import io.sermant.flowcontrol.common.util.XdsRouterUtils; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +/** + * Xds handler + * + * @author zhp + * @since 2024-11-28 + */ +public enum XdsHandler { + /** + * singleton + */ + INSTANCE; + + /** + * Length of the cluster name after splitting + */ + private static final int CLUSTER_NAME_SPLIT_LENGTH = 4; + + private XdsFlowControlService xdsFlowControlService; + + private XdsRouteService xdsRouteService; + + private XdsServiceDiscovery xdsServiceDiscovery; + + private XdsLoadBalanceService xdsLoadBalanceService; + + /** + * constructor + */ + XdsHandler() { + Logger logger = LoggerFactory.getLogger(); + try { + XdsCoreService xdsCoreService = ServiceManager.getService(XdsCoreService.class); + xdsRouteService = xdsCoreService.getXdsRouteService(); + xdsServiceDiscovery = xdsCoreService.getXdsServiceDiscovery(); + xdsFlowControlService = xdsCoreService.getXdsFlowControlService(); + xdsLoadBalanceService = xdsCoreService.getLoadBalanceService(); + } catch (IllegalArgumentException e) { + logger.severe("XdsCoreService not started"); + } + } + + /** + * get request circuit breaker information of cluster + * + * @param serviceName service name + * @param clusterName cluster name + * @return circuit breaker rules + */ + public Optional getRequestCircuitBreakers(String serviceName, String clusterName) { + if (xdsFlowControlService == null || StringUtils.isEmpty(clusterName) || StringUtils.isEmpty(serviceName)) { + return Optional.empty(); + } + return xdsFlowControlService.getRequestCircuitBreakers(serviceName, clusterName); + } + + /** + * get instance circuit breaker information of cluster + * + * @param serviceName service name + * @param clusterName cluster name + * @return circuit breaker rules + */ + public Optional getInstanceCircuitBreakers(String serviceName, String clusterName) { + if (xdsFlowControlService == null || StringUtils.isEmpty(clusterName) || StringUtils.isEmpty(serviceName)) { + return Optional.empty(); + } + return xdsFlowControlService.getInstanceCircuitBreakers(serviceName, clusterName); + } + + /** + * get retry policy of route name + * + * @param serviceName service name + * @param routeName route name + * @return retry policy + */ + public Optional getRetryPolicy(String serviceName, String routeName) { + if (xdsFlowControlService == null || StringUtils.isEmpty(routeName) || StringUtils.isEmpty(serviceName)) { + return Optional.empty(); + } + return xdsFlowControlService.getRetryPolicy(serviceName, routeName); + } + + /** + * get rate limit of route name + * + * @param serviceName service name + * @param routeName route name + * @param clusterName cluster name + * @return rate limit rule + */ + public Optional getRateLimit(String serviceName, String routeName, String clusterName) { + if (xdsFlowControlService == null || StringUtils.isEmpty(clusterName) || StringUtils.isEmpty(routeName) + || StringUtils.isEmpty(serviceName)) { + return Optional.empty(); + } + String[] clusterInfo = clusterName.split(CommonConstant.ESCAPED_VERTICAL_LINE); + if (clusterInfo.length != CLUSTER_NAME_SPLIT_LENGTH) { + return Optional.empty(); + } + return xdsFlowControlService.getRateLimit(serviceName, routeName, clusterInfo[1]); + } + + /** + * get http fault of route name + * + * @param serviceName service name + * @param routeName route name + * @return http fault rule + */ + public Optional getHttpFault(String serviceName, String routeName) { + if (xdsFlowControlService == null || StringUtils.isEmpty(routeName) || StringUtils.isEmpty(serviceName)) { + return Optional.empty(); + } + return xdsFlowControlService.getHttpFault(serviceName, routeName); + } + + /** + * get http fault of route name + * + * @param serviceName service name + * @return route rules + */ + public List getServiceRouteByServiceName(String serviceName) { + if (xdsRouteService == null || StringUtils.isEmpty(serviceName)) { + return Collections.emptyList(); + } + return xdsRouteService.getServiceRoute(serviceName); + } + + /** + * get ServiceInstance of service name + * + * @param serviceName service name + * @return route rules + */ + public Set getServiceInstanceByServiceName(String serviceName) { + if (xdsServiceDiscovery == null || StringUtils.isEmpty(serviceName)) { + return Collections.emptySet(); + } + return xdsServiceDiscovery.getServiceInstance(serviceName); + } + + /** + * get ServiceInstance of service name and cluster name + * + * @param serviceName service name + * @param clusterName cluster name + * @return lb policy + */ + public Optional getLbPolicyOfCluster(String serviceName, String clusterName) { + if (xdsLoadBalanceService == null || StringUtils.isEmpty(serviceName) || StringUtils.isEmpty(clusterName)) { + return Optional.empty(); + } + return Optional.of(xdsLoadBalanceService.getLbPolicyOfCluster(serviceName, clusterName)); + } + + /** + * get ServiceInstance of service name and cluster name + * + * @param serviceName service name + * @param clusterName cluster name + * @return Service Instance + */ + public Set getAllServerInstance(String serviceName, String clusterName) { + if (xdsServiceDiscovery == null || xdsRouteService == null) { + return Collections.emptySet(); + } + if (StringUtils.isEmpty(serviceName) || StringUtils.isEmpty(clusterName)) { + return Collections.emptySet(); + } + Optional loadAssigmentOptional = + xdsServiceDiscovery.getClusterServiceInstance(serviceName, clusterName); + if (!loadAssigmentOptional.isPresent()) { + return xdsServiceDiscovery.getServiceInstance(serviceName); + } + XdsClusterLoadAssigment clusterLoadAssigment = loadAssigmentOptional.get(); + if (!xdsRouteService.isLocalityRoute(serviceName, clusterLoadAssigment.getClusterName())) { + Set serviceInstances = getServiceInstanceOfCluster(clusterLoadAssigment); + return serviceInstances.isEmpty() ? xdsServiceDiscovery.getServiceInstance(serviceName) : serviceInstances; + } + + // get locality info of self-service and route by locality + Optional localityInfoOfSelfService = XdsRouterUtils.getLocalityInfoOfSelfService(); + if (localityInfoOfSelfService.isPresent()) { + Set serviceInstances = getServiceInstanceOfLocalityCluster(clusterLoadAssigment, + localityInfoOfSelfService.get()); + if (!serviceInstances.isEmpty()) { + return serviceInstances; + } + } + Set serviceInstances = getServiceInstanceOfCluster(clusterLoadAssigment); + return serviceInstances.isEmpty() ? xdsServiceDiscovery.getServiceInstance(serviceName) : serviceInstances; + } + + private Set getServiceInstanceOfLocalityCluster(XdsClusterLoadAssigment clusterLoadAssigment, + XdsLocality locality) { + return clusterLoadAssigment.getLocalityInstances().entrySet().stream() + .filter(xdsLocalitySetEntry -> isSameLocality(locality, xdsLocalitySetEntry.getKey())) + .flatMap(xdsLocalitySetEntry -> xdsLocalitySetEntry.getValue().stream()) + .collect(Collectors.toSet()); + } + + private boolean isSameLocality(XdsLocality selfLocality, XdsLocality serviceLocality) { + if (!selfLocality.getRegion().equals(serviceLocality.getRegion())) { + return false; + } + if (StringUtils.isEmpty(selfLocality.getZone())) { + return true; + } + if (!selfLocality.getZone().equals(serviceLocality.getZone())) { + return false; + } + if (StringUtils.isEmpty(selfLocality.getSubZone())) { + return true; + } + return selfLocality.getSubZone().equals(serviceLocality.getSubZone()); + } + + private Set getServiceInstanceOfCluster(XdsClusterLoadAssigment clusterLoadAssigment) { + Set serviceInstances = new HashSet<>(); + for (Map.Entry> xdsLocalitySetEntry : clusterLoadAssigment + .getLocalityInstances().entrySet()) { + serviceInstances.addAll(xdsLocalitySetEntry.getValue()); + } + return serviceInstances; + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/lb/XdsLoadBalancer.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/lb/XdsLoadBalancer.java new file mode 100644 index 0000000000..418c97ea00 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/lb/XdsLoadBalancer.java @@ -0,0 +1,37 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.xds.lb; + +import io.sermant.core.service.xds.entity.ServiceInstance; + +import java.util.List; + +/** + * XdsLoadBalancer + * + * @author daizhenyu + * @since 2024-08-30 + **/ +public interface XdsLoadBalancer { + /** + * select instance by loadbalancer + * + * @param instances service instance + * @return selected instance + */ + ServiceInstance selectInstance(List instances); +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/lb/XdsLoadBalancerFactory.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/lb/XdsLoadBalancerFactory.java new file mode 100644 index 0000000000..c270c75454 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/lb/XdsLoadBalancerFactory.java @@ -0,0 +1,82 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.xds.lb; + +import io.sermant.core.common.LoggerFactory; +import io.sermant.core.service.xds.entity.XdsLbPolicy; +import io.sermant.flowcontrol.common.xds.handler.XdsHandler; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.logging.Logger; + +/** + * XdsLoadBalancerFactory + * + * @author daizhenyu + * @since 2024-08-30 + **/ +public class XdsLoadBalancerFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(); + + private static final Map LOAD_BALANCERS = new ConcurrentHashMap<>(); + + private static final String RANDOM = "RANDOM"; + + private XdsLoadBalancerFactory() { + } + + /** + * getRoundRobinLoadBalancer + * + * @param clusterName cluster name + * @return XdsLoadBalancer + */ + private static XdsLoadBalancer getRoundRobinLoadBalancer(String clusterName) { + return LOAD_BALANCERS.computeIfAbsent(clusterName, key -> new XdsRoundRobinLoadBalancer()); + } + + /** + * getRandomLoadBalancer + * + * @return XdsLoadBalancer + */ + private static XdsLoadBalancer getRandomLoadBalancer() { + return LOAD_BALANCERS.computeIfAbsent(RANDOM, key -> new XdsRandomLoadBalancer()); + } + + /** + * getLoadBalancer + * + * @param serviceName service name + * @param clusterName cluster name + * @return XdsLoadBalancer + */ + public static XdsLoadBalancer getLoadBalancer(String serviceName, String clusterName) { + Optional lbPolicyOptional = + XdsHandler.INSTANCE.getLbPolicyOfCluster(serviceName, clusterName); + if (!lbPolicyOptional.isPresent()) { + return getRoundRobinLoadBalancer(clusterName); + } + XdsLbPolicy lbPolicy = lbPolicyOptional.get(); + if (lbPolicy == XdsLbPolicy.RANDOM) { + return getRandomLoadBalancer(); + } + return getRoundRobinLoadBalancer(clusterName); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/lb/XdsRandomLoadBalancer.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/lb/XdsRandomLoadBalancer.java new file mode 100644 index 0000000000..c27c4e275f --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/lb/XdsRandomLoadBalancer.java @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.xds.lb; + +import io.sermant.core.service.xds.entity.ServiceInstance; + +import java.util.List; +import java.util.Random; + +/** + * XdsRandomLoadBalancer + * + * @author daizhenyu + * @since 2024-08-30 + **/ +public class XdsRandomLoadBalancer implements XdsLoadBalancer { + private final Random random; + + /** + * Constructor + */ + public XdsRandomLoadBalancer() { + this.random = new Random(); + } + + @Override + public ServiceInstance selectInstance(List instances) { + // Select a random index from the list + int index = random.nextInt(instances.size()); + + // Return the randomly selected instance + return instances.get(index); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/lb/XdsRoundRobinLoadBalancer.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/lb/XdsRoundRobinLoadBalancer.java new file mode 100644 index 0000000000..4d1ab698be --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/lb/XdsRoundRobinLoadBalancer.java @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.xds.lb; + +import io.sermant.core.service.xds.entity.ServiceInstance; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * XdsRoundRobinLoadBalancer + * + * @author daizhenyu + * @since 2024-08-30 + **/ +public class XdsRoundRobinLoadBalancer implements XdsLoadBalancer { + private final AtomicInteger index; + + /** + * constructor + */ + public XdsRoundRobinLoadBalancer() { + this.index = new AtomicInteger(0); + } + + @Override + public ServiceInstance selectInstance(List instances) { + synchronized (XdsRoundRobinLoadBalancer.class) { + // safely calculate the index based on the current size of the instances list + int currentIndex = index.getAndUpdate(i -> (i + 1) % instances.size()); + + // double-check size to avoid index out of bounds + if (currentIndex >= instances.size()) { + currentIndex = 0; + index.set(1); + } + + // return the instance at the current index + return instances.get(currentIndex); + } + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/ratelimit/XdsRateLimitInfo.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/ratelimit/XdsRateLimitInfo.java new file mode 100644 index 0000000000..5694d9ec24 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/ratelimit/XdsRateLimitInfo.java @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.xds.ratelimit; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * xds rate limit information + * + * @author zhp + * @since 2024-12-05 + */ +public class XdsRateLimitInfo { + private long lastFilledTime; + + private AtomicInteger currentTokens; + + /** + * Constructor + * + * @param lastFilledTime Time to fill tokens + * @param currentTokens Current number of tokens + */ + public XdsRateLimitInfo(long lastFilledTime, int currentTokens) { + this.lastFilledTime = lastFilledTime; + this.currentTokens = new AtomicInteger(currentTokens); + } + + public long getLastFilledTime() { + return lastFilledTime; + } + + public void setLastFilledTime(long lastFilledTime) { + this.lastFilledTime = lastFilledTime; + } + + public AtomicInteger getCurrentTokens() { + return currentTokens; + } + + public void setCurrentTokens(AtomicInteger currentTokens) { + this.currentTokens = currentTokens; + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/ratelimit/XdsRateLimitManager.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/ratelimit/XdsRateLimitManager.java new file mode 100644 index 0000000000..792716477b --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/ratelimit/XdsRateLimitManager.java @@ -0,0 +1,81 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.xds.ratelimit; + +import io.sermant.core.service.xds.entity.XdsTokenBucket; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Circuit Breaker manager + * + * @author zhp + * @since 2024-12-02 + */ +public class XdsRateLimitManager { + /** + * The map that stores rate limiting information, where the Key of the first level is the service name + * and the Key of the second level is the route name + */ + private static final Map> RATE_LIMIT_INFO_MAP = new ConcurrentHashMap<>(); + + private XdsRateLimitManager() { + } + + /** + * fill and Consumer token + * + * @param serviceName service name + * @param routeName route name + * @param tokenBucket token rule info + * @return the result of Consumer results + */ + public static synchronized boolean fillAndConsumeToken(String serviceName, String routeName, + XdsTokenBucket tokenBucket) { + Map limitInfoMap = RATE_LIMIT_INFO_MAP.computeIfAbsent(serviceName, + key -> new ConcurrentHashMap<>()); + XdsRateLimitInfo xdsRateLimitInfo = limitInfoMap.computeIfAbsent(routeName, key -> + new XdsRateLimitInfo(System.currentTimeMillis(), tokenBucket.getMaxTokens())); + fillToken(xdsRateLimitInfo, tokenBucket); + AtomicInteger currentTokens = xdsRateLimitInfo.getCurrentTokens(); + if (1 <= currentTokens.get()) { + currentTokens.addAndGet(-1); + return true; + } + return false; + } + + /** + * Fill tokenBucket, the token will be consumption when receiving the requests + * + * @param xdsRateLimitInfo current rate limit info + * @param tokenBucket token rule information + */ + private static void fillToken(XdsRateLimitInfo xdsRateLimitInfo, XdsTokenBucket tokenBucket) { + long now = System.currentTimeMillis(); + long timeElapsed = now - xdsRateLimitInfo.getLastFilledTime(); + long tokensToAdd = (timeElapsed / tokenBucket.getFillInterval()) * tokenBucket.getTokensPerFill(); + if (tokensToAdd > 0) { + int currentToken = xdsRateLimitInfo.getCurrentTokens().get(); + xdsRateLimitInfo.getCurrentTokens().set((int) Math.min(tokenBucket.getMaxTokens(), + currentToken + tokensToAdd)); + xdsRateLimitInfo.setLastFilledTime(now); + } + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/config/XdsFlowControlConfigTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/config/XdsFlowControlConfigTest.java new file mode 100644 index 0000000000..100e187a75 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/config/XdsFlowControlConfigTest.java @@ -0,0 +1,63 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.config; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +/** + * XdsFlowControlConfig Test + * + * @since 2024-12-10 + * @author zhp + */ +public class XdsFlowControlConfigTest { + + private XdsFlowControlConfig xdsFlowControlConfigUnderTest; + + @Before + public void setUp() throws Exception { + xdsFlowControlConfigUnderTest = new XdsFlowControlConfig(); + } + + @Test + public void testRetryStatusCodesGetterAndSetter() { + final List retryStatusCodes = Collections.singletonList("value"); + xdsFlowControlConfigUnderTest.setRetryStatusCodes(retryStatusCodes); + assertEquals(retryStatusCodes, xdsFlowControlConfigUnderTest.getRetryStatusCodes()); + } + + @Test + public void testRetryHeaderNamesGetterAndSetter() { + final List retryHeaderNames = Collections.singletonList("value"); + xdsFlowControlConfigUnderTest.setRetryHeaderNames(retryHeaderNames); + assertEquals(retryHeaderNames, xdsFlowControlConfigUnderTest.getRetryHeaderNames()); + } + + @Test + public void testEnableGetterAndSetter() { + final boolean enable = false; + xdsFlowControlConfigUnderTest.setEnable(enable); + assertFalse(xdsFlowControlConfigUnderTest.isEnable()); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/core/match/XdsRouteMatchManagerTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/core/match/XdsRouteMatchManagerTest.java new file mode 100644 index 0000000000..22e8b41c21 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/core/match/XdsRouteMatchManagerTest.java @@ -0,0 +1,117 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.core.match; + +import io.sermant.core.service.ServiceManager; +import io.sermant.core.service.xds.XdsCoreService; +import io.sermant.core.service.xds.XdsFlowControlService; +import io.sermant.core.service.xds.XdsLoadBalanceService; +import io.sermant.core.service.xds.XdsRouteService; +import io.sermant.core.service.xds.XdsServiceDiscovery; +import io.sermant.core.service.xds.entity.XdsPathMatcher; +import io.sermant.core.service.xds.entity.XdsRoute; +import io.sermant.core.service.xds.entity.XdsRouteAction; +import io.sermant.core.service.xds.entity.XdsRouteMatch; +import io.sermant.core.service.xds.entity.match.ExactMatchStrategy; +import io.sermant.core.service.xds.entity.match.MatchStrategy; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; +import io.sermant.flowcontrol.common.entity.HttpRequestEntity; +import io.sermant.flowcontrol.common.entity.RequestEntity; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +/** + * XdsRouteMatchManager Test + * @author zhp + * @since 2024-12-18 + */ +public class XdsRouteMatchManagerTest { + private static final String SERVICE_NAME = "provider"; + + private static final String PATH = "/test"; + + private static final String ROUTE_NAME = "routeA"; + + private static final String CLUSTER_NAME = "outbound|8080||serviceA.default.svc.cluster.local"; + + private RequestEntity requestEntity; + + private MockedStatic serviceManagerMockedStatic; + + @Before + public void init() { + requestEntity = new HttpRequestEntity.Builder() + .setRequestType(RequestEntity.RequestType.CLIENT).setApiPath(PATH) + .setHeaders(new HashMap<>()).setMethod("test").setServiceName(SERVICE_NAME).build(); + List xdsRouteList = new ArrayList<>(); + XdsRoute xdsRoute = new XdsRoute(); + xdsRoute.setName(ROUTE_NAME); + XdsRouteMatch xdsRouteMatch = new XdsRouteMatch(); + MatchStrategy matchStrategy = new ExactMatchStrategy(PATH); + XdsPathMatcher xdsPathMatcher = new XdsPathMatcher(matchStrategy, false); + xdsRouteMatch.setPathMatcher(xdsPathMatcher); + xdsRouteMatch.setHeaderMatchers(new ArrayList<>()); + xdsRoute.setRouteMatch(xdsRouteMatch); + XdsRouteAction xdsRouteAction = new XdsRouteAction(); + xdsRouteAction.setWeighted(true); + XdsRouteAction.XdsClusterWeight xdsClusterWeight = new XdsRouteAction.XdsClusterWeight(); + xdsClusterWeight.setClusterName(CLUSTER_NAME); + xdsClusterWeight.setWeight(100); + XdsRouteAction.XdsWeightedClusters xdsWeightedClusters = new XdsRouteAction.XdsWeightedClusters(); + xdsWeightedClusters.setClusters(Collections.singletonList(xdsClusterWeight)); + xdsWeightedClusters.setTotalWeight(100); + xdsRouteAction.setWeightedClusters(xdsWeightedClusters); + xdsRoute.setRouteAction(xdsRouteAction); + xdsRouteList.add(xdsRoute); + XdsCoreService xdsCoreService = Mockito.mock(XdsCoreService.class); + serviceManagerMockedStatic = Mockito.mockStatic(ServiceManager.class); + serviceManagerMockedStatic.when(()->ServiceManager.getService(XdsCoreService.class)).thenReturn(xdsCoreService); + XdsRouteService xdsRouteService = Mockito.mock(XdsRouteService.class); + XdsServiceDiscovery serviceDiscovery = Mockito.mock(XdsServiceDiscovery.class); + XdsFlowControlService xdsFlowControlService = Mockito.mock(XdsFlowControlService.class); + XdsLoadBalanceService xdsLoadBalanceService = Mockito.mock(XdsLoadBalanceService.class); + Mockito.when(xdsCoreService.getXdsRouteService()).thenReturn(xdsRouteService); + Mockito.when(xdsCoreService.getXdsServiceDiscovery()).thenReturn(serviceDiscovery); + Mockito.when(xdsCoreService.getXdsFlowControlService()).thenReturn(xdsFlowControlService); + Mockito.when(xdsCoreService.getLoadBalanceService()).thenReturn(xdsLoadBalanceService); + Mockito.when(xdsRouteService.getServiceRoute(SERVICE_NAME)).thenReturn(xdsRouteList); + } + + @After + public void tearDown() throws Exception { + serviceManagerMockedStatic.close(); + } + + @Test + public void testGetMatchedScenarioInfo() { + final FlowControlScenario result = XdsRouteMatchManager.INSTANCE.getMatchedScenarioInfo(requestEntity, SERVICE_NAME); + Assert.assertNotNull(result); + Assert.assertEquals(SERVICE_NAME, result.getServiceName()); + Assert.assertEquals(CLUSTER_NAME, result.getClusterName()); + Assert.assertEquals(ROUTE_NAME, result.getRouteName()); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/entity/FlowControlScenarioTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/entity/FlowControlScenarioTest.java new file mode 100644 index 0000000000..91f0bb2817 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/entity/FlowControlScenarioTest.java @@ -0,0 +1,76 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.entity; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +/** + * FlowControlScenario Test + * + * @since 2024-12-10 + * @author zhp + */ +public class FlowControlScenarioTest { + private FlowControlScenario flowControlScenario; + + @Before + public void setUp() throws Exception { + flowControlScenario = new FlowControlScenario(); + } + + @Test + public void testBusinessNamesGetterAndSetter() { + final Set businessNames = new HashSet<>(Collections.singletonList("value")); + flowControlScenario.setMatchedScenarioNames(businessNames); + assertEquals(businessNames, flowControlScenario.getMatchedScenarioNames()); + } + + @Test + public void testServiceNameGetterAndSetter() { + final String serviceName = "serviceName"; + flowControlScenario.setServiceName(serviceName); + assertEquals(serviceName, flowControlScenario.getServiceName()); + } + + @Test + public void testClusterNameGetterAndSetter() { + final String clusterName = "clusterName"; + flowControlScenario.setClusterName(clusterName); + assertEquals(clusterName, flowControlScenario.getClusterName()); + } + + @Test + public void testRouteNameGetterAndSetter() { + final String routeName = "routeName"; + flowControlScenario.setRouteName(routeName); + assertEquals(routeName, flowControlScenario.getRouteName()); + } + + @Test + public void testAddressGetterAndSetter() { + final String address = "127.0.0.1:8080"; + flowControlScenario.setAddress(address); + assertEquals(address, flowControlScenario.getAddress()); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/handler/retry/RetryContextTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/handler/retry/RetryContextTest.java index e762c37493..eae232f739 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/handler/retry/RetryContextTest.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/handler/retry/RetryContextTest.java @@ -22,6 +22,7 @@ import org.junit.Assert; import org.junit.Test; +import java.util.Optional; import java.util.Set; /** @@ -61,13 +62,13 @@ public void testRetryPolicy() { } // If the number of retries exceeds the maximum, retry is complete - RetryContext.INSTANCE.updateServiceInstance(instance); + RetryContext.INSTANCE.updateRetriedServiceInstance(instance); Assert.assertFalse(RetryContext.INSTANCE.isPolicyNeedRetry()); RetryContext.INSTANCE.remove(); } private void muteRetry(Object instance) { - RetryContext.INSTANCE.updateServiceInstance(instance); + RetryContext.INSTANCE.updateRetriedServiceInstance(instance); Assert.assertTrue(RetryContext.INSTANCE.isPolicyNeedRetry()); } @@ -87,6 +88,16 @@ public Class[] retryExceptions() { public RetryFramework retryType() { return RetryFramework.ALIBABA_DUBBO; } + + @Override + public Optional getCode(Object result) { + return Optional.empty(); + } + + @Override + public Optional> getHeaderNames(Object result) { + return Optional.empty(); + } }; } } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/util/RandomUtilTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/util/RandomUtilTest.java new file mode 100644 index 0000000000..fad148cf4e --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/util/RandomUtilTest.java @@ -0,0 +1,39 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.util; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * RandomUtil test + * + * @author zhp + * @since 2024-12-05 + */ +public class RandomUtilTest { + @Test + public void testRandomInt1() { + assertEquals(0, RandomUtil.randomInt(0, 1)); + } + + @Test + public void testRandomInt2() { + assertEquals(0, RandomUtil.randomInt(1)); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/util/XdsRouterUtilsTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/util/XdsRouterUtilsTest.java new file mode 100644 index 0000000000..cfc0392a02 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/util/XdsRouterUtilsTest.java @@ -0,0 +1,112 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.util; + +import io.sermant.core.config.ConfigManager; +import io.sermant.core.plugin.config.ServiceMeta; +import io.sermant.core.service.ServiceManager; +import io.sermant.core.service.xds.XdsCoreService; +import io.sermant.core.service.xds.XdsFlowControlService; +import io.sermant.core.service.xds.XdsServiceDiscovery; +import io.sermant.core.service.xds.entity.ServiceInstance; +import io.sermant.core.service.xds.entity.XdsLocality; +import io.sermant.core.utils.NetworkUtils; +import io.sermant.implement.service.xds.entity.XdsServiceInstance; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * XdsRouterUtilTest + * + * @author zhp + * @since 2024-12-10 + **/ +public class XdsRouterUtilsTest { + private static MockedStatic serviceManager; + + private static MockedStatic networkUtils; + + private static MockedStatic configManager; + + private XdsServiceDiscovery serviceDiscovery; + + @Before + public void setUp() { + XdsCoreService xdsCoreService = Mockito.mock(XdsCoreService.class); + serviceManager = Mockito.mockStatic(ServiceManager.class); + Mockito.when(ServiceManager.getService(XdsCoreService.class)).thenReturn(xdsCoreService); + networkUtils = Mockito.mockStatic(NetworkUtils.class); + Mockito.when(NetworkUtils.getKubernetesPodIp()).thenReturn("127.0.0.1"); + + serviceDiscovery = Mockito.mock(XdsServiceDiscovery.class); + Mockito.when(xdsCoreService.getXdsServiceDiscovery()).thenReturn(serviceDiscovery); + XdsFlowControlService xdsFlowControlService = Mockito.mock(XdsFlowControlService.class); + Mockito.when(xdsCoreService.getXdsFlowControlService()).thenReturn(xdsFlowControlService); + configManager = Mockito.mockStatic(ConfigManager.class); + } + + @AfterClass + public static void tearDown() { + serviceManager.close(); + networkUtils.close(); + configManager.close(); + } + + @Test + public void testGetLocalityInfoOfSelfService() { + Mockito.when(serviceDiscovery.getServiceInstance("provider")) + .thenReturn(createServiceInstance4Service(Arrays.asList("127.0.0.1", "host", "localhost"))); + // not find matched service instance + ServiceMeta meta = new ServiceMeta(); + meta.setService("consumer"); + Mockito.when(ConfigManager.getConfig(ServiceMeta.class)).thenReturn(meta); + Optional localityInfo = XdsRouterUtils.getLocalityInfoOfSelfService(); + Assert.assertFalse(localityInfo.isPresent()); + + // find matched service instance + meta.setService("provider"); + XdsRouterUtils.updateLocalityObtainedFlag(false); + localityInfo = XdsRouterUtils.getLocalityInfoOfSelfService(); + Assert.assertTrue(localityInfo.isPresent()); + Assert.assertEquals("127.0.0.1", localityInfo.get().getRegion()); + } + + private static Set createServiceInstance4Service(List hosts) { + Set serviceInstances = new HashSet<>(); + for (String host : hosts) { + XdsServiceInstance serviceInstance = new XdsServiceInstance(); + serviceInstance.setHost(host); + Map metaData = new HashMap<>(); + metaData.put("region", host); + serviceInstance.setMetadata(metaData); + serviceInstances.add(serviceInstance); + } + return serviceInstances; + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/util/XdsThreadLocalUtilTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/util/XdsThreadLocalUtilTest.java new file mode 100644 index 0000000000..579899a8aa --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/util/XdsThreadLocalUtilTest.java @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.util; + +import io.sermant.flowcontrol.common.entity.FlowControlScenario; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * XdsThreadLocalUtil Test + * + * @author zhp + * @since 2024-11-30 + */ +public class XdsThreadLocalUtilTest { + @Test + public void testSetAndRemoveSendByteFlag() { + assertFalse(XdsThreadLocalUtil.getSendByteFlag()); + XdsThreadLocalUtil.setSendByteFlag(true); + assertTrue(XdsThreadLocalUtil.getSendByteFlag()); + XdsThreadLocalUtil.removeSendByteFlag(); + assertFalse(XdsThreadLocalUtil.getSendByteFlag()); + } + + @Test + public void testSetScenarioInfo() { + final FlowControlScenario flowControlScenario = new FlowControlScenario(); + flowControlScenario.setMatchedScenarioNames(new HashSet<>(Collections.singletonList("value"))); + flowControlScenario.setServiceName("serviceName"); + flowControlScenario.setClusterName("clusterName"); + flowControlScenario.setRouteName("routeName"); + flowControlScenario.setAddress("address"); + XdsThreadLocalUtil.setScenarioInfo(flowControlScenario); + FlowControlScenario scenarioInfo = XdsThreadLocalUtil.getScenarioInfo(); + assertEquals(scenarioInfo, flowControlScenario); + XdsThreadLocalUtil.removeScenarioInfo(); + assertNull(XdsThreadLocalUtil.getScenarioInfo()); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerInfoTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerInfoTest.java new file mode 100644 index 0000000000..70f5711fdf --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerInfoTest.java @@ -0,0 +1,94 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.xds.circuit; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Deque; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +/** + * Circuit Breaker information Test + * + * @author zhp + * @since 2024-12-02 + */ +public class XdsCircuitBreakerInfoTest { + private XdsCircuitBreakerInfo xdsCircuitBreakerInfoUnderTest; + + @Before + public void setUp() throws Exception { + xdsCircuitBreakerInfoUnderTest = new XdsCircuitBreakerInfo(); + } + + @Test + public void testLocalFailureGetterAndSetter() { + final Deque localFailure = new ConcurrentLinkedDeque<>(); + localFailure.add(1L); + xdsCircuitBreakerInfoUnderTest.setLocalFailure(localFailure); + assertEquals(localFailure, xdsCircuitBreakerInfoUnderTest.getLocalFailure()); + } + + @Test + public void testServerFailureGetterAndSetter() { + final Deque serverFailure = new ConcurrentLinkedDeque<>(); + serverFailure.add(1L); + xdsCircuitBreakerInfoUnderTest.setServerFailure(serverFailure); + assertEquals(serverFailure, xdsCircuitBreakerInfoUnderTest.getServerFailure()); + } + + @Test + public void testGateWayFailureGetterAndSetter() { + final Deque gateWayFailure = new ConcurrentLinkedDeque<>(); + gateWayFailure.add(1L); + xdsCircuitBreakerInfoUnderTest.setGateWayFailure(gateWayFailure); + assertEquals(gateWayFailure, xdsCircuitBreakerInfoUnderTest.getGateWayFailure()); + } + + @Test + public void testIsOpenGetterAndSetter() { + final boolean isOpen = false; + xdsCircuitBreakerInfoUnderTest.setOpen(isOpen); + assertFalse(xdsCircuitBreakerInfoUnderTest.isOpen()); + } + + @Test + public void testCircuitBreakerTimeGetterAndSetter() { + final long circuitBreakerTime = System.currentTimeMillis(); + xdsCircuitBreakerInfoUnderTest.setCircuitBreakerEndTime(circuitBreakerTime); + assertEquals(circuitBreakerTime, xdsCircuitBreakerInfoUnderTest.getCircuitBreakerEndTime()); + } + + @Test + public void testCircuitBreakerCountGetterAndSetter() { + final AtomicInteger circuitBreakerCount = new AtomicInteger(1); + xdsCircuitBreakerInfoUnderTest.setCircuitBreakerCount(circuitBreakerCount); + assertEquals(circuitBreakerCount, xdsCircuitBreakerInfoUnderTest.getCircuitBreakerCount()); + } + + @Test + public void testCleanRequestDate() { + testCircuitBreakerTimeGetterAndSetter(); + xdsCircuitBreakerInfoUnderTest.cleanRequestData(); + assertEquals(0, xdsCircuitBreakerInfoUnderTest.getServerFailure().size()); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerManagerTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerManagerTest.java new file mode 100644 index 0000000000..6c895e39c6 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerManagerTest.java @@ -0,0 +1,71 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.xds.circuit; + +import io.sermant.core.service.xds.entity.XdsInstanceCircuitBreakers; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Circuit Breaker manager Test + * + * @author zhp + * @since 2024-12-02 + */ +public class XdsCircuitBreakerManagerTest { + private static final String SERVICE_NAME = "provider"; + + private static final String ROUTE_NAME = "routeA"; + + private static final String CLUSTER_NAME = "clusterA"; + + private static final String ADDRESS = "127.0.0.1:8080"; + + @Test + public void testActiveRequests() { + assertEquals(1, XdsCircuitBreakerManager.incrementActiveRequests(SERVICE_NAME, CLUSTER_NAME, ADDRESS)); + assertEquals(2, XdsCircuitBreakerManager.incrementActiveRequests(SERVICE_NAME, CLUSTER_NAME, ADDRESS)); + XdsCircuitBreakerManager.decrementActiveRequests(SERVICE_NAME, CLUSTER_NAME, ADDRESS); + assertEquals(2, XdsCircuitBreakerManager.incrementActiveRequests(SERVICE_NAME, CLUSTER_NAME, ADDRESS)); + } + + @Test + public void testCircuitBreaker() { + final FlowControlScenario scenarioInfo = new FlowControlScenario(); + scenarioInfo.setServiceName(SERVICE_NAME); + scenarioInfo.setClusterName(CLUSTER_NAME); + scenarioInfo.setRouteName(ROUTE_NAME); + scenarioInfo.setAddress(ADDRESS); + final XdsInstanceCircuitBreakers circuitBreakers = new XdsInstanceCircuitBreakers(); + circuitBreakers.setSplitExternalLocalOriginErrors(false); + circuitBreakers.setConsecutiveLocalOriginFailure(1); + circuitBreakers.setConsecutiveGatewayFailure(1); + circuitBreakers.setConsecutive5xxFailure(1); + circuitBreakers.setInterval(1000L); + circuitBreakers.setBaseEjectionTime(10000L); + boolean result = XdsCircuitBreakerManager.needsInstanceCircuitBreaker(scenarioInfo, ADDRESS); + assertFalse(result); + XdsCircuitBreakerManager.recordFailureRequest(scenarioInfo, ADDRESS, 500, circuitBreakers); + XdsCircuitBreakerManager.setCircuitBeakerStatus(circuitBreakers, scenarioInfo); + result = XdsCircuitBreakerManager.needsInstanceCircuitBreaker(scenarioInfo, ADDRESS); + assertTrue(result); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/handler/XdsHandlerTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/handler/XdsHandlerTest.java new file mode 100644 index 0000000000..83caeb6b3c --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/handler/XdsHandlerTest.java @@ -0,0 +1,221 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.xds.handler; + +import io.sermant.core.service.ServiceManager; +import io.sermant.core.service.xds.XdsCoreService; +import io.sermant.core.service.xds.XdsFlowControlService; +import io.sermant.core.service.xds.XdsLoadBalanceService; +import io.sermant.core.service.xds.XdsRouteService; +import io.sermant.core.service.xds.XdsServiceDiscovery; +import io.sermant.core.service.xds.entity.ServiceInstance; +import io.sermant.core.service.xds.entity.XdsDelay; +import io.sermant.core.service.xds.entity.XdsHeaderOption; +import io.sermant.core.service.xds.entity.XdsHttpFault; +import io.sermant.core.service.xds.entity.XdsInstanceCircuitBreakers; +import io.sermant.core.service.xds.entity.XdsLbPolicy; +import io.sermant.core.service.xds.entity.XdsPathMatcher; +import io.sermant.core.service.xds.entity.XdsRateLimit; +import io.sermant.core.service.xds.entity.XdsRequestCircuitBreakers; +import io.sermant.core.service.xds.entity.XdsRetryPolicy; +import io.sermant.core.service.xds.entity.XdsRoute; +import io.sermant.core.service.xds.entity.XdsRouteAction; +import io.sermant.core.service.xds.entity.XdsRouteMatch; +import io.sermant.core.service.xds.entity.match.ExactMatchStrategy; +import io.sermant.core.service.xds.entity.match.MatchStrategy; +import io.sermant.implement.service.xds.entity.XdsServiceInstance; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +/** + * Xds Flow Control handler Test + * + * @author zhp + * @since 2024-11-28 + */ +public class XdsHandlerTest { + private MockedStatic serviceManagerMockedStatic; + + private static final String SERVICE_NAME = "provider"; + + private static final String ROUTE_NAME = "routeA"; + + private static final String PATH = "/test"; + + private static final String CLUSTER_NAME = "outbound|8080||serviceA.default.svc.cluster.local"; + + private final XdsRequestCircuitBreakers requestCircuitBreakers = new XdsRequestCircuitBreakers(); + + private final XdsInstanceCircuitBreakers instanceCircuitBreakers = new XdsInstanceCircuitBreakers(); + + private final XdsRetryPolicy retryPolicy = new XdsRetryPolicy(); + + private final XdsRateLimit rateLimit = new XdsRateLimit(); + + private final XdsHttpFault httpFault = new XdsHttpFault(); + + @Before + public void init() { + List xdsRouteList = new ArrayList<>(); + XdsRoute xdsRoute = new XdsRoute(); + xdsRoute.setName(ROUTE_NAME); + XdsRouteMatch xdsRouteMatch = new XdsRouteMatch(); + MatchStrategy matchStrategy = new ExactMatchStrategy(PATH); + XdsPathMatcher xdsPathMatcher = new XdsPathMatcher(matchStrategy, false); + xdsRouteMatch.setPathMatcher(xdsPathMatcher); + xdsRouteMatch.setHeaderMatchers(new ArrayList<>()); + xdsRoute.setRouteMatch(xdsRouteMatch); + XdsRouteAction xdsRouteAction = new XdsRouteAction(); + xdsRouteAction.setWeighted(true); + XdsRouteAction.XdsClusterWeight xdsClusterWeight = new XdsRouteAction.XdsClusterWeight(); + xdsClusterWeight.setClusterName(CLUSTER_NAME); + xdsClusterWeight.setWeight(100); + XdsRouteAction.XdsWeightedClusters xdsWeightedClusters = new XdsRouteAction.XdsWeightedClusters(); + xdsWeightedClusters.setClusters(Collections.singletonList(xdsClusterWeight)); + xdsWeightedClusters.setTotalWeight(100); + xdsRouteAction.setWeightedClusters(xdsWeightedClusters); + xdsRoute.setRouteAction(xdsRouteAction); + xdsRouteList.add(xdsRoute); + XdsCoreService xdsCoreService = Mockito.mock(XdsCoreService.class); + serviceManagerMockedStatic = Mockito.mockStatic(ServiceManager.class); + serviceManagerMockedStatic.when(()->ServiceManager.getService(XdsCoreService.class)).thenReturn(xdsCoreService); + XdsRouteService xdsRouteService = Mockito.mock(XdsRouteService.class); + XdsServiceDiscovery serviceDiscovery = Mockito.mock(XdsServiceDiscovery.class); + XdsFlowControlService xdsFlowControlService = Mockito.mock(XdsFlowControlService.class); + XdsLoadBalanceService xdsLoadBalanceService = Mockito.mock(XdsLoadBalanceService.class); + Mockito.when(xdsCoreService.getXdsRouteService()).thenReturn(xdsRouteService); + Mockito.when(xdsCoreService.getXdsServiceDiscovery()).thenReturn(serviceDiscovery); + Mockito.when(xdsCoreService.getXdsFlowControlService()).thenReturn(xdsFlowControlService); + Mockito.when(xdsCoreService.getLoadBalanceService()).thenReturn(xdsLoadBalanceService); + requestCircuitBreakers.setMaxRequests(1000); + retryPolicy.setRetryOn("503"); + rateLimit.setResponseHeaderOption(Collections.singletonList(new XdsHeaderOption())); + XdsDelay delay = new XdsDelay(); + delay.setFixedDelay(1000); + httpFault.setDelay(delay); + Mockito.when(xdsRouteService.getServiceRoute(SERVICE_NAME)).thenReturn(xdsRouteList); + Mockito.when(xdsFlowControlService.getRequestCircuitBreakers(SERVICE_NAME, CLUSTER_NAME)) + .thenReturn(Optional.of(requestCircuitBreakers)); + Mockito.when(xdsFlowControlService.getInstanceCircuitBreakers(SERVICE_NAME, CLUSTER_NAME)) + .thenReturn(Optional.of(instanceCircuitBreakers)); + Mockito.when(xdsFlowControlService.getRetryPolicy(SERVICE_NAME, ROUTE_NAME)) + .thenReturn(Optional.of(retryPolicy)); + Mockito.when(xdsFlowControlService.getRateLimit(SERVICE_NAME, ROUTE_NAME, "8080")) + .thenReturn(Optional.of(rateLimit)); + Mockito.when(xdsFlowControlService.getHttpFault(SERVICE_NAME, ROUTE_NAME)) + .thenReturn(Optional.of(httpFault)); + Mockito.when(serviceDiscovery.getServiceInstance(SERVICE_NAME)) + .thenReturn(createServiceInstance4Service(Arrays.asList("127.0.0.1", "host", "localhost"))); + Mockito.when(xdsLoadBalanceService.getLbPolicyOfCluster(SERVICE_NAME, CLUSTER_NAME)) + .thenReturn(XdsLbPolicy.RANDOM); + } + + @After + public void tearDown() throws Exception { + serviceManagerMockedStatic.close(); + } + + @Test + public void testGetRequestCircuitBreakers() { + final Optional result = XdsHandler.INSTANCE.getRequestCircuitBreakers( + SERVICE_NAME, CLUSTER_NAME); + Assert.assertTrue(result.isPresent()); + Assert.assertEquals(requestCircuitBreakers.getMaxRequests(), result.get().getMaxRequests()); + } + + @Test + public void testGetInstanceCircuitBreakers() { + final Optional result = XdsHandler.INSTANCE.getInstanceCircuitBreakers( + SERVICE_NAME, CLUSTER_NAME); + Assert.assertTrue(result.isPresent()); + Assert.assertEquals(instanceCircuitBreakers, result.get()); + } + + @Test + public void testGetRetryPolicy() { + final Optional result = XdsHandler.INSTANCE.getRetryPolicy( + SERVICE_NAME, ROUTE_NAME); + Assert.assertTrue(result.isPresent()); + Assert.assertEquals(retryPolicy.getRetryOn(), result.get().getRetryOn()); + } + + @Test + public void testGetRateLimit() { + final Optional result = XdsHandler.INSTANCE.getRateLimit( + SERVICE_NAME, ROUTE_NAME, CLUSTER_NAME); + Assert.assertTrue(result.isPresent()); + Assert.assertEquals(1, result.get().getResponseHeaderOption().size()); + } + + @Test + public void testGetHttpFault() { + final Optional result = XdsHandler.INSTANCE.getHttpFault( + SERVICE_NAME, ROUTE_NAME); + Assert.assertTrue(result.isPresent()); + Assert.assertEquals(httpFault.getDelay().getFixedDelay(), result.get().getDelay().getFixedDelay()); + } + + @Test + public void testGetServiceRouteByServiceName() { + final List result = XdsHandler.INSTANCE.getServiceRouteByServiceName(SERVICE_NAME); + assertEquals(1, result.size()); + } + + @Test + public void testGetServiceInstanceByServiceName() { + final Set result = XdsHandler.INSTANCE. + getServiceInstanceByServiceName(SERVICE_NAME); + assertEquals(3, result.size()); + } + + @Test + public void testGetLbPolicyOfCluster() { + final Optional result = XdsHandler.INSTANCE.getLbPolicyOfCluster(SERVICE_NAME, + CLUSTER_NAME); + assertEquals(Optional.of(XdsLbPolicy.RANDOM), result); + } + + private static Set createServiceInstance4Service(List hosts) { + Set serviceInstances = new HashSet<>(); + for (String host : hosts) { + XdsServiceInstance serviceInstance = new XdsServiceInstance(); + serviceInstance.setHost(host); + Map metaData = new HashMap<>(); + metaData.put("region", host); + serviceInstance.setMetadata(metaData); + serviceInstances.add(serviceInstance); + } + return serviceInstances; + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/ratelimit/XdsRateLimitInfoTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/ratelimit/XdsRateLimitInfoTest.java new file mode 100644 index 0000000000..49566451cf --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/ratelimit/XdsRateLimitInfoTest.java @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.xds.ratelimit; + +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; + +/** + * xds rate limit information Testz + * + * @author zhp + * @since 2024-12-05 + */ +public class XdsRateLimitInfoTest { + private XdsRateLimitInfo xdsRateLimitInfoUnderTest; + + @Before + public void setUp() throws Exception { + xdsRateLimitInfoUnderTest = new XdsRateLimitInfo(0L, 0); + } + + @Test + public void testLastFilledTimeGetterAndSetter() { + final long lastFilledTime = 0L; + xdsRateLimitInfoUnderTest.setLastFilledTime(lastFilledTime); + assertEquals(lastFilledTime, xdsRateLimitInfoUnderTest.getLastFilledTime()); + } + + @Test + public void testCurrentTokensGetterAndSetter() { + final AtomicInteger currentTokens = new AtomicInteger(0); + xdsRateLimitInfoUnderTest.setCurrentTokens(currentTokens); + assertEquals(currentTokens, xdsRateLimitInfoUnderTest.getCurrentTokens()); + } +}