Skip to content

Commit

Permalink
finish concurrent flow control checker
Browse files Browse the repository at this point in the history
Signed-off-by: yunfeiyanggzq <[email protected]>
  • Loading branch information
yunfeiyanggzq authored and sczyh30 committed Sep 16, 2020
1 parent 3b07f78 commit b044383
Show file tree
Hide file tree
Showing 11 changed files with 696 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ public final class ClusterConstants {
public static final int MSG_TYPE_PING = 0;
public static final int MSG_TYPE_FLOW = 1;
public static final int MSG_TYPE_PARAM_FLOW = 2;
public static final int MSG_TYPE_CONCURRENT_FLOW_ACQUIRE = 3;
public static final int MSG_TYPE_CONCURRENT_FLOW_RELEASE = 4;


public static final int RESPONSE_STATUS_BAD = -1;
public static final int RESPONSE_STATUS_OK = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.flow;

import com.alibaba.csp.sentinel.cluster.TokenResult;
import com.alibaba.csp.sentinel.cluster.TokenResultStatus;
import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterFlowRuleManager;
import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.CurrentConcurrencyManager;
import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.TokenCacheNode;
import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.TokenCacheNodeManager;
import com.alibaba.csp.sentinel.cluster.server.log.ClusterServerStatLogUtil;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;

import java.util.concurrent.atomic.AtomicInteger;

/**
* @author yunfeiyanggzq
*/
final public class ConcurrentClusterFlowChecker {

public static double calcGlobalThreshold(FlowRule rule) {
double count = rule.getCount();
switch (rule.getClusterConfig().getThresholdType()) {
case ClusterRuleConstant.FLOW_THRESHOLD_GLOBAL:
return count;
case ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL:
default:
int connectedCount = ClusterFlowRuleManager.getConnectedCount(rule.getClusterConfig().getFlowId());
return count * connectedCount;
}
}

public static TokenResult acquireConcurrentToken(/*@Valid*/ String clientAddress, FlowRule rule, int acquireCount) {
long flowId = rule.getClusterConfig().getFlowId();
AtomicInteger nowCalls = CurrentConcurrencyManager.get(flowId);
if (nowCalls == null) {
RecordLog.warn("[ConcurrentClusterFlowChecker] Fail to get nowCalls by flowId<{}>", flowId);
return new TokenResult(TokenResultStatus.FAIL);
}

// check before enter the lock to improve the efficiency
if (nowCalls.get() + acquireCount > calcGlobalThreshold(rule)) {
ClusterServerStatLogUtil.log("concurrent|block|" + flowId, acquireCount);
return new TokenResult(TokenResultStatus.BLOCKED);
}

// ensure the atomicity of operations
// lock different nowCalls to improve the efficiency
synchronized (nowCalls) {
// check again whether the request can pass.
if (nowCalls.get() + acquireCount > calcGlobalThreshold(rule)) {
ClusterServerStatLogUtil.log("concurrent|block|" + flowId, acquireCount);
return new TokenResult(TokenResultStatus.BLOCKED);
} else {
nowCalls.getAndAdd(acquireCount);
}
}
ClusterServerStatLogUtil.log("concurrent|pass|" + flowId, acquireCount);
TokenCacheNode node = TokenCacheNode.generateTokenCacheNode(rule, acquireCount, clientAddress);
TokenCacheNodeManager.putTokenCacheNode(node.getTokenId(), node);
TokenResult tokenResult = new TokenResult(TokenResultStatus.OK);
tokenResult.setTokenId(node.getTokenId());
return tokenResult;
}

public static TokenResult releaseConcurrentToken(/*@Valid*/ long tokenId) {
TokenCacheNode node = TokenCacheNodeManager.getTokenCacheNode(tokenId);
if (node == null) {
RecordLog.info("[ConcurrentClusterFlowChecker] Token<{}> is already released", tokenId);
return new TokenResult(TokenResultStatus.ALREADY_RELEASE);
}
FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(node.getFlowId());
if (rule == null) {
RecordLog.info("[ConcurrentClusterFlowChecker] Fail to get rule by flowId<{}>", node.getFlowId());
return new TokenResult(TokenResultStatus.NO_RULE_EXISTS);
}
if (TokenCacheNodeManager.removeTokenCacheNode(tokenId) == null) {
RecordLog.info("[ConcurrentClusterFlowChecker] Token<{}> is already released for flowId<{}>", tokenId, node.getFlowId());
return new TokenResult(TokenResultStatus.ALREADY_RELEASE);
}
int acquireCount = node.getAcquireCount();
AtomicInteger nowCalls = CurrentConcurrencyManager.get(node.getFlowId());
nowCalls.getAndAdd(-1 * acquireCount);
ClusterServerStatLogUtil.log("concurrent|release|" + rule.getClusterConfig().getFlowId(), acquireCount);
return new TokenResult(TokenResultStatus.RELEASE_OK);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@
*/
package com.alibaba.csp.sentinel.cluster.flow;

import java.util.Collection;

import com.alibaba.csp.sentinel.cluster.TokenResultStatus;
import com.alibaba.csp.sentinel.cluster.TokenResult;
import com.alibaba.csp.sentinel.cluster.TokenResultStatus;
import com.alibaba.csp.sentinel.cluster.TokenService;
import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterFlowRuleManager;
import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterParamFlowRuleManager;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRule;

import java.util.Collection;

/**
* Default implementation for cluster {@link TokenService}.
*
Expand Down Expand Up @@ -61,10 +61,35 @@ public TokenResult requestParamToken(Long ruleId, int acquireCount, Collection<O
return ClusterParamFlowChecker.acquireClusterToken(rule, acquireCount, params);
}

@Override
public TokenResult requestConcurrentToken(String clientAddress, Long ruleId, int acquireCount) {
if (notValidRequest(clientAddress, ruleId, acquireCount)) {
return badRequest();
}
// The rule should be valid.
FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(ruleId);
if (rule == null) {
return new TokenResult(TokenResultStatus.NO_RULE_EXISTS);
}
return ConcurrentClusterFlowChecker.acquireConcurrentToken(clientAddress, rule, acquireCount);
}

@Override
public void releaseConcurrentToken(Long tokenId) {
if (tokenId == null) {
return;
}
ConcurrentClusterFlowChecker.releaseConcurrentToken(tokenId);
}

private boolean notValidRequest(Long id, int count) {
return id == null || id <= 0 || count <= 0;
}

private boolean notValidRequest(String address, Long id, int count) {
return address == null || "".equals(address) || id == null || id <= 0 || count <= 0;
}

private TokenResult badRequest() {
return new TokenResult(TokenResultStatus.BAD_REQUEST);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,10 @@
*/
package com.alibaba.csp.sentinel.cluster.flow.rule;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterMetricStatistics;
import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.CurrentConcurrencyManager;
import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterMetric;
import com.alibaba.csp.sentinel.cluster.server.ServerConstants;
import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager;
import com.alibaba.csp.sentinel.cluster.server.connection.ConnectionManager;
import com.alibaba.csp.sentinel.cluster.server.util.ClusterRuleUtil;
import com.alibaba.csp.sentinel.log.RecordLog;
Expand All @@ -41,6 +34,9 @@
import com.alibaba.csp.sentinel.util.function.Function;
import com.alibaba.csp.sentinel.util.function.Predicate;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

/**
* Manager for cluster flow rules.
*
Expand All @@ -54,12 +50,12 @@ public final class ClusterFlowRuleManager {
* for a specific namespace to do rule management manually.
*/
public static final Function<String, SentinelProperty<List<FlowRule>>> DEFAULT_PROPERTY_SUPPLIER =
new Function<String, SentinelProperty<List<FlowRule>>>() {
@Override
public SentinelProperty<List<FlowRule>> apply(String namespace) {
return new DynamicSentinelProperty<>();
}
};
new Function<String, SentinelProperty<List<FlowRule>>>() {
@Override
public SentinelProperty<List<FlowRule>> apply(String namespace) {
return new DynamicSentinelProperty<>();
}
};

/**
* (flowId, clusterRule)
Expand Down Expand Up @@ -87,7 +83,7 @@ public SentinelProperty<List<FlowRule>> apply(String namespace) {
* Cluster flow rule property supplier for a specific namespace.
*/
private static volatile Function<String, SentinelProperty<List<FlowRule>>> propertySupplier
= DEFAULT_PROPERTY_SUPPLIER;
= DEFAULT_PROPERTY_SUPPLIER;

private static final Object UPDATE_LOCK = new Object();

Expand Down Expand Up @@ -118,18 +114,18 @@ public static void register2Property(String namespace) {
AssertUtil.notEmpty(namespace, "namespace cannot be empty");
if (propertySupplier == null) {
RecordLog.warn(
"[ClusterFlowRuleManager] Cluster flow property supplier is absent, cannot register property");
"[ClusterFlowRuleManager] Cluster flow property supplier is absent, cannot register property");
return;
}
SentinelProperty<List<FlowRule>> property = propertySupplier.apply(namespace);
if (property == null) {
RecordLog.warn(
"[ClusterFlowRuleManager] Wrong created property from cluster flow property supplier, ignoring");
"[ClusterFlowRuleManager] Wrong created property from cluster flow property supplier, ignoring");
return;
}
synchronized (UPDATE_LOCK) {
RecordLog.info("[ClusterFlowRuleManager] Registering new property to cluster flow rule manager"
+ " for namespace <{}>", namespace);
+ " for namespace <{}>", namespace);
registerPropertyInternal(namespace, property);
}
}
Expand Down Expand Up @@ -180,7 +176,7 @@ public static void removeProperty(String namespace) {
PROPERTY_MAP.remove(namespace);
}
RecordLog.info("[ClusterFlowRuleManager] Removing property from cluster flow rule manager"
+ " for namespace <{}>", namespace);
+ " for namespace <{}>", namespace);
}
}

