From 334567e24384dfbd9f8263fa04555ba7839fb7f4 Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Thu, 15 Feb 2024 09:24:41 -0500 Subject: [PATCH 1/4] fix RateLimiter canExecute and waitInterval synchronization --- .../hadoop/hbase/quotas/RateLimiter.java | 20 ++++- .../hadoop/hbase/quotas/TimeBasedLimiter.java | 52 ++++++------ .../quotas/TestBlockBytesScannedQuota.java | 22 ++--- .../hadoop/hbase/quotas/TestRateLimiter.java | 83 +++++++++---------- 4 files changed, 97 insertions(+), 80 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java index bda60ffa690a..5cd37b26d160 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java @@ -135,18 +135,32 @@ protected synchronized long getTimeUnitInMillis() { /** * Is there at least one resource available to allow execution? - * @return true if there is at least one resource available, otherwise false + * @return the waitInterval to backoff, or 0 if execution is allowed */ - public boolean canExecute() { + public long canExecute() { return canExecute(1); } + /** + * Are there enough available resources to allow execution? + * @param amount the number of required resources, a non-negative number + * @return the waitInterval to backoff, or 0 if execution is allowed + */ + public synchronized long canExecute(final long amount) { + assert amount >= 0; + long waitInterval = waitInterval(amount); + if (isAvailable(amount) || waitInterval == 0) { + return 0; + } + return waitInterval; + } + /** * Are there enough available resources to allow execution? * @param amount the number of required resources, a non-negative number * @return true if there are enough available resources, otherwise false */ - public synchronized boolean canExecute(final long amount) { + private synchronized boolean isAvailable(final long amount) { if (isBypass()) { return true; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java index d7eb0e537a39..025f6aa5fa71 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java @@ -141,43 +141,47 @@ private static void setFromTimedQuota(final RateLimiter limiter, final TimedQuot public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, long estimateReadSize, long estimateWriteCapacityUnit, long estimateReadCapacityUnit) throws RpcThrottlingException { - if (!reqsLimiter.canExecute(writeReqs + readReqs)) { - RpcThrottlingException.throwNumRequestsExceeded(reqsLimiter.waitInterval()); + long waitInterval = reqsLimiter.canExecute(writeReqs + readReqs); + if (waitInterval > 0) { + RpcThrottlingException.throwNumRequestsExceeded(waitInterval); } - if (!reqSizeLimiter.canExecute(estimateWriteSize + estimateReadSize)) { - RpcThrottlingException.throwRequestSizeExceeded( - reqSizeLimiter.waitInterval(estimateWriteSize + estimateReadSize)); + waitInterval = reqSizeLimiter.canExecute(estimateWriteSize + estimateReadSize); + if (waitInterval > 0) { + RpcThrottlingException.throwRequestSizeExceeded(waitInterval); } - if (!reqCapacityUnitLimiter.canExecute(estimateWriteCapacityUnit + estimateReadCapacityUnit)) { - RpcThrottlingException.throwRequestCapacityUnitExceeded( - reqCapacityUnitLimiter.waitInterval(estimateWriteCapacityUnit + estimateReadCapacityUnit)); + waitInterval = + reqCapacityUnitLimiter.canExecute(estimateWriteCapacityUnit + estimateReadCapacityUnit); + if (waitInterval > 0) { + RpcThrottlingException.throwRequestCapacityUnitExceeded(waitInterval); } if (estimateWriteSize > 0) { - if (!writeReqsLimiter.canExecute(writeReqs)) { - RpcThrottlingException.throwNumWriteRequestsExceeded(writeReqsLimiter.waitInterval()); + waitInterval = writeReqsLimiter.canExecute(writeReqs); + if (waitInterval > 0) { + RpcThrottlingException.throwNumWriteRequestsExceeded(waitInterval); } - if (!writeSizeLimiter.canExecute(estimateWriteSize)) { - RpcThrottlingException - .throwWriteSizeExceeded(writeSizeLimiter.waitInterval(estimateWriteSize)); + waitInterval = writeSizeLimiter.canExecute(estimateWriteSize); + if (waitInterval > 0) { + RpcThrottlingException.throwWriteSizeExceeded(waitInterval); } - if (!writeCapacityUnitLimiter.canExecute(estimateWriteCapacityUnit)) { - RpcThrottlingException.throwWriteCapacityUnitExceeded( - writeCapacityUnitLimiter.waitInterval(estimateWriteCapacityUnit)); + waitInterval = writeCapacityUnitLimiter.canExecute(estimateWriteCapacityUnit); + if (waitInterval > 0) { + RpcThrottlingException.throwWriteCapacityUnitExceeded(waitInterval); } } if (estimateReadSize > 0) { - if (!readReqsLimiter.canExecute(readReqs)) { - RpcThrottlingException.throwNumReadRequestsExceeded(readReqsLimiter.waitInterval()); + waitInterval = readReqsLimiter.canExecute(readReqs); + if (waitInterval > 0) { + RpcThrottlingException.throwNumReadRequestsExceeded(waitInterval); } - if (!readSizeLimiter.canExecute(estimateReadSize)) { - RpcThrottlingException - .throwReadSizeExceeded(readSizeLimiter.waitInterval(estimateReadSize)); + waitInterval = readSizeLimiter.canExecute(estimateReadSize); + if (waitInterval > 0) { + RpcThrottlingException.throwReadSizeExceeded(waitInterval); } - if (!readCapacityUnitLimiter.canExecute(estimateReadCapacityUnit)) { - RpcThrottlingException.throwReadCapacityUnitExceeded( - readCapacityUnitLimiter.waitInterval(estimateReadCapacityUnit)); + waitInterval = readCapacityUnitLimiter.canExecute(estimateReadCapacityUnit); + if (waitInterval > 0) { + RpcThrottlingException.throwReadCapacityUnitExceeded(waitInterval); } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java index e27ba123381c..5de9a2d1a900 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java @@ -103,9 +103,9 @@ public void testBBSGet() throws Exception { doPuts(10_000, FAMILY, QUALIFIER, table); TEST_UTIL.flush(TABLE_NAME); - // Add ~10 block/min limit + // Add ~10 block/sec limit admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.READ_SIZE, - Math.round(10.1 * blockSize), TimeUnit.MINUTES)); + Math.round(10.1 * blockSize), TimeUnit.SECONDS)); triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); // should execute at max 10 requests @@ -132,10 +132,10 @@ public void testBBSScan() throws Exception { doPuts(10_000, FAMILY, QUALIFIER, table); TEST_UTIL.flush(TABLE_NAME); - // Add 1 block/min limit. + // Add 1 block/sec limit. // This should only allow 1 scan per minute, because we estimate 1 block per scan admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, blockSize, - TimeUnit.MINUTES)); + TimeUnit.SECONDS)); triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); waitMinuteQuota(); @@ -148,9 +148,9 @@ public void testBBSScan() throws Exception { testTraffic(() -> doScans(100, table), 100, 0); testTraffic(() -> doScans(100, table), 100, 0); - // Add ~3 block/min limit. This should support >1 scans + // Add ~3 block/sec limit. This should support >1 scans admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, - Math.round(3.1 * blockSize), TimeUnit.MINUTES)); + Math.round(3.1 * blockSize), TimeUnit.SECONDS)); triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); // should execute some requests, but not all @@ -174,10 +174,10 @@ public void testBBSMultiGet() throws Exception { doPuts(rowCount, FAMILY, QUALIFIER, table); TEST_UTIL.flush(TABLE_NAME); - // Add 1 block/min limit. + // Add 1 block/sec limit. // This should only allow 1 multiget per minute, because we estimate 1 block per multiget admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, blockSize, - TimeUnit.MINUTES)); + TimeUnit.SECONDS)); triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); waitMinuteQuota(); @@ -190,9 +190,9 @@ public void testBBSMultiGet() throws Exception { testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0); testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0); - // Add ~100 block/min limit + // Add ~100 block/sec limit admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, - Math.round(100.1 * blockSize), TimeUnit.MINUTES)); + Math.round(100.1 * blockSize), TimeUnit.SECONDS)); triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); // should execute approximately 10 batches of 10 requests @@ -211,7 +211,7 @@ public void testBBSMultiGet() throws Exception { private void testTraffic(Callable trafficCallable, long expectedSuccess, long marginOfError) throws Exception { - TEST_UTIL.waitFor(90_000, () -> { + TEST_UTIL.waitFor(5_000, () -> { long actualSuccess; try { actualSuccess = trafficCallable.call(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java index 49df937f7c5c..16d1025b88a8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java @@ -18,8 +18,7 @@ package org.apache.hadoop.hbase.quotas; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNotEquals; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -71,7 +70,7 @@ private void testWaitInterval(final TimeUnit timeUnit, final long limit, // consume all the available resources, one request at the time. // the wait interval should be 0 for (int i = 0; i < (limit - 1); ++i) { - assertTrue(limiter.canExecute()); + assertEquals(0, limiter.canExecute()); limiter.consume(); long waitInterval = limiter.waitInterval(); assertEquals(0, waitInterval); @@ -81,7 +80,7 @@ private void testWaitInterval(final TimeUnit timeUnit, final long limit, // There is one resource available, so we should be able to // consume it without waiting. limiter.setNextRefillTime(limiter.getNextRefillTime() - nowTs); - assertTrue(limiter.canExecute()); + assertEquals(0, limiter.canExecute()); assertEquals(0, limiter.waitInterval()); limiter.consume(); // No more resources are available, we should wait for at least an interval. @@ -94,7 +93,7 @@ private void testWaitInterval(final TimeUnit timeUnit, final long limit, // artificially go into the past to prove that when too early we should fail. long temp = nowTs + 500; limiter.setNextRefillTime(limiter.getNextRefillTime() + temp); - assertFalse(limiter.canExecute()); + assertNotEquals(0, limiter.canExecute()); // Roll back the nextRefillTime set to continue further testing limiter.setNextRefillTime(limiter.getNextRefillTime() - temp); } @@ -107,7 +106,7 @@ public void testOverconsumptionAverageIntervalRefillStrategy() { // 10 resources are available, but we need to consume 20 resources // Verify that we have to wait at least 1.1sec to have 1 resource available - assertTrue(limiter.canExecute()); + assertEquals(0, limiter.canExecute()); limiter.consume(20); // We consumed twice the quota. Need to wait 1s to get back to 0, then another 100ms for the 1 assertEquals(1100, limiter.waitInterval(1)); @@ -116,10 +115,10 @@ public void testOverconsumptionAverageIntervalRefillStrategy() { // Verify that after 1sec we need to wait for another 0.1sec to get a resource available limiter.setNextRefillTime(limiter.getNextRefillTime() - 1000); - assertFalse(limiter.canExecute(1)); + assertNotEquals(0, limiter.canExecute(1)); limiter.setNextRefillTime(limiter.getNextRefillTime() - 100); // We've waited the full 1.1sec, should now have 1 available - assertTrue(limiter.canExecute(1)); + assertEquals(0, limiter.canExecute(1)); assertEquals(0, limiter.waitInterval()); } @@ -138,7 +137,7 @@ public long currentTime() { } }; EnvironmentEdgeManager.injectEdge(edge); - assertTrue(limiter.canExecute()); + assertEquals(0, limiter.canExecute()); // 10 resources are available, but we need to consume 20 resources limiter.consume(20); // We over-consumed by 10. Since this is a fixed interval refill, where @@ -149,10 +148,10 @@ public long currentTime() { // Verify that after 1sec also no resource should be available limiter.setNextRefillTime(limiter.getNextRefillTime() - 1000); - assertFalse(limiter.canExecute()); + assertNotEquals(0, limiter.canExecute()); // Verify that after total 2sec the 10 resource is available limiter.setNextRefillTime(limiter.getNextRefillTime() - 1000); - assertTrue(limiter.canExecute()); + assertEquals(0, limiter.canExecute()); assertEquals(0, limiter.waitInterval()); } @@ -161,12 +160,12 @@ public void testFixedIntervalResourceAvailability() throws Exception { RateLimiter limiter = new FixedIntervalRateLimiter(); limiter.set(10, TimeUnit.SECONDS); - assertTrue(limiter.canExecute(10)); + assertEquals(0, limiter.canExecute(10)); limiter.consume(3); assertEquals(7, limiter.getAvailable()); - assertFalse(limiter.canExecute(10)); + assertNotEquals(0, limiter.canExecute(10)); limiter.setNextRefillTime(limiter.getNextRefillTime() - 1000); - assertTrue(limiter.canExecute(10)); + assertEquals(0, limiter.canExecute(10)); assertEquals(10, limiter.getAvailable()); } @@ -182,7 +181,7 @@ public void testLimiterBySmallerRate() throws InterruptedException { limiter.setNextRefillTime(limiter.getNextRefillTime() - 500); for (int i = 0; i < 3; i++) { // 6 resources/sec < limit, so limiter.canExecute(nowTs, lastTs) should be true - assertEquals(true, limiter.canExecute()); + assertEquals(limiter.canExecute(), 0); limiter.consume(); } } @@ -237,7 +236,7 @@ public int testCanExecuteByRate(RateLimiter limiter, int rate) { int count = 0; while ((request++) < rate) { limiter.setNextRefillTime(limiter.getNextRefillTime() - limiter.getTimeUnitInMillis() / rate); - if (limiter.canExecute()) { + if (limiter.canExecute() == 0) { count++; limiter.consume(); } @@ -317,28 +316,28 @@ public void testUnconfiguredLimiters() throws InterruptedException { assertEquals(limit, avgLimiter.getAvailable()); assertEquals(limit, fixLimiter.getAvailable()); - assertTrue(avgLimiter.canExecute(limit)); + assertEquals(0, avgLimiter.canExecute(limit)); avgLimiter.consume(limit); - assertTrue(fixLimiter.canExecute(limit)); + assertEquals(0, fixLimiter.canExecute(limit)); fixLimiter.consume(limit); // Make sure that available is Long.MAX_VALUE - assertTrue(limit == avgLimiter.getAvailable()); - assertTrue(limit == fixLimiter.getAvailable()); + assertEquals(limit, avgLimiter.getAvailable()); + assertEquals(limit, fixLimiter.getAvailable()); // after 100 millseconds, it should be able to execute limit as well testEdge.incValue(100); - assertTrue(avgLimiter.canExecute(limit)); + assertEquals(0, avgLimiter.canExecute(limit)); avgLimiter.consume(limit); - assertTrue(fixLimiter.canExecute(limit)); + assertEquals(0, fixLimiter.canExecute(limit)); fixLimiter.consume(limit); // Make sure that available is Long.MAX_VALUE - assertTrue(limit == avgLimiter.getAvailable()); - assertTrue(limit == fixLimiter.getAvailable()); + assertEquals(limit, avgLimiter.getAvailable()); + assertEquals(limit, fixLimiter.getAvailable()); EnvironmentEdgeManager.reset(); } @@ -358,39 +357,39 @@ public void testExtremeLimiters() throws InterruptedException { assertEquals(limit, avgLimiter.getAvailable()); assertEquals(limit, fixLimiter.getAvailable()); - assertTrue(avgLimiter.canExecute(limit / 2)); + assertEquals(0, avgLimiter.canExecute(limit / 2)); avgLimiter.consume(limit / 2); - assertTrue(fixLimiter.canExecute(limit / 2)); + assertEquals(0, fixLimiter.canExecute(limit / 2)); fixLimiter.consume(limit / 2); // Make sure that available is whatever left - assertTrue((limit - (limit / 2)) == avgLimiter.getAvailable()); - assertTrue((limit - (limit / 2)) == fixLimiter.getAvailable()); + assertEquals((limit - (limit / 2)), avgLimiter.getAvailable()); + assertEquals((limit - (limit / 2)), fixLimiter.getAvailable()); // after 100 millseconds, both should not be able to execute the limit testEdge.incValue(100); - assertFalse(avgLimiter.canExecute(limit)); - assertFalse(fixLimiter.canExecute(limit)); + assertNotEquals(0, avgLimiter.canExecute(limit)); + assertNotEquals(0, fixLimiter.canExecute(limit)); // after 500 millseconds, average interval limiter should be able to execute the limit testEdge.incValue(500); - assertTrue(avgLimiter.canExecute(limit)); - assertFalse(fixLimiter.canExecute(limit)); + assertEquals(0, avgLimiter.canExecute(limit)); + assertNotEquals(0, fixLimiter.canExecute(limit)); // Make sure that available is correct - assertTrue(limit == avgLimiter.getAvailable()); - assertTrue((limit - (limit / 2)) == fixLimiter.getAvailable()); + assertEquals(limit, avgLimiter.getAvailable()); + assertEquals((limit - (limit / 2)), fixLimiter.getAvailable()); // after 500 millseconds, both should be able to execute testEdge.incValue(500); - assertTrue(avgLimiter.canExecute(limit)); - assertTrue(fixLimiter.canExecute(limit)); + assertEquals(0, avgLimiter.canExecute(limit)); + assertEquals(0, fixLimiter.canExecute(limit)); // Make sure that available is Long.MAX_VALUE - assertTrue(limit == avgLimiter.getAvailable()); - assertTrue(limit == fixLimiter.getAvailable()); + assertEquals(limit, avgLimiter.getAvailable()); + assertEquals(limit, fixLimiter.getAvailable()); EnvironmentEdgeManager.reset(); } @@ -413,19 +412,19 @@ public void testLimiterCompensationOverflow() throws InterruptedException { assertEquals(limit, avgLimiter.getAvailable()); // The initial guess is that 100 bytes. - assertTrue(avgLimiter.canExecute(guessNumber)); + assertEquals(0, avgLimiter.canExecute(guessNumber)); avgLimiter.consume(guessNumber); // Make sure that available is whatever left - assertTrue((limit - guessNumber) == avgLimiter.getAvailable()); + assertEquals((limit - guessNumber), avgLimiter.getAvailable()); // Manually set avil to simulate that another thread call canExecute(). // It is simulated by consume(). avgLimiter.consume(-80); - assertTrue((limit - guessNumber + 80) == avgLimiter.getAvailable()); + assertEquals((limit - guessNumber + 80), avgLimiter.getAvailable()); // Now thread1 compensates 80 avgLimiter.consume(-80); - assertTrue(limit == avgLimiter.getAvailable()); + assertEquals(limit, avgLimiter.getAvailable()); } } From 11cd56cc1f3777edaf3f94b2165781e2d60b34c2 Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Thu, 15 Feb 2024 15:22:41 -0500 Subject: [PATCH 2/4] cleanup --- .../java/org/apache/hadoop/hbase/quotas/RateLimiter.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java index 5cd37b26d160..b3835ad86d50 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java @@ -148,11 +148,10 @@ public long canExecute() { */ public synchronized long canExecute(final long amount) { assert amount >= 0; - long waitInterval = waitInterval(amount); - if (isAvailable(amount) || waitInterval == 0) { - return 0; + if (!isAvailable(amount)) { + return waitInterval(amount); } - return waitInterval; + return 0; } /** From abe1a329b45b124fc69e83049166495765526cb2 Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Fri, 16 Feb 2024 14:01:52 -0500 Subject: [PATCH 3/4] fix underrepresented read availability --- .../org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java index 4b89e18a8021..baf08b25d872 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java @@ -131,7 +131,8 @@ public void close() { @Override public long getReadAvailable() { - return readAvailable; + // at this point we've grabbed some quota, so we should use at least that + return Math.max(readAvailable, readConsumed); } @Override From 6d4abbf35f171a180e0961ecb9edd0a0fb33c37d Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Sun, 18 Feb 2024 10:19:48 -0500 Subject: [PATCH 4/4] PR feedback: new method, no private sync, naming --- .../hbase/quotas/DefaultOperationQuota.java | 8 ++- .../hbase/quotas/NoopOperationQuota.java | 5 ++ .../hadoop/hbase/quotas/OperationQuota.java | 11 ++++ .../hadoop/hbase/quotas/RateLimiter.java | 18 +++---- .../hadoop/hbase/quotas/TimeBasedLimiter.java | 20 +++---- .../hbase/regionserver/RSRpcServices.java | 4 +- .../hadoop/hbase/quotas/TestRateLimiter.java | 54 +++++++++---------- 7 files changed, 69 insertions(+), 51 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java index baf08b25d872..a4ff8b2a859e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java @@ -131,8 +131,12 @@ public void close() { @Override public long getReadAvailable() { - // at this point we've grabbed some quota, so we should use at least that - return Math.max(readAvailable, readConsumed); + return readAvailable; + } + + @Override + public long getReadConsumed() { + return readConsumed; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java index 71fc169d671f..b64429d9adc8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java @@ -68,4 +68,9 @@ public void addMutation(final Mutation mutation) { public long getReadAvailable() { return Long.MAX_VALUE; } + + @Override + public long getReadConsumed() { + return 0L; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java index ffc3cd50825c..bedad5e98673 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java @@ -80,4 +80,15 @@ public enum OperationType { /** Returns the number of bytes available to read to avoid exceeding the quota */ long getReadAvailable(); + + /** Returns the number of bytes consumed from the quota by the operation */ + long getReadConsumed(); + + /** + * Returns the maximum result size to be returned by the given operation. This is the greater of + * two numbers: the bytes available, or the bytes already consumed + */ + default long getMaxResultSize() { + return Math.max(getReadAvailable(), getReadConsumed()); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java index b3835ad86d50..5c69ad5d6cd5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java @@ -23,12 +23,10 @@ /** * Simple rate limiter. Usage Example: // At this point you have a unlimited resource limiter - * RateLimiter limiter = new AverageIntervalRateLimiter(); or new FixedIntervalRateLimiter(); - * limiter.set(10, TimeUnit.SECONDS); // set 10 resources/sec while (true) { // call canExecute - * before performing resource consuming operation bool canExecute = limiter.canExecute(); // If - * there are no available resources, wait until one is available if (!canExecute) - * Thread.sleep(limiter.waitInterval()); // ...execute the work and consume the resource... - * limiter.consume(); } + * RateLimiter limiter = new AverageIntervalRateLimiter(); // or new FixedIntervalRateLimiter(); + * limiter.set(10, TimeUnit.SECONDS); // set 10 resources/sec while (limiter.getWaitIntervalMs > 0) + * { // wait until waitInterval == 0 Thread.sleep(limiter.getWaitIntervalMs()); } // ...execute the + * work and consume the resource... limiter.consume(); */ @InterfaceAudience.Private @InterfaceStability.Evolving @@ -137,8 +135,8 @@ protected synchronized long getTimeUnitInMillis() { * Is there at least one resource available to allow execution? * @return the waitInterval to backoff, or 0 if execution is allowed */ - public long canExecute() { - return canExecute(1); + public long getWaitIntervalMs() { + return getWaitIntervalMs(1); } /** @@ -146,7 +144,7 @@ public long canExecute() { * @param amount the number of required resources, a non-negative number * @return the waitInterval to backoff, or 0 if execution is allowed */ - public synchronized long canExecute(final long amount) { + public synchronized long getWaitIntervalMs(final long amount) { assert amount >= 0; if (!isAvailable(amount)) { return waitInterval(amount); @@ -159,7 +157,7 @@ public synchronized long canExecute(final long amount) { * @param amount the number of required resources, a non-negative number * @return true if there are enough available resources, otherwise false */ - private synchronized boolean isAvailable(final long amount) { + private boolean isAvailable(final long amount) { if (isBypass()) { return true; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java index 025f6aa5fa71..8ae2cae01881 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java @@ -141,45 +141,45 @@ private static void setFromTimedQuota(final RateLimiter limiter, final TimedQuot public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, long estimateReadSize, long estimateWriteCapacityUnit, long estimateReadCapacityUnit) throws RpcThrottlingException { - long waitInterval = reqsLimiter.canExecute(writeReqs + readReqs); + long waitInterval = reqsLimiter.getWaitIntervalMs(writeReqs + readReqs); if (waitInterval > 0) { RpcThrottlingException.throwNumRequestsExceeded(waitInterval); } - waitInterval = reqSizeLimiter.canExecute(estimateWriteSize + estimateReadSize); + waitInterval = reqSizeLimiter.getWaitIntervalMs(estimateWriteSize + estimateReadSize); if (waitInterval > 0) { RpcThrottlingException.throwRequestSizeExceeded(waitInterval); } - waitInterval = - reqCapacityUnitLimiter.canExecute(estimateWriteCapacityUnit + estimateReadCapacityUnit); + waitInterval = reqCapacityUnitLimiter + .getWaitIntervalMs(estimateWriteCapacityUnit + estimateReadCapacityUnit); if (waitInterval > 0) { RpcThrottlingException.throwRequestCapacityUnitExceeded(waitInterval); } if (estimateWriteSize > 0) { - waitInterval = writeReqsLimiter.canExecute(writeReqs); + waitInterval = writeReqsLimiter.getWaitIntervalMs(writeReqs); if (waitInterval > 0) { RpcThrottlingException.throwNumWriteRequestsExceeded(waitInterval); } - waitInterval = writeSizeLimiter.canExecute(estimateWriteSize); + waitInterval = writeSizeLimiter.getWaitIntervalMs(estimateWriteSize); if (waitInterval > 0) { RpcThrottlingException.throwWriteSizeExceeded(waitInterval); } - waitInterval = writeCapacityUnitLimiter.canExecute(estimateWriteCapacityUnit); + waitInterval = writeCapacityUnitLimiter.getWaitIntervalMs(estimateWriteCapacityUnit); if (waitInterval > 0) { RpcThrottlingException.throwWriteCapacityUnitExceeded(waitInterval); } } if (estimateReadSize > 0) { - waitInterval = readReqsLimiter.canExecute(readReqs); + waitInterval = readReqsLimiter.getWaitIntervalMs(readReqs); if (waitInterval > 0) { RpcThrottlingException.throwNumReadRequestsExceeded(waitInterval); } - waitInterval = readSizeLimiter.canExecute(estimateReadSize); + waitInterval = readSizeLimiter.getWaitIntervalMs(estimateReadSize); if (waitInterval > 0) { RpcThrottlingException.throwReadSizeExceeded(waitInterval); } - waitInterval = readCapacityUnitLimiter.canExecute(estimateReadCapacityUnit); + waitInterval = readCapacityUnitLimiter.getWaitIntervalMs(estimateReadCapacityUnit); if (waitInterval > 0) { RpcThrottlingException.throwReadCapacityUnitExceeded(waitInterval); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 0538b9706e89..69d98048fc71 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -716,7 +716,7 @@ private List doNonAtomicRegionMutation(final HRegion region, // doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are // deferred/batched List mutations = null; - long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); + long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize()); IOException sizeIOE = null; ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = ResultOrException.newBuilder(); @@ -3609,7 +3609,7 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque } RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); // now let's do the real scan. - long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); + long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize()); RegionScanner scanner = rsh.s; // this is the limit of rows for this scan, if we the number of rows reach this value, we will // close the scanner. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java index 16d1025b88a8..ae9b96d7a6c7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java @@ -70,7 +70,7 @@ private void testWaitInterval(final TimeUnit timeUnit, final long limit, // consume all the available resources, one request at the time. // the wait interval should be 0 for (int i = 0; i < (limit - 1); ++i) { - assertEquals(0, limiter.canExecute()); + assertEquals(0, limiter.getWaitIntervalMs()); limiter.consume(); long waitInterval = limiter.waitInterval(); assertEquals(0, waitInterval); @@ -80,7 +80,7 @@ private void testWaitInterval(final TimeUnit timeUnit, final long limit, // There is one resource available, so we should be able to // consume it without waiting. limiter.setNextRefillTime(limiter.getNextRefillTime() - nowTs); - assertEquals(0, limiter.canExecute()); + assertEquals(0, limiter.getWaitIntervalMs()); assertEquals(0, limiter.waitInterval()); limiter.consume(); // No more resources are available, we should wait for at least an interval. @@ -93,7 +93,7 @@ private void testWaitInterval(final TimeUnit timeUnit, final long limit, // artificially go into the past to prove that when too early we should fail. long temp = nowTs + 500; limiter.setNextRefillTime(limiter.getNextRefillTime() + temp); - assertNotEquals(0, limiter.canExecute()); + assertNotEquals(0, limiter.getWaitIntervalMs()); // Roll back the nextRefillTime set to continue further testing limiter.setNextRefillTime(limiter.getNextRefillTime() - temp); } @@ -106,7 +106,7 @@ public void testOverconsumptionAverageIntervalRefillStrategy() { // 10 resources are available, but we need to consume 20 resources // Verify that we have to wait at least 1.1sec to have 1 resource available - assertEquals(0, limiter.canExecute()); + assertEquals(0, limiter.getWaitIntervalMs()); limiter.consume(20); // We consumed twice the quota. Need to wait 1s to get back to 0, then another 100ms for the 1 assertEquals(1100, limiter.waitInterval(1)); @@ -115,10 +115,10 @@ public void testOverconsumptionAverageIntervalRefillStrategy() { // Verify that after 1sec we need to wait for another 0.1sec to get a resource available limiter.setNextRefillTime(limiter.getNextRefillTime() - 1000); - assertNotEquals(0, limiter.canExecute(1)); + assertNotEquals(0, limiter.getWaitIntervalMs(1)); limiter.setNextRefillTime(limiter.getNextRefillTime() - 100); // We've waited the full 1.1sec, should now have 1 available - assertEquals(0, limiter.canExecute(1)); + assertEquals(0, limiter.getWaitIntervalMs(1)); assertEquals(0, limiter.waitInterval()); } @@ -137,7 +137,7 @@ public long currentTime() { } }; EnvironmentEdgeManager.injectEdge(edge); - assertEquals(0, limiter.canExecute()); + assertEquals(0, limiter.getWaitIntervalMs()); // 10 resources are available, but we need to consume 20 resources limiter.consume(20); // We over-consumed by 10. Since this is a fixed interval refill, where @@ -148,10 +148,10 @@ public long currentTime() { // Verify that after 1sec also no resource should be available limiter.setNextRefillTime(limiter.getNextRefillTime() - 1000); - assertNotEquals(0, limiter.canExecute()); + assertNotEquals(0, limiter.getWaitIntervalMs()); // Verify that after total 2sec the 10 resource is available limiter.setNextRefillTime(limiter.getNextRefillTime() - 1000); - assertEquals(0, limiter.canExecute()); + assertEquals(0, limiter.getWaitIntervalMs()); assertEquals(0, limiter.waitInterval()); } @@ -160,12 +160,12 @@ public void testFixedIntervalResourceAvailability() throws Exception { RateLimiter limiter = new FixedIntervalRateLimiter(); limiter.set(10, TimeUnit.SECONDS); - assertEquals(0, limiter.canExecute(10)); + assertEquals(0, limiter.getWaitIntervalMs(10)); limiter.consume(3); assertEquals(7, limiter.getAvailable()); - assertNotEquals(0, limiter.canExecute(10)); + assertNotEquals(0, limiter.getWaitIntervalMs(10)); limiter.setNextRefillTime(limiter.getNextRefillTime() - 1000); - assertEquals(0, limiter.canExecute(10)); + assertEquals(0, limiter.getWaitIntervalMs(10)); assertEquals(10, limiter.getAvailable()); } @@ -181,7 +181,7 @@ public void testLimiterBySmallerRate() throws InterruptedException { limiter.setNextRefillTime(limiter.getNextRefillTime() - 500); for (int i = 0; i < 3; i++) { // 6 resources/sec < limit, so limiter.canExecute(nowTs, lastTs) should be true - assertEquals(limiter.canExecute(), 0); + assertEquals(limiter.getWaitIntervalMs(), 0); limiter.consume(); } } @@ -236,7 +236,7 @@ public int testCanExecuteByRate(RateLimiter limiter, int rate) { int count = 0; while ((request++) < rate) { limiter.setNextRefillTime(limiter.getNextRefillTime() - limiter.getTimeUnitInMillis() / rate); - if (limiter.canExecute() == 0) { + if (limiter.getWaitIntervalMs() == 0) { count++; limiter.consume(); } @@ -316,10 +316,10 @@ public void testUnconfiguredLimiters() throws InterruptedException { assertEquals(limit, avgLimiter.getAvailable()); assertEquals(limit, fixLimiter.getAvailable()); - assertEquals(0, avgLimiter.canExecute(limit)); + assertEquals(0, avgLimiter.getWaitIntervalMs(limit)); avgLimiter.consume(limit); - assertEquals(0, fixLimiter.canExecute(limit)); + assertEquals(0, fixLimiter.getWaitIntervalMs(limit)); fixLimiter.consume(limit); // Make sure that available is Long.MAX_VALUE @@ -329,10 +329,10 @@ public void testUnconfiguredLimiters() throws InterruptedException { // after 100 millseconds, it should be able to execute limit as well testEdge.incValue(100); - assertEquals(0, avgLimiter.canExecute(limit)); + assertEquals(0, avgLimiter.getWaitIntervalMs(limit)); avgLimiter.consume(limit); - assertEquals(0, fixLimiter.canExecute(limit)); + assertEquals(0, fixLimiter.getWaitIntervalMs(limit)); fixLimiter.consume(limit); // Make sure that available is Long.MAX_VALUE @@ -357,10 +357,10 @@ public void testExtremeLimiters() throws InterruptedException { assertEquals(limit, avgLimiter.getAvailable()); assertEquals(limit, fixLimiter.getAvailable()); - assertEquals(0, avgLimiter.canExecute(limit / 2)); + assertEquals(0, avgLimiter.getWaitIntervalMs(limit / 2)); avgLimiter.consume(limit / 2); - assertEquals(0, fixLimiter.canExecute(limit / 2)); + assertEquals(0, fixLimiter.getWaitIntervalMs(limit / 2)); fixLimiter.consume(limit / 2); // Make sure that available is whatever left @@ -370,13 +370,13 @@ public void testExtremeLimiters() throws InterruptedException { // after 100 millseconds, both should not be able to execute the limit testEdge.incValue(100); - assertNotEquals(0, avgLimiter.canExecute(limit)); - assertNotEquals(0, fixLimiter.canExecute(limit)); + assertNotEquals(0, avgLimiter.getWaitIntervalMs(limit)); + assertNotEquals(0, fixLimiter.getWaitIntervalMs(limit)); // after 500 millseconds, average interval limiter should be able to execute the limit testEdge.incValue(500); - assertEquals(0, avgLimiter.canExecute(limit)); - assertNotEquals(0, fixLimiter.canExecute(limit)); + assertEquals(0, avgLimiter.getWaitIntervalMs(limit)); + assertNotEquals(0, fixLimiter.getWaitIntervalMs(limit)); // Make sure that available is correct assertEquals(limit, avgLimiter.getAvailable()); @@ -384,8 +384,8 @@ public void testExtremeLimiters() throws InterruptedException { // after 500 millseconds, both should be able to execute testEdge.incValue(500); - assertEquals(0, avgLimiter.canExecute(limit)); - assertEquals(0, fixLimiter.canExecute(limit)); + assertEquals(0, avgLimiter.getWaitIntervalMs(limit)); + assertEquals(0, fixLimiter.getWaitIntervalMs(limit)); // Make sure that available is Long.MAX_VALUE assertEquals(limit, avgLimiter.getAvailable()); @@ -412,7 +412,7 @@ public void testLimiterCompensationOverflow() throws InterruptedException { assertEquals(limit, avgLimiter.getAvailable()); // The initial guess is that 100 bytes. - assertEquals(0, avgLimiter.canExecute(guessNumber)); + assertEquals(0, avgLimiter.getWaitIntervalMs(guessNumber)); avgLimiter.consume(guessNumber); // Make sure that available is whatever left