diff --git a/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/entity/XdsAbort.java b/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/entity/XdsAbort.java index ceb76e7cdd..027506ece0 100644 --- a/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/entity/XdsAbort.java +++ b/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/entity/XdsAbort.java @@ -31,7 +31,7 @@ public class XdsAbort { /** * The percentage of requests/ operations/ connections that will be aborted with the error code */ - private int percentage; + private FractionalPercent percentage; public int getHttpStatus() { return httpStatus; @@ -41,11 +41,11 @@ public void setHttpStatus(int httpStatus) { this.httpStatus = httpStatus; } - public int getPercentage() { + public FractionalPercent getPercentage() { return percentage; } - public void setPercentage(int percentage) { + public void setPercentage(FractionalPercent percentage) { this.percentage = percentage; } } diff --git a/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/entity/XdsDelay.java b/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/entity/XdsDelay.java index 3d63f923d9..c94de7e51d 100644 --- a/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/entity/XdsDelay.java +++ b/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/entity/XdsDelay.java @@ -31,7 +31,7 @@ public class XdsDelay { /** * The percentage of requests on which the delay will be injected */ - private int percentage; + private FractionalPercent percentage; public long getFixedDelay() { return fixedDelay; @@ -41,11 +41,11 @@ public void setFixedDelay(long fixedDelay) { this.fixedDelay = fixedDelay; } - public int getPercentage() { + public FractionalPercent getPercentage() { return percentage; } - public void setPercentage(int percentage) { + public void setPercentage(FractionalPercent percentage) { this.percentage = percentage; } } diff --git a/sermant-agentcore/sermant-agentcore-core/src/test/java/io/sermant/core/service/xds/entity/XdsAbortTest.java b/sermant-agentcore/sermant-agentcore-core/src/test/java/io/sermant/core/service/xds/entity/XdsAbortTest.java index e97fc1e0b1..3f5325dfcf 100644 --- a/sermant-agentcore/sermant-agentcore-core/src/test/java/io/sermant/core/service/xds/entity/XdsAbortTest.java +++ b/sermant-agentcore/sermant-agentcore-core/src/test/java/io/sermant/core/service/xds/entity/XdsAbortTest.java @@ -29,9 +29,10 @@ public class XdsAbortTest { @Test public void testXdsAbort() { XdsAbort abort = new XdsAbort(); - abort.setPercentage(100); + FractionalPercent percent = new FractionalPercent(); + abort.setPercentage(percent); abort.setHttpStatus(200); Assert.assertEquals(200, abort.getHttpStatus()); - Assert.assertEquals(100, abort.getPercentage(), 0); + Assert.assertEquals(percent, abort.getPercentage()); } } diff --git a/sermant-agentcore/sermant-agentcore-core/src/test/java/io/sermant/core/service/xds/entity/XdsDelayTest.java b/sermant-agentcore/sermant-agentcore-core/src/test/java/io/sermant/core/service/xds/entity/XdsDelayTest.java index 47fb845ee0..97a05fec04 100644 --- a/sermant-agentcore/sermant-agentcore-core/src/test/java/io/sermant/core/service/xds/entity/XdsDelayTest.java +++ b/sermant-agentcore/sermant-agentcore-core/src/test/java/io/sermant/core/service/xds/entity/XdsDelayTest.java @@ -29,9 +29,10 @@ public class XdsDelayTest { @Test public void testXdsDelay() { XdsDelay delay = new XdsDelay(); - delay.setPercentage(100); + FractionalPercent fractionalPercent = new FractionalPercent(); + delay.setPercentage(fractionalPercent); delay.setFixedDelay(200L); Assert.assertEquals(200, delay.getFixedDelay()); - Assert.assertEquals(100, delay.getPercentage(), 0); + Assert.assertEquals(fractionalPercent, delay.getPercentage()); } } diff --git a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/utils/RdsProtocolTransformer.java b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/utils/RdsProtocolTransformer.java index d8963ccc47..1be0673f4d 100644 --- a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/utils/RdsProtocolTransformer.java +++ b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/utils/RdsProtocolTransformer.java @@ -294,7 +294,12 @@ private static XdsHttpFault parseHttpFault(HTTPFault httpFault) { private static XdsAbort parseAbort(FaultAbort faultAbort) { XdsAbort xdsAbort = new XdsAbort(); - xdsAbort.setPercentage(faultAbort.getPercentage().getNumerator()); + io.sermant.core.service.xds.entity.FractionalPercent fractionalPercent = + new io.sermant.core.service.xds.entity.FractionalPercent(); + fractionalPercent.setNumerator(faultAbort.getPercentage().getNumerator()); + fractionalPercent.setDenominator(DenominatorType.getValueByName(faultAbort.getPercentage() + .getDenominator().name())); + xdsAbort.setPercentage(fractionalPercent); xdsAbort.setHttpStatus(faultAbort.getHttpStatus()); return xdsAbort; } @@ -303,7 +308,12 @@ private static XdsDelay parseDelay(FaultDelay faultDelay) { XdsDelay xdsDelay = new XdsDelay(); long fixedDelay = Duration.ofSeconds(faultDelay.getFixedDelay().getSeconds()).toMillis(); xdsDelay.setFixedDelay(fixedDelay); - xdsDelay.setPercentage(faultDelay.getPercentage().getNumerator()); + io.sermant.core.service.xds.entity.FractionalPercent fractionalPercent = + new io.sermant.core.service.xds.entity.FractionalPercent(); + fractionalPercent.setNumerator(faultDelay.getPercentage().getNumerator()); + fractionalPercent.setDenominator(DenominatorType.getValueByName(faultDelay.getPercentage() + .getDenominator().name())); + xdsDelay.setPercentage(fractionalPercent); return xdsDelay; } diff --git a/sermant-plugins/sermant-flowcontrol/config/config.yaml b/sermant-plugins/sermant-flowcontrol/config/config.yaml index eca2a07e41..ffa2288e27 100644 --- a/sermant-plugins/sermant-flowcontrol/config/config.yaml +++ b/sermant-plugins/sermant-flowcontrol/config/config.yaml @@ -1,6 +1,20 @@ # FlowControl configuration flow.control.plugin: - useCseRule: true # whether to configure cse rules - enable-start-monitor: false # whether to enable indicator monitoring - enable-system-adaptive: false # whether to enable system adaptive flow control - enable-system-rule: false # whether to enable system rule flow control + # whether to configure cse rules + useCseRule: true + # whether to enable indicator monitoring + enable-start-monitor: false + # whether to enable system adaptive flow control + enable-system-adaptive: false + # whether to enable system rule flow control + enable-system-rule: false +xds.flow.control.config: + # Whether to enable Xds flow control + enable: false + retry: + # The specified response status codes that need to be retried. Retry will be performed when the response's status + # code matches one of the specified codes. + x-sermant-retriable-status-codes: + # The specified response header names that need to be retried. Retry will be performed when the response contains + # the specified headers. + x-sermant-retriable-header-names: diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/config/CommonConst.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/config/CommonConst.java index 4da2bae4d7..86aa5cbeaa 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/config/CommonConst.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/config/CommonConst.java @@ -269,6 +269,21 @@ public class CommonConst { */ public static final String REQUEST_START_TIME = "requestStartTime"; + /** + * the connect for request address + */ + public static final String CONNECT = ":"; + + /** + * point + */ + public static final String ESCAPED_POINT = "\\."; + + /** + * Default response status code + */ + public static final int DEFAULT_RESPONSE_CODE = -1; + private CommonConst() { } } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/config/XdsFlowControlConfig.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/config/XdsFlowControlConfig.java new file mode 100644 index 0000000000..26e7a55f6d --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/config/XdsFlowControlConfig.java @@ -0,0 +1,74 @@ +/* + * Copyright (C) 2021-2022 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.sermant.flowcontrol.common.config; + +import io.sermant.core.config.common.ConfigFieldKey; +import io.sermant.core.config.common.ConfigTypeKey; +import io.sermant.core.plugin.config.PluginConfig; + +import java.util.List; + +/** + * retry configuration class + * + * @author zhouss + * @since 2022-01-28 + */ +@ConfigTypeKey("xds.flow.control.config") +public class XdsFlowControlConfig implements PluginConfig { + /** + * Specify the response code for retry, and retry will be executed when the response code is included + */ + @ConfigFieldKey("retry.x-sermant-retriable-status-codes") + private List retryStatusCodes; + + /** + * Specify the response code for retry, and retry will be executed when the response header is included + */ + @ConfigFieldKey("retry.x-sermant-retriable-header-names") + private List retryHeaderNames; + + /** + * xds flow control switch + */ + private boolean enable; + + public List getRetryStatusCodes() { + return retryStatusCodes; + } + + public void setRetryStatusCodes(List retryStatusCodes) { + this.retryStatusCodes = retryStatusCodes; + } + + public List getRetryHeaderNames() { + return retryHeaderNames; + } + + public void setRetryHeaderNames(List retryHeaderNames) { + this.retryHeaderNames = retryHeaderNames; + } + + public boolean isEnable() { + return enable; + } + + public void setEnable(boolean enable) { + this.enable = enable; + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/core/match/XdsRouteMatchManager.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/core/match/XdsRouteMatchManager.java new file mode 100644 index 0000000000..bbb0570dac --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/core/match/XdsRouteMatchManager.java @@ -0,0 +1,121 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.core.match; + +import io.sermant.core.service.xds.entity.XdsHeaderMatcher; +import io.sermant.core.service.xds.entity.XdsPathMatcher; +import io.sermant.core.service.xds.entity.XdsRoute; +import io.sermant.core.service.xds.entity.XdsRouteAction; +import io.sermant.core.service.xds.entity.XdsRouteAction.XdsClusterWeight; +import io.sermant.core.service.xds.entity.XdsRouteAction.XdsWeightedClusters; +import io.sermant.core.service.xds.entity.XdsRouteMatch; +import io.sermant.core.utils.CollectionUtils; +import io.sermant.core.utils.StringUtils; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; +import io.sermant.flowcontrol.common.entity.RequestEntity; +import io.sermant.flowcontrol.common.util.RandomUtil; +import io.sermant.flowcontrol.common.xds.handler.XdsHandler; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * XdsRouteMatchManager, Get BusinessEntity based on xDS routing rules + * + * @author zhp + * @since 2024-12-20 + **/ +public enum XdsRouteMatchManager { + /** + * singleton + */ + INSTANCE; + + /** + * get matched scenario information + * + * @param requestEntity request-information + * @param serviceName service name + * @return matched business information + */ + public FlowControlScenario getMatchedScenarioInfo(RequestEntity requestEntity, String serviceName) { + FlowControlScenario scenario = new FlowControlScenario(); + scenario.setServiceName(serviceName); + Optional matchedRouteOptional = getMatchedRoute(requestEntity, serviceName); + if (!matchedRouteOptional.isPresent()) { + return scenario; + } + XdsRoute matchedRoute = matchedRouteOptional.get(); + scenario.setRouteName(matchedRoute.getName()); + scenario.setClusterName(selectClusterByRoute(matchedRoute)); + return scenario; + } + + private Optional getMatchedRoute(RequestEntity requestEntity, String serviceName) { + List routes = + XdsHandler.INSTANCE.getServiceRouteByServiceName(serviceName); + for (XdsRoute route : routes) { + XdsRouteMatch routeMatch = route.getRouteMatch(); + + // check path matching + if (!isPathMatched(routeMatch.getPathMatcher(), requestEntity.getApiPath())) { + continue; + } + + // check head matching + if (!isHeadersMatched(routeMatch.getHeaderMatchers(), requestEntity.getHeaders())) { + continue; + } + return Optional.of(route); + } + return Optional.empty(); + } + + private boolean isPathMatched(XdsPathMatcher matcher, String path) { + return matcher.isMatch(path); + } + + private boolean isHeadersMatched(List matchers, Map headers) { + return matchers.stream() + .allMatch(xdsHeaderMatcher -> xdsHeaderMatcher.isMatch(headers)); + } + + private String selectClusterByRoute(XdsRoute matchedRoute) { + XdsRouteAction routeAction = matchedRoute.getRouteAction(); + String cluster = routeAction.getCluster(); + if (!routeAction.isWeighted() || routeAction.getWeightedClusters() == null) { + return cluster; + } + XdsWeightedClusters weightedClusters = routeAction.getWeightedClusters(); + List clusters = weightedClusters.getClusters(); + int totalWeight = weightedClusters.getTotalWeight(); + if (CollectionUtils.isEmpty(clusters) || totalWeight == 0) { + return StringUtils.EMPTY; + } + int randomWeight = RandomUtil.randomInt(totalWeight); + + int currentWeight = 0; + for (XdsClusterWeight clusterWeight : clusters) { + currentWeight += clusterWeight.getWeight(); + if (randomWeight < currentWeight) { + return clusterWeight.getClusterName(); + } + } + return StringUtils.EMPTY; + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/entity/FlowControlResponse.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/entity/FlowControlResponse.java index b0bea33071..57d83b33a5 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/entity/FlowControlResponse.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/entity/FlowControlResponse.java @@ -17,6 +17,10 @@ package io.sermant.flowcontrol.common.entity; +import java.util.Collections; +import java.util.List; +import java.util.Map; + /** * flow control response * @@ -33,6 +37,11 @@ public class FlowControlResponse { */ private Object result; + /** + * response + */ + private Map> headers; + /** * Whether to replace the actual response result, if true, replace */ @@ -54,6 +63,21 @@ public FlowControlResponse(String msg, int code) { this.code = code; } + /** + * flow control response results + * + * @param msg prompt message + * @param code response code + * @param headers response headers + * @param result response result + */ + public FlowControlResponse(String msg, int code, Map> headers, Object result) { + this.msg = msg; + this.code = code; + this.headers = headers; + this.result = result; + } + /** * flow control response results * @@ -91,4 +115,8 @@ public Object getResult() { public boolean isReplaceResult() { return isReplaceResult; } + + public Map> getHeaders() { + return headers == null ? Collections.emptyMap() : headers; + } } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/entity/FlowControlScenario.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/entity/FlowControlScenario.java new file mode 100644 index 0000000000..2993f9e706 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/entity/FlowControlScenario.java @@ -0,0 +1,92 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.entity; + +import java.util.Set; + +/** + * Scenario information for flow control, primarily used to retrieve matching flow control rules + * + * @author zhp + * @since 2024-11-27 + */ +public class FlowControlScenario { + /** + * matched service scenario name + */ + private Set matchedScenarioNames; + + /** + * The name of the downstream service + */ + private String serviceName; + + /** + * cluster name + */ + private String clusterName; + + /** + * route rule name + */ + private String routeName; + + /** + * request Address,ip:port + */ + private String address; + + public Set getMatchedScenarioNames() { + return matchedScenarioNames; + } + + public void setMatchedScenarioNames(Set matchedScenarioNames) { + this.matchedScenarioNames = matchedScenarioNames; + } + + public String getServiceName() { + return serviceName; + } + + public void setServiceName(String serviceName) { + this.serviceName = serviceName; + } + + public String getClusterName() { + return clusterName; + } + + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + public String getRouteName() { + return routeName; + } + + public void setRouteName(String routeName) { + this.routeName = routeName; + } + + public String getAddress() { + return address; + } + + public void setAddress(String address) { + this.address = address; + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/util/RandomUtil.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/util/RandomUtil.java new file mode 100644 index 0000000000..fddf9f8c30 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/util/RandomUtil.java @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.util; + +import java.util.Random; + +/** + * Random util + * + * @author zhp + * @since 2024-12-05 + */ +public class RandomUtil { + private static final Random RANDOM = new Random(); + + private RandomUtil() { + } + + /** + * Generate random numbers + * + * @param min The minimum value of a random number + * @param max The maximum value of a random number + * @return random numbers + */ + public static int randomInt(int min, int max) { + return RANDOM.nextInt(max - min) + min; + } + + /** + * Generate random numbers + * + * @param max The maximum value of a random number + * @return random numbers + */ + public static int randomInt(int max) { + return RANDOM.nextInt(max); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/util/XdsRouterUtils.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/util/XdsRouterUtils.java new file mode 100644 index 0000000000..517f3a01d3 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/util/XdsRouterUtils.java @@ -0,0 +1,144 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.util; + +import io.sermant.core.config.ConfigManager; +import io.sermant.core.plugin.config.ServiceMeta; +import io.sermant.core.service.xds.entity.ServiceInstance; +import io.sermant.core.service.xds.entity.XdsLocality; +import io.sermant.core.utils.NetworkUtils; +import io.sermant.core.utils.StringUtils; +import io.sermant.flowcontrol.common.xds.handler.XdsHandler; + +import java.net.URI; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * XdsRouterUtils + * + * @author zhp + * @since 2024-12-10 + **/ +public class XdsRouterUtils { + private static final String LOCAL_HOST = "localhost"; + + /** + * the locality information of the host microservice itself + */ + private static XdsLocality selfServiceLocality; + + private static volatile boolean localityObtainedFlag = false; + + private XdsRouterUtils() { + } + + /** + * get XdsLocality of self-service + * + * @return XdsLocality + */ + public static Optional getLocalityInfoOfSelfService() { + if (localityObtainedFlag) { + return Optional.of(selfServiceLocality); + } + synchronized (XdsRouterUtils.class) { + if (localityObtainedFlag) { + return Optional.of(selfServiceLocality); + } + String podIp = NetworkUtils.getKubernetesPodIp(); + if (StringUtils.isEmpty(podIp)) { + return Optional.empty(); + } + Set serviceInstances = XdsHandler.INSTANCE + .getServiceInstanceByServiceName(ConfigManager.getConfig(ServiceMeta.class).getService()); + Optional serviceInstance = getMatchedServiceInstanceByPodIp(serviceInstances, podIp); + if (!serviceInstance.isPresent()) { + return Optional.empty(); + } + Optional validXdsLocality = createValidXdsLocality(serviceInstance.get().getMetaData()); + selfServiceLocality = validXdsLocality.orElse(null); + return validXdsLocality; + } + } + + /** + * update localityObtainedFlag + * + * @param flag locality obtained flag + */ + public static void updateLocalityObtainedFlag(boolean flag) { + localityObtainedFlag = flag; + } + + private static Optional getMatchedServiceInstanceByPodIp(Set serviceInstances, + String podIp) { + return serviceInstances.stream() + .filter(serviceInstance -> podIp.equals(serviceInstance.getHost())) + .findFirst(); + } + + /** + * rebuild new url by XdsServiceInstance + * + * @param oldUri old uri + * @param serviceInstance xds service instance + * @return new url + */ + public static String rebuildUrlByXdsServiceInstance(URI oldUri, ServiceInstance serviceInstance) { + StringBuilder builder = new StringBuilder(); + builder.append(oldUri.getScheme()) + .append("://") + .append(serviceInstance.getHost()) + .append(":") + .append(serviceInstance.getPort()) + .append(oldUri.getPath()); + String query = oldUri.getQuery(); + if (StringUtils.isEmpty(query)) { + return builder.toString(); + } + builder.append("?").append(query); + return builder.toString(); + } + + /** + * isXdsRouteRequired + * + * @param serviceName serviceName + * @return isXdsRouteRequired + */ + public static boolean isXdsRouteRequired(String serviceName) { + // if service is localhost or started not with lowercase, so no xds routing required + return !StringUtils.isEmpty(serviceName) && !serviceName.equals(LOCAL_HOST) + && Character.isLowerCase(serviceName.charAt(0)); + } + + private static Optional createValidXdsLocality(Map metaData) { + XdsLocality locality = new XdsLocality(); + String region = metaData.get("region"); + String zone = metaData.get("zone"); + String subZone = metaData.get("sub_zone"); + if (StringUtils.isEmpty(region) || StringUtils.isEmpty(zone) && !StringUtils.isEmpty(subZone)) { + return Optional.empty(); + } + locality.setRegion(region); + locality.setZone(zone); + locality.setSubZone(subZone); + return Optional.of(locality); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/util/XdsThreadLocalUtil.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/util/XdsThreadLocalUtil.java new file mode 100644 index 0000000000..f6912bf840 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/util/XdsThreadLocalUtil.java @@ -0,0 +1,84 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.util; + +import io.sermant.flowcontrol.common.entity.FlowControlScenario; + +/** + * xds thread local utility class + * + * @author zhp + * @since 2024-11-30 + */ +public class XdsThreadLocalUtil { + private static final ThreadLocal SEND_BYTE_FLAG = new ThreadLocal<>(); + + private static final ThreadLocal FLOW_CONTROL_SCENARIO_THREAD_LOCAL = new ThreadLocal<>(); + + private XdsThreadLocalUtil() { + } + + /** + * Set byte send flag + * + * @param flag byte send flag + */ + public static void setSendByteFlag(boolean flag) { + SEND_BYTE_FLAG.set(flag); + } + + /** + * get byte send flag + * + * @return byte send flag + */ + public static boolean getSendByteFlag() { + return SEND_BYTE_FLAG.get() != null && SEND_BYTE_FLAG.get(); + } + + /** + * remove byte send flag + */ + public static void removeSendByteFlag() { + SEND_BYTE_FLAG.remove(); + } + + /** + * Set scenario information + * + * @param flowControlScenario scenario information + */ + public static void setScenarioInfo(FlowControlScenario flowControlScenario) { + FLOW_CONTROL_SCENARIO_THREAD_LOCAL.set(flowControlScenario); + } + + /** + * get scenario information + * + * @return flowControl scenario information + */ + public static FlowControlScenario getScenarioInfo() { + return FLOW_CONTROL_SCENARIO_THREAD_LOCAL.get(); + } + + /** + * remove business information + */ + public static void removeScenarioInfo() { + FLOW_CONTROL_SCENARIO_THREAD_LOCAL.remove(); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerInfo.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerInfo.java new file mode 100644 index 0000000000..05f8282172 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerInfo.java @@ -0,0 +1,126 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.xds.circuit; + +import java.util.Deque; +import java.util.LinkedList; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Circuit Breaker information + * + * @author zhp + * @since 2024-12-02 + */ +public class XdsCircuitBreakerInfo { + /** + * Number of local failures + */ + private Deque localFailure; + + /** + * Number of gateway errors, Response status code 502,503,504 is gateway error + */ + private Deque gateWayFailure; + + /** + * The number of server errors + */ + private Deque serverFailure; + + /** + * Identification of whether the circuit breaker is open or not + */ + private boolean isOpen; + + /** + * End time of circuit breaker + */ + private long circuitBreakerEndTime; + + /** + * Number of circuit breakers + */ + private AtomicInteger circuitBreakerCount; + + /** + * Constructor + */ + public XdsCircuitBreakerInfo() { + this.localFailure = new LinkedList<>(); + this.gateWayFailure = new LinkedList<>(); + this.serverFailure = new LinkedList<>(); + this.circuitBreakerCount = new AtomicInteger(0); + } + + public Deque getLocalFailure() { + return localFailure; + } + + public void setLocalFailure(Deque localFailure) { + this.localFailure = localFailure; + } + + public Deque getGateWayFailure() { + return gateWayFailure; + } + + public void setGateWayFailure(Deque gateWayFailure) { + this.gateWayFailure = gateWayFailure; + } + + public Deque getServerFailure() { + return serverFailure; + } + + public void setServerFailure(Deque serverFailure) { + this.serverFailure = serverFailure; + } + + public boolean isOpen() { + return isOpen; + } + + public void setOpen(boolean open) { + isOpen = open; + } + + public long getCircuitBreakerEndTime() { + return circuitBreakerEndTime; + } + + public void setCircuitBreakerEndTime(long circuitBreakerEndTime) { + this.circuitBreakerEndTime = circuitBreakerEndTime; + } + + public AtomicInteger getCircuitBreakerCount() { + return circuitBreakerCount; + } + + public void setCircuitBreakerCount(AtomicInteger circuitBreakerCount) { + this.circuitBreakerCount = circuitBreakerCount; + } + + /** + * reset the data + */ + public void cleanRequestData() { + this.localFailure.clear(); + this.gateWayFailure.clear(); + this.serverFailure.clear(); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerManager.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerManager.java new file mode 100644 index 0000000000..dc3cc8186b --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerManager.java @@ -0,0 +1,203 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.xds.circuit; + +import io.sermant.core.service.xds.entity.XdsInstanceCircuitBreakers; +import io.sermant.core.utils.CollectionUtils; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; +import io.sermant.flowcontrol.common.util.XdsThreadLocalUtil; + +import java.util.Deque; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Circuit Breaker manager + * + * @author zhp + * @since 2024-12-02 + */ +public class XdsCircuitBreakerManager { + /** + * The map that stores Circuit breaker information, where the Key of the first level is the service name, + * the Key of the second level is the cluster name, the Key of the three level is the server address + */ + private static final Map>> INSTANCE_CIRCUIT_BREAKER_MAP = + new ConcurrentHashMap<>(); + + /** + * The map that stores the count of active requests, where the Key of the first level is the service name, + * the Key of the second level is the cluster name, the Key of the three level is the server address + */ + private static final Map>> REQUEST_CIRCUIT_BREAKER_MAP = + new ConcurrentHashMap<>(); + + private static final String GATE_WAY_FAILURE = "502,503,504"; + + private XdsCircuitBreakerManager() { + } + + /** + * increment Active Request + * + * @param serviceName service name + * @param clusterName route name + * @param address request address + * @return active request num + */ + public static int incrementActiveRequests(String serviceName, String clusterName, String address) { + return getActiveRequestCount(serviceName, clusterName, address).incrementAndGet(); + } + + /** + * decrement Active Request + * + * @param serviceName service name + * @param clusterName route name + * @param address request address + */ + public static void decrementActiveRequests(String serviceName, String clusterName, String address) { + getActiveRequestCount(serviceName, clusterName, address).decrementAndGet(); + } + + /** + * Determine whether instance circuit breaking is opened + * + * @param scenarioInfo Flow Control Scenario information + * @param address service address + * @return The result of the check, where true indicates that instance circuit breaking is required + */ + public static boolean needsInstanceCircuitBreaker(FlowControlScenario scenarioInfo, String address) { + XdsCircuitBreakerInfo circuitBreakerInfo = getCircuitBreakerInfo(scenarioInfo.getServiceName(), + scenarioInfo.getRouteName(), address); + return isCircuitBreakerOpen(circuitBreakerInfo); + } + + /** + * set circuit breaker status, The circuit breaker will be open when the number of failed instance calls reaches + * the threshold. + * + * @param circuitBreakers circuitBreakers rule + * @param scenarioInfo scenario information + */ + public static void setCircuitBeakerStatus(XdsInstanceCircuitBreakers circuitBreakers, + FlowControlScenario scenarioInfo) { + XdsCircuitBreakerInfo circuitBreakerInfo = getCircuitBreakerInfo(scenarioInfo.getServiceName(), + scenarioInfo.getRouteName(), scenarioInfo.getAddress()); + if (!XdsThreadLocalUtil.getSendByteFlag() && circuitBreakers.isSplitExternalLocalOriginErrors() + && shouldCircuitBreakerByFailure(circuitBreakerInfo.getGateWayFailure(), + circuitBreakers.getConsecutiveLocalOriginFailure(), circuitBreakers.getInterval())) { + openCircuitBreaker(circuitBreakerInfo, circuitBreakers.getInterval()); + } + if (shouldCircuitBreakerByFailure(circuitBreakerInfo.getGateWayFailure(), + circuitBreakers.getConsecutiveGatewayFailure(), circuitBreakers.getInterval())) { + openCircuitBreaker(circuitBreakerInfo, circuitBreakers.getInterval()); + } + if (shouldCircuitBreakerByFailure(circuitBreakerInfo.getServerFailure(), + circuitBreakers.getConsecutive5xxFailure(), circuitBreakers.getInterval())) { + openCircuitBreaker(circuitBreakerInfo, circuitBreakers.getInterval()); + } + } + + private static void openCircuitBreaker(XdsCircuitBreakerInfo circuitBreakerInfo, long interval) { + circuitBreakerInfo.setOpen(true); + circuitBreakerInfo.getCircuitBreakerCount().incrementAndGet(); + circuitBreakerInfo.cleanRequestData(); + circuitBreakerInfo.setCircuitBreakerEndTime( + System.currentTimeMillis() + circuitBreakerInfo.getCircuitBreakerCount().get() * interval); + } + + private static boolean shouldCircuitBreakerByFailure(Deque times, int failureRequestThreshold, + long interval) { + if (failureRequestThreshold <= 0 || CollectionUtils.isEmpty(times) || times.size() < failureRequestThreshold) { + return false; + } + for (int i = times.size(); i > failureRequestThreshold; i--) { + times.removeFirst(); + } + long currentTime = System.currentTimeMillis(); + Long time = times.getFirst(); + return currentTime - time <= interval; + } + + /** + * Check if circuit breaker is open + * + * @param circuitBreakerInfo circuit Breaker information + * @return if circuit breaker is open + */ + private static boolean isCircuitBreakerOpen(XdsCircuitBreakerInfo circuitBreakerInfo) { + return circuitBreakerInfo.isOpen() + && circuitBreakerInfo.getCircuitBreakerEndTime() > System.currentTimeMillis(); + } + + /** + * record failure request + * + * @param scenarioInfo scenario information + * @param address request address + * @param code response code + * @param circuitBreakers circuit rule + */ + public static void recordFailureRequest(FlowControlScenario scenarioInfo, String address, int code, + XdsInstanceCircuitBreakers circuitBreakers) { + XdsCircuitBreakerInfo circuitBreakerInfo = getCircuitBreakerInfo(scenarioInfo.getServiceName(), + scenarioInfo.getRouteName(), address); + if (isCircuitBreakerOpen(circuitBreakerInfo)) { + return; + } + long currentTime = System.currentTimeMillis(); + if (!XdsThreadLocalUtil.getSendByteFlag()) { + recordRequestTime(circuitBreakerInfo.getLocalFailure(), circuitBreakers.getConsecutiveLocalOriginFailure(), + currentTime); + } + if (code != 0 && GATE_WAY_FAILURE.contains(String.valueOf(code))) { + recordRequestTime(circuitBreakerInfo.getGateWayFailure(), circuitBreakers.getConsecutiveGatewayFailure(), + currentTime); + } + recordRequestTime(circuitBreakerInfo.getServerFailure(), circuitBreakers.getConsecutive5xxFailure(), + currentTime); + } + + private static void recordRequestTime(Deque times, int failureRequestThreshold, long currentTime) { + if (failureRequestThreshold <= 0) { + return; + } + for (int i = times.size(); i >= failureRequestThreshold - 1 && !times.isEmpty(); i--) { + times.removeFirst(); + } + times.add(currentTime); + } + + private static XdsCircuitBreakerInfo getCircuitBreakerInfo(String serviceName, String routeName, + String address) { + Map> serviceCircuitBreakerMap = INSTANCE_CIRCUIT_BREAKER_MAP. + computeIfAbsent(serviceName, key -> new ConcurrentHashMap<>()); + Map instanceCircuitBreakerMap = serviceCircuitBreakerMap. + computeIfAbsent(routeName, key -> new ConcurrentHashMap<>()); + return instanceCircuitBreakerMap.computeIfAbsent(address, key -> new XdsCircuitBreakerInfo()); + } + + private static AtomicInteger getActiveRequestCount(String serviceName, String clusterName, String address) { + Map> clusterCircuitBreakerMap = REQUEST_CIRCUIT_BREAKER_MAP. + computeIfAbsent(serviceName, key -> new ConcurrentHashMap<>()); + Map requestCircuitBreakerMap = clusterCircuitBreakerMap. + computeIfAbsent(clusterName, key -> new ConcurrentHashMap<>()); + return requestCircuitBreakerMap.computeIfAbsent(address, key -> new AtomicInteger()); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/handler/XdsHandler.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/handler/XdsHandler.java new file mode 100644 index 0000000000..71b598b9f7 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/handler/XdsHandler.java @@ -0,0 +1,276 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.xds.handler; + +import io.sermant.core.common.CommonConstant; +import io.sermant.core.common.LoggerFactory; +import io.sermant.core.service.ServiceManager; +import io.sermant.core.service.xds.XdsCoreService; +import io.sermant.core.service.xds.XdsFlowControlService; +import io.sermant.core.service.xds.XdsLoadBalanceService; +import io.sermant.core.service.xds.XdsRouteService; +import io.sermant.core.service.xds.XdsServiceDiscovery; +import io.sermant.core.service.xds.entity.ServiceInstance; +import io.sermant.core.service.xds.entity.XdsClusterLoadAssigment; +import io.sermant.core.service.xds.entity.XdsHttpFault; +import io.sermant.core.service.xds.entity.XdsInstanceCircuitBreakers; +import io.sermant.core.service.xds.entity.XdsLbPolicy; +import io.sermant.core.service.xds.entity.XdsLocality; +import io.sermant.core.service.xds.entity.XdsRateLimit; +import io.sermant.core.service.xds.entity.XdsRequestCircuitBreakers; +import io.sermant.core.service.xds.entity.XdsRetryPolicy; +import io.sermant.core.service.xds.entity.XdsRoute; +import io.sermant.core.utils.StringUtils; +import io.sermant.flowcontrol.common.util.XdsRouterUtils; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +/** + * Xds handler + * + * @author zhp + * @since 2024-11-28 + */ +public enum XdsHandler { + /** + * singleton + */ + INSTANCE; + + /** + * Length of the cluster name after splitting + */ + private static final int CLUSTER_NAME_SPLIT_LENGTH = 4; + + private XdsFlowControlService xdsFlowControlService; + + private XdsRouteService xdsRouteService; + + private XdsServiceDiscovery xdsServiceDiscovery; + + private XdsLoadBalanceService xdsLoadBalanceService; + + /** + * constructor + */ + XdsHandler() { + Logger logger = LoggerFactory.getLogger(); + try { + XdsCoreService xdsCoreService = ServiceManager.getService(XdsCoreService.class); + xdsRouteService = xdsCoreService.getXdsRouteService(); + xdsServiceDiscovery = xdsCoreService.getXdsServiceDiscovery(); + xdsFlowControlService = xdsCoreService.getXdsFlowControlService(); + xdsLoadBalanceService = xdsCoreService.getLoadBalanceService(); + } catch (IllegalArgumentException e) { + logger.severe("XdsCoreService not started"); + } + } + + /** + * get request circuit breaker information of cluster + * + * @param serviceName service name + * @param clusterName cluster name + * @return circuit breaker rules + */ + public Optional getRequestCircuitBreakers(String serviceName, String clusterName) { + if (xdsFlowControlService == null || StringUtils.isEmpty(clusterName) || StringUtils.isEmpty(serviceName)) { + return Optional.empty(); + } + return xdsFlowControlService.getRequestCircuitBreakers(serviceName, clusterName); + } + + /** + * get instance circuit breaker information of cluster + * + * @param serviceName service name + * @param clusterName cluster name + * @return circuit breaker rules + */ + public Optional getInstanceCircuitBreakers(String serviceName, String clusterName) { + if (xdsFlowControlService == null || StringUtils.isEmpty(clusterName) || StringUtils.isEmpty(serviceName)) { + return Optional.empty(); + } + return xdsFlowControlService.getInstanceCircuitBreakers(serviceName, clusterName); + } + + /** + * get retry policy of route name + * + * @param serviceName service name + * @param routeName route name + * @return retry policy + */ + public Optional getRetryPolicy(String serviceName, String routeName) { + if (xdsFlowControlService == null || StringUtils.isEmpty(routeName) || StringUtils.isEmpty(serviceName)) { + return Optional.empty(); + } + return xdsFlowControlService.getRetryPolicy(serviceName, routeName); + } + + /** + * get rate limit of route name + * + * @param serviceName service name + * @param routeName route name + * @param clusterName cluster name + * @return rate limit rule + */ + public Optional getRateLimit(String serviceName, String routeName, String clusterName) { + if (xdsFlowControlService == null || StringUtils.isEmpty(clusterName) || StringUtils.isEmpty(routeName) + || StringUtils.isEmpty(serviceName)) { + return Optional.empty(); + } + String[] clusterInfo = clusterName.split(CommonConstant.ESCAPED_VERTICAL_LINE); + if (clusterInfo.length != CLUSTER_NAME_SPLIT_LENGTH) { + return Optional.empty(); + } + return xdsFlowControlService.getRateLimit(serviceName, routeName, clusterInfo[1]); + } + + /** + * get http fault of route name + * + * @param serviceName service name + * @param routeName route name + * @return http fault rule + */ + public Optional getHttpFault(String serviceName, String routeName) { + if (xdsFlowControlService == null || StringUtils.isEmpty(routeName) || StringUtils.isEmpty(serviceName)) { + return Optional.empty(); + } + return xdsFlowControlService.getHttpFault(serviceName, routeName); + } + + /** + * get http fault of route name + * + * @param serviceName service name + * @return route rules + */ + public List getServiceRouteByServiceName(String serviceName) { + if (xdsRouteService == null || StringUtils.isEmpty(serviceName)) { + return Collections.emptyList(); + } + return xdsRouteService.getServiceRoute(serviceName); + } + + /** + * get ServiceInstance of service name + * + * @param serviceName service name + * @return route rules + */ + public Set getServiceInstanceByServiceName(String serviceName) { + if (xdsServiceDiscovery == null || StringUtils.isEmpty(serviceName)) { + return Collections.emptySet(); + } + return xdsServiceDiscovery.getServiceInstance(serviceName); + } + + /** + * get ServiceInstance of service name and cluster name + * + * @param serviceName service name + * @param clusterName cluster name + * @return lb policy + */ + public Optional getLbPolicyOfCluster(String serviceName, String clusterName) { + if (xdsLoadBalanceService == null || StringUtils.isEmpty(serviceName) || StringUtils.isEmpty(clusterName)) { + return Optional.empty(); + } + return Optional.of(xdsLoadBalanceService.getLbPolicyOfCluster(serviceName, clusterName)); + } + + /** + * get ServiceInstance of service name and cluster name + * + * @param serviceName service name + * @param clusterName cluster name + * @return Service Instance + */ + public Set getMatchedServiceInstance(String serviceName, String clusterName) { + if (xdsServiceDiscovery == null || xdsRouteService == null) { + return Collections.emptySet(); + } + if (StringUtils.isEmpty(serviceName) || StringUtils.isEmpty(clusterName)) { + return Collections.emptySet(); + } + Optional loadAssigmentOptional = + xdsServiceDiscovery.getClusterServiceInstance(serviceName, clusterName); + if (!loadAssigmentOptional.isPresent()) { + return xdsServiceDiscovery.getServiceInstance(serviceName); + } + XdsClusterLoadAssigment clusterLoadAssigment = loadAssigmentOptional.get(); + if (!xdsRouteService.isLocalityRoute(serviceName, clusterLoadAssigment.getClusterName())) { + Set serviceInstances = getServiceInstanceOfCluster(clusterLoadAssigment); + return serviceInstances.isEmpty() ? xdsServiceDiscovery.getServiceInstance(serviceName) : serviceInstances; + } + + // get locality info of self-service and route by locality + Optional localityInfoOfSelfService = XdsRouterUtils.getLocalityInfoOfSelfService(); + if (localityInfoOfSelfService.isPresent()) { + Set serviceInstances = getServiceInstanceOfLocalityCluster(clusterLoadAssigment, + localityInfoOfSelfService.get()); + if (!serviceInstances.isEmpty()) { + return serviceInstances; + } + } + Set serviceInstances = getServiceInstanceOfCluster(clusterLoadAssigment); + return serviceInstances.isEmpty() ? xdsServiceDiscovery.getServiceInstance(serviceName) : serviceInstances; + } + + private Set getServiceInstanceOfLocalityCluster(XdsClusterLoadAssigment clusterLoadAssigment, + XdsLocality locality) { + return clusterLoadAssigment.getLocalityInstances().entrySet().stream() + .filter(xdsLocalitySetEntry -> isSameLocality(locality, xdsLocalitySetEntry.getKey())) + .flatMap(xdsLocalitySetEntry -> xdsLocalitySetEntry.getValue().stream()) + .collect(Collectors.toSet()); + } + + private boolean isSameLocality(XdsLocality selfLocality, XdsLocality serviceLocality) { + if (!selfLocality.getRegion().equals(serviceLocality.getRegion())) { + return false; + } + if (StringUtils.isEmpty(selfLocality.getZone())) { + return true; + } + if (!selfLocality.getZone().equals(serviceLocality.getZone())) { + return false; + } + if (StringUtils.isEmpty(selfLocality.getSubZone())) { + return true; + } + return selfLocality.getSubZone().equals(serviceLocality.getSubZone()); + } + + private Set getServiceInstanceOfCluster(XdsClusterLoadAssigment clusterLoadAssigment) { + Set serviceInstances = new HashSet<>(); + for (Map.Entry> xdsLocalitySetEntry : clusterLoadAssigment + .getLocalityInstances().entrySet()) { + serviceInstances.addAll(xdsLocalitySetEntry.getValue()); + } + return serviceInstances; + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/lb/XdsLoadBalancer.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/lb/XdsLoadBalancer.java new file mode 100644 index 0000000000..418c97ea00 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/lb/XdsLoadBalancer.java @@ -0,0 +1,37 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.xds.lb; + +import io.sermant.core.service.xds.entity.ServiceInstance; + +import java.util.List; + +/** + * XdsLoadBalancer + * + * @author daizhenyu + * @since 2024-08-30 + **/ +public interface XdsLoadBalancer { + /** + * select instance by loadbalancer + * + * @param instances service instance + * @return selected instance + */ + ServiceInstance selectInstance(List instances); +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/lb/XdsLoadBalancerFactory.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/lb/XdsLoadBalancerFactory.java new file mode 100644 index 0000000000..c270c75454 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/lb/XdsLoadBalancerFactory.java @@ -0,0 +1,82 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.xds.lb; + +import io.sermant.core.common.LoggerFactory; +import io.sermant.core.service.xds.entity.XdsLbPolicy; +import io.sermant.flowcontrol.common.xds.handler.XdsHandler; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.logging.Logger; + +/** + * XdsLoadBalancerFactory + * + * @author daizhenyu + * @since 2024-08-30 + **/ +public class XdsLoadBalancerFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(); + + private static final Map LOAD_BALANCERS = new ConcurrentHashMap<>(); + + private static final String RANDOM = "RANDOM"; + + private XdsLoadBalancerFactory() { + } + + /** + * getRoundRobinLoadBalancer + * + * @param clusterName cluster name + * @return XdsLoadBalancer + */ + private static XdsLoadBalancer getRoundRobinLoadBalancer(String clusterName) { + return LOAD_BALANCERS.computeIfAbsent(clusterName, key -> new XdsRoundRobinLoadBalancer()); + } + + /** + * getRandomLoadBalancer + * + * @return XdsLoadBalancer + */ + private static XdsLoadBalancer getRandomLoadBalancer() { + return LOAD_BALANCERS.computeIfAbsent(RANDOM, key -> new XdsRandomLoadBalancer()); + } + + /** + * getLoadBalancer + * + * @param serviceName service name + * @param clusterName cluster name + * @return XdsLoadBalancer + */ + public static XdsLoadBalancer getLoadBalancer(String serviceName, String clusterName) { + Optional lbPolicyOptional = + XdsHandler.INSTANCE.getLbPolicyOfCluster(serviceName, clusterName); + if (!lbPolicyOptional.isPresent()) { + return getRoundRobinLoadBalancer(clusterName); + } + XdsLbPolicy lbPolicy = lbPolicyOptional.get(); + if (lbPolicy == XdsLbPolicy.RANDOM) { + return getRandomLoadBalancer(); + } + return getRoundRobinLoadBalancer(clusterName); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/lb/XdsRandomLoadBalancer.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/lb/XdsRandomLoadBalancer.java new file mode 100644 index 0000000000..c27c4e275f --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/lb/XdsRandomLoadBalancer.java @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.xds.lb; + +import io.sermant.core.service.xds.entity.ServiceInstance; + +import java.util.List; +import java.util.Random; + +/** + * XdsRandomLoadBalancer + * + * @author daizhenyu + * @since 2024-08-30 + **/ +public class XdsRandomLoadBalancer implements XdsLoadBalancer { + private final Random random; + + /** + * Constructor + */ + public XdsRandomLoadBalancer() { + this.random = new Random(); + } + + @Override + public ServiceInstance selectInstance(List instances) { + // Select a random index from the list + int index = random.nextInt(instances.size()); + + // Return the randomly selected instance + return instances.get(index); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/lb/XdsRoundRobinLoadBalancer.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/lb/XdsRoundRobinLoadBalancer.java new file mode 100644 index 0000000000..4d1ab698be --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/lb/XdsRoundRobinLoadBalancer.java @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.xds.lb; + +import io.sermant.core.service.xds.entity.ServiceInstance; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * XdsRoundRobinLoadBalancer + * + * @author daizhenyu + * @since 2024-08-30 + **/ +public class XdsRoundRobinLoadBalancer implements XdsLoadBalancer { + private final AtomicInteger index; + + /** + * constructor + */ + public XdsRoundRobinLoadBalancer() { + this.index = new AtomicInteger(0); + } + + @Override + public ServiceInstance selectInstance(List instances) { + synchronized (XdsRoundRobinLoadBalancer.class) { + // safely calculate the index based on the current size of the instances list + int currentIndex = index.getAndUpdate(i -> (i + 1) % instances.size()); + + // double-check size to avoid index out of bounds + if (currentIndex >= instances.size()) { + currentIndex = 0; + index.set(1); + } + + // return the instance at the current index + return instances.get(currentIndex); + } + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/ratelimit/XdsRateLimitInfo.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/ratelimit/XdsRateLimitInfo.java new file mode 100644 index 0000000000..5694d9ec24 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/ratelimit/XdsRateLimitInfo.java @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.xds.ratelimit; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * xds rate limit information + * + * @author zhp + * @since 2024-12-05 + */ +public class XdsRateLimitInfo { + private long lastFilledTime; + + private AtomicInteger currentTokens; + + /** + * Constructor + * + * @param lastFilledTime Time to fill tokens + * @param currentTokens Current number of tokens + */ + public XdsRateLimitInfo(long lastFilledTime, int currentTokens) { + this.lastFilledTime = lastFilledTime; + this.currentTokens = new AtomicInteger(currentTokens); + } + + public long getLastFilledTime() { + return lastFilledTime; + } + + public void setLastFilledTime(long lastFilledTime) { + this.lastFilledTime = lastFilledTime; + } + + public AtomicInteger getCurrentTokens() { + return currentTokens; + } + + public void setCurrentTokens(AtomicInteger currentTokens) { + this.currentTokens = currentTokens; + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/ratelimit/XdsRateLimitManager.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/ratelimit/XdsRateLimitManager.java new file mode 100644 index 0000000000..792716477b --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/ratelimit/XdsRateLimitManager.java @@ -0,0 +1,81 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.xds.ratelimit; + +import io.sermant.core.service.xds.entity.XdsTokenBucket; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Circuit Breaker manager + * + * @author zhp + * @since 2024-12-02 + */ +public class XdsRateLimitManager { + /** + * The map that stores rate limiting information, where the Key of the first level is the service name + * and the Key of the second level is the route name + */ + private static final Map> RATE_LIMIT_INFO_MAP = new ConcurrentHashMap<>(); + + private XdsRateLimitManager() { + } + + /** + * fill and Consumer token + * + * @param serviceName service name + * @param routeName route name + * @param tokenBucket token rule info + * @return the result of Consumer results + */ + public static synchronized boolean fillAndConsumeToken(String serviceName, String routeName, + XdsTokenBucket tokenBucket) { + Map limitInfoMap = RATE_LIMIT_INFO_MAP.computeIfAbsent(serviceName, + key -> new ConcurrentHashMap<>()); + XdsRateLimitInfo xdsRateLimitInfo = limitInfoMap.computeIfAbsent(routeName, key -> + new XdsRateLimitInfo(System.currentTimeMillis(), tokenBucket.getMaxTokens())); + fillToken(xdsRateLimitInfo, tokenBucket); + AtomicInteger currentTokens = xdsRateLimitInfo.getCurrentTokens(); + if (1 <= currentTokens.get()) { + currentTokens.addAndGet(-1); + return true; + } + return false; + } + + /** + * Fill tokenBucket, the token will be consumption when receiving the requests + * + * @param xdsRateLimitInfo current rate limit info + * @param tokenBucket token rule information + */ + private static void fillToken(XdsRateLimitInfo xdsRateLimitInfo, XdsTokenBucket tokenBucket) { + long now = System.currentTimeMillis(); + long timeElapsed = now - xdsRateLimitInfo.getLastFilledTime(); + long tokensToAdd = (timeElapsed / tokenBucket.getFillInterval()) * tokenBucket.getTokensPerFill(); + if (tokensToAdd > 0) { + int currentToken = xdsRateLimitInfo.getCurrentTokens().get(); + xdsRateLimitInfo.getCurrentTokens().set((int) Math.min(tokenBucket.getMaxTokens(), + currentToken + tokensToAdd)); + xdsRateLimitInfo.setLastFilledTime(now); + } + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/config/XdsFlowControlConfigTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/config/XdsFlowControlConfigTest.java new file mode 100644 index 0000000000..100e187a75 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/config/XdsFlowControlConfigTest.java @@ -0,0 +1,63 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.config; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +/** + * XdsFlowControlConfig Test + * + * @since 2024-12-10 + * @author zhp + */ +public class XdsFlowControlConfigTest { + + private XdsFlowControlConfig xdsFlowControlConfigUnderTest; + + @Before + public void setUp() throws Exception { + xdsFlowControlConfigUnderTest = new XdsFlowControlConfig(); + } + + @Test + public void testRetryStatusCodesGetterAndSetter() { + final List retryStatusCodes = Collections.singletonList("value"); + xdsFlowControlConfigUnderTest.setRetryStatusCodes(retryStatusCodes); + assertEquals(retryStatusCodes, xdsFlowControlConfigUnderTest.getRetryStatusCodes()); + } + + @Test + public void testRetryHeaderNamesGetterAndSetter() { + final List retryHeaderNames = Collections.singletonList("value"); + xdsFlowControlConfigUnderTest.setRetryHeaderNames(retryHeaderNames); + assertEquals(retryHeaderNames, xdsFlowControlConfigUnderTest.getRetryHeaderNames()); + } + + @Test + public void testEnableGetterAndSetter() { + final boolean enable = false; + xdsFlowControlConfigUnderTest.setEnable(enable); + assertFalse(xdsFlowControlConfigUnderTest.isEnable()); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/core/match/XdsRouteMatchManagerTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/core/match/XdsRouteMatchManagerTest.java new file mode 100644 index 0000000000..1ccdc049fc --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/core/match/XdsRouteMatchManagerTest.java @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.core.match; + +import io.sermant.flowcontrol.common.entity.FlowControlScenario; +import io.sermant.flowcontrol.common.entity.HttpRequestEntity; +import io.sermant.flowcontrol.common.entity.RequestEntity; + +import io.sermant.flowcontrol.common.util.XdsAbstractTest; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; + +/** + * XdsRouteMatchManager Test + * @author zhp + * @since 2024-12-18 + */ +public class XdsRouteMatchManagerTest extends XdsAbstractTest { + + @Test + public void testGetMatchedScenarioInfo() { + HttpRequestEntity requestEntity = new HttpRequestEntity.Builder() + .setRequestType(RequestEntity.RequestType.CLIENT).setApiPath(PATH) + .setHeaders(new HashMap<>()).setMethod("test").setServiceName(SERVICE_NAME).build(); + final FlowControlScenario result = XdsRouteMatchManager.INSTANCE. + getMatchedScenarioInfo(requestEntity, SERVICE_NAME); + Assert.assertNotNull(result); + Assert.assertEquals(SERVICE_NAME, result.getServiceName()); + Assert.assertEquals(CLUSTER_NAME, result.getClusterName()); + Assert.assertEquals(ROUTE_NAME, result.getRouteName()); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/entity/FlowControlScenarioTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/entity/FlowControlScenarioTest.java new file mode 100644 index 0000000000..91f0bb2817 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/entity/FlowControlScenarioTest.java @@ -0,0 +1,76 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.entity; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +/** + * FlowControlScenario Test + * + * @since 2024-12-10 + * @author zhp + */ +public class FlowControlScenarioTest { + private FlowControlScenario flowControlScenario; + + @Before + public void setUp() throws Exception { + flowControlScenario = new FlowControlScenario(); + } + + @Test + public void testBusinessNamesGetterAndSetter() { + final Set businessNames = new HashSet<>(Collections.singletonList("value")); + flowControlScenario.setMatchedScenarioNames(businessNames); + assertEquals(businessNames, flowControlScenario.getMatchedScenarioNames()); + } + + @Test + public void testServiceNameGetterAndSetter() { + final String serviceName = "serviceName"; + flowControlScenario.setServiceName(serviceName); + assertEquals(serviceName, flowControlScenario.getServiceName()); + } + + @Test + public void testClusterNameGetterAndSetter() { + final String clusterName = "clusterName"; + flowControlScenario.setClusterName(clusterName); + assertEquals(clusterName, flowControlScenario.getClusterName()); + } + + @Test + public void testRouteNameGetterAndSetter() { + final String routeName = "routeName"; + flowControlScenario.setRouteName(routeName); + assertEquals(routeName, flowControlScenario.getRouteName()); + } + + @Test + public void testAddressGetterAndSetter() { + final String address = "127.0.0.1:8080"; + flowControlScenario.setAddress(address); + assertEquals(address, flowControlScenario.getAddress()); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/util/RandomUtilTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/util/RandomUtilTest.java new file mode 100644 index 0000000000..fad148cf4e --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/util/RandomUtilTest.java @@ -0,0 +1,39 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.util; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * RandomUtil test + * + * @author zhp + * @since 2024-12-05 + */ +public class RandomUtilTest { + @Test + public void testRandomInt1() { + assertEquals(0, RandomUtil.randomInt(0, 1)); + } + + @Test + public void testRandomInt2() { + assertEquals(0, RandomUtil.randomInt(1)); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/util/XdsAbstractTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/util/XdsAbstractTest.java new file mode 100644 index 0000000000..4d3d2ac672 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/util/XdsAbstractTest.java @@ -0,0 +1,168 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.util; + +import io.sermant.core.config.ConfigManager; +import io.sermant.core.service.ServiceManager; +import io.sermant.core.service.xds.XdsCoreService; +import io.sermant.core.service.xds.XdsFlowControlService; +import io.sermant.core.service.xds.XdsLoadBalanceService; +import io.sermant.core.service.xds.XdsRouteService; +import io.sermant.core.service.xds.XdsServiceDiscovery; +import io.sermant.core.service.xds.entity.ServiceInstance; +import io.sermant.core.service.xds.entity.XdsDelay; +import io.sermant.core.service.xds.entity.XdsHeaderOption; +import io.sermant.core.service.xds.entity.XdsHttpFault; +import io.sermant.core.service.xds.entity.XdsInstanceCircuitBreakers; +import io.sermant.core.service.xds.entity.XdsLbPolicy; +import io.sermant.core.service.xds.entity.XdsPathMatcher; +import io.sermant.core.service.xds.entity.XdsRateLimit; +import io.sermant.core.service.xds.entity.XdsRequestCircuitBreakers; +import io.sermant.core.service.xds.entity.XdsRetryPolicy; +import io.sermant.core.service.xds.entity.XdsRoute; +import io.sermant.core.service.xds.entity.XdsRouteAction; +import io.sermant.core.service.xds.entity.XdsRouteMatch; +import io.sermant.core.service.xds.entity.match.ExactMatchStrategy; +import io.sermant.core.service.xds.entity.match.MatchStrategy; +import io.sermant.core.utils.NetworkUtils; +import io.sermant.implement.service.xds.entity.XdsServiceInstance; +import org.junit.After; +import org.junit.Before; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * XdsAbstractTest + * + * @author zhp + * @since 2024-12-10 + **/ +public abstract class XdsAbstractTest { + private MockedStatic serviceManagerMockedStatic; + + private static MockedStatic networkUtils; + + private static MockedStatic configManager; + + protected static final String SERVICE_NAME = "provider"; + + protected static final String ROUTE_NAME = "routeA"; + + protected static final String PATH = "/test"; + + protected static final String CLUSTER_NAME = "outbound|8080||serviceA.default.svc.cluster.local"; + + protected final XdsRequestCircuitBreakers requestCircuitBreakers = new XdsRequestCircuitBreakers(); + + protected final XdsInstanceCircuitBreakers instanceCircuitBreakers = new XdsInstanceCircuitBreakers(); + + protected final XdsRetryPolicy retryPolicy = new XdsRetryPolicy(); + + protected final XdsRateLimit rateLimit = new XdsRateLimit(); + + protected final XdsHttpFault httpFault = new XdsHttpFault(); + + @Before + public void setUp() { + networkUtils = Mockito.mockStatic(NetworkUtils.class); + Mockito.when(NetworkUtils.getKubernetesPodIp()).thenReturn("127.0.0.1"); + configManager = Mockito.mockStatic(ConfigManager.class); + List xdsRouteList = new ArrayList<>(); + XdsRoute xdsRoute = new XdsRoute(); + xdsRoute.setName(ROUTE_NAME); + XdsRouteMatch xdsRouteMatch = new XdsRouteMatch(); + MatchStrategy matchStrategy = new ExactMatchStrategy(PATH); + XdsPathMatcher xdsPathMatcher = new XdsPathMatcher(matchStrategy, false); + xdsRouteMatch.setPathMatcher(xdsPathMatcher); + xdsRouteMatch.setHeaderMatchers(new ArrayList<>()); + xdsRoute.setRouteMatch(xdsRouteMatch); + XdsRouteAction xdsRouteAction = new XdsRouteAction(); + xdsRouteAction.setWeighted(true); + XdsRouteAction.XdsClusterWeight xdsClusterWeight = new XdsRouteAction.XdsClusterWeight(); + xdsClusterWeight.setClusterName(CLUSTER_NAME); + xdsClusterWeight.setWeight(100); + XdsRouteAction.XdsWeightedClusters xdsWeightedClusters = new XdsRouteAction.XdsWeightedClusters(); + xdsWeightedClusters.setClusters(Collections.singletonList(xdsClusterWeight)); + xdsWeightedClusters.setTotalWeight(100); + xdsRouteAction.setWeightedClusters(xdsWeightedClusters); + xdsRoute.setRouteAction(xdsRouteAction); + xdsRouteList.add(xdsRoute); + XdsCoreService xdsCoreService = Mockito.mock(XdsCoreService.class); + serviceManagerMockedStatic = Mockito.mockStatic(ServiceManager.class); + serviceManagerMockedStatic.when(()->ServiceManager.getService(XdsCoreService.class)).thenReturn(xdsCoreService); + XdsRouteService xdsRouteService = Mockito.mock(XdsRouteService.class); + XdsServiceDiscovery serviceDiscovery = Mockito.mock(XdsServiceDiscovery.class); + XdsFlowControlService xdsFlowControlService = Mockito.mock(XdsFlowControlService.class); + XdsLoadBalanceService xdsLoadBalanceService = Mockito.mock(XdsLoadBalanceService.class); + Mockito.when(xdsCoreService.getXdsRouteService()).thenReturn(xdsRouteService); + Mockito.when(xdsCoreService.getXdsServiceDiscovery()).thenReturn(serviceDiscovery); + Mockito.when(xdsCoreService.getXdsFlowControlService()).thenReturn(xdsFlowControlService); + Mockito.when(xdsCoreService.getLoadBalanceService()).thenReturn(xdsLoadBalanceService); + requestCircuitBreakers.setMaxRequests(1000); + instanceCircuitBreakers.setInterval(1000); + retryPolicy.setRetryOn("503"); + rateLimit.setResponseHeaderOption(Collections.singletonList(new XdsHeaderOption())); + XdsDelay delay = new XdsDelay(); + delay.setFixedDelay(1000); + httpFault.setDelay(delay); + Mockito.when(xdsRouteService.getServiceRoute(SERVICE_NAME)).thenReturn(xdsRouteList); + Mockito.when(xdsFlowControlService.getRequestCircuitBreakers(SERVICE_NAME, CLUSTER_NAME)) + .thenReturn(Optional.of(requestCircuitBreakers)); + Mockito.when(xdsFlowControlService.getInstanceCircuitBreakers(SERVICE_NAME, CLUSTER_NAME)) + .thenReturn(Optional.of(instanceCircuitBreakers)); + Mockito.when(xdsFlowControlService.getRetryPolicy(SERVICE_NAME, ROUTE_NAME)) + .thenReturn(Optional.of(retryPolicy)); + Mockito.when(xdsFlowControlService.getRateLimit(SERVICE_NAME, ROUTE_NAME, "8080")) + .thenReturn(Optional.of(rateLimit)); + Mockito.when(xdsFlowControlService.getHttpFault(SERVICE_NAME, ROUTE_NAME)) + .thenReturn(Optional.of(httpFault)); + Mockito.when(serviceDiscovery.getServiceInstance(SERVICE_NAME)) + .thenReturn(createServiceInstance4Service(Arrays.asList("127.0.0.1", "host", "localhost"))); + Mockito.when(xdsLoadBalanceService.getLbPolicyOfCluster(SERVICE_NAME, CLUSTER_NAME)) + .thenReturn(XdsLbPolicy.RANDOM); + } + + @After + public void tearDown() { + serviceManagerMockedStatic.close(); + networkUtils.close(); + configManager.close(); + } + + private static Set createServiceInstance4Service(List hosts) { + Set serviceInstances = new HashSet<>(); + for (String host : hosts) { + XdsServiceInstance serviceInstance = new XdsServiceInstance(); + serviceInstance.setHost(host); + Map metaData = new HashMap<>(); + metaData.put("region", host); + serviceInstance.setMetadata(metaData); + serviceInstances.add(serviceInstance); + } + return serviceInstances; + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/util/XdsRouterUtilsTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/util/XdsRouterUtilsTest.java new file mode 100644 index 0000000000..6676c766e0 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/util/XdsRouterUtilsTest.java @@ -0,0 +1,51 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.util; + +import io.sermant.core.config.ConfigManager; +import io.sermant.core.plugin.config.ServiceMeta; +import io.sermant.core.service.xds.entity.XdsLocality; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.Optional; + +/** + * XdsRouterUtilTest + * + * @author zhp + * @since 2024-12-10 + **/ +public class XdsRouterUtilsTest extends XdsAbstractTest { + @Test + public void testGetLocalityInfoOfSelfService() { + // not find matched service instance + ServiceMeta meta = new ServiceMeta(); + meta.setService("consumer"); + Mockito.when(ConfigManager.getConfig(ServiceMeta.class)).thenReturn(meta); + Optional localityInfo = XdsRouterUtils.getLocalityInfoOfSelfService(); + Assert.assertFalse(localityInfo.isPresent()); + + // find matched service instance + meta.setService("provider"); + XdsRouterUtils.updateLocalityObtainedFlag(false); + localityInfo = XdsRouterUtils.getLocalityInfoOfSelfService(); + Assert.assertTrue(localityInfo.isPresent()); + Assert.assertEquals("127.0.0.1", localityInfo.get().getRegion()); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/util/XdsThreadLocalUtilTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/util/XdsThreadLocalUtilTest.java new file mode 100644 index 0000000000..579899a8aa --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/util/XdsThreadLocalUtilTest.java @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.util; + +import io.sermant.flowcontrol.common.entity.FlowControlScenario; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * XdsThreadLocalUtil Test + * + * @author zhp + * @since 2024-11-30 + */ +public class XdsThreadLocalUtilTest { + @Test + public void testSetAndRemoveSendByteFlag() { + assertFalse(XdsThreadLocalUtil.getSendByteFlag()); + XdsThreadLocalUtil.setSendByteFlag(true); + assertTrue(XdsThreadLocalUtil.getSendByteFlag()); + XdsThreadLocalUtil.removeSendByteFlag(); + assertFalse(XdsThreadLocalUtil.getSendByteFlag()); + } + + @Test + public void testSetScenarioInfo() { + final FlowControlScenario flowControlScenario = new FlowControlScenario(); + flowControlScenario.setMatchedScenarioNames(new HashSet<>(Collections.singletonList("value"))); + flowControlScenario.setServiceName("serviceName"); + flowControlScenario.setClusterName("clusterName"); + flowControlScenario.setRouteName("routeName"); + flowControlScenario.setAddress("address"); + XdsThreadLocalUtil.setScenarioInfo(flowControlScenario); + FlowControlScenario scenarioInfo = XdsThreadLocalUtil.getScenarioInfo(); + assertEquals(scenarioInfo, flowControlScenario); + XdsThreadLocalUtil.removeScenarioInfo(); + assertNull(XdsThreadLocalUtil.getScenarioInfo()); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerInfoTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerInfoTest.java new file mode 100644 index 0000000000..70f5711fdf --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerInfoTest.java @@ -0,0 +1,94 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.xds.circuit; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Deque; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +/** + * Circuit Breaker information Test + * + * @author zhp + * @since 2024-12-02 + */ +public class XdsCircuitBreakerInfoTest { + private XdsCircuitBreakerInfo xdsCircuitBreakerInfoUnderTest; + + @Before + public void setUp() throws Exception { + xdsCircuitBreakerInfoUnderTest = new XdsCircuitBreakerInfo(); + } + + @Test + public void testLocalFailureGetterAndSetter() { + final Deque localFailure = new ConcurrentLinkedDeque<>(); + localFailure.add(1L); + xdsCircuitBreakerInfoUnderTest.setLocalFailure(localFailure); + assertEquals(localFailure, xdsCircuitBreakerInfoUnderTest.getLocalFailure()); + } + + @Test + public void testServerFailureGetterAndSetter() { + final Deque serverFailure = new ConcurrentLinkedDeque<>(); + serverFailure.add(1L); + xdsCircuitBreakerInfoUnderTest.setServerFailure(serverFailure); + assertEquals(serverFailure, xdsCircuitBreakerInfoUnderTest.getServerFailure()); + } + + @Test + public void testGateWayFailureGetterAndSetter() { + final Deque gateWayFailure = new ConcurrentLinkedDeque<>(); + gateWayFailure.add(1L); + xdsCircuitBreakerInfoUnderTest.setGateWayFailure(gateWayFailure); + assertEquals(gateWayFailure, xdsCircuitBreakerInfoUnderTest.getGateWayFailure()); + } + + @Test + public void testIsOpenGetterAndSetter() { + final boolean isOpen = false; + xdsCircuitBreakerInfoUnderTest.setOpen(isOpen); + assertFalse(xdsCircuitBreakerInfoUnderTest.isOpen()); + } + + @Test + public void testCircuitBreakerTimeGetterAndSetter() { + final long circuitBreakerTime = System.currentTimeMillis(); + xdsCircuitBreakerInfoUnderTest.setCircuitBreakerEndTime(circuitBreakerTime); + assertEquals(circuitBreakerTime, xdsCircuitBreakerInfoUnderTest.getCircuitBreakerEndTime()); + } + + @Test + public void testCircuitBreakerCountGetterAndSetter() { + final AtomicInteger circuitBreakerCount = new AtomicInteger(1); + xdsCircuitBreakerInfoUnderTest.setCircuitBreakerCount(circuitBreakerCount); + assertEquals(circuitBreakerCount, xdsCircuitBreakerInfoUnderTest.getCircuitBreakerCount()); + } + + @Test + public void testCleanRequestDate() { + testCircuitBreakerTimeGetterAndSetter(); + xdsCircuitBreakerInfoUnderTest.cleanRequestData(); + assertEquals(0, xdsCircuitBreakerInfoUnderTest.getServerFailure().size()); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerManagerTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerManagerTest.java new file mode 100644 index 0000000000..6c895e39c6 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerManagerTest.java @@ -0,0 +1,71 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.xds.circuit; + +import io.sermant.core.service.xds.entity.XdsInstanceCircuitBreakers; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Circuit Breaker manager Test + * + * @author zhp + * @since 2024-12-02 + */ +public class XdsCircuitBreakerManagerTest { + private static final String SERVICE_NAME = "provider"; + + private static final String ROUTE_NAME = "routeA"; + + private static final String CLUSTER_NAME = "clusterA"; + + private static final String ADDRESS = "127.0.0.1:8080"; + + @Test + public void testActiveRequests() { + assertEquals(1, XdsCircuitBreakerManager.incrementActiveRequests(SERVICE_NAME, CLUSTER_NAME, ADDRESS)); + assertEquals(2, XdsCircuitBreakerManager.incrementActiveRequests(SERVICE_NAME, CLUSTER_NAME, ADDRESS)); + XdsCircuitBreakerManager.decrementActiveRequests(SERVICE_NAME, CLUSTER_NAME, ADDRESS); + assertEquals(2, XdsCircuitBreakerManager.incrementActiveRequests(SERVICE_NAME, CLUSTER_NAME, ADDRESS)); + } + + @Test + public void testCircuitBreaker() { + final FlowControlScenario scenarioInfo = new FlowControlScenario(); + scenarioInfo.setServiceName(SERVICE_NAME); + scenarioInfo.setClusterName(CLUSTER_NAME); + scenarioInfo.setRouteName(ROUTE_NAME); + scenarioInfo.setAddress(ADDRESS); + final XdsInstanceCircuitBreakers circuitBreakers = new XdsInstanceCircuitBreakers(); + circuitBreakers.setSplitExternalLocalOriginErrors(false); + circuitBreakers.setConsecutiveLocalOriginFailure(1); + circuitBreakers.setConsecutiveGatewayFailure(1); + circuitBreakers.setConsecutive5xxFailure(1); + circuitBreakers.setInterval(1000L); + circuitBreakers.setBaseEjectionTime(10000L); + boolean result = XdsCircuitBreakerManager.needsInstanceCircuitBreaker(scenarioInfo, ADDRESS); + assertFalse(result); + XdsCircuitBreakerManager.recordFailureRequest(scenarioInfo, ADDRESS, 500, circuitBreakers); + XdsCircuitBreakerManager.setCircuitBeakerStatus(circuitBreakers, scenarioInfo); + result = XdsCircuitBreakerManager.needsInstanceCircuitBreaker(scenarioInfo, ADDRESS); + assertTrue(result); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/handler/XdsHandlerTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/handler/XdsHandlerTest.java new file mode 100644 index 0000000000..756ae137e2 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/handler/XdsHandlerTest.java @@ -0,0 +1,104 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.xds.handler; + +import io.sermant.core.service.xds.entity.ServiceInstance; +import io.sermant.core.service.xds.entity.XdsHttpFault; +import io.sermant.core.service.xds.entity.XdsInstanceCircuitBreakers; +import io.sermant.core.service.xds.entity.XdsLbPolicy; +import io.sermant.core.service.xds.entity.XdsRateLimit; +import io.sermant.core.service.xds.entity.XdsRequestCircuitBreakers; +import io.sermant.core.service.xds.entity.XdsRetryPolicy; +import io.sermant.core.service.xds.entity.XdsRoute; +import io.sermant.flowcontrol.common.util.XdsAbstractTest; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +/** + * Xds Flow Control handler Test + * + * @author zhp + * @since 2024-11-28 + */ +public class XdsHandlerTest extends XdsAbstractTest { + @Test + public void testGetRequestCircuitBreakers() { + final Optional result = XdsHandler.INSTANCE.getRequestCircuitBreakers( + SERVICE_NAME, CLUSTER_NAME); + Assert.assertTrue(result.isPresent()); + Assert.assertEquals(requestCircuitBreakers.getMaxRequests(), result.get().getMaxRequests()); + } + + @Test + public void testGetInstanceCircuitBreakers() { + final Optional result = XdsHandler.INSTANCE.getInstanceCircuitBreakers( + SERVICE_NAME, CLUSTER_NAME); + Assert.assertTrue(result.isPresent()); + Assert.assertEquals(instanceCircuitBreakers.getInterval(), result.get().getInterval()); + } + + @Test + public void testGetRetryPolicy() { + final Optional result = XdsHandler.INSTANCE.getRetryPolicy( + SERVICE_NAME, ROUTE_NAME); + Assert.assertTrue(result.isPresent()); + Assert.assertEquals(retryPolicy.getRetryOn(), result.get().getRetryOn()); + } + + @Test + public void testGetRateLimit() { + final Optional result = XdsHandler.INSTANCE.getRateLimit( + SERVICE_NAME, ROUTE_NAME, CLUSTER_NAME); + Assert.assertTrue(result.isPresent()); + Assert.assertEquals(1, result.get().getResponseHeaderOption().size()); + } + + @Test + public void testGetHttpFault() { + final Optional result = XdsHandler.INSTANCE.getHttpFault( + SERVICE_NAME, ROUTE_NAME); + Assert.assertTrue(result.isPresent()); + Assert.assertEquals(httpFault.getDelay().getFixedDelay(), result.get().getDelay().getFixedDelay()); + } + + @Test + public void testGetServiceRouteByServiceName() { + final List result = XdsHandler.INSTANCE.getServiceRouteByServiceName(SERVICE_NAME); + assertEquals(1, result.size()); + } + + @Test + public void testGetServiceInstanceByServiceName() { + final Set result = XdsHandler.INSTANCE. + getServiceInstanceByServiceName(SERVICE_NAME); + assertEquals(3, result.size()); + } + + @Test + public void testGetLbPolicyOfCluster() { + final Optional result = XdsHandler.INSTANCE.getLbPolicyOfCluster(SERVICE_NAME, + CLUSTER_NAME); + assertEquals(Optional.of(XdsLbPolicy.RANDOM), result); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/ratelimit/XdsRateLimitInfoTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/ratelimit/XdsRateLimitInfoTest.java new file mode 100644 index 0000000000..49566451cf --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/ratelimit/XdsRateLimitInfoTest.java @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.common.xds.ratelimit; + +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; + +/** + * xds rate limit information Testz + * + * @author zhp + * @since 2024-12-05 + */ +public class XdsRateLimitInfoTest { + private XdsRateLimitInfo xdsRateLimitInfoUnderTest; + + @Before + public void setUp() throws Exception { + xdsRateLimitInfoUnderTest = new XdsRateLimitInfo(0L, 0); + } + + @Test + public void testLastFilledTimeGetterAndSetter() { + final long lastFilledTime = 0L; + xdsRateLimitInfoUnderTest.setLastFilledTime(lastFilledTime); + assertEquals(lastFilledTime, xdsRateLimitInfoUnderTest.getLastFilledTime()); + } + + @Test + public void testCurrentTokensGetterAndSetter() { + final AtomicInteger currentTokens = new AtomicInteger(0); + xdsRateLimitInfoUnderTest.setCurrentTokens(currentTokens); + assertEquals(currentTokens, xdsRateLimitInfoUnderTest.getCurrentTokens()); + } +}