Skip to content

Commit

Permalink
Refactor RateLimiterController: improve accuracy and support maxQps >…
Browse files Browse the repository at this point in the history
… 1000

* Rename to ThrottlingController
* Improve accuracy: use nanoseconds when necessary and support maxQps threshold > 1000 (i.e. wait < 1ms)

Signed-off-by: Eric Zhao <[email protected]>
  • Loading branch information
sczyh30 committed Nov 30, 2022
1 parent aa54e41 commit 6dc2fdb
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.flow.controller.DefaultController;
import com.alibaba.csp.sentinel.slots.block.flow.controller.RateLimiterController;
import com.alibaba.csp.sentinel.slots.block.flow.controller.ThrottlingController;
import com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpController;
import com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpRateLimiterController;
import com.alibaba.csp.sentinel.util.StringUtil;
Expand Down Expand Up @@ -136,7 +136,7 @@ private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule)
return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),
ColdFactorProperty.coldFactor);
case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
return new ThrottlingController(rule.getMaxQueueingTimeMs(), rule.getCount());
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),
rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slots.block.flow.controller;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;

import com.alibaba.csp.sentinel.node.Node;
import com.alibaba.csp.sentinel.slots.block.flow.TrafficShapingController;
import com.alibaba.csp.sentinel.util.AssertUtil;
import com.alibaba.csp.sentinel.util.TimeUtil;

/**
* @author Eric Zhao
* @author jialiang.linjl
* @since 2.0
*/
public class ThrottlingController implements TrafficShapingController {

// Refactored from legacy RateLimitController of Sentinel 1.x.

private static final long MS_TO_NS_OFFSET = TimeUnit.MILLISECONDS.toNanos(1);

private final int maxQueueingTimeMs;
private final int statDurationMs;

private final double count;
private final boolean useNanoSeconds;

private final AtomicLong latestPassedTime = new AtomicLong(-1);

public ThrottlingController(int queueingTimeoutMs, double maxCountPerStat) {
this(queueingTimeoutMs, maxCountPerStat, 1000);
}

public ThrottlingController(int queueingTimeoutMs, double maxCountPerStat, int statDurationMs) {
AssertUtil.assertTrue(statDurationMs > 0, "statDurationMs should be positive");
AssertUtil.assertTrue(maxCountPerStat >= 0, "maxCountPerStat should be >= 0");
AssertUtil.assertTrue(queueingTimeoutMs >= 0, "queueingTimeoutMs should be >= 0");
this.maxQueueingTimeMs = queueingTimeoutMs;
this.count = maxCountPerStat;
this.statDurationMs = statDurationMs;
// Use nanoSeconds when durationMs%count != 0 or count/durationMs> 1 (to be accurate)
if (maxCountPerStat > 0) {
this.useNanoSeconds = statDurationMs % Math.round(maxCountPerStat) != 0 || maxCountPerStat / statDurationMs > 1;
} else {
this.useNanoSeconds = false;
}
}

@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}

private boolean checkPassUsingNanoSeconds(int acquireCount, double maxCountPerStat) {
final long maxQueueingTimeNs = maxQueueingTimeMs * MS_TO_NS_OFFSET;
long currentTime = System.nanoTime();
// Calculate the interval between every two requests.
final long costTimeNs = Math.round(1.0d * MS_TO_NS_OFFSET * statDurationMs * acquireCount / maxCountPerStat);

// Expected pass time of this request.
long expectedTime = costTimeNs + latestPassedTime.get();

if (expectedTime <= currentTime) {
// Contention may exist here, but it's okay.
latestPassedTime.set(currentTime);
return true;
} else {
final long curNanos = System.nanoTime();
// Calculate the time to wait.
long waitTime = costTimeNs + latestPassedTime.get() - curNanos;
if (waitTime > maxQueueingTimeNs) {
return false;
}

long oldTime = latestPassedTime.addAndGet(costTimeNs);
waitTime = oldTime - curNanos;
if (waitTime > maxQueueingTimeNs) {
latestPassedTime.addAndGet(-costTimeNs);
return false;
}
// in race condition waitTime may <= 0
if (waitTime > 0) {
sleepNanos(waitTime);
}
return true;
}
}

