Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28359 Improve quota RateLimiter synchronization #5683

Merged
merged 4 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ public long getReadAvailable() {
return readAvailable;
}

@Override
public long getReadConsumed() {
return readConsumed;
}

@Override
public void addGetResult(final Result result) {
operationSize[OperationType.GET.ordinal()] += QuotaUtil.calculateResultSize(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,9 @@ public void addMutation(final Mutation mutation) {
public long getReadAvailable() {
return Long.MAX_VALUE;
}

@Override
public long getReadConsumed() {
return 0L;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,15 @@ public enum OperationType {

/** Returns the number of bytes available to read to avoid exceeding the quota */
long getReadAvailable();

/** Returns the number of bytes consumed from the quota by the operation */
long getReadConsumed();

/**
* Returns the maximum result size to be returned by the given operation. This is the greater of
* two numbers: the bytes available, or the bytes already consumed
*/
default long getMaxResultSize() {
return Math.max(getReadAvailable(), getReadConsumed());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@

/**
* Simple rate limiter. Usage Example: // At this point you have a unlimited resource limiter
* RateLimiter limiter = new AverageIntervalRateLimiter(); or new FixedIntervalRateLimiter();
* limiter.set(10, TimeUnit.SECONDS); // set 10 resources/sec while (true) { // call canExecute
* before performing resource consuming operation bool canExecute = limiter.canExecute(); // If
* there are no available resources, wait until one is available if (!canExecute)
* Thread.sleep(limiter.waitInterval()); // ...execute the work and consume the resource...
* limiter.consume(); }
* RateLimiter limiter = new AverageIntervalRateLimiter(); // or new FixedIntervalRateLimiter();
* limiter.set(10, TimeUnit.SECONDS); // set 10 resources/sec while (limiter.getWaitIntervalMs > 0)
* { // wait until waitInterval == 0 Thread.sleep(limiter.getWaitIntervalMs()); } // ...execute the
* work and consume the resource... limiter.consume();
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
Expand Down Expand Up @@ -135,18 +133,31 @@ protected synchronized long getTimeUnitInMillis() {

/**
* Is there at least one resource available to allow execution?
* @return true if there is at least one resource available, otherwise false
* @return the waitInterval to backoff, or 0 if execution is allowed
*/
public boolean canExecute() {
return canExecute(1);
public long getWaitIntervalMs() {
return getWaitIntervalMs(1);
}

/**
* Are there enough available resources to allow execution?
* @param amount the number of required resources, a non-negative number
* @return the waitInterval to backoff, or 0 if execution is allowed
*/
public synchronized long getWaitIntervalMs(final long amount) {
assert amount >= 0;
if (!isAvailable(amount)) {
return waitInterval(amount);
}
return 0;
}

/**
* Are there enough available resources to allow execution?
* @param amount the number of required resources, a non-negative number
* @return true if there are enough available resources, otherwise false
*/
public synchronized boolean canExecute(final long amount) {
private boolean isAvailable(final long amount) {
if (isBypass()) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,43 +141,47 @@ private static void setFromTimedQuota(final RateLimiter limiter, final TimedQuot
public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs,
long estimateReadSize, long estimateWriteCapacityUnit, long estimateReadCapacityUnit)
throws RpcThrottlingException {
if (!reqsLimiter.canExecute(writeReqs + readReqs)) {
RpcThrottlingException.throwNumRequestsExceeded(reqsLimiter.waitInterval());
long waitInterval = reqsLimiter.getWaitIntervalMs(writeReqs + readReqs);
if (waitInterval > 0) {
RpcThrottlingException.throwNumRequestsExceeded(waitInterval);
}
if (!reqSizeLimiter.canExecute(estimateWriteSize + estimateReadSize)) {
RpcThrottlingException.throwRequestSizeExceeded(
reqSizeLimiter.waitInterval(estimateWriteSize + estimateReadSize));
waitInterval = reqSizeLimiter.getWaitIntervalMs(estimateWriteSize + estimateReadSize);
if (waitInterval > 0) {
RpcThrottlingException.throwRequestSizeExceeded(waitInterval);
}
if (!reqCapacityUnitLimiter.canExecute(estimateWriteCapacityUnit + estimateReadCapacityUnit)) {
RpcThrottlingException.throwRequestCapacityUnitExceeded(
reqCapacityUnitLimiter.waitInterval(estimateWriteCapacityUnit + estimateReadCapacityUnit));
waitInterval = reqCapacityUnitLimiter
.getWaitIntervalMs(estimateWriteCapacityUnit + estimateReadCapacityUnit);
if (waitInterval > 0) {
RpcThrottlingException.throwRequestCapacityUnitExceeded(waitInterval);
}

if (estimateWriteSize > 0) {
if (!writeReqsLimiter.canExecute(writeReqs)) {
RpcThrottlingException.throwNumWriteRequestsExceeded(writeReqsLimiter.waitInterval());
waitInterval = writeReqsLimiter.getWaitIntervalMs(writeReqs);
if (waitInterval > 0) {
RpcThrottlingException.throwNumWriteRequestsExceeded(waitInterval);
}
if (!writeSizeLimiter.canExecute(estimateWriteSize)) {
RpcThrottlingException
.throwWriteSizeExceeded(writeSizeLimiter.waitInterval(estimateWriteSize));
waitInterval = writeSizeLimiter.getWaitIntervalMs(estimateWriteSize);
if (waitInterval > 0) {
RpcThrottlingException.throwWriteSizeExceeded(waitInterval);
}
if (!writeCapacityUnitLimiter.canExecute(estimateWriteCapacityUnit)) {
RpcThrottlingException.throwWriteCapacityUnitExceeded(
writeCapacityUnitLimiter.waitInterval(estimateWriteCapacityUnit));
waitInterval = writeCapacityUnitLimiter.getWaitIntervalMs(estimateWriteCapacityUnit);
if (waitInterval > 0) {
RpcThrottlingException.throwWriteCapacityUnitExceeded(waitInterval);
}
}

if (estimateReadSize > 0) {
if (!readReqsLimiter.canExecute(readReqs)) {
RpcThrottlingException.throwNumReadRequestsExceeded(readReqsLimiter.waitInterval());
waitInterval = readReqsLimiter.getWaitIntervalMs(readReqs);
if (waitInterval > 0) {
RpcThrottlingException.throwNumReadRequestsExceeded(waitInterval);
}
if (!readSizeLimiter.canExecute(estimateReadSize)) {
RpcThrottlingException
.throwReadSizeExceeded(readSizeLimiter.waitInterval(estimateReadSize));
waitInterval = readSizeLimiter.getWaitIntervalMs(estimateReadSize);
if (waitInterval > 0) {
RpcThrottlingException.throwReadSizeExceeded(waitInterval);
}
if (!readCapacityUnitLimiter.canExecute(estimateReadCapacityUnit)) {
RpcThrottlingException.throwReadCapacityUnitExceeded(
readCapacityUnitLimiter.waitInterval(estimateReadCapacityUnit));
waitInterval = readCapacityUnitLimiter.getWaitIntervalMs(estimateReadCapacityUnit);
if (waitInterval > 0) {
RpcThrottlingException.throwReadCapacityUnitExceeded(waitInterval);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ private List<CellScannable> doNonAtomicRegionMutation(final HRegion region,
// doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are
// deferred/batched
List<ClientProtos.Action> mutations = null;
long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize());
IOException sizeIOE = null;
ClientProtos.ResultOrException.Builder resultOrExceptionBuilder =
ResultOrException.newBuilder();
Expand Down Expand Up @@ -3609,7 +3609,7 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
}
RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
// now let's do the real scan.
long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize());
RegionScanner scanner = rsh.s;
// this is the limit of rows for this scan, if we the number of rows reach this value, we will
// close the scanner.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ public void testBBSGet() throws Exception {
doPuts(10_000, FAMILY, QUALIFIER, table);
TEST_UTIL.flush(TABLE_NAME);

// Add ~10 block/min limit
// Add ~10 block/sec limit
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.READ_SIZE,
Math.round(10.1 * blockSize), TimeUnit.MINUTES));
Math.round(10.1 * blockSize), TimeUnit.SECONDS));
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);

// should execute at max 10 requests
Expand All @@ -132,10 +132,10 @@ public void testBBSScan() throws Exception {
doPuts(10_000, FAMILY, QUALIFIER, table);
TEST_UTIL.flush(TABLE_NAME);

// Add 1 block/min limit.
// Add 1 block/sec limit.
// This should only allow 1 scan per minute, because we estimate 1 block per scan
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, blockSize,
TimeUnit.MINUTES));
TimeUnit.SECONDS));
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
waitMinuteQuota();

Expand All @@ -148,9 +148,9 @@ public void testBBSScan() throws Exception {
testTraffic(() -> doScans(100, table), 100, 0);
testTraffic(() -> doScans(100, table), 100, 0);

// Add ~3 block/min limit. This should support >1 scans
// Add ~3 block/sec limit. This should support >1 scans
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE,
Math.round(3.1 * blockSize), TimeUnit.MINUTES));
Math.round(3.1 * blockSize), TimeUnit.SECONDS));
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);

// should execute some requests, but not all
Expand All @@ -174,10 +174,10 @@ public void testBBSMultiGet() throws Exception {
doPuts(rowCount, FAMILY, QUALIFIER, table);
TEST_UTIL.flush(TABLE_NAME);

// Add 1 block/min limit.
// Add 1 block/sec limit.
// This should only allow 1 multiget per minute, because we estimate 1 block per multiget
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, blockSize,
TimeUnit.MINUTES));
TimeUnit.SECONDS));
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
waitMinuteQuota();

Expand All @@ -190,9 +190,9 @@ public void testBBSMultiGet() throws Exception {
testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0);
testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0);

// Add ~100 block/min limit
// Add ~100 block/sec limit
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE,
Math.round(100.1 * blockSize), TimeUnit.MINUTES));
Math.round(100.1 * blockSize), TimeUnit.SECONDS));
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);

// should execute approximately 10 batches of 10 requests
Expand All @@ -211,7 +211,7 @@ public void testBBSMultiGet() throws Exception {

private void testTraffic(Callable<Long> trafficCallable, long expectedSuccess, long marginOfError)
throws Exception {
TEST_UTIL.waitFor(90_000, () -> {
TEST_UTIL.waitFor(5_000, () -> {
long actualSuccess;
try {
actualSuccess = trafficCallable.call();
Expand Down
Loading