From cfee25a840f790381ea5049067f2a240808ba824 Mon Sep 17 00:00:00 2001 From: nziebart Date: Mon, 20 Nov 2017 13:53:24 +0000 Subject: [PATCH] [QoS] live reload (#2710) * live reload and logging * millis * checkpoint * fix tests * comments * checkstyle --- .../atlasdb/factory/TransactionManagers.java | 10 +-- qos-service-impl/build.gradle | 3 +- .../atlasdb/qos/client/AtlasDbQosClient.java | 9 +- .../atlasdb/qos/config/QosLimitsConfig.java | 8 +- .../atlasdb/qos/metrics/QosMetrics.java | 25 ++++++ .../atlasdb/qos/ratelimit/QosRateLimiter.java | 69 ++++++++++----- .../qos/ratelimit/QosRateLimiters.java | 12 +-- .../qos/client/AtlasDbQosClientTest.java | 25 +++++- .../qos/ratelimit/QosRateLimiterTest.java | 84 +++++++++++++++---- 9 files changed, 189 insertions(+), 56 deletions(-) diff --git a/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java index cf1450fb8eb..94359348491 100644 --- a/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java +++ b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java @@ -99,6 +99,7 @@ import com.palantir.atlasdb.transaction.service.TransactionService; import com.palantir.atlasdb.transaction.service.TransactionServices; import com.palantir.atlasdb.util.AtlasDbMetrics; +import com.palantir.atlasdb.util.JavaSuppliers; import com.palantir.leader.LeaderElectionService; import com.palantir.leader.PingableLeader; import com.palantir.leader.proxy.AwaitingLeadershipProxy; @@ -309,7 +310,7 @@ SerializableTransactionManager serializable() { java.util.function.Supplier runtimeConfigSupplier = () -> runtimeConfigSupplier().get().orElse(defaultRuntime); - QosClient qosClient = getQosClient(runtimeConfigSupplier.get().qos()); + QosClient qosClient = getQosClient(JavaSuppliers.compose(conf -> conf.qos(), runtimeConfigSupplier)); ServiceDiscoveringAtlasSupplier atlasFactory = new ServiceDiscoveringAtlasSupplier( @@ -403,11 +404,10 @@ SerializableTransactionManager serializable() { return transactionManager; } - private QosClient getQosClient(QosClientConfig config) { - // TODO(nziebart): create a RefreshingRateLimiter + private QosClient getQosClient(Supplier config) { QosRateLimiters rateLimiters = QosRateLimiters.create( - config.limits(), - config.maxBackoffSleepTime().toMilliseconds()); + JavaSuppliers.compose(conf -> conf.maxBackoffSleepTime().toMilliseconds(), config), + JavaSuppliers.compose(QosClientConfig::limits, config)); return AtlasDbQosClient.create(rateLimiters); } diff --git a/qos-service-impl/build.gradle b/qos-service-impl/build.gradle index 91fe725b45c..75c1f1db3d6 100644 --- a/qos-service-impl/build.gradle +++ b/qos-service-impl/build.gradle @@ -15,12 +15,13 @@ jacocoTestReport { check.dependsOn integTest dependencies { - compile (project(":qos-service-api")); compile (project(":atlasdb-client")) { exclude group: 'com.squareup.okhttp3' exclude group: 'com.google.guava' } + compile (project(":qos-service-api")); + processor project(":atlasdb-processors") processor group: 'org.immutables', name: 'value' testCompile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml' diff --git a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/client/AtlasDbQosClient.java b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/client/AtlasDbQosClient.java index 6e4624c8ee0..6e556655329 100644 --- a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/client/AtlasDbQosClient.java +++ b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/client/AtlasDbQosClient.java @@ -15,6 +15,7 @@ */ package com.palantir.atlasdb.qos.client; +import java.time.Duration; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -65,7 +66,13 @@ private T execute( QosRateLimiter rateLimiter, Consumer weightMetric) throws E { long estimatedNumBytes = weigher.estimate().numBytes(); - rateLimiter.consumeWithBackoff(estimatedNumBytes); + try { + Duration waitTime = rateLimiter.consumeWithBackoff(estimatedNumBytes); + metrics.recordBackoffMicros(TimeUnit.NANOSECONDS.toMicros(waitTime.toNanos())); + } catch (RuntimeException ex) { // TODO(nziebart): use rate limited exception here + metrics.recordRateLimitedException(); + throw ex; + } Stopwatch timer = Stopwatch.createStarted(ticker); diff --git a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/config/QosLimitsConfig.java b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/config/QosLimitsConfig.java index e932632ddaa..c1ed99ed922 100644 --- a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/config/QosLimitsConfig.java +++ b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/config/QosLimitsConfig.java @@ -29,13 +29,13 @@ public abstract class QosLimitsConfig { public static final QosLimitsConfig DEFAULT_NO_LIMITS = ImmutableQosLimitsConfig.builder().build(); @Value.Default - public int readBytesPerSecond() { - return Integer.MAX_VALUE; + public long readBytesPerSecond() { + return Long.MAX_VALUE; } @Value.Default - public int writeBytesPerSecond() { - return Integer.MAX_VALUE; + public long writeBytesPerSecond() { + return Long.MAX_VALUE; } } diff --git a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/metrics/QosMetrics.java b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/metrics/QosMetrics.java index d8ee536efb2..9d84764630e 100644 --- a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/metrics/QosMetrics.java +++ b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/metrics/QosMetrics.java @@ -16,12 +16,19 @@ package com.palantir.atlasdb.qos.metrics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.codahale.metrics.Meter; import com.palantir.atlasdb.qos.QueryWeight; import com.palantir.atlasdb.util.MetricsManager; +import com.palantir.logsafe.SafeArg; // TODO(nziebart): needs tests public class QosMetrics { + + private static final Logger log = LoggerFactory.getLogger(QosMetrics.class); + private final MetricsManager metricsManager = new MetricsManager(); private final Meter readRequestCount; @@ -34,6 +41,8 @@ public class QosMetrics { private final Meter writeTime; private final Meter rowsWritten; + private final Meter backoffTime; + private final Meter rateLimitedExceptions; public QosMetrics() { readRequestCount = metricsManager.registerMeter(QosMetrics.class, "numReadRequests"); @@ -45,6 +54,9 @@ public QosMetrics() { bytesWritten = metricsManager.registerMeter(QosMetrics.class, "bytesWritten"); writeTime = metricsManager.registerMeter(QosMetrics.class, "writeTime"); rowsWritten = metricsManager.registerMeter(QosMetrics.class, "rowsWritten"); + + backoffTime = metricsManager.registerMeter(QosMetrics.class, "backoffTime"); + rateLimitedExceptions = metricsManager.registerMeter(QosMetrics.class, "rateLimitedExceptions"); } public void recordRead(QueryWeight weight) { @@ -61,4 +73,17 @@ public void recordWrite(QueryWeight weight) { rowsWritten.mark(weight.numDistinctRows()); } + public void recordBackoffMicros(long backoffTimeMicros) { + if (backoffTimeMicros > 0) { + log.info("Backing off for {} micros", SafeArg.of("backoffTimeMicros", backoffTimeMicros)); + backoffTime.mark(backoffTimeMicros); + } + } + + public void recordRateLimitedException() { + log.info("Rate limit exceeded and backoff time would be more than the configured maximum. " + + "Throwing a throttling exception"); + rateLimitedExceptions.mark(); + } + } diff --git a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiter.java b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiter.java index cb7ab33a8c7..a6ae75bb1c3 100644 --- a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiter.java +++ b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiter.java @@ -19,11 +19,16 @@ import java.time.Duration; import java.util.Optional; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.primitives.Ints; import com.palantir.atlasdb.qos.ratelimit.guava.RateLimiter; import com.palantir.atlasdb.qos.ratelimit.guava.SmoothRateLimiter; +import com.palantir.logsafe.SafeArg; /** * A rate limiter for database queries, based on "units" of expense. This limiter strives to maintain an upper limit on @@ -35,31 +40,30 @@ */ public class QosRateLimiter { - private static final double MAX_BURST_SECONDS = 5; - private static final double UNLIMITED_RATE = Double.MAX_VALUE; + private static final Logger log = LoggerFactory.getLogger(QosRateLimiter.class); + + private static final long MAX_BURST_SECONDS = 5; + + private final Supplier maxBackoffTimeMillis; + private final Supplier unitsPerSecond; + private final RateLimiter.SleepingStopwatch stopwatch; - private final long maxBackoffTimeMillis; - private RateLimiter rateLimiter; + private volatile RateLimiter rateLimiter; + private volatile long currentRate; - public static QosRateLimiter create(long maxBackoffTimeMillis) { - return new QosRateLimiter(RateLimiter.SleepingStopwatch.createFromSystemTimer(), maxBackoffTimeMillis); + public static QosRateLimiter create(Supplier maxBackoffTimeMillis, Supplier unitsPerSecond) { + return new QosRateLimiter(RateLimiter.SleepingStopwatch.createFromSystemTimer(), maxBackoffTimeMillis, + unitsPerSecond); } @VisibleForTesting - QosRateLimiter(RateLimiter.SleepingStopwatch stopwatch, long maxBackoffTimeMillis) { - rateLimiter = new SmoothRateLimiter.SmoothBursty( - stopwatch, - MAX_BURST_SECONDS); - - rateLimiter.setRate(UNLIMITED_RATE); + QosRateLimiter(RateLimiter.SleepingStopwatch stopwatch, Supplier maxBackoffTimeMillis, + Supplier unitsPerSecond) { + this.stopwatch = stopwatch; + this.unitsPerSecond = unitsPerSecond; this.maxBackoffTimeMillis = maxBackoffTimeMillis; - } - /** - * Update the allowed rate, in units per second. - */ - public void updateRate(double unitsPerSecond) { - rateLimiter.setRate(unitsPerSecond); + createRateLimiterAtomically(); } /** @@ -69,9 +73,11 @@ public void updateRate(double unitsPerSecond) { * @return the amount of time slept for, if any */ public Duration consumeWithBackoff(long estimatedNumUnits) { + updateRateIfNeeded(); + Optional waitTime = rateLimiter.tryAcquire( Ints.saturatedCast(estimatedNumUnits), // TODO(nziebart): deal with longs - maxBackoffTimeMillis, + maxBackoffTimeMillis.get(), TimeUnit.MILLISECONDS); if (!waitTime.isPresent()) { @@ -81,6 +87,31 @@ public Duration consumeWithBackoff(long estimatedNumUnits) { return waitTime.get(); } + /** + * The RateLimiter's rate requires a lock acquisition to read, and is returned as a double. To avoid + * overhead and double comparisons, we maintain the current rate ourselves. + */ + private void updateRateIfNeeded() { + if (currentRate != unitsPerSecond.get()) { + createRateLimiterAtomically(); + } + } + + /** + * Guava's RateLimiter has strange behavior around updating the rate. Namely, if you set the rate very small and ask + * for a large number of permits, you will end up having to wait until that small rate is satisfied before acquiring + * more, even if you update the rate to something very large. So, we just create a new rate limiter if the rate + * changes. + */ + private synchronized void createRateLimiterAtomically() { + currentRate = unitsPerSecond.get(); + rateLimiter = new SmoothRateLimiter.SmoothBursty(stopwatch, MAX_BURST_SECONDS); + rateLimiter.setRate(currentRate); + + // TODO(nziebart): distinguish between read/write rate limiters + log.info("Units per second set to {}", SafeArg.of("unitsPerSecond", currentRate)); + } + /** * Records an adjustment to the original estimate of units consumed passed to {@link #consumeWithBackoff}. This * should be called after a query returns, when the exact number of units consumed is known. This value may be diff --git a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiters.java b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiters.java index a60696dc649..8b7d8753dd2 100644 --- a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiters.java +++ b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiters.java @@ -16,6 +16,8 @@ package com.palantir.atlasdb.qos.ratelimit; +import java.util.function.Supplier; + import org.immutables.value.Value; import com.palantir.atlasdb.qos.config.QosLimitsConfig; @@ -23,12 +25,12 @@ @Value.Immutable public interface QosRateLimiters { - static QosRateLimiters create(QosLimitsConfig config, long maxBackoffSleepTimeMillis) { - QosRateLimiter readLimiter = QosRateLimiter.create(maxBackoffSleepTimeMillis); - readLimiter.updateRate(config.readBytesPerSecond()); + static QosRateLimiters create(Supplier maxBackoffSleepTimeMillis, Supplier config) { + QosRateLimiter readLimiter = QosRateLimiter.create(maxBackoffSleepTimeMillis, + () -> config.get().readBytesPerSecond()); - QosRateLimiter writeLimiter = QosRateLimiter.create(maxBackoffSleepTimeMillis); - writeLimiter.updateRate(config.writeBytesPerSecond()); + QosRateLimiter writeLimiter = QosRateLimiter.create(maxBackoffSleepTimeMillis, + () -> config.get().writeBytesPerSecond()); return ImmutableQosRateLimiters.builder() .read(readLimiter) diff --git a/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/client/AtlasDbQosClientTest.java b/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/client/AtlasDbQosClientTest.java index 2e347874c22..588fe549380 100644 --- a/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/client/AtlasDbQosClientTest.java +++ b/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/client/AtlasDbQosClientTest.java @@ -25,6 +25,8 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import java.time.Duration; + import org.junit.Before; import org.junit.Test; @@ -76,6 +78,8 @@ public void setUp() { when(weigher.weighSuccess(any(), anyLong())).thenReturn(ACTUAL_WEIGHT); when(weigher.weighFailure(any(), anyLong())).thenReturn(ACTUAL_WEIGHT); + when(readLimiter.consumeWithBackoff(anyLong())).thenReturn(Duration.ZERO); + when(writeLimiter.consumeWithBackoff(anyLong())).thenReturn(Duration.ZERO); } @Test @@ -92,7 +96,6 @@ public void recordsReadMetrics() throws TestCheckedException { qosClient.executeRead(() -> "foo", weigher); verify(metrics).recordRead(ACTUAL_WEIGHT); - verifyNoMoreInteractions(metrics); } @Test @@ -116,7 +119,6 @@ public void recordsWriteMetrics() throws TestCheckedException { qosClient.executeWrite(() -> null, weigher); verify(metrics).recordWrite(ACTUAL_WEIGHT); - verifyNoMoreInteractions(metrics); } @Test @@ -127,7 +129,6 @@ public void recordsReadMetricsOnFailure() throws TestCheckedException { }, weigher)).isInstanceOf(TestCheckedException.class); verify(metrics).recordRead(ACTUAL_WEIGHT); - verifyNoMoreInteractions(metrics); } @Test @@ -138,7 +139,6 @@ public void recordsWriteMetricsOnFailure() throws TestCheckedException { }, weigher)).isInstanceOf(TestCheckedException.class); verify(metrics).recordWrite(ACTUAL_WEIGHT); - verifyNoMoreInteractions(metrics); } @Test @@ -163,5 +163,22 @@ public void propagatesCheckedExceptions() throws TestCheckedException { }, weigher)).isInstanceOf(TestCheckedException.class); } + @Test + public void recordsBackoffTime() { + when(readLimiter.consumeWithBackoff(anyLong())).thenReturn(Duration.ofMillis(1_100)); + qosClient.executeRead(() -> "foo", weigher); + + verify(metrics).recordBackoffMicros(1_100_000); + } + + @Test + public void recordsBackoffExceptions() { + // TODO(nziebart): use rate limited exception here + when(readLimiter.consumeWithBackoff(anyLong())).thenThrow(new RuntimeException("rate limited")); + assertThatThrownBy(() -> qosClient.executeRead(() -> "foo", weigher)).isInstanceOf(RuntimeException.class); + + verify(metrics).recordRateLimitedException(); + } + static class TestCheckedException extends Exception {} } diff --git a/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiterTest.java b/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiterTest.java index ec0f58383c2..103d98fe3b9 100644 --- a/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiterTest.java +++ b/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiterTest.java @@ -23,6 +23,7 @@ import java.time.Duration; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import org.junit.Before; import org.junit.Test; @@ -32,34 +33,60 @@ public class QosRateLimiterTest { private static final long START_TIME_MICROS = 0L; - private static final long MAX_BACKOFF_TIME_MILLIS = 10_000; + private static final Supplier MAX_BACKOFF_TIME_MILLIS = () -> 10_000L; RateLimiter.SleepingStopwatch stopwatch = mock(RateLimiter.SleepingStopwatch.class); - QosRateLimiter limiter = new QosRateLimiter(stopwatch, MAX_BACKOFF_TIME_MILLIS); + Supplier currentRate = mock(Supplier.class); + QosRateLimiter limiter; @Before public void before() { when(stopwatch.readMicros()).thenReturn(START_TIME_MICROS); + when(currentRate.get()).thenReturn(10L); + + limiter = new QosRateLimiter(stopwatch, MAX_BACKOFF_TIME_MILLIS, currentRate); } @Test - public void doesNotLimitIfNoLimitIsSet() { + public void doesNotLimitIfLimitIsVeryHigh() { + when(currentRate.get()).thenReturn(Long.MAX_VALUE); + assertThat(limiter.consumeWithBackoff(Integer.MAX_VALUE)).isEqualTo(Duration.ZERO); assertThat(limiter.consumeWithBackoff(Integer.MAX_VALUE)).isEqualTo(Duration.ZERO); assertThat(limiter.consumeWithBackoff(Integer.MAX_VALUE)).isEqualTo(Duration.ZERO); } @Test - public void limitsBySleepingIfTimeIsReasonable() { - limiter.updateRate(10); + public void limitsOnlyWhenConsumptionExceedsLimit() { + when(currentRate.get()).thenReturn(100L); + limiter.consumeWithBackoff(1); // set the current time + + tickMillis(500); + + assertThat(limiter.consumeWithBackoff(25L)).isEqualTo(Duration.ZERO); + assertThat(limiter.consumeWithBackoff(25L)).isEqualTo(Duration.ZERO); + + tickMillis(500); + + assertThat(limiter.consumeWithBackoff(20L)).isEqualTo(Duration.ZERO); + tickMillis(500); + + assertThat(limiter.consumeWithBackoff(20L)).isEqualTo(Duration.ZERO); + assertThat(limiter.consumeWithBackoff(20L)).isEqualTo(Duration.ZERO); + assertThat(limiter.consumeWithBackoff(40L)).isEqualTo(Duration.ZERO); + + assertThat(limiter.consumeWithBackoff(40L)).isGreaterThan(Duration.ZERO); + } + + @Test + public void limitsBySleepingIfTimeIsReasonable() { assertThat(limiter.consumeWithBackoff(100)).isEqualTo(Duration.ZERO); assertThat(limiter.consumeWithBackoff(1)).isGreaterThan(Duration.ZERO); } @Test public void limitsByThrowingIfSleepTimeIsTooGreat() { - limiter.updateRate(10); limiter.consumeWithBackoff(1_000); assertThatThrownBy(() -> limiter.consumeWithBackoff(100)) @@ -68,8 +95,7 @@ public void limitsByThrowingIfSleepTimeIsTooGreat() { @Test public void doesNotThrowIfMaxBackoffTimeIsVeryLarge() { - QosRateLimiter limiterWithLargeBackoffLimit = new QosRateLimiter(stopwatch, Long.MAX_VALUE); - limiterWithLargeBackoffLimit.updateRate(10); + QosRateLimiter limiterWithLargeBackoffLimit = new QosRateLimiter(stopwatch, () -> Long.MAX_VALUE, () -> 10L); limiterWithLargeBackoffLimit.consumeWithBackoff(1_000_000_000); limiterWithLargeBackoffLimit.consumeWithBackoff(1_000_000_000); @@ -77,30 +103,27 @@ public void doesNotThrowIfMaxBackoffTimeIsVeryLarge() { @Test public void consumingAdditionalUnitsPenalizesFutureCallers() { - limiter.updateRate(10); - limiter.consumeWithBackoff(1); - limiter.recordAdjustment(100); + limiter.recordAdjustment(25); assertThat(limiter.consumeWithBackoff(1)).isGreaterThan(Duration.ZERO); } @Test public void canConsumeBurstUnits() { - limiter.updateRate(10); limiter.consumeWithBackoff(100); // simulate 30 seconds passing with no consumption - when(stopwatch.readMicros()).thenReturn(TimeUnit.SECONDS.toMicros(30)); + tickMillis(30_000); assertThat(limiter.consumeWithBackoff(10)).isEqualTo(Duration.ZERO); - assertThat(limiter.consumeWithBackoff(10)).isEqualTo(Duration.ZERO); + assertThat(limiter.consumeWithBackoff(20)).isEqualTo(Duration.ZERO); assertThat(limiter.consumeWithBackoff(10)).isEqualTo(Duration.ZERO); } @Test public void canConsumeImmediatelyAgainAfterBackoff() { - limiter.updateRate(10); + when(currentRate.get()).thenReturn(10L); limiter.consumeWithBackoff(100); Duration timeWaited = limiter.consumeWithBackoff(20); @@ -113,11 +136,38 @@ public void canConsumeImmediatelyAgainAfterBackoff() { @Test public void sleepTimeIsSensible() { - limiter.updateRate(10); - limiter.consumeWithBackoff(100); + limiter.consumeWithBackoff(50); assertThat(limiter.consumeWithBackoff(20)).isEqualTo(Duration.ofSeconds(5)); assertThat(limiter.consumeWithBackoff(20)).isEqualTo(Duration.ofSeconds(7)); } + @Test + public void canUpdateRate() { + // baseline + limiter.consumeWithBackoff(20); + assertThat(limiter.consumeWithBackoff(20)).isGreaterThan(Duration.ZERO); + + // increase to a large rate + when(currentRate.get()).thenReturn(1000000L); + limiter.consumeWithBackoff(1); + tickMillis(1); + + assertThat(limiter.consumeWithBackoff(50)).isEqualTo(Duration.ZERO); + assertThat(limiter.consumeWithBackoff(500)).isEqualTo(Duration.ZERO); + + // decrease to small rate + when(currentRate.get()).thenReturn(10L); + tickMillis(1000); + + limiter.consumeWithBackoff(1); + limiter.consumeWithBackoff(20); + assertThat(limiter.consumeWithBackoff(20)).isGreaterThan(Duration.ZERO); + } + + private void tickMillis(long millis) { + long now = stopwatch.readMicros(); + when(stopwatch.readMicros()).thenReturn(now + millis * 1_000); + } + }