Expand Down Expand Up @@ -253,7 +249,7 @@ public static List<FlowRule> getFlowRules(String namespace) {
* Load flow rules for a specific namespace. The former rules of the namespace will be replaced.
*
* @param namespace a valid namespace
* @param rules rule list
* @param rules rule list
*/
public static void loadRules(String namespace, List<FlowRule> rules) {
AssertUtil.notEmpty(namespace, "namespace cannot be empty");
Expand All @@ -278,6 +274,9 @@ private static void clearAndResetRulesFor(/*@Valid*/ String namespace) {
for (Long flowId : flowIdSet) {
FLOW_RULES.remove(flowId);
FLOW_NAMESPACE_MAP.remove(flowId);
if (CurrentConcurrencyManager.containsFlowId(flowId)) {
CurrentConcurrencyManager.remove(flowId);
}
}
flowIdSet.clear();
} else {
Expand All @@ -293,6 +292,9 @@ private static void clearAndResetRulesConditional(/*@Valid*/ String namespace, P
FLOW_RULES.remove(flowId);
FLOW_NAMESPACE_MAP.remove(flowId);
ClusterMetricStatistics.removeMetric(flowId);
if (CurrentConcurrencyManager.containsFlowId(flowId)) {
CurrentConcurrencyManager.remove(flowId);
}
}
}
oldIdSet.clear();
Expand Down Expand Up @@ -335,7 +337,7 @@ private static void applyClusterFlowRule(List<FlowRule> list, /*@Valid*/ String
}
if (!FlowRuleUtil.isValidRule(rule)) {
RecordLog.warn(
"[ClusterFlowRuleManager] Ignoring invalid flow rule when loading new flow rules: " + rule);
"[ClusterFlowRuleManager] Ignoring invalid flow rule when loading new flow rules: " + rule);
continue;
}
if (StringUtil.isBlank(rule.getLimitApp())) {
Expand All @@ -351,10 +353,13 @@ private static void applyClusterFlowRule(List<FlowRule> list, /*@Valid*/ String
ruleMap.put(flowId, rule);
FLOW_NAMESPACE_MAP.put(flowId, namespace);
flowIdSet.add(flowId);
if (!CurrentConcurrencyManager.containsFlowId(flowId)) {
CurrentConcurrencyManager.put(flowId, 0);
}

// Prepare cluster metric from valid flow ID.
ClusterMetricStatistics.putMetricIfAbsent(flowId,
new ClusterMetric(clusterConfig.getSampleCount(), clusterConfig.getWindowIntervalMs()));
new ClusterMetric(clusterConfig.getSampleCount(), clusterConfig.getWindowIntervalMs()));
}

// Cleanup unused cluster metrics.
Expand All @@ -381,16 +386,17 @@ public FlowRulePropertyListener(String namespace) {
public synchronized void configUpdate(List<FlowRule> conf) {
applyClusterFlowRule(conf, namespace);
RecordLog.info("[ClusterFlowRuleManager] Cluster flow rules received for namespace <{}>: {}",
namespace, FLOW_RULES);
namespace, FLOW_RULES);
}

@Override
public synchronized void configLoad(List<FlowRule> conf) {
applyClusterFlowRule(conf, namespace);
RecordLog.info("[ClusterFlowRuleManager] Cluster flow rules loaded for namespace <{}>: {}",
namespace, FLOW_RULES);
namespace, FLOW_RULES);
}
}

