Skip to content

Commit

Permalink
feat: 限流算法示例
Browse files Browse the repository at this point in the history
  • Loading branch information
dunwu committed Jan 22, 2024
1 parent aad3a98 commit 70b3983
Show file tree
Hide file tree
Showing 8 changed files with 415 additions and 0 deletions.
37 changes: 37 additions & 0 deletions codes/java-distributed/java-rate-limit/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>io.github.dunwu.javatech</groupId>
<artifactId>java-rate-limit</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<name>${project.artifactId}</name>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.25</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
<optional>true</optional>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package io.github.dunwu.distributed.ratelimit;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
* 固定时间窗口限流算法
*
* @author <a href="mailto:[email protected]">Zhang Peng</a>
* @date 2024-01-18
*/
public class FixedWindowRateLimiter implements RateLimiter {

/**
* 允许的最大请求数
*/
private final long maxPermits;

/**
* 窗口期时长
*/
private final long periodMillis;

/**
* 窗口期截止时间
*/
private long lastPeriodMillis;

/**
* 请求计数
*/
private AtomicLong count = new AtomicLong(0);

public FixedWindowRateLimiter(long qps) {
this(qps, 1000, TimeUnit.MILLISECONDS);
}

public FixedWindowRateLimiter(long maxPermits, long period, TimeUnit timeUnit) {
this.maxPermits = maxPermits;
this.periodMillis = timeUnit.toMillis(period);
this.lastPeriodMillis = System.currentTimeMillis() + this.periodMillis;
}

@Override
public synchronized boolean tryAcquire(int permits) {
long now = System.currentTimeMillis();
if (lastPeriodMillis <= now) {
this.lastPeriodMillis = now + this.periodMillis;
count = new AtomicLong(0);
}
if (count.get() + permits <= maxPermits) {
count.addAndGet(permits);
return true;
} else {
return false;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package io.github.dunwu.distributed.ratelimit;

import java.util.concurrent.atomic.AtomicLong;

/**
* 漏桶限流算法
*
* @author <a href="mailto:[email protected]">Zhang Peng</a>
* @date 2024-01-18
*/
public class LeakyBucketRateLimiter implements RateLimiter {

/**
* QPS
*/
private final int qps;

/**
* 桶的容量
*/
private final long capacity;

/**
* 计算的起始时间
*/
private long beginTimeMillis;

/**
* 桶中当前的水量
*/
private final AtomicLong waterNum = new AtomicLong(0);

public LeakyBucketRateLimiter(int qps, int capacity) {
this.qps = qps;
this.capacity = capacity;
}

@Override
public synchronized boolean tryAcquire(int permits) {

// 如果桶中没有水,直接放行
if (waterNum.get() == 0) {
beginTimeMillis = System.currentTimeMillis();
waterNum.addAndGet(permits);
return true;
}

// 计算水量
long leakedWaterNum = ((System.currentTimeMillis() - beginTimeMillis) / 1000) * qps;
long currentWaterNum = waterNum.get() - leakedWaterNum;
waterNum.set(Math.max(0, currentWaterNum));

// 重置时间
beginTimeMillis = System.currentTimeMillis();

if (waterNum.get() + permits < capacity) {
waterNum.addAndGet(permits);
return true;
} else {
return false;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.github.dunwu.distributed.ratelimit;

/**
* 限流器
*
* @author <a href="mailto:[email protected]">Zhang Peng</a>
* @date 2024-01-18
*/
public interface RateLimiter {

boolean tryAcquire(int permits);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package io.github.dunwu.distributed.ratelimit;

import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.RandomUtil;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
* 限流器示例
*
* @author <a href="mailto:[email protected]">Zhang Peng</a>
* @date 2024-01-18
*/
@Slf4j
public class RateLimiterDemo {

public static void main(String[] args) {

// ============================================================================

int qps = 20;

System.out.println("======================= 固定时间窗口限流算法 =======================");
FixedWindowRateLimiter fixedWindowRateLimiter = new FixedWindowRateLimiter(qps);
testRateLimit(fixedWindowRateLimiter, qps);

System.out.println("======================= 滑动时间窗口限流算法 =======================");
SlidingWindowRateLimiter slidingWindowRateLimiter = new SlidingWindowRateLimiter(qps, 10);
testRateLimit(slidingWindowRateLimiter, qps);

System.out.println("======================= 漏桶限流算法 =======================");
LeakyBucketRateLimiter leakyBucketRateLimiter = new LeakyBucketRateLimiter(qps, 100);
testRateLimit(leakyBucketRateLimiter, qps);

System.out.println("======================= 令牌桶限流算法 =======================");
TokenBucketRateLimiter tokenBucketRateLimiter = new TokenBucketRateLimiter(qps, 100);
testRateLimit(tokenBucketRateLimiter, qps);
}

private static void testRateLimit(RateLimiter rateLimiter, int qps) {

AtomicInteger okNum = new AtomicInteger(0);
AtomicInteger limitNum = new AtomicInteger(0);
ExecutorService executorService = ThreadUtil.newFixedExecutor(10, "限流测试", true);
long beginTime = System.currentTimeMillis();

int threadNum = 4;
final CountDownLatch latch = new CountDownLatch(threadNum);
for (int i = 0; i < threadNum; i++) {
executorService.submit(() -> {
try {
batchRequest(rateLimiter, okNum, limitNum, 1000);
} catch (Exception e) {
log.error("发生异常!", e);
} finally {
latch.countDown();
}
});
}

try {
latch.await(10, TimeUnit.SECONDS);
long endTime = System.currentTimeMillis();
long gap = endTime - beginTime;
log.info("限流 QPS: {} -> 实际结果:耗时 {} ms,{} 次请求成功,{} 次请求被限流,实际 QPS: {}",
qps, gap, okNum.get(), limitNum.get(), okNum.get() * 1000 / gap);
if (okNum.get() == qps) {
log.info("限流符合预期");
}
} catch (Exception e) {
log.error("发生异常!", e);
} finally {
executorService.shutdown();
}
}

private static void batchRequest(RateLimiter rateLimiter, AtomicInteger okNum, AtomicInteger limitNum, int num)
throws InterruptedException {
for (int j = 0; j < num; j++) {
if (rateLimiter.tryAcquire(1)) {
log.info("请求成功");
okNum.getAndIncrement();
} else {
log.info("请求限流");
limitNum.getAndIncrement();
}
TimeUnit.MILLISECONDS.sleep(RandomUtil.randomInt(0, 10));
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package io.github.dunwu.distributed.ratelimit;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
* 滑动时间窗口限流算法
*
* @author <a href="mailto:[email protected]">Zhang Peng</a>
* @date 2024-01-18
*/
public class SlidingWindowRateLimiter implements RateLimiter {

/**
* 允许的最大请求数
*/
private final long maxPermits;

/**
* 窗口期时长
*/
private final long periodMillis;

/**
* 分片窗口期时长
*/
private final long shardPeriodMillis;

/**
* 窗口期截止时间
*/
private long lastPeriodMillis;

/**
* 分片窗口数
*/
private final int shardNum;

/**
* 请求总计数
*/
private final AtomicLong totalCount = new AtomicLong(0);

/**
* 分片窗口计数列表
*/
private final List<AtomicLong> countList = new LinkedList<>();

public SlidingWindowRateLimiter(long qps, int shardNum) {
this(qps, 1000, TimeUnit.MILLISECONDS, shardNum);
}

public SlidingWindowRateLimiter(long maxPermits, long period, TimeUnit timeUnit, int shardNum) {
this.maxPermits = maxPermits;
this.periodMillis = timeUnit.toMillis(period);
this.lastPeriodMillis = System.currentTimeMillis();
this.shardPeriodMillis = timeUnit.toMillis(period) / shardNum;
this.shardNum = shardNum;
for (int i = 0; i < shardNum; i++) {
countList.add(new AtomicLong(0));
}
}

@Override
public synchronized boolean tryAcquire(int permits) {
long now = System.currentTimeMillis();
if (now > lastPeriodMillis) {
for (int shardId = 0; shardId < shardNum; shardId++) {
long shardCount = countList.get(shardId).get();
totalCount.addAndGet(-shardCount);
countList.set(shardId, new AtomicLong(0));
lastPeriodMillis += shardPeriodMillis;
}
}
int shardId = (int) (now % periodMillis / shardPeriodMillis);
if (totalCount.get() + permits <= maxPermits) {
countList.get(shardId).addAndGet(permits);
totalCount.addAndGet(permits);
return true;
} else {
return false;
}
}

}
Loading

0 comments on commit 70b3983

Please sign in to comment.