diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java index 4b89e18a8021..a4ff8b2a859e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java @@ -134,6 +134,11 @@ public long getReadAvailable() { return readAvailable; } + @Override + public long getReadConsumed() { + return readConsumed; + } + @Override public void addGetResult(final Result result) { operationSize[OperationType.GET.ordinal()] += QuotaUtil.calculateResultSize(result); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java index 71fc169d671f..b64429d9adc8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java @@ -68,4 +68,9 @@ public void addMutation(final Mutation mutation) { public long getReadAvailable() { return Long.MAX_VALUE; } + + @Override + public long getReadConsumed() { + return 0L; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java index ffc3cd50825c..bedad5e98673 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java @@ -80,4 +80,15 @@ public enum OperationType { /** Returns the number of bytes available to read to avoid exceeding the quota */ long getReadAvailable(); + + /** Returns the number of bytes consumed from the quota by the operation */ + long getReadConsumed(); + + /** + * Returns the maximum result size to be returned by the given operation. This is the greater of + * two numbers: the bytes available, or the bytes already consumed + */ + default long getMaxResultSize() { + return Math.max(getReadAvailable(), getReadConsumed()); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java index bda60ffa690a..5c69ad5d6cd5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java @@ -23,12 +23,10 @@ /** * Simple rate limiter. Usage Example: // At this point you have a unlimited resource limiter - * RateLimiter limiter = new AverageIntervalRateLimiter(); or new FixedIntervalRateLimiter(); - * limiter.set(10, TimeUnit.SECONDS); // set 10 resources/sec while (true) { // call canExecute - * before performing resource consuming operation bool canExecute = limiter.canExecute(); // If - * there are no available resources, wait until one is available if (!canExecute) - * Thread.sleep(limiter.waitInterval()); // ...execute the work and consume the resource... - * limiter.consume(); } + * RateLimiter limiter = new AverageIntervalRateLimiter(); // or new FixedIntervalRateLimiter(); + * limiter.set(10, TimeUnit.SECONDS); // set 10 resources/sec while (limiter.getWaitIntervalMs > 0) + * { // wait until waitInterval == 0 Thread.sleep(limiter.getWaitIntervalMs()); } // ...execute the + * work and consume the resource... limiter.consume(); */ @InterfaceAudience.Private @InterfaceStability.Evolving @@ -135,10 +133,23 @@ protected synchronized long getTimeUnitInMillis() { /** * Is there at least one resource available to allow execution? - * @return true if there is at least one resource available, otherwise false + * @return the waitInterval to backoff, or 0 if execution is allowed */ - public boolean canExecute() { - return canExecute(1); + public long getWaitIntervalMs() { + return getWaitIntervalMs(1); + } + + /** + * Are there enough available resources to allow execution? + * @param amount the number of required resources, a non-negative number + * @return the waitInterval to backoff, or 0 if execution is allowed + */ + public synchronized long getWaitIntervalMs(final long amount) { + assert amount >= 0; + if (!isAvailable(amount)) { + return waitInterval(amount); + } + return 0; } /** @@ -146,7 +157,7 @@ public boolean canExecute() { * @param amount the number of required resources, a non-negative number * @return true if there are enough available resources, otherwise false */ - public synchronized boolean canExecute(final long amount) { + private boolean isAvailable(final long amount) { if (isBypass()) { return true; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java index d7eb0e537a39..8ae2cae01881 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java @@ -141,43 +141,47 @@ private static void setFromTimedQuota(final RateLimiter limiter, final TimedQuot public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, long estimateReadSize, long estimateWriteCapacityUnit, long estimateReadCapacityUnit) throws RpcThrottlingException { - if (!reqsLimiter.canExecute(writeReqs + readReqs)) { - RpcThrottlingException.throwNumRequestsExceeded(reqsLimiter.waitInterval()); + long waitInterval = reqsLimiter.getWaitIntervalMs(writeReqs + readReqs); + if (waitInterval > 0) { + RpcThrottlingException.throwNumRequestsExceeded(waitInterval); } - if (!reqSizeLimiter.canExecute(estimateWriteSize + estimateReadSize)) { - RpcThrottlingException.throwRequestSizeExceeded( - reqSizeLimiter.waitInterval(estimateWriteSize + estimateReadSize)); + waitInterval = reqSizeLimiter.getWaitIntervalMs(estimateWriteSize + estimateReadSize); + if (waitInterval > 0) { + RpcThrottlingException.throwRequestSizeExceeded(waitInterval); } - if (!reqCapacityUnitLimiter.canExecute(estimateWriteCapacityUnit + estimateReadCapacityUnit)) { - RpcThrottlingException.throwRequestCapacityUnitExceeded( - reqCapacityUnitLimiter.waitInterval(estimateWriteCapacityUnit + estimateReadCapacityUnit)); + waitInterval = reqCapacityUnitLimiter + .getWaitIntervalMs(estimateWriteCapacityUnit + estimateReadCapacityUnit); + if (waitInterval > 0) { + RpcThrottlingException.throwRequestCapacityUnitExceeded(waitInterval); } if (estimateWriteSize > 0) { - if (!writeReqsLimiter.canExecute(writeReqs)) { - RpcThrottlingException.throwNumWriteRequestsExceeded(writeReqsLimiter.waitInterval()); + waitInterval = writeReqsLimiter.getWaitIntervalMs(writeReqs); + if (waitInterval > 0) { + RpcThrottlingException.throwNumWriteRequestsExceeded(waitInterval); } - if (!writeSizeLimiter.canExecute(estimateWriteSize)) { - RpcThrottlingException - .throwWriteSizeExceeded(writeSizeLimiter.waitInterval(estimateWriteSize)); + waitInterval = writeSizeLimiter.getWaitIntervalMs(estimateWriteSize); + if (waitInterval > 0) { + RpcThrottlingException.throwWriteSizeExceeded(waitInterval); } - if (!writeCapacityUnitLimiter.canExecute(estimateWriteCapacityUnit)) { - RpcThrottlingException.throwWriteCapacityUnitExceeded( - writeCapacityUnitLimiter.waitInterval(estimateWriteCapacityUnit)); + waitInterval = writeCapacityUnitLimiter.getWaitIntervalMs(estimateWriteCapacityUnit); + if (waitInterval > 0) { + RpcThrottlingException.throwWriteCapacityUnitExceeded(waitInterval); } } if (estimateReadSize > 0) { - if (!readReqsLimiter.canExecute(readReqs)) { - RpcThrottlingException.throwNumReadRequestsExceeded(readReqsLimiter.waitInterval()); + waitInterval = readReqsLimiter.getWaitIntervalMs(readReqs); + if (waitInterval > 0) { + RpcThrottlingException.throwNumReadRequestsExceeded(waitInterval); } - if (!readSizeLimiter.canExecute(estimateReadSize)) { - RpcThrottlingException - .throwReadSizeExceeded(readSizeLimiter.waitInterval(estimateReadSize)); + waitInterval = readSizeLimiter.getWaitIntervalMs(estimateReadSize); + if (waitInterval > 0) { + RpcThrottlingException.throwReadSizeExceeded(waitInterval); } - if (!readCapacityUnitLimiter.canExecute(estimateReadCapacityUnit)) { - RpcThrottlingException.throwReadCapacityUnitExceeded( - readCapacityUnitLimiter.waitInterval(estimateReadCapacityUnit)); + waitInterval = readCapacityUnitLimiter.getWaitIntervalMs(estimateReadCapacityUnit); + if (waitInterval > 0) { + RpcThrottlingException.throwReadCapacityUnitExceeded(waitInterval); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 4926aa30c8a4..25b229fd0c4e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -716,7 +716,7 @@ private List doNonAtomicRegionMutation(final HRegion region, // doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are // deferred/batched List mutations = null; - long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); + long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize()); IOException sizeIOE = null; ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = ResultOrException.newBuilder(); @@ -3611,7 +3611,7 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque } RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); // now let's do the real scan. - long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); + long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize()); RegionScanner scanner = rsh.s; // this is the limit of rows for this scan, if we the number of rows reach this value, we will // close the scanner. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java index e27ba123381c..5de9a2d1a900 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java @@ -103,9 +103,9 @@ public void testBBSGet() throws Exception { doPuts(10_000, FAMILY, QUALIFIER, table); TEST_UTIL.flush(TABLE_NAME); - // Add ~10 block/min limit + // Add ~10 block/sec limit admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.READ_SIZE, - Math.round(10.1 * blockSize), TimeUnit.MINUTES)); + Math.round(10.1 * blockSize), TimeUnit.SECONDS)); triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); // should execute at max 10 requests @@ -132,10 +132,10 @@ public void testBBSScan() throws Exception { doPuts(10_000, FAMILY, QUALIFIER, table); TEST_UTIL.flush(TABLE_NAME); - // Add 1 block/min limit. + // Add 1 block/sec limit. // This should only allow 1 scan per minute, because we estimate 1 block per scan admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, blockSize, - TimeUnit.MINUTES)); + TimeUnit.SECONDS)); triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); waitMinuteQuota(); @@ -148,9 +148,9 @@ public void testBBSScan() throws Exception { testTraffic(() -> doScans(100, table), 100, 0); testTraffic(() -> doScans(100, table), 100, 0); - // Add ~3 block/min limit. This should support >1 scans + // Add ~3 block/sec limit. This should support >1 scans admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, - Math.round(3.1 * blockSize), TimeUnit.MINUTES)); + Math.round(3.1 * blockSize), TimeUnit.SECONDS)); triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); // should execute some requests, but not all @@ -174,10 +174,10 @@ public void testBBSMultiGet() throws Exception { doPuts(rowCount, FAMILY, QUALIFIER, table); TEST_UTIL.flush(TABLE_NAME); - // Add 1 block/min limit. + // Add 1 block/sec limit. // This should only allow 1 multiget per minute, because we estimate 1 block per multiget admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, blockSize, - TimeUnit.MINUTES)); + TimeUnit.SECONDS)); triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); waitMinuteQuota(); @@ -190,9 +190,9 @@ public void testBBSMultiGet() throws Exception { testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0); testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0); - // Add ~100 block/min limit + // Add ~100 block/sec limit admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, - Math.round(100.1 * blockSize), TimeUnit.MINUTES)); + Math.round(100.1 * blockSize), TimeUnit.SECONDS)); triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); // should execute approximately 10 batches of 10 requests @@ -211,7 +211,7 @@ public void testBBSMultiGet() throws Exception { private void testTraffic(Callable trafficCallable, long expectedSuccess, long marginOfError) throws Exception { - TEST_UTIL.waitFor(90_000, () -> { + TEST_UTIL.waitFor(5_000, () -> { long actualSuccess; try { actualSuccess = trafficCallable.call(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java index 49df937f7c5c..ae9b96d7a6c7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java @@ -18,8 +18,7 @@ package org.apache.hadoop.hbase.quotas; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNotEquals; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -71,7 +70,7 @@ private void testWaitInterval(final TimeUnit timeUnit, final long limit, // consume all the available resources, one request at the time. // the wait interval should be 0 for (int i = 0; i < (limit - 1); ++i) { - assertTrue(limiter.canExecute()); + assertEquals(0, limiter.getWaitIntervalMs()); limiter.consume(); long waitInterval = limiter.waitInterval(); assertEquals(0, waitInterval); @@ -81,7 +80,7 @@ private void testWaitInterval(final TimeUnit timeUnit, final long limit, // There is one resource available, so we should be able to // consume it without waiting. limiter.setNextRefillTime(limiter.getNextRefillTime() - nowTs); - assertTrue(limiter.canExecute()); + assertEquals(0, limiter.getWaitIntervalMs()); assertEquals(0, limiter.waitInterval()); limiter.consume(); // No more resources are available, we should wait for at least an interval. @@ -94,7 +93,7 @@ private void testWaitInterval(final TimeUnit timeUnit, final long limit, // artificially go into the past to prove that when too early we should fail. long temp = nowTs + 500; limiter.setNextRefillTime(limiter.getNextRefillTime() + temp); - assertFalse(limiter.canExecute()); + assertNotEquals(0, limiter.getWaitIntervalMs()); // Roll back the nextRefillTime set to continue further testing limiter.setNextRefillTime(limiter.getNextRefillTime() - temp); } @@ -107,7 +106,7 @@ public void testOverconsumptionAverageIntervalRefillStrategy() { // 10 resources are available, but we need to consume 20 resources // Verify that we have to wait at least 1.1sec to have 1 resource available - assertTrue(limiter.canExecute()); + assertEquals(0, limiter.getWaitIntervalMs()); limiter.consume(20); // We consumed twice the quota. Need to wait 1s to get back to 0, then another 100ms for the 1 assertEquals(1100, limiter.waitInterval(1)); @@ -116,10 +115,10 @@ public void testOverconsumptionAverageIntervalRefillStrategy() { // Verify that after 1sec we need to wait for another 0.1sec to get a resource available limiter.setNextRefillTime(limiter.getNextRefillTime() - 1000); - assertFalse(limiter.canExecute(1)); + assertNotEquals(0, limiter.getWaitIntervalMs(1)); limiter.setNextRefillTime(limiter.getNextRefillTime() - 100); // We've waited the full 1.1sec, should now have 1 available - assertTrue(limiter.canExecute(1)); + assertEquals(0, limiter.getWaitIntervalMs(1)); assertEquals(0, limiter.waitInterval()); } @@ -138,7 +137,7 @@ public long currentTime() { } }; EnvironmentEdgeManager.injectEdge(edge); - assertTrue(limiter.canExecute()); + assertEquals(0, limiter.getWaitIntervalMs()); // 10 resources are available, but we need to consume 20 resources limiter.consume(20); // We over-consumed by 10. Since this is a fixed interval refill, where @@ -149,10 +148,10 @@ public long currentTime() { // Verify that after 1sec also no resource should be available limiter.setNextRefillTime(limiter.getNextRefillTime() - 1000); - assertFalse(limiter.canExecute()); + assertNotEquals(0, limiter.getWaitIntervalMs()); // Verify that after total 2sec the 10 resource is available limiter.setNextRefillTime(limiter.getNextRefillTime() - 1000); - assertTrue(limiter.canExecute()); + assertEquals(0, limiter.getWaitIntervalMs()); assertEquals(0, limiter.waitInterval()); } @@ -161,12 +160,12 @@ public void testFixedIntervalResourceAvailability() throws Exception { RateLimiter limiter = new FixedIntervalRateLimiter(); limiter.set(10, TimeUnit.SECONDS); - assertTrue(limiter.canExecute(10)); + assertEquals(0, limiter.getWaitIntervalMs(10)); limiter.consume(3); assertEquals(7, limiter.getAvailable()); - assertFalse(limiter.canExecute(10)); + assertNotEquals(0, limiter.getWaitIntervalMs(10)); limiter.setNextRefillTime(limiter.getNextRefillTime() - 1000); - assertTrue(limiter.canExecute(10)); + assertEquals(0, limiter.getWaitIntervalMs(10)); assertEquals(10, limiter.getAvailable()); } @@ -182,7 +181,7 @@ public void testLimiterBySmallerRate() throws InterruptedException { limiter.setNextRefillTime(limiter.getNextRefillTime() - 500); for (int i = 0; i < 3; i++) { // 6 resources/sec < limit, so limiter.canExecute(nowTs, lastTs) should be true - assertEquals(true, limiter.canExecute()); + assertEquals(limiter.getWaitIntervalMs(), 0); limiter.consume(); } } @@ -237,7 +236,7 @@ public int testCanExecuteByRate(RateLimiter limiter, int rate) { int count = 0; while ((request++) < rate) { limiter.setNextRefillTime(limiter.getNextRefillTime() - limiter.getTimeUnitInMillis() / rate); - if (limiter.canExecute()) { + if (limiter.getWaitIntervalMs() == 0) { count++; limiter.consume(); } @@ -317,28 +316,28 @@ public void testUnconfiguredLimiters() throws InterruptedException { assertEquals(limit, avgLimiter.getAvailable()); assertEquals(limit, fixLimiter.getAvailable()); - assertTrue(avgLimiter.canExecute(limit)); + assertEquals(0, avgLimiter.getWaitIntervalMs(limit)); avgLimiter.consume(limit); - assertTrue(fixLimiter.canExecute(limit)); + assertEquals(0, fixLimiter.getWaitIntervalMs(limit)); fixLimiter.consume(limit); // Make sure that available is Long.MAX_VALUE - assertTrue(limit == avgLimiter.getAvailable()); - assertTrue(limit == fixLimiter.getAvailable()); + assertEquals(limit, avgLimiter.getAvailable()); + assertEquals(limit, fixLimiter.getAvailable()); // after 100 millseconds, it should be able to execute limit as well testEdge.incValue(100); - assertTrue(avgLimiter.canExecute(limit)); + assertEquals(0, avgLimiter.getWaitIntervalMs(limit)); avgLimiter.consume(limit); - assertTrue(fixLimiter.canExecute(limit)); + assertEquals(0, fixLimiter.getWaitIntervalMs(limit)); fixLimiter.consume(limit); // Make sure that available is Long.MAX_VALUE - assertTrue(limit == avgLimiter.getAvailable()); - assertTrue(limit == fixLimiter.getAvailable()); + assertEquals(limit, avgLimiter.getAvailable()); + assertEquals(limit, fixLimiter.getAvailable()); EnvironmentEdgeManager.reset(); } @@ -358,39 +357,39 @@ public void testExtremeLimiters() throws InterruptedException { assertEquals(limit, avgLimiter.getAvailable()); assertEquals(limit, fixLimiter.getAvailable()); - assertTrue(avgLimiter.canExecute(limit / 2)); + assertEquals(0, avgLimiter.getWaitIntervalMs(limit / 2)); avgLimiter.consume(limit / 2); - assertTrue(fixLimiter.canExecute(limit / 2)); + assertEquals(0, fixLimiter.getWaitIntervalMs(limit / 2)); fixLimiter.consume(limit / 2); // Make sure that available is whatever left - assertTrue((limit - (limit / 2)) == avgLimiter.getAvailable()); - assertTrue((limit - (limit / 2)) == fixLimiter.getAvailable()); + assertEquals((limit - (limit / 2)), avgLimiter.getAvailable()); + assertEquals((limit - (limit / 2)), fixLimiter.getAvailable()); // after 100 millseconds, both should not be able to execute the limit testEdge.incValue(100); - assertFalse(avgLimiter.canExecute(limit)); - assertFalse(fixLimiter.canExecute(limit)); + assertNotEquals(0, avgLimiter.getWaitIntervalMs(limit)); + assertNotEquals(0, fixLimiter.getWaitIntervalMs(limit)); // after 500 millseconds, average interval limiter should be able to execute the limit testEdge.incValue(500); - assertTrue(avgLimiter.canExecute(limit)); - assertFalse(fixLimiter.canExecute(limit)); + assertEquals(0, avgLimiter.getWaitIntervalMs(limit)); + assertNotEquals(0, fixLimiter.getWaitIntervalMs(limit)); // Make sure that available is correct - assertTrue(limit == avgLimiter.getAvailable()); - assertTrue((limit - (limit / 2)) == fixLimiter.getAvailable()); + assertEquals(limit, avgLimiter.getAvailable()); + assertEquals((limit - (limit / 2)), fixLimiter.getAvailable()); // after 500 millseconds, both should be able to execute testEdge.incValue(500); - assertTrue(avgLimiter.canExecute(limit)); - assertTrue(fixLimiter.canExecute(limit)); + assertEquals(0, avgLimiter.getWaitIntervalMs(limit)); + assertEquals(0, fixLimiter.getWaitIntervalMs(limit)); // Make sure that available is Long.MAX_VALUE - assertTrue(limit == avgLimiter.getAvailable()); - assertTrue(limit == fixLimiter.getAvailable()); + assertEquals(limit, avgLimiter.getAvailable()); + assertEquals(limit, fixLimiter.getAvailable()); EnvironmentEdgeManager.reset(); } @@ -413,19 +412,19 @@ public void testLimiterCompensationOverflow() throws InterruptedException { assertEquals(limit, avgLimiter.getAvailable()); // The initial guess is that 100 bytes. - assertTrue(avgLimiter.canExecute(guessNumber)); + assertEquals(0, avgLimiter.getWaitIntervalMs(guessNumber)); avgLimiter.consume(guessNumber); // Make sure that available is whatever left - assertTrue((limit - guessNumber) == avgLimiter.getAvailable()); + assertEquals((limit - guessNumber), avgLimiter.getAvailable()); // Manually set avil to simulate that another thread call canExecute(). // It is simulated by consume(). avgLimiter.consume(-80); - assertTrue((limit - guessNumber + 80) == avgLimiter.getAvailable()); + assertEquals((limit - guessNumber + 80), avgLimiter.getAvailable()); // Now thread1 compensates 80 avgLimiter.consume(-80); - assertTrue(limit == avgLimiter.getAvailable()); + assertEquals(limit, avgLimiter.getAvailable()); } }