private ClusterFlowRuleManager() {}
private ClusterFlowRuleManager() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent;

import com.alibaba.csp.sentinel.cluster.flow.ConcurrentClusterFlowChecker;
import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterFlowRuleManager;
import com.alibaba.csp.sentinel.cluster.server.log.ClusterServerStatLogUtil;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;

import java.util.Set;

/**
* @author yunfeiyanggzq
*/
public class ClusterConcurrentCheckerLogListener implements Runnable {
@Override
public void run() {
try {
collectInformation();
} catch (Exception e) {
RecordLog.warn("[ClusterConcurrentCheckerLogListener] Failed to record concurrent flow control regularly", e);
}
}

private void collectInformation() {
Set<Long> keySet = CurrentConcurrencyManager.getConcurrencyMapKeySet();
for (long flowId : keySet) {
FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(flowId);
if (rule == null || CurrentConcurrencyManager.get(flowId).get() == 0) {
continue;
}
double concurrencyLevel = ConcurrentClusterFlowChecker.calcGlobalThreshold(rule);
String resource = rule.getResource();
ClusterServerStatLogUtil.log(String.format("concurrent|resource:%s|flowId:%dl|concurrencyLevel:%fl|currentConcurrency", resource, flowId,concurrencyLevel),CurrentConcurrencyManager.get(flowId).get());
}
if (TokenCacheNodeManager.getSize() != 0){
ClusterServerStatLogUtil.log("flow|totalTokenSize", TokenCacheNodeManager.getSize());
}

}
}
Loading

0 comments on commit b044383

Please sign in to comment.