Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize the performance of xds flow control functionality #1730

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,16 @@ public class CommonConst {
*/
public static final int DEFAULT_RESPONSE_CODE = -1;

/**
* the key of Scenario information for flow control
*/
public static final String SCENARIO_INFO = "flowControlScenario";

/**
* the key of request-information
*/
public static final String REQUEST_INFO = "REQUEST_INFO";

private CommonConst() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,12 @@ private boolean isPathMatched(XdsPathMatcher matcher, String path) {
}

private boolean isHeadersMatched(List<XdsHeaderMatcher> matchers, Map<String, String> headers) {
return matchers.stream()
.allMatch(xdsHeaderMatcher -> xdsHeaderMatcher.isMatch(headers));
for (XdsHeaderMatcher xdsHeaderMatcher : matchers) {
if (!xdsHeaderMatcher.isMatch(headers)) {
return false;
}
}
return true;
}

private String selectClusterByRoute(XdsRoute matchedRoute) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
import io.sermant.flowcontrol.common.core.ResolverManager;
import io.sermant.flowcontrol.common.core.match.MatchManager;
import io.sermant.flowcontrol.common.core.resolver.AbstractResolver;
import io.sermant.flowcontrol.common.entity.FlowControlScenario;
import io.sermant.flowcontrol.common.entity.RequestEntity;
import io.sermant.flowcontrol.common.util.StringUtils;

import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -71,17 +69,6 @@ public List<H> getHandlers(RequestEntity request) {
return createOrGetHandlers(businessNames);
}

/**
* gets the specified request handler
*
* @param flowControlScenario matched scenario information
* @return handler
*/
public List<H> getXdsHandlers(FlowControlScenario flowControlScenario) {
Optional<H> handlerOptions = createHandler(flowControlScenario, StringUtils.EMPTY);
return handlerOptions.map(Collections::singletonList).orElse(Collections.emptyList());
}

/**
* create handler
*
Expand Down Expand Up @@ -114,17 +101,6 @@ private Optional<H> create(String businessName) {
*/
protected abstract Optional<H> createHandler(String businessName, R rule);

/**
* create handler
*
* @param flowControlScenario matched business information
* @param businessName service scenario name
* @return handler
*/
public Optional<H> createHandler(FlowControlScenario flowControlScenario, String businessName) {
return Optional.empty();
}

/**
* get configuration key
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public boolean isNeedRetry(Throwable ex, XdsRetryPolicy retryPolicy) {
* @throws UnsupportedOperationException unsupported operation
*/
public Optional<String> getCode(Object result) {
throw new UnsupportedOperationException();
return Optional.empty();
}

/**
Expand All @@ -141,7 +141,7 @@ public Optional<String> getCode(Object result) {
* @throws UnsupportedOperationException unsupported operation
*/
public Optional<Set<String>> getHeaderNames(Object result) {
throw new UnsupportedOperationException();
return Optional.empty();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,15 @@ public interface Retry {
RetryFramework retryType();

/**
* get status code
* get status code
*
* @param result interface response result
* @return response status code
*/
Optional<String> getCode(Object result);

/**
* get header
* get header
*
* @param result interface response result
* @return response header names
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,11 @@
import io.sermant.flowcontrol.common.core.RuleUtils;
import io.sermant.flowcontrol.common.core.resolver.RetryResolver;
import io.sermant.flowcontrol.common.core.rule.RetryRule;
import io.sermant.flowcontrol.common.entity.FlowControlScenario;
import io.sermant.flowcontrol.common.entity.HttpRequestEntity;
import io.sermant.flowcontrol.common.handler.retry.policy.RetryOnUntriedPolicy;
import io.sermant.flowcontrol.common.handler.retry.policy.RetryPolicy;
import io.sermant.flowcontrol.common.util.StringUtils;
import io.sermant.flowcontrol.common.xds.handler.XdsHandler;

import java.util.List;
import java.util.Optional;

/**
* Retry context, used to manage retry policies based on different host framework types
Expand Down Expand Up @@ -141,21 +137,11 @@ public void buildRetryPolicy(HttpRequestEntity requestEntity) {
}

/**
* build test strategy
* build retry policy
*
* @param scenario scenario information
* @param retryPolicy retry policy information
*/
public void buildXdsRetryPolicy(FlowControlScenario scenario) {
if (StringUtils.isEmpty(scenario.getServiceName())
|| StringUtils.isEmpty(scenario.getRouteName())) {
return;
}
Optional<XdsRetryPolicy> retryPolicyOptional = XdsHandler.INSTANCE
.getRetryPolicy(scenario.getServiceName(), scenario.getRouteName());
if (!retryPolicyOptional.isPresent()) {
return;
}
XdsRetryPolicy retryPolicy = retryPolicyOptional.get();
public void buildXdsRetryPolicy(XdsRetryPolicy retryPolicy) {
policyThreadLocal.set(new RetryOnUntriedPolicy((int) retryPolicy.getMaxAttempts()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ public static int incrementActiveRequests(String serviceName, String clusterName
}

/**
* decrement Active Request
* decrease Active Request
*
* @param serviceName service name
* @param clusterName route name
* @param address request address
*/
public static void decrementActiveRequests(String serviceName, String clusterName, String address) {
public static void decreaseActiveRequests(String serviceName, String clusterName, String address) {
getActiveRequestCount(serviceName, clusterName, address).decrementAndGet();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.sermant.flowcontrol.common.entity.FlowControlScenario;
import io.sermant.flowcontrol.common.entity.HttpRequestEntity;
import io.sermant.flowcontrol.common.entity.RequestEntity;

import io.sermant.flowcontrol.common.util.XdsAbstractTest;
import org.junit.Assert;
import org.junit.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.sermant.core.service.xds.entity.XdsInstanceCircuitBreakers;
import io.sermant.flowcontrol.common.entity.FlowControlScenario;
import io.sermant.flowcontrol.common.util.XdsThreadLocalUtil;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
Expand All @@ -43,29 +44,62 @@ public class XdsCircuitBreakerManagerTest {
public void testActiveRequests() {
assertEquals(1, XdsCircuitBreakerManager.incrementActiveRequests(SERVICE_NAME, CLUSTER_NAME, ADDRESS));
assertEquals(2, XdsCircuitBreakerManager.incrementActiveRequests(SERVICE_NAME, CLUSTER_NAME, ADDRESS));
XdsCircuitBreakerManager.decrementActiveRequests(SERVICE_NAME, CLUSTER_NAME, ADDRESS);
XdsCircuitBreakerManager.decreaseActiveRequests(SERVICE_NAME, CLUSTER_NAME, ADDRESS);
assertEquals(2, XdsCircuitBreakerManager.incrementActiveRequests(SERVICE_NAME, CLUSTER_NAME, ADDRESS));
}

@Test
public void testCircuitBreaker() {
public void testCircuitBreaker() throws InterruptedException {
final FlowControlScenario scenarioInfo = new FlowControlScenario();
scenarioInfo.setServiceName(SERVICE_NAME);
scenarioInfo.setClusterName(CLUSTER_NAME);
scenarioInfo.setRouteName(ROUTE_NAME);
scenarioInfo.setAddress(ADDRESS);
final XdsInstanceCircuitBreakers circuitBreakers = new XdsInstanceCircuitBreakers();
circuitBreakers.setSplitExternalLocalOriginErrors(false);
circuitBreakers.setConsecutiveLocalOriginFailure(1);
circuitBreakers.setConsecutiveGatewayFailure(1);
circuitBreakers.setConsecutiveLocalOriginFailure(0);
circuitBreakers.setConsecutiveGatewayFailure(0);
circuitBreakers.setConsecutive5xxFailure(1);
circuitBreakers.setInterval(1000L);
circuitBreakers.setBaseEjectionTime(10000L);
boolean result = XdsCircuitBreakerManager.needsInstanceCircuitBreaker(scenarioInfo, ADDRESS);
assertFalse(result);

// Test the number of errors from the server reached the threshold
XdsCircuitBreakerManager.recordFailureRequest(scenarioInfo, ADDRESS, 500, circuitBreakers);
XdsCircuitBreakerManager.setCircuitBeakerStatus(circuitBreakers, scenarioInfo);
result = XdsCircuitBreakerManager.needsInstanceCircuitBreaker(scenarioInfo, ADDRESS);
assertTrue(result);

// Test whether the circuit breaker time has been exceeded and whether it has been restored
Thread.sleep(1100L);
result = XdsCircuitBreakerManager.needsInstanceCircuitBreaker(scenarioInfo, ADDRESS);
assertFalse(result);

// Test the number of errors from the local source reached the threshold
circuitBreakers.setSplitExternalLocalOriginErrors(true);
circuitBreakers.setConsecutiveLocalOriginFailure(1);
XdsThreadLocalUtil.setConnectionStatus(false);
XdsCircuitBreakerManager.recordFailureRequest(scenarioInfo, ADDRESS, -1, circuitBreakers);
XdsCircuitBreakerManager.setCircuitBeakerStatus(circuitBreakers, scenarioInfo);
result = XdsCircuitBreakerManager.needsInstanceCircuitBreaker(scenarioInfo, ADDRESS);
assertTrue(result);

// Test whether the actual circuit breaker time is the product of the number of circuit breakers multiplied
// by the configured circuit breaker time
Thread.sleep(1100L);
result = XdsCircuitBreakerManager.needsInstanceCircuitBreaker(scenarioInfo, ADDRESS);
assertTrue(result);
Thread.sleep(1000L);
result = XdsCircuitBreakerManager.needsInstanceCircuitBreaker(scenarioInfo, ADDRESS);
assertFalse(result);

// Test the number of errors from the gateway reached the threshold
circuitBreakers.setConsecutive5xxFailure(0);
circuitBreakers.setConsecutiveGatewayFailure(1);
XdsCircuitBreakerManager.recordFailureRequest(scenarioInfo, ADDRESS, 503, circuitBreakers);
XdsCircuitBreakerManager.setCircuitBeakerStatus(circuitBreakers, scenarioInfo);
result = XdsCircuitBreakerManager.needsInstanceCircuitBreaker(scenarioInfo, ADDRESS);
assertTrue(result);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.sermant.flowcontrol.common.xds.ratelimit;

import io.sermant.core.service.xds.entity.XdsTokenBucket;
import org.junit.Test;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

/**
* RateLimitManager Test
*
* @author zhp
* @since 2024-12-02
*/
public class XdsRateLimitManagerTest {
private static final String SERVICE_NAME = "serviceA";

private static final String ROUTE_NAME = "routeA";

@Test
public void testFillAndConsumeToken() throws InterruptedException {
final XdsTokenBucket tokenBucket = new XdsTokenBucket();
tokenBucket.setMaxTokens(1);
tokenBucket.setTokensPerFill(1);
tokenBucket.setFillInterval(1000L);
boolean result = XdsRateLimitManager.fillAndConsumeToken(SERVICE_NAME, ROUTE_NAME, tokenBucket);
assertTrue(result);

// The situation where all tokens have been consumed
result = XdsRateLimitManager.fillAndConsumeToken(SERVICE_NAME, ROUTE_NAME, tokenBucket);
assertFalse(result);
Thread.sleep(1000L);

// Test token refill situation
result = XdsRateLimitManager.fillAndConsumeToken(SERVICE_NAME, ROUTE_NAME, tokenBucket);
assertTrue(result);

// Test all refilled tokens have been consumed
result = XdsRateLimitManager.fillAndConsumeToken(SERVICE_NAME, ROUTE_NAME, tokenBucket);
assertFalse(result);
}
}
Loading
Loading