From fdbdd781d47aeecbaf8e3ddb29edc43447824420 Mon Sep 17 00:00:00 2001 From: nziebart Date: Fri, 17 Nov 2017 18:21:27 +0000 Subject: [PATCH] [QoS] Guava license (#2703) * guava license * Cleanup: class reference --- .../atlasdb/qos/ratelimit/QosRateLimiter.java | 2 + .../atlasdb/qos/ratelimit/RateLimiter.java | 306 ------------ .../qos/ratelimit/SmoothRateLimiter.java | 154 ------ .../atlasdb/qos/ratelimit/guava/LICENSE | 53 ++ .../qos/ratelimit/guava/RateLimiter.java | 464 ++++++++++++++++++ .../ratelimit/guava/SmoothRateLimiter.java | 401 +++++++++++++++ .../qos/ratelimit/QosRateLimiterTest.java | 2 + 7 files changed, 922 insertions(+), 460 deletions(-) delete mode 100644 qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/RateLimiter.java delete mode 100644 qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/SmoothRateLimiter.java create mode 100644 qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/guava/LICENSE create mode 100644 qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/guava/RateLimiter.java create mode 100644 qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/guava/SmoothRateLimiter.java diff --git a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiter.java b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiter.java index 299a417f371..cb7ab33a8c7 100644 --- a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiter.java +++ b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiter.java @@ -22,6 +22,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.primitives.Ints; +import com.palantir.atlasdb.qos.ratelimit.guava.RateLimiter; +import com.palantir.atlasdb.qos.ratelimit.guava.SmoothRateLimiter; /** * A rate limiter for database queries, based on "units" of expense. This limiter strives to maintain an upper limit on diff --git a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/RateLimiter.java b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/RateLimiter.java deleted file mode 100644 index b53460dd180..00000000000 --- a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/RateLimiter.java +++ /dev/null @@ -1,306 +0,0 @@ -//CHECKSTYLE:OFF -/* - * Copyright (C) 2012 The Guava Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.qos.ratelimit; - -import static java.lang.Math.max; -import static java.util.concurrent.TimeUnit.MICROSECONDS; -import static java.util.concurrent.TimeUnit.SECONDS; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - -import java.time.Duration; -import java.util.Locale; -import java.util.Optional; -import java.util.concurrent.TimeUnit; - -import com.google.common.base.Stopwatch; -import com.google.common.util.concurrent.Uninterruptibles; - -/** - * Copied from Guava, because {@link com.palantir.atlasdb.qos.ratelimit.SmoothRateLimiter.SmoothBursty} is a package - * private class. - * - * There are also a few minor but notable modifications: - * 1) {@link #tryAcquire(int, long, TimeUnit)} returns an optional duration rather than a boolean. This is analogous - * to the return value of {@link #acquire()}. - * 2) A new method {@link #steal(int)} was added, to support taking permits without waiting - * 3) Some static constructors were removed. - **/ -public abstract class RateLimiter { - - /** - * The underlying timer; used both to measure elapsed time and sleep as necessary. A separate - * object to facilitate testing. - */ - private final SleepingStopwatch stopwatch; - - // Can't be initialized in the constructor because mocks don't call the constructor. - private volatile Object mutexDoNotUseDirectly; - - private Object mutex() { - Object mutex = mutexDoNotUseDirectly; - if (mutex == null) { - synchronized (this) { - mutex = mutexDoNotUseDirectly; - if (mutex == null) { - mutexDoNotUseDirectly = mutex = new Object(); - } - } - } - return mutex; - } - - RateLimiter(SleepingStopwatch stopwatch) { - this.stopwatch = checkNotNull(stopwatch); - } - - /** - * Updates the stable rate of this {@code RateLimiter}, that is, the {@code permitsPerSecond} - * argument provided in the factory method that constructed the {@code RateLimiter}. Currently - * throttled threads will not be awakened as a result of this invocation, thus they do not - * observe the new rate; only subsequent requests will. - * - *

Note though that, since each request repays (by waiting, if necessary) the cost of the - * previous request, this means that the very next request after an invocation to - * {@code setRate} will not be affected by the new rate; it will pay the cost of the previous - * request, which is in terms of the previous rate. - * - *

The behavior of the {@code RateLimiter} is not modified in any other way, e.g. if the - * {@code RateLimiter} was configured with a warmup period of 20 seconds, it still has a warmup - * period of 20 seconds after this method invocation. - * - * @param permitsPerSecond the new stable rate of this {@code RateLimiter} - * @throws IllegalArgumentException if {@code permitsPerSecond} is negative or zero - */ - public final void setRate(double permitsPerSecond) { - checkArgument( - permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive"); - synchronized (mutex()) { - doSetRate(permitsPerSecond, stopwatch.readMicros()); - } - } - - abstract void doSetRate(double permitsPerSecond, long nowMicros); - - /** - * Returns the stable rate (as {@code permits per seconds}) with which this {@code RateLimiter} is - * configured with. The initial value of this is the same as the {@code permitsPerSecond} argument - * passed in the factory method that produced this {@code RateLimiter}, and it is only updated - * after invocations to {@linkplain #setRate}. - */ - public final double getRate() { - synchronized (mutex()) { - return doGetRate(); - } - } - - abstract double doGetRate(); - - /** - * Acquires a single permit from this {@code RateLimiter}, blocking until the request can be - * granted. Tells the amount of time slept, if any. - * - *

This method is equivalent to {@code acquire(1)}. - * - * @return time spent sleeping to enforce rate, in seconds; 0.0 if not rate-limited - * @since 16.0 (present in 13.0 with {@code void} return type}) - */ - public double acquire() { - return acquire(1); - } - - /** - * Acquires the given number of permits from this {@code RateLimiter}, blocking until the request - * can be granted. Tells the amount of time slept, if any. - * - * @param permits the number of permits to acquire - * @return time spent sleeping to enforce rate, in seconds; 0.0 if not rate-limited - * @throws IllegalArgumentException if the requested number of permits is negative or zero - * @since 16.0 (present in 13.0 with {@code void} return type}) - */ - public double acquire(int permits) { - long microsToWait = reserve(permits); - stopwatch.sleepMicrosUninterruptibly(microsToWait); - return 1.0 * microsToWait / SECONDS.toMicros(1L); - } - - /** - * Immediately steals the given number of permits. This will potentially penalize future callers, but has no - * effect on callers that are already waiting for permits. - */ - public void steal(int permits) { - reserve(permits); - } - - /** - * Reserves the given number of permits from this {@code RateLimiter} for future use, returning - * the number of microseconds until the reservation can be consumed. - * - * @return time in microseconds to wait until the resource can be acquired, never negative - */ - final long reserve(int permits) { - checkPermits(permits); - synchronized (mutex()) { - return reserveAndGetWaitLength(permits, stopwatch.readMicros()); - } - } - - /** - * Acquires a permit from this {@code RateLimiter} if it can be obtained without exceeding the - * specified {@code timeout}, or returns {@code false} immediately (without waiting) if the permit - * would not have been granted before the timeout expired. - * - *

This method is equivalent to {@code tryAcquire(1, timeout, unit)}. - * - * @param timeout the maximum time to wait for the permit. Negative values are treated as zero. - * @param unit the time unit of the timeout argument - * @return {@code true} if the permit was acquired, {@code false} otherwise - * @throws IllegalArgumentException if the requested number of permits is negative or zero - */ - public boolean tryAcquire(long timeout, TimeUnit unit) { - return tryAcquire(1, timeout, unit).isPresent(); - } - - /** - * Acquires permits from this {@link com.google.common.util.concurrent.RateLimiter} if it can be acquired immediately without delay. - * - *

This method is equivalent to {@code tryAcquire(permits, 0, anyUnit)}. - * - * @param permits the number of permits to acquire - * @return {@code true} if the permits were acquired, {@code false} otherwise - * @throws IllegalArgumentException if the requested number of permits is negative or zero - * @since 14.0 - */ - public boolean tryAcquire(int permits) { - return tryAcquire(permits, 0, MICROSECONDS).isPresent(); - } - - /** - * Acquires a permit from this {@link com.google.common.util.concurrent.RateLimiter} if it can be acquired immediately without - * delay. - * - *

This method is equivalent to {@code tryAcquire(1)}. - * - * @return {@code true} if the permit was acquired, {@code false} otherwise - * @since 14.0 - */ - public boolean tryAcquire() { - return tryAcquire(1, 0, MICROSECONDS).isPresent(); - } - - /** - * Acquires the given number of permits from this {@code RateLimiter} if it can be obtained - * without exceeding the specified {@code timeout}, or returns {@code false} immediately (without - * waiting) if the permits would not have been granted before the timeout expired. - * - * @param permits the number of permits to acquire - * @param timeout the maximum time to wait for the permits. Negative values are treated as zero. - * @param unit the time unit of the timeout argument - * @return amount of time waited, if the permits were acquired, empty otherwise - * @throws IllegalArgumentException if the requested number of permits is negative or zero - */ - public Optional tryAcquire(int permits, long timeout, TimeUnit unit) { - long timeoutMicros = max(unit.toMicros(timeout), 0); - checkPermits(permits); - long microsToWait; - synchronized (mutex()) { - long nowMicros = stopwatch.readMicros(); - if (!canAcquire(nowMicros, timeoutMicros)) { - return Optional.empty(); - } else { - microsToWait = reserveAndGetWaitLength(permits, nowMicros); - } - } - stopwatch.sleepMicrosUninterruptibly(microsToWait); - return Optional.of(Duration.ofNanos(TimeUnit.MICROSECONDS.toNanos(microsToWait))); - } - - private boolean canAcquire(long nowMicros, long timeoutMicros) { - return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros; - } - - /** - * Reserves next ticket and returns the wait time that the caller must wait for. - * - * @return the required wait time, never negative - */ - final long reserveAndGetWaitLength(int permits, long nowMicros) { - long momentAvailable = reserveEarliestAvailable(permits, nowMicros); - return max(momentAvailable - nowMicros, 0); - } - - /** - * Returns the earliest time that permits are available (with one caveat). - * - * @return the time that permits are available, or, if permits are available immediately, an - * arbitrary past or present time - */ - abstract long queryEarliestAvailable(long nowMicros); - - /** - * Reserves the requested number of permits and returns the time that those permits can be used - * (with one caveat). - * - * @return the time that the permits may be used, or, if the permits may be used immediately, an - * arbitrary past or present time - */ - abstract long reserveEarliestAvailable(int permits, long nowMicros); - - @Override - public String toString() { - return String.format(Locale.ROOT, "RateLimiter[stableRate=%3.1fqps]", getRate()); - } - - abstract static class SleepingStopwatch { - /** Constructor for use by subclasses. */ - protected SleepingStopwatch() {} - - /* - * We always hold the mutex when calling this. TODO(cpovirk): Is that important? Perhaps we need - * to guarantee that each call to reserveEarliestAvailable, etc. sees a value >= the previous? - * Also, is it OK that we don't hold the mutex when sleeping? - */ - protected abstract long readMicros(); - - protected abstract void sleepMicrosUninterruptibly(long micros); - - public static final SleepingStopwatch createFromSystemTimer() { - return new SleepingStopwatch() { - final Stopwatch stopwatch = Stopwatch.createStarted(); - - @Override - protected long readMicros() { - return stopwatch.elapsed(MICROSECONDS); - } - - @Override - protected void sleepMicrosUninterruptibly(long micros) { - if (micros > 0) { - Uninterruptibles.sleepUninterruptibly(micros, MICROSECONDS); - } - } - }; - } - } - - private static void checkPermits(int permits) { - checkArgument(permits > 0, "Requested permits (%s) must be positive", permits); - } -} - diff --git a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/SmoothRateLimiter.java b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/SmoothRateLimiter.java deleted file mode 100644 index c80f20f1de5..00000000000 --- a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/SmoothRateLimiter.java +++ /dev/null @@ -1,154 +0,0 @@ -// CHECKSTYLE:OFF -/* - * Copyright (C) 2012 The Guava Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.qos.ratelimit; - -import static java.lang.Math.min; -import static java.util.concurrent.TimeUnit.SECONDS; - -/** - * Copied from Guava, because {@link SmoothBursty} is a package private class. - **/ -abstract class SmoothRateLimiter extends RateLimiter { - /** - * This implements a "bursty" RateLimiter, where storedPermits are translated to zero throttling. - * The maximum number of permits that can be saved (when the RateLimiter is unused) is defined in - * terms of time, in this sense: if a RateLimiter is 2qps, and this time is specified as 10 - * seconds, we can save up to 2 * 10 = 20 permits. - */ - static final class SmoothBursty extends SmoothRateLimiter { - /** The work (permits) of how many seconds can be saved up if this RateLimiter is unused? */ - final double maxBurstSeconds; - - SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) { - super(stopwatch); - this.maxBurstSeconds = maxBurstSeconds; - } - - @Override - void doSetRate(double permitsPerSecond, double stableIntervalMicros) { - double oldMaxPermits = this.maxPermits; - maxPermits = maxBurstSeconds * permitsPerSecond; - if (oldMaxPermits == Double.POSITIVE_INFINITY) { - // if we don't special-case this, we would get storedPermits == NaN, below - storedPermits = maxPermits; - } else { - storedPermits = - (oldMaxPermits == 0.0) - ? 0.0 // initial state - : storedPermits * maxPermits / oldMaxPermits; - } - } - - @Override - long storedPermitsToWaitTime(double storedPermits, double permitsToTake) { - return 0L; - } - - @Override - double coolDownIntervalMicros() { - return stableIntervalMicros; - } - } - - /** - * The currently stored permits. - */ - double storedPermits; - - /** - * The maximum number of stored permits. - */ - double maxPermits; - - /** - * The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits - * per second has a stable interval of 200ms. - */ - double stableIntervalMicros; - - /** - * The time when the next request (no matter its size) will be granted. After granting a request, - * this is pushed further in the future. Large requests push this further than small requests. - */ - private long nextFreeTicketMicros = 0L; // could be either in the past or future - - private SmoothRateLimiter(SleepingStopwatch stopwatch) { - super(stopwatch); - } - - @Override - final void doSetRate(double permitsPerSecond, long nowMicros) { - resync(nowMicros); - double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond; - this.stableIntervalMicros = stableIntervalMicros; - doSetRate(permitsPerSecond, stableIntervalMicros); - } - - abstract void doSetRate(double permitsPerSecond, double stableIntervalMicros); - - @Override - final double doGetRate() { - return SECONDS.toMicros(1L) / stableIntervalMicros; - } - - @Override - final long queryEarliestAvailable(long nowMicros) { - return nextFreeTicketMicros; - } - - @Override - final long reserveEarliestAvailable(int requiredPermits, long nowMicros) { - resync(nowMicros); - long returnValue = nextFreeTicketMicros; - double storedPermitsToSpend = min(requiredPermits, this.storedPermits); - double freshPermits = requiredPermits - storedPermitsToSpend; - long waitMicros = - storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) - + (long) (freshPermits * stableIntervalMicros); - - this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros; - this.storedPermits -= storedPermitsToSpend; - return returnValue; - } - - /** - * Translates a specified portion of our currently stored permits which we want to spend/acquire, - * into a throttling time. Conceptually, this evaluates the integral of the underlying function we - * use, for the range of [(storedPermits - permitsToTake), storedPermits]. - * - *

This always holds: {@code 0 <= permitsToTake <= storedPermits} - */ - abstract long storedPermitsToWaitTime(double storedPermits, double permitsToTake); - - /** - * Returns the number of microseconds during cool down that we have to wait to get a new permit. - */ - abstract double coolDownIntervalMicros(); - - /** - * Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time. - */ - void resync(long nowMicros) { - // if nextFreeTicket is in the past, resync to now - if (nowMicros > nextFreeTicketMicros) { - double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros(); - storedPermits = min(maxPermits, storedPermits + newPermits); - nextFreeTicketMicros = nowMicros; - } - } -} diff --git a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/guava/LICENSE b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/guava/LICENSE new file mode 100644 index 00000000000..d6aac6c3516 --- /dev/null +++ b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/guava/LICENSE @@ -0,0 +1,53 @@ +Apache License + +Version 2.0, January 2004 + +http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + +"License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document. + +"Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License. + +"Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity. + +"You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License. + +"Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files. + +"Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types. + +"Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below). + +"Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof. + +"Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution." + +"Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions: + +You must give any other recipients of the Work or Derivative Works a copy of this License; and +You must cause any modified files to carry prominent notices stating that You changed the files; and +You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and +If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License. + +You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License. +5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS \ No newline at end of file diff --git a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/guava/RateLimiter.java b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/guava/RateLimiter.java new file mode 100644 index 00000000000..59ff887ab81 --- /dev/null +++ b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/guava/RateLimiter.java @@ -0,0 +1,464 @@ +//CHECKSTYLE:OFF +/* + * Copyright (C) 2012 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// CHANGELOG: package name changed +package com.palantir.atlasdb.qos.ratelimit.guava; + +import static java.lang.Math.max; +import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import java.time.Duration; +import java.util.Locale; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import javax.annotation.concurrent.ThreadSafe; + +import com.google.common.annotations.Beta; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Stopwatch; +import com.google.common.util.concurrent.Uninterruptibles; +import com.palantir.atlasdb.qos.ratelimit.guava.SmoothRateLimiter.SmoothBursty; +import com.palantir.atlasdb.qos.ratelimit.guava.SmoothRateLimiter.SmoothWarmingUp; + +/** + * A rate limiter. Conceptually, a rate limiter distributes permits at a + * configurable rate. Each {@link #acquire()} blocks if necessary until a permit is + * available, and then takes it. Once acquired, permits need not be released. + * + *

Rate limiters are often used to restrict the rate at which some + * physical or logical resource is accessed. This is in contrast to {@link + * java.util.concurrent.Semaphore} which restricts the number of concurrent + * accesses instead of the rate (note though that concurrency and rate are closely related, + * e.g. see Little's Law). + * + *

A {@code RateLimiter} is defined primarily by the rate at which permits + * are issued. Absent additional configuration, permits will be distributed at a + * fixed rate, defined in terms of permits per second. Permits will be distributed + * smoothly, with the delay between individual permits being adjusted to ensure + * that the configured rate is maintained. + * + *

It is possible to configure a {@code RateLimiter} to have a warmup + * period during which time the permits issued each second steadily increases until + * it hits the stable rate. + * + *

As an example, imagine that we have a list of tasks to execute, but we don't want to + * submit more than 2 per second: + *

  {@code
+ *  final RateLimiter rateLimiter = RateLimiter.create(2.0); // rate is "2 permits per second"
+ *  void submitTasks(List tasks, Executor executor) {
+ *    for (Runnable task : tasks) {
+ *      rateLimiter.acquire(); // may wait
+ *      executor.execute(task);
+ *    }
+ *  }
+ *}
+ * + *

As another example, imagine that we produce a stream of data, and we want to cap it + * at 5kb per second. This could be accomplished by requiring a permit per byte, and specifying + * a rate of 5000 permits per second: + *

  {@code
+ *  final RateLimiter rateLimiter = RateLimiter.create(5000.0); // rate = 5000 permits per second
+ *  void submitPacket(byte[] packet) {
+ *    rateLimiter.acquire(packet.length);
+ *    networkService.send(packet);
+ *  }
+ *}
+ * + *

It is important to note that the number of permits requested never + * affects the throttling of the request itself (an invocation to {@code acquire(1)} + * and an invocation to {@code acquire(1000)} will result in exactly the same throttling, if any), + * but it affects the throttling of the next request. I.e., if an expensive task + * arrives at an idle RateLimiter, it will be granted immediately, but it is the next + * request that will experience extra throttling, thus paying for the cost of the expensive + * task. + * + *

Note: {@code RateLimiter} does not provide fairness guarantees. + * + * @author Dimitris Andreou + * @since 13.0 + */ +// TODO(user): switch to nano precision. A natural unit of cost is "bytes", and a micro precision +// would mean a maximum rate of "1MB/s", which might be small in some cases. +@ThreadSafe +@Beta +public abstract class RateLimiter { + /** + * Creates a {@code RateLimiter} with the specified stable throughput, given as + * "permits per second" (commonly referred to as QPS, queries per second). + * + *

The returned {@code RateLimiter} ensures that on average no more than {@code + * permitsPerSecond} are issued during any given second, with sustained requests + * being smoothly spread over each second. When the incoming request rate exceeds + * {@code permitsPerSecond} the rate limiter will release one permit every {@code + * (1.0 / permitsPerSecond)} seconds. When the rate limiter is unused, + * bursts of up to {@code permitsPerSecond} permits will be allowed, with subsequent + * requests being smoothly limited at the stable rate of {@code permitsPerSecond}. + * + * @param permitsPerSecond the rate of the returned {@code RateLimiter}, measured in + * how many permits become available per second + * @throws IllegalArgumentException if {@code permitsPerSecond} is negative or zero + */ + // TODO(user): "This is equivalent to + // {@code createWithCapacity(permitsPerSecond, 1, TimeUnit.SECONDS)}". + public static RateLimiter create(double permitsPerSecond) { + /* + * The default RateLimiter configuration can save the unused permits of up to one second. + * This is to avoid unnecessary stalls in situations like this: A RateLimiter of 1qps, + * and 4 threads, all calling acquire() at these moments: + * + * T0 at 0 seconds + * T1 at 1.05 seconds + * T2 at 2 seconds + * T3 at 3 seconds + * + * Due to the slight delay of T1, T2 would have to sleep till 2.05 seconds, + * and T3 would also have to sleep till 3.05 seconds. + */ + return create(SleepingStopwatch.createFromSystemTimer(), permitsPerSecond); + } + + /* + * TODO(cpovirk): make SleepingStopwatch the last parameter throughout the class so that the + * overloads follow the usual convention: Foo(int), Foo(int, SleepingStopwatch) + */ + @VisibleForTesting + static RateLimiter create(SleepingStopwatch stopwatch, double permitsPerSecond) { + RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */); + rateLimiter.setRate(permitsPerSecond); + return rateLimiter; + } + + /** + * Creates a {@code RateLimiter} with the specified stable throughput, given as + * "permits per second" (commonly referred to as QPS, queries per second), and a + * warmup period, during which the {@code RateLimiter} smoothly ramps up its rate, + * until it reaches its maximum rate at the end of the period (as long as there are enough + * requests to saturate it). Similarly, if the {@code RateLimiter} is left unused for + * a duration of {@code warmupPeriod}, it will gradually return to its "cold" state, + * i.e. it will go through the same warming up process as when it was first created. + * + *

The returned {@code RateLimiter} is intended for cases where the resource that actually + * fulfills the requests (e.g., a remote server) needs "warmup" time, rather than + * being immediately accessed at the stable (maximum) rate. + * + *

The returned {@code RateLimiter} starts in a "cold" state (i.e. the warmup period + * will follow), and if it is left unused for long enough, it will return to that state. + * + * @param permitsPerSecond the rate of the returned {@code RateLimiter}, measured in + * how many permits become available per second + * @param warmupPeriod the duration of the period where the {@code RateLimiter} ramps up its + * rate, before reaching its stable (maximum) rate + * @param unit the time unit of the warmupPeriod argument + * @throws IllegalArgumentException if {@code permitsPerSecond} is negative or zero or + * {@code warmupPeriod} is negative + */ + public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) { + checkArgument(warmupPeriod >= 0, "warmupPeriod must not be negative: %s", warmupPeriod); + return create(SleepingStopwatch.createFromSystemTimer(), permitsPerSecond, warmupPeriod, unit, + 3.0); + } + + @VisibleForTesting + static RateLimiter create( + SleepingStopwatch stopwatch, double permitsPerSecond, long warmupPeriod, TimeUnit unit, + double coldFactor) { + RateLimiter rateLimiter = new SmoothWarmingUp(stopwatch, warmupPeriod, unit, coldFactor); + rateLimiter.setRate(permitsPerSecond); + return rateLimiter; + } + + /** + * The underlying timer; used both to measure elapsed time and sleep as necessary. A separate + * object to facilitate testing. + */ + private final SleepingStopwatch stopwatch; + + // Can't be initialized in the constructor because mocks don't call the constructor. + private volatile Object mutexDoNotUseDirectly; + + private Object mutex() { + Object mutex = mutexDoNotUseDirectly; + if (mutex == null) { + synchronized (this) { + mutex = mutexDoNotUseDirectly; + if (mutex == null) { + mutexDoNotUseDirectly = mutex = new Object(); + } + } + } + return mutex; + } + + RateLimiter(SleepingStopwatch stopwatch) { + this.stopwatch = checkNotNull(stopwatch); + } + + /** + * Updates the stable rate of this {@code RateLimiter}, that is, the + * {@code permitsPerSecond} argument provided in the factory method that + * constructed the {@code RateLimiter}. Currently throttled threads will not + * be awakened as a result of this invocation, thus they do not observe the new rate; + * only subsequent requests will. + * + *

Note though that, since each request repays (by waiting, if necessary) the cost + * of the previous request, this means that the very next request + * after an invocation to {@code setRate} will not be affected by the new rate; + * it will pay the cost of the previous request, which is in terms of the previous rate. + * + *

The behavior of the {@code RateLimiter} is not modified in any other way, + * e.g. if the {@code RateLimiter} was configured with a warmup period of 20 seconds, + * it still has a warmup period of 20 seconds after this method invocation. + * + * @param permitsPerSecond the new stable rate of this {@code RateLimiter} + * @throws IllegalArgumentException if {@code permitsPerSecond} is negative or zero + */ + public final void setRate(double permitsPerSecond) { + checkArgument( + permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive"); + synchronized (mutex()) { + doSetRate(permitsPerSecond, stopwatch.readMicros()); + } + } + + abstract void doSetRate(double permitsPerSecond, long nowMicros); + + /** + * Returns the stable rate (as {@code permits per seconds}) with which this + * {@code RateLimiter} is configured with. The initial value of this is the same as + * the {@code permitsPerSecond} argument passed in the factory method that produced + * this {@code RateLimiter}, and it is only updated after invocations + * to {@linkplain #setRate}. + */ + public final double getRate() { + synchronized (mutex()) { + return doGetRate(); + } + } + + abstract double doGetRate(); + + /** + * Acquires a single permit from this {@code RateLimiter}, blocking until the + * request can be granted. Tells the amount of time slept, if any. + * + *

This method is equivalent to {@code acquire(1)}. + * + * @return time spent sleeping to enforce rate, in seconds; 0.0 if not rate-limited + * @since 16.0 (present in 13.0 with {@code void} return type}) + */ + public double acquire() { + return acquire(1); + } + + /** + * Acquires the given number of permits from this {@code RateLimiter}, blocking until the + * request can be granted. Tells the amount of time slept, if any. + * + * @param permits the number of permits to acquire + * @return time spent sleeping to enforce rate, in seconds; 0.0 if not rate-limited + * @throws IllegalArgumentException if the requested number of permits is negative or zero + * @since 16.0 (present in 13.0 with {@code void} return type}) + */ + public double acquire(int permits) { + long microsToWait = reserve(permits); + stopwatch.sleepMicrosUninterruptibly(microsToWait); + return 1.0 * microsToWait / SECONDS.toMicros(1L); + } + + /** + * Reserves the given number of permits from this {@code RateLimiter} for future use, returning + * the number of microseconds until the reservation can be consumed. + * + * @return time in microseconds to wait until the resource can be acquired, never negative + */ + final long reserve(int permits) { + checkPermits(permits); + synchronized (mutex()) { + return reserveAndGetWaitLength(permits, stopwatch.readMicros()); + } + } + + /** + * Acquires a permit from this {@code RateLimiter} if it can be obtained + * without exceeding the specified {@code timeout}, or returns {@code false} + * immediately (without waiting) if the permit would not have been granted + * before the timeout expired. + * + *

This method is equivalent to {@code tryAcquire(1, timeout, unit)}. + * + * @param timeout the maximum time to wait for the permit. Negative values are treated as zero. + * @param unit the time unit of the timeout argument + * @return {@code true} if the permit was acquired, {@code false} otherwise + * @throws IllegalArgumentException if the requested number of permits is negative or zero + */ + public boolean tryAcquire(long timeout, TimeUnit unit) { + // CHANGELOG: boolean returned value now inferred from Optional presence + return tryAcquire(1, timeout, unit).isPresent(); + } + + /** + * Acquires permits from this {@link RateLimiter} if it can be acquired immediately without delay. + * + *

+ * This method is equivalent to {@code tryAcquire(permits, 0, anyUnit)}. + * + * @param permits the number of permits to acquire + * @return {@code true} if the permits were acquired, {@code false} otherwise + * @throws IllegalArgumentException if the requested number of permits is negative or zero + * @since 14.0 + */ + public boolean tryAcquire(int permits) { + // CHANGELOG: boolean returned value now inferred from Optional presence + return tryAcquire(permits, 0, MICROSECONDS).isPresent(); + } + + /** + * Acquires a permit from this {@link RateLimiter} if it can be acquired immediately without + * delay. + * + *

+ * This method is equivalent to {@code tryAcquire(1)}. + * + * @return {@code true} if the permit was acquired, {@code false} otherwise + * @since 14.0 + */ + public boolean tryAcquire() { + // CHANGELOG: boolean returned value now inferred from Optional presence + return tryAcquire(1, 0, MICROSECONDS).isPresent(); + } + + /** + * Acquires the given number of permits from this {@code RateLimiter} if it can be obtained + * without exceeding the specified {@code timeout}, or returns {@code false} + * immediately (without waiting) if the permits would not have been granted + * before the timeout expired. + * + * @param permits the number of permits to acquire + * @param timeout the maximum time to wait for the permits. Negative values are treated as zero. + * @param unit the time unit of the timeout argument + * // CHANGELOG: docs changed to reflect different return value + * @return amount of time waited, if the permits were acquired, empty otherwise + * @throws IllegalArgumentException if the requested number of permits is negative or zero + */ + // CHANGELOG: return value changed from boolean to Optional + public Optional tryAcquire(int permits, long timeout, TimeUnit unit) { + long timeoutMicros = max(unit.toMicros(timeout), 0); + checkPermits(permits); + long microsToWait; + synchronized (mutex()) { + long nowMicros = stopwatch.readMicros(); + if (!canAcquire(nowMicros, timeoutMicros)) { + // CHANGELOG: return value changed from false to Optional#empty + return Optional.empty(); + } else { + microsToWait = reserveAndGetWaitLength(permits, nowMicros); + } + } + stopwatch.sleepMicrosUninterruptibly(microsToWait); + // CHANGELOG: return value changed from true to Optional + return Optional.of(Duration.ofNanos(TimeUnit.MICROSECONDS.toNanos(microsToWait))); + } + + // CHANGELOG: new method + /** + * Immediately steals the given number of permits. This will potentially penalize future callers, but has no + * effect on callers that are already waiting for permits. + */ + public void steal(int permits) { + reserve(permits); + } + + private boolean canAcquire(long nowMicros, long timeoutMicros) { + return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros; + } + + /** + * Reserves next ticket and returns the wait time that the caller must wait for. + * + * @return the required wait time, never negative + */ + final long reserveAndGetWaitLength(int permits, long nowMicros) { + long momentAvailable = reserveEarliestAvailable(permits, nowMicros); + return max(momentAvailable - nowMicros, 0); + } + + /** + * Returns the earliest time that permits are available (with one caveat). + * + * @return the time that permits are available, or, if permits are available immediately, an + * arbitrary past or present time + */ + abstract long queryEarliestAvailable(long nowMicros); + + /** + * Reserves the requested number of permits and returns the time that those permits can be used + * (with one caveat). + * + * @return the time that the permits may be used, or, if the permits may be used immediately, an + * arbitrary past or present time + */ + abstract long reserveEarliestAvailable(int permits, long nowMicros); + + @Override + public String toString() { + return String.format(Locale.ROOT, "RateLimiter[stableRate=%3.1fqps]", getRate()); + } + + @VisibleForTesting + // CHANGELOG: modifier changed from package private to public + public abstract static class SleepingStopwatch { + /* + * We always hold the mutex when calling this. TODO(cpovirk): Is that important? Perhaps we need + * to guarantee that each call to reserveEarliestAvailable, etc. sees a value >= the previous? + * Also, is it OK that we don't hold the mutex when sleeping? + */ + // CHANGELOG: modifier changed from package private to public + public abstract long readMicros(); + + abstract void sleepMicrosUninterruptibly(long micros); + + // CHANGELOG: modifier changed from package private to public + public static final SleepingStopwatch createFromSystemTimer() { + return new SleepingStopwatch() { + final Stopwatch stopwatch = Stopwatch.createStarted(); + + @Override + // CHANGELOG: modifier changed from package private to public + public long readMicros() { + return stopwatch.elapsed(MICROSECONDS); + } + + @Override + void sleepMicrosUninterruptibly(long micros) { + if (micros > 0) { + Uninterruptibles.sleepUninterruptibly(micros, MICROSECONDS); + } + } + }; + } + } + + private static int checkPermits(int permits) { + checkArgument(permits > 0, "Requested permits (%s) must be positive", permits); + return permits; + } +} diff --git a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/guava/SmoothRateLimiter.java b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/guava/SmoothRateLimiter.java new file mode 100644 index 00000000000..e8124d6a1db --- /dev/null +++ b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/guava/SmoothRateLimiter.java @@ -0,0 +1,401 @@ +//CHECKSTYLE:OFF +/* + * Copyright (C) 2012 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// CHANGELOG: package name changed +package com.palantir.atlasdb.qos.ratelimit.guava; + +import static java.lang.Math.min; +import static java.util.concurrent.TimeUnit.SECONDS; + +import java.util.concurrent.TimeUnit; + +import com.google.common.math.LongMath; + +// CHANGELOG: modifier changed from package private to public +public abstract class SmoothRateLimiter extends RateLimiter { + /* + * How is the RateLimiter designed, and why? + * + * The primary feature of a RateLimiter is its "stable rate", the maximum rate that + * is should allow at normal conditions. This is enforced by "throttling" incoming + * requests as needed, i.e. compute, for an incoming request, the appropriate throttle time, + * and make the calling thread wait as much. + * + * The simplest way to maintain a rate of QPS is to keep the timestamp of the last + * granted request, and ensure that (1/QPS) seconds have elapsed since then. For example, + * for a rate of QPS=5 (5 tokens per second), if we ensure that a request isn't granted + * earlier than 200ms after the last one, then we achieve the intended rate. + * If a request comes and the last request was granted only 100ms ago, then we wait for + * another 100ms. At this rate, serving 15 fresh permits (i.e. for an acquire(15) request) + * naturally takes 3 seconds. + * + * It is important to realize that such a RateLimiter has a very superficial memory + * of the past: it only remembers the last request. What if the RateLimiter was unused for + * a long period of time, then a request arrived and was immediately granted? + * This RateLimiter would immediately forget about that past underutilization. This may + * result in either underutilization or overflow, depending on the real world consequences + * of not using the expected rate. + * + * Past underutilization could mean that excess resources are available. Then, the RateLimiter + * should speed up for a while, to take advantage of these resources. This is important + * when the rate is applied to networking (limiting bandwidth), where past underutilization + * typically translates to "almost empty buffers", which can be filled immediately. + * + * On the other hand, past underutilization could mean that "the server responsible for + * handling the request has become less ready for future requests", i.e. its caches become + * stale, and requests become more likely to trigger expensive operations (a more extreme + * case of this example is when a server has just booted, and it is mostly busy with getting + * itself up to speed). + * + * To deal with such scenarios, we add an extra dimension, that of "past underutilization", + * modeled by "storedPermits" variable. This variable is zero when there is no + * underutilization, and it can grow up to maxStoredPermits, for sufficiently large + * underutilization. So, the requested permits, by an invocation acquire(permits), + * are served from: + * - stored permits (if available) + * - fresh permits (for any remaining permits) + * + * How this works is best explained with an example: + * + * For a RateLimiter that produces 1 token per second, every second + * that goes by with the RateLimiter being unused, we increase storedPermits by 1. + * Say we leave the RateLimiter unused for 10 seconds (i.e., we expected a request at time + * X, but we are at time X + 10 seconds before a request actually arrives; this is + * also related to the point made in the last paragraph), thus storedPermits + * becomes 10.0 (assuming maxStoredPermits >= 10.0). At that point, a request of acquire(3) + * arrives. We serve this request out of storedPermits, and reduce that to 7.0 (how this is + * translated to throttling time is discussed later). Immediately after, assume that an + * acquire(10) request arriving. We serve the request partly from storedPermits, + * using all the remaining 7.0 permits, and the remaining 3.0, we serve them by fresh permits + * produced by the rate limiter. + * + * We already know how much time it takes to serve 3 fresh permits: if the rate is + * "1 token per second", then this will take 3 seconds. But what does it mean to serve 7 + * stored permits? As explained above, there is no unique answer. If we are primarily + * interested to deal with underutilization, then we want stored permits to be given out + * /faster/ than fresh ones, because underutilization = free resources for the taking. + * If we are primarily interested to deal with overflow, then stored permits could + * be given out /slower/ than fresh ones. Thus, we require a (different in each case) + * function that translates storedPermits to throtting time. + * + * This role is played by storedPermitsToWaitTime(double storedPermits, double permitsToTake). + * The underlying model is a continuous function mapping storedPermits + * (from 0.0 to maxStoredPermits) onto the 1/rate (i.e. intervals) that is effective at the given + * storedPermits. "storedPermits" essentially measure unused time; we spend unused time + * buying/storing permits. Rate is "permits / time", thus "1 / rate = time / permits". + * Thus, "1/rate" (time / permits) times "permits" gives time, i.e., integrals on this + * function (which is what storedPermitsToWaitTime() computes) correspond to minimum intervals + * between subsequent requests, for the specified number of requested permits. + * + * Here is an example of storedPermitsToWaitTime: + * If storedPermits == 10.0, and we want 3 permits, we take them from storedPermits, + * reducing them to 7.0, and compute the throttling for these as a call to + * storedPermitsToWaitTime(storedPermits = 10.0, permitsToTake = 3.0), which will + * evaluate the integral of the function from 7.0 to 10.0. + * + * Using integrals guarantees that the effect of a single acquire(3) is equivalent + * to { acquire(1); acquire(1); acquire(1); }, or { acquire(2); acquire(1); }, etc, + * since the integral of the function in [7.0, 10.0] is equivalent to the sum of the + * integrals of [7.0, 8.0], [8.0, 9.0], [9.0, 10.0] (and so on), no matter + * what the function is. This guarantees that we handle correctly requests of varying weight + * (permits), /no matter/ what the actual function is - so we can tweak the latter freely. + * (The only requirement, obviously, is that we can compute its integrals). + * + * Note well that if, for this function, we chose a horizontal line, at height of exactly + * (1/QPS), then the effect of the function is non-existent: we serve storedPermits at + * exactly the same cost as fresh ones (1/QPS is the cost for each). We use this trick later. + * + * If we pick a function that goes /below/ that horizontal line, it means that we reduce + * the area of the function, thus time. Thus, the RateLimiter becomes /faster/ after a + * period of underutilization. If, on the other hand, we pick a function that + * goes /above/ that horizontal line, then it means that the area (time) is increased, + * thus storedPermits are more costly than fresh permits, thus the RateLimiter becomes + * /slower/ after a period of underutilization. + * + * Last, but not least: consider a RateLimiter with rate of 1 permit per second, currently + * completely unused, and an expensive acquire(100) request comes. It would be nonsensical + * to just wait for 100 seconds, and /then/ start the actual task. Why wait without doing + * anything? A much better approach is to /allow/ the request right away (as if it was an + * acquire(1) request instead), and postpone /subsequent/ requests as needed. In this version, + * we allow starting the task immediately, and postpone by 100 seconds future requests, + * thus we allow for work to get done in the meantime instead of waiting idly. + * + * This has important consequences: it means that the RateLimiter doesn't remember the time + * of the _last_ request, but it remembers the (expected) time of the _next_ request. This + * also enables us to tell immediately (see tryAcquire(timeout)) whether a particular + * timeout is enough to get us to the point of the next scheduling time, since we always + * maintain that. And what we mean by "an unused RateLimiter" is also defined by that + * notion: when we observe that the "expected arrival time of the next request" is actually + * in the past, then the difference (now - past) is the amount of time that the RateLimiter + * was formally unused, and it is that amount of time which we translate to storedPermits. + * (We increase storedPermits with the amount of permits that would have been produced + * in that idle time). So, if rate == 1 permit per second, and arrivals come exactly + * one second after the previous, then storedPermits is _never_ increased -- we would only + * increase it for arrivals _later_ than the expected one second. + */ + + /** + * This implements the following function where coldInterval = coldFactor * stableInterval. + * + * ^ throttling + * | + * cold + / + * interval | /. + * | / . + * | / . <-- "warmup period" is the area of the trapezoid between + * | / . thresholdPermits and maxPermits + * | / . + * | / . + * | / . + * stable +----------/ WARM . + * interval | . UP . + * | . PERIOD. + * | . . + * 0 +----------+-------+--------------> storedPermits + * 0 thresholdPermits maxPermits + * Before going into the details of this particular function, let's keep in mind the basics: + * 1) The state of the RateLimiter (storedPermits) is a vertical line in this figure. + * 2) When the RateLimiter is not used, this goes right (up to maxPermits) + * 3) When the RateLimiter is used, this goes left (down to zero), since if we have storedPermits, + * we serve from those first + * 4) When _unused_, we go right at a constant rate! The rate at which we move to + * the right is chosen as maxPermits / warmupPeriod. This ensures that the time it takes to + * go from 0 to maxPermits is equal to warmupPeriod. + * 5) When _used_, the time it takes, as explained in the introductory class note, is + * equal to the integral of our function, between X permits and X-K permits, assuming + * we want to spend K saved permits. + * + * In summary, the time it takes to move to the left (spend K permits), is equal to the + * area of the function of width == K. + * + * Assuming we have saturated demand, the time to go from maxPermits to thresholdPermits is + * equal to warmupPeriod. And the time to go from thresholdPermits to 0 is + * warmupPeriod/2. (The reason that this is warmupPeriod/2 is to maintain the behavior of + * the original implementation where coldFactor was hard coded as 3.) + * + * It remains to calculate thresholdsPermits and maxPermits. + * + * - The time to go from thresholdPermits to 0 is equal to the integral of the function between + * 0 and thresholdPermits. This is thresholdPermits * stableIntervals. By (5) it is also + * equal to warmupPeriod/2. Therefore + * + * thresholdPermits = 0.5 * warmupPeriod / stableInterval. + * + * - The time to go from maxPermits to thresholdPermits is equal to the integral of the function + * between thresholdPermits and maxPermits. This is the area of the pictured trapezoid, and it + * is equal to 0.5 * (stableInterval + coldInterval) * (maxPermits - thresholdPermits). It is + * also equal to warmupPeriod, so + * + * maxPermits = thresholdPermits + 2 * warmupPeriod / (stableInterval + coldInterval). + */ + static final class SmoothWarmingUp extends SmoothRateLimiter { + private final long warmupPeriodMicros; + /** + * The slope of the line from the stable interval (when permits == 0), to the cold interval + * (when permits == maxPermits) + */ + private double slope; + private double thresholdPermits; + private double coldFactor; + + SmoothWarmingUp( + SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit, double coldFactor) { + super(stopwatch); + this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod); + this.coldFactor = coldFactor; + } + + @Override + void doSetRate(double permitsPerSecond, double stableIntervalMicros) { + double oldMaxPermits = maxPermits; + double coldIntervalMicros = stableIntervalMicros * coldFactor; + thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros; + maxPermits = thresholdPermits + + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros); + slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits); + if (oldMaxPermits == Double.POSITIVE_INFINITY) { + // if we don't special-case this, we would get storedPermits == NaN, below + storedPermits = 0.0; + } else { + storedPermits = (oldMaxPermits == 0.0) + ? maxPermits // initial state is cold + : storedPermits * maxPermits / oldMaxPermits; + } + } + + @Override + long storedPermitsToWaitTime(double storedPermits, double permitsToTake) { + double availablePermitsAboveThreshold = storedPermits - thresholdPermits; + long micros = 0; + // measuring the integral on the right part of the function (the climbing line) + if (availablePermitsAboveThreshold > 0.0) { + double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake); + micros = (long) (permitsAboveThresholdToTake + * (permitsToTime(availablePermitsAboveThreshold) + + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake)) / 2.0); + permitsToTake -= permitsAboveThresholdToTake; + } + // measuring the integral on the left part of the function (the horizontal line) + micros += (stableIntervalMicros * permitsToTake); + return micros; + } + + private double permitsToTime(double permits) { + return stableIntervalMicros + permits * slope; + } + + @Override + double coolDownIntervalMicros() { + return warmupPeriodMicros / maxPermits; + } + } + + /** + * This implements a "bursty" RateLimiter, where storedPermits are translated to + * zero throttling. The maximum number of permits that can be saved (when the RateLimiter is + * unused) is defined in terms of time, in this sense: if a RateLimiter is 2qps, and this + * time is specified as 10 seconds, we can save up to 2 * 10 = 20 permits. + */ + // CHANGELOG: modifier changed from package private to public + public static final class SmoothBursty extends SmoothRateLimiter { + /** The work (permits) of how many seconds can be saved up if this RateLimiter is unused? */ + final double maxBurstSeconds; + + // CHANGELOG: modifier changed from package private to public + public SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) { + super(stopwatch); + this.maxBurstSeconds = maxBurstSeconds; + } + + @Override + void doSetRate(double permitsPerSecond, double stableIntervalMicros) { + double oldMaxPermits = this.maxPermits; + maxPermits = maxBurstSeconds * permitsPerSecond; + if (oldMaxPermits == Double.POSITIVE_INFINITY) { + // if we don't special-case this, we would get storedPermits == NaN, below + storedPermits = maxPermits; + } else { + storedPermits = (oldMaxPermits == 0.0) + ? 0.0 // initial state + : storedPermits * maxPermits / oldMaxPermits; + } + } + + @Override + long storedPermitsToWaitTime(double storedPermits, double permitsToTake) { + return 0L; + } + + @Override + double coolDownIntervalMicros() { + return stableIntervalMicros; + } + } + + /** + * The currently stored permits. + */ + double storedPermits; + + /** + * The maximum number of stored permits. + */ + double maxPermits; + + /** + * The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits + * per second has a stable interval of 200ms. + */ + double stableIntervalMicros; + + /** + * The time when the next request (no matter its size) will be granted. After granting a + * request, this is pushed further in the future. Large requests push this further than small + * requests. + */ + private long nextFreeTicketMicros = 0L; // could be either in the past or future + + private SmoothRateLimiter(SleepingStopwatch stopwatch) { + super(stopwatch); + } + + @Override + final void doSetRate(double permitsPerSecond, long nowMicros) { + resync(nowMicros); + double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond; + this.stableIntervalMicros = stableIntervalMicros; + doSetRate(permitsPerSecond, stableIntervalMicros); + } + + abstract void doSetRate(double permitsPerSecond, double stableIntervalMicros); + + @Override + final double doGetRate() { + return SECONDS.toMicros(1L) / stableIntervalMicros; + } + + @Override + final long queryEarliestAvailable(long nowMicros) { + return nextFreeTicketMicros; + } + + @Override + final long reserveEarliestAvailable(int requiredPermits, long nowMicros) { + resync(nowMicros); + long returnValue = nextFreeTicketMicros; + double storedPermitsToSpend = min(requiredPermits, this.storedPermits); + double freshPermits = requiredPermits - storedPermitsToSpend; + long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) + + (long) (freshPermits * stableIntervalMicros); + + try { + this.nextFreeTicketMicros = LongMath.checkedAdd(nextFreeTicketMicros, waitMicros); + } catch (ArithmeticException e) { + this.nextFreeTicketMicros = Long.MAX_VALUE; + } + this.storedPermits -= storedPermitsToSpend; + return returnValue; + } + + /** + * Translates a specified portion of our currently stored permits which we want to + * spend/acquire, into a throttling time. Conceptually, this evaluates the integral + * of the underlying function we use, for the range of + * [(storedPermits - permitsToTake), storedPermits]. + * + *

This always holds: {@code 0 <= permitsToTake <= storedPermits} + */ + abstract long storedPermitsToWaitTime(double storedPermits, double permitsToTake); + + /** + * Returns the number of microseconds during cool down that we have to wait to get a new permit. + */ + abstract double coolDownIntervalMicros(); + + /** + * Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time. + */ + void resync(long nowMicros) { + // if nextFreeTicket is in the past, resync to now + if (nowMicros > nextFreeTicketMicros) { + storedPermits = min(maxPermits, + storedPermits + + (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros()); + nextFreeTicketMicros = nowMicros; + } + } +} diff --git a/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiterTest.java b/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiterTest.java index c68278e43d3..ec0f58383c2 100644 --- a/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiterTest.java +++ b/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiterTest.java @@ -27,6 +27,8 @@ import org.junit.Before; import org.junit.Test; +import com.palantir.atlasdb.qos.ratelimit.guava.RateLimiter; + public class QosRateLimiterTest { private static final long START_TIME_MICROS = 0L;