Skip to content

Commit

Permalink
FixedIntervalRateLimiter support for a shorter refill interval
Browse files Browse the repository at this point in the history
  • Loading branch information
Ray Mattingly committed Mar 22, 2024
1 parent 0763a74 commit 1cf038d
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,65 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;

/**
* With this limiter resources will be refilled only after a fixed interval of time.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class FixedIntervalRateLimiter extends RateLimiter {

/**
* The FixedIntervalRateLimiter can be harsh from a latency/backoff perspective, which makes it
* difficult to fully and consistently utilize a quota allowance. By configuring the
* {@link #RATE_LIMITER_REFILL_INTERVAL_MS} to a lower value you will encourage the rate limiter
* to throw smaller wait intervals for requests which may be fulfilled in timeframes shorter than
* the quota's full interval. For example, if you're saturating a 100MB/sec read IO quota with a
* ton of tiny gets, then configuring this to a value like 100ms will ensure that your retry
* backoffs approach ~100ms, rather than 1sec. Be careful not to configure this too low, or you
* may produce a dangerous amount of retry volume.
*/
public static final String RATE_LIMITER_REFILL_INTERVAL_MS =
"hbase.quota.rate.limiter.refill.interval.ms";

private long nextRefillTime = -1L;
private final long refillInterval;

public FixedIntervalRateLimiter() {
this(DEFAULT_TIME_UNIT);
}

public FixedIntervalRateLimiter(long refillInterval) {
super();
Preconditions.checkArgument(getTimeUnitInMillis() >= refillInterval,
String.format("Refill interval %s must be less than or equal to TimeUnit millis %s",
refillInterval, getTimeUnitInMillis()));
this.refillInterval = refillInterval;
}

@Override
public long refill(long limit) {
final long now = EnvironmentEdgeManager.currentTime();
if (nextRefillTime == -1) {
nextRefillTime = now + refillInterval;
return limit;
}
if (now < nextRefillTime) {
return 0;
}
nextRefillTime = now + super.getTimeUnitInMillis();
return limit;
long diff = refillInterval + now - nextRefillTime;
long refills = diff / refillInterval;
nextRefillTime = now + refillInterval;
long refillAmount = refills * getRefillIntervalAdjustedLimit(limit);
return Math.min(limit, refillAmount);
}