private boolean checkPassUsingCachedMs(int acquireCount, double maxCountPerStat) {
long currentTime = TimeUtil.currentTimeMillis();
// Calculate the interval between every two requests.
long costTime = Math.round(1.0d * statDurationMs * acquireCount / maxCountPerStat);

// Expected pass time of this request.
long expectedTime = costTime + latestPassedTime.get();

if (expectedTime <= currentTime) {
// Contention may exist here, but it's okay.
latestPassedTime.set(currentTime);
return true;
} else {
// Calculate the time to wait.
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
return false;
}

long oldTime = latestPassedTime.addAndGet(costTime);
waitTime = oldTime - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
// in race condition waitTime may <= 0
if (waitTime > 0) {
sleepMs(waitTime);
}
return true;
}
}

@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// Pass when acquire count is less or equal than 0.
if (acquireCount <= 0) {
return true;
}
// Reject when count is less or equal than 0.
// Otherwise, the costTime will be max of long and waitTime will overflow in some cases.
if (count <= 0) {
return false;
}
if (useNanoSeconds) {
return checkPassUsingNanoSeconds(acquireCount, this.count);
} else {
return checkPassUsingCachedMs(acquireCount, this.count);
}
}

private void sleepMs(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
}
}

private void sleepNanos(long ns) {
LockSupport.parkNanos(ns);
}

}
49 changes: 24 additions & 25 deletions ...controller/RateLimiterControllerTest.java → .../controller/ThrottlingControllerTest.java
100755 → 100644
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -15,26 +15,26 @@
*/
package com.alibaba.csp.sentinel.slots.block.flow.controller;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import com.alibaba.csp.sentinel.node.Node;
import com.alibaba.csp.sentinel.util.TimeUtil;

import org.junit.Test;

import com.alibaba.csp.sentinel.util.TimeUtil;
import com.alibaba.csp.sentinel.node.Node;
import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;

/**
* @author Eric Zhao
* @author jialiang.linjl
*/
public class RateLimiterControllerTest {
public class ThrottlingControllerTest {

@Test
public void testPaceController_normal() throws InterruptedException {
RateLimiterController paceController = new RateLimiterController(500, 10d);
public void testThrottlingControllerNormal() throws InterruptedException {
ThrottlingController paceController = new ThrottlingController(500, 10d);
Node node = mock(Node.class);

long start = TimeUtil.currentTimeMillis();
Expand All @@ -46,12 +46,12 @@ public void testPaceController_normal() throws InterruptedException {
}

@Test
public void testPaceController_timeout() throws InterruptedException {
final RateLimiterController paceController = new RateLimiterController(500, 10d);
public void testThrottlingControllerQueueTimeout() throws InterruptedException {
final ThrottlingController paceController = new ThrottlingController(500, 10d);
final Node node = mock(Node.class);

final AtomicInteger passcount = new AtomicInteger();
final AtomicInteger blockcount = new AtomicInteger();
final AtomicInteger passCount = new AtomicInteger();
final AtomicInteger blockCount = new AtomicInteger();
final CountDownLatch countDown = new CountDownLatch(1);

final AtomicInteger done = new AtomicInteger();
Expand All @@ -62,9 +62,9 @@ public void run() {
boolean pass = paceController.canPass(node, 1);

if (pass) {
passcount.incrementAndGet();
passCount.incrementAndGet();
} else {
blockcount.incrementAndGet();
blockCount.incrementAndGet();
}
done.incrementAndGet();

Expand All @@ -73,21 +73,20 @@ public void run() {
}
}

}, "Thread " + i);
}, "Thread-TestThrottlingControllerQueueTimeout-" + i);
thread.start();
}

countDown.await();
System.out.println("pass:" + passcount.get());
System.out.println("block" + blockcount.get());
System.out.println("done" + done.get());
assertTrue(blockcount.get() > 0);

System.out.println("pass: " + passCount.get());
System.out.println("block: " + blockCount.get());
System.out.println("done: " + done.get());
assertTrue(blockCount.get() > 0);
}

@Test
public void testPaceController_zeroattack() throws InterruptedException {
RateLimiterController paceController = new RateLimiterController(500, 0d);
public void testThrottlingControllerZeroThreshold() throws InterruptedException {
ThrottlingController paceController = new ThrottlingController(500, 0d);
Node node = mock(Node.class);

for (int i = 0; i < 2; i++) {
Expand Down

0 comments on commit 6dc2fdb

Please sign in to comment.