@Override
public long getWaitInterval(long limit, long available, long amount) {
// adjust the limit based on the refill interval
limit = getRefillIntervalAdjustedLimit(limit);

if (nextRefillTime == -1) {
return 0;
}
Expand All @@ -62,7 +101,11 @@ public long getWaitInterval(long limit, long available, long amount) {
if (diff % limit == 0) {
extraRefillsNecessary--;
}
return nextRefillInterval + (extraRefillsNecessary * super.getTimeUnitInMillis());
return nextRefillInterval + (extraRefillsNecessary * refillInterval);
}

private long getRefillIntervalAdjustedLimit(long limit) {
return (long) Math.ceil(refillInterval / (double) getTimeUnitInMillis() * limit);
}

// This method is for strictly testing purpose only
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
+ "are mostly synchronized...but to me it looks like they are totally synchronized")
public abstract class RateLimiter {
public static final String QUOTA_RATE_LIMITER_CONF_KEY = "hbase.quota.rate.limiter";
private long tunit = 1000; // Timeunit factor for translating to ms.
public static final long DEFAULT_TIME_UNIT = 1000;

private long tunit = DEFAULT_TIME_UNIT; // Timeunit factor for translating to ms.
private long limit = Long.MAX_VALUE; // The max value available resource units can be refilled to.
private long avail = Long.MAX_VALUE; // Currently available resource units

Expand Down Expand Up @@ -157,7 +159,7 @@ public synchronized long getWaitIntervalMs(final long amount) {
* @param amount the number of required resources, a non-negative number
* @return true if there are enough available resources, otherwise false
*/
private boolean isAvailable(final long amount) {
protected boolean isAvailable(final long amount) {
if (isBypass()) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,17 @@ private TimeBasedLimiter() {
conf.getClass(RateLimiter.QUOTA_RATE_LIMITER_CONF_KEY, AverageIntervalRateLimiter.class)
.getName())
) {
reqsLimiter = new FixedIntervalRateLimiter();
reqSizeLimiter = new FixedIntervalRateLimiter();
writeReqsLimiter = new FixedIntervalRateLimiter();
writeSizeLimiter = new FixedIntervalRateLimiter();
readReqsLimiter = new FixedIntervalRateLimiter();
readSizeLimiter = new FixedIntervalRateLimiter();
reqCapacityUnitLimiter = new FixedIntervalRateLimiter();
writeCapacityUnitLimiter = new FixedIntervalRateLimiter();
readCapacityUnitLimiter = new FixedIntervalRateLimiter();
long refillInterval = conf.getLong(FixedIntervalRateLimiter.RATE_LIMITER_REFILL_INTERVAL_MS,
RateLimiter.DEFAULT_TIME_UNIT);
reqsLimiter = new FixedIntervalRateLimiter(refillInterval);
reqSizeLimiter = new FixedIntervalRateLimiter(refillInterval);
writeReqsLimiter = new FixedIntervalRateLimiter(refillInterval);
writeSizeLimiter = new FixedIntervalRateLimiter(refillInterval);
readReqsLimiter = new FixedIntervalRateLimiter(refillInterval);
readSizeLimiter = new FixedIntervalRateLimiter(refillInterval);
reqCapacityUnitLimiter = new FixedIntervalRateLimiter(refillInterval);
writeCapacityUnitLimiter = new FixedIntervalRateLimiter(refillInterval);
readCapacityUnitLimiter = new FixedIntervalRateLimiter(refillInterval);
} else {
reqsLimiter = new AverageIntervalRateLimiter();
reqSizeLimiter = new AverageIntervalRateLimiter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.hadoop.hbase.quotas;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;

import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HBaseClassTestRule;
Expand Down Expand Up @@ -427,4 +429,71 @@ public void testLimiterCompensationOverflow() throws InterruptedException {
avgLimiter.consume(-80);
assertEquals(limit, avgLimiter.getAvailable());
}

@Test
public void itRunsFullWithPartialRefillInterval() {
RateLimiter limiter = new FixedIntervalRateLimiter(100);
limiter.set(10, TimeUnit.SECONDS);
assertEquals(0, limiter.getWaitIntervalMs());

// Consume the quota
limiter.consume(10);

// Need to wait 1s to acquire another resource
long waitInterval = limiter.waitInterval(10);
assertTrue(900 < waitInterval);
assertTrue(1000 >= waitInterval);
// We need to wait 2s to acquire more than 10 resources
waitInterval = limiter.waitInterval(20);
assertTrue(1900 < waitInterval);
assertTrue(2000 >= waitInterval);

limiter.setNextRefillTime(limiter.getNextRefillTime() - 1000);
// We've waited the full interval, so we should now have 10
assertEquals(0, limiter.getWaitIntervalMs(10));
assertEquals(0, limiter.waitInterval());
}

@Test
public void itRunsPartialRefillIntervals() {
RateLimiter limiter = new FixedIntervalRateLimiter(100);
limiter.set(10, TimeUnit.SECONDS);
assertEquals(0, limiter.getWaitIntervalMs());

// Consume the quota
limiter.consume(10);

// Need to wait 1s to acquire another resource
long waitInterval = limiter.waitInterval(10);
assertTrue(900 < waitInterval);
assertTrue(1000 >= waitInterval);
// We need to wait 2s to acquire more than 10 resources
waitInterval = limiter.waitInterval(20);
assertTrue(1900 < waitInterval);
assertTrue(2000 >= waitInterval);
// We need to wait 0<=x<=100ms to acquire 1 resource
waitInterval = limiter.waitInterval(1);
assertTrue(0 < waitInterval);
assertTrue(100 >= waitInterval);

limiter.setNextRefillTime(limiter.getNextRefillTime() - 500);
// We've waited half the interval, so we should now have half available
assertEquals(0, limiter.getWaitIntervalMs(5));
assertEquals(0, limiter.waitInterval());
}

@Test
public void itRunsRepeatedPartialRefillIntervals() {
RateLimiter limiter = new FixedIntervalRateLimiter(100);
limiter.set(10, TimeUnit.SECONDS);
assertEquals(0, limiter.getWaitIntervalMs());
// Consume the quota
limiter.consume(10);
for (int i = 0; i < 100; i++) {
limiter.setNextRefillTime(limiter.getNextRefillTime() - 100); // free 1 resource
limiter.consume(1);
assertFalse(limiter.isAvailable(1)); // all resources consumed
assertTrue(limiter.isAvailable(0)); // not negative
}
}
}

0 comments on commit 1cf038d

Please sign in to comment.