From bbfc98faefef532b013abc46e9330db09d1e2ac6 Mon Sep 17 00:00:00 2001 From: bstorozhuk Date: Wed, 30 Nov 2016 20:55:11 +0200 Subject: [PATCH] Issue #12 TimeBasedRateLimiter and AtomicRateLimiter implementations + benchmarks --- build.gradle | 12 +- gradlew | 10 +- .../CircuitBreakerBenchmark.java | 80 +++--- ...016-i7-I7-5557U-OSX-Java-1.8.0_112-x64.txt | 41 ++++ .../circuitbreaker/RateLimiterBenchmark.java | 89 +++++++ .../javaslang/ratelimiter/RateLimiter.java | 97 ++++---- .../ratelimiter/RateLimiterConfig.java | 2 +- .../internal/AtomicRateLimiter.java | 232 ++++++++++++++++++ .../internal/InMemoryRateLimiterRegistry.java | 4 +- ...pl.java => SemaphoreBasedRateLimiter.java} | 8 +- .../internal/TimeBasedRateLimiter.java | 178 ++++++++++++++ .../InMemoryRateLimiterRegistryTest.java | 1 + .../SemaphoreBasedRateLimiterImplTest.java | 26 +- .../internal/TimeBasedRateLimiterTest.java | 53 ++++ 14 files changed, 710 insertions(+), 123 deletions(-) create mode 100644 src/jmh/java/javaslang/circuitbreaker/30-Nov-2016-i7-I7-5557U-OSX-Java-1.8.0_112-x64.txt create mode 100644 src/jmh/java/javaslang/circuitbreaker/RateLimiterBenchmark.java create mode 100644 src/main/java/javaslang/ratelimiter/internal/AtomicRateLimiter.java rename src/main/java/javaslang/ratelimiter/internal/{SemaphoreBasedRateLimiterImpl.java => SemaphoreBasedRateLimiter.java} (93%) create mode 100644 src/main/java/javaslang/ratelimiter/internal/TimeBasedRateLimiter.java create mode 100644 src/test/java/javaslang/ratelimiter/internal/TimeBasedRateLimiterTest.java diff --git a/build.gradle b/build.gradle index 43a0235f64..f9544a0373 100644 --- a/build.gradle +++ b/build.gradle @@ -2,11 +2,14 @@ buildscript { repositories { jcenter() mavenCentral() + maven { + url "https://plugins.gradle.org/m2/" + } } dependencies { classpath 'org.kt3k.gradle.plugin:coveralls-gradle-plugin:2.0.1' classpath 'com.jfrog.bintray.gradle:gradle-bintray-plugin:1.2' - classpath 'me.champeau.gradle:jmh-gradle-plugin:0.2.0' + classpath 'me.champeau.gradle:jmh-gradle-plugin:0.3.1' classpath 'com.github.jengelman.gradle.plugins:shadow:1.2.0' classpath "org.asciidoctor:asciidoctor-gradle-plugin:1.5.3" classpath "org.ajoberstar:gradle-git:1.3.2" @@ -50,12 +53,7 @@ repositories { } jmh { - benchmarkMode = 'all' - jmhVersion = '1.11.2' - fork = 1 - threads = 10 - iterations = 2 - warmupIterations = 2 + jmhVersion = '1.17' include='' } diff --git a/gradlew b/gradlew index 91a7e269e1..9d82f78915 100755 --- a/gradlew +++ b/gradlew @@ -42,11 +42,6 @@ case "`uname`" in ;; esac -# For Cygwin, ensure paths are in UNIX format before anything is touched. -if $cygwin ; then - [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"` -fi - # Attempt to set APP_HOME # Resolve links: $0 may be a link PRG="$0" @@ -61,9 +56,9 @@ while [ -h "$PRG" ] ; do fi done SAVED="`pwd`" -cd "`dirname \"$PRG\"`/" >&- +cd "`dirname \"$PRG\"`/" >/dev/null APP_HOME="`pwd -P`" -cd "$SAVED" >&- +cd "$SAVED" >/dev/null CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar @@ -114,6 +109,7 @@ fi if $cygwin ; then APP_HOME=`cygpath --path --mixed "$APP_HOME"` CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` + JAVACMD=`cygpath --unix "$JAVACMD"` # We build the pattern for arguments to be converted via cygpath ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` diff --git a/src/jmh/java/io/github/robwin/circuitbreaker/CircuitBreakerBenchmark.java b/src/jmh/java/io/github/robwin/circuitbreaker/CircuitBreakerBenchmark.java index d05d7c43ac..cb8c476ba9 100644 --- a/src/jmh/java/io/github/robwin/circuitbreaker/CircuitBreakerBenchmark.java +++ b/src/jmh/java/io/github/robwin/circuitbreaker/CircuitBreakerBenchmark.java @@ -18,46 +18,40 @@ */ package io.github.robwin.circuitbreaker; -import org.openjdk.jmh.annotations.*; - -import java.time.Duration; -import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; - -@State(Scope.Benchmark) -@OutputTimeUnit(TimeUnit.MILLISECONDS) -@BenchmarkMode(Mode.Throughput) -public class CircuitBreakerBenchmark { - - private CircuitBreaker circuitBreaker; - private Supplier supplier; - private static final int ITERATION_COUNT = 10; - private static final int WARMUP_COUNT = 10; - private static final int THREAD_COUNT = 10; - - @Setup - public void setUp() { - CircuitBreakerRegistry circuitBreakerRegistry = CircuitBreakerRegistry.of(CircuitBreakerConfig.custom() - .failureRateThreshold(1) - .waitDurationInOpenState(Duration.ofSeconds(1)) - .build()); - circuitBreaker = circuitBreakerRegistry.circuitBreaker("testCircuitBreaker"); - - supplier = CircuitBreaker.decorateSupplier(() -> { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - e.printStackTrace(); - } - return "Hello Benchmark"; - }, circuitBreaker); - } - - @Benchmark - @Threads(value = THREAD_COUNT) - @Warmup(iterations = WARMUP_COUNT) - @Measurement(iterations = ITERATION_COUNT) - public String invokeSupplier(){ - return supplier.get(); - } -} +//@State(Scope.Benchmark) +//@OutputTimeUnit(TimeUnit.MILLISECONDS) +//@BenchmarkMode(Mode.Throughput) +//public class CircuitBreakerBenchmark { +// +// private CircuitBreaker circuitBreaker; +// private Supplier supplier; +// private static final int ITERATION_COUNT = 10; +// private static final int WARMUP_COUNT = 10; +// private static final int THREAD_COUNT = 10; +// +// @Setup +// public void setUp() { +// CircuitBreakerRegistry circuitBreakerRegistry = CircuitBreakerRegistry.of(CircuitBreakerConfig.custom() +// .failureRateThreshold(1) +// .waitDurationInOpenState(Duration.ofSeconds(1)) +// .build()); +// circuitBreaker = circuitBreakerRegistry.circuitBreaker("testCircuitBreaker"); +// +// supplier = CircuitBreaker.decorateSupplier(() -> { +// try { +// Thread.sleep(100); +// } catch (InterruptedException e) { +// e.printStackTrace(); +// } +// return "Hello Benchmark"; +// }, circuitBreaker); +// } +// +// @Benchmark +// @Threads(value = THREAD_COUNT) +// @Warmup(iterations = WARMUP_COUNT) +// @Measurement(iterations = ITERATION_COUNT) +// public String invokeSupplier(){ +// return supplier.get(); +// } +//} diff --git a/src/jmh/java/javaslang/circuitbreaker/30-Nov-2016-i7-I7-5557U-OSX-Java-1.8.0_112-x64.txt b/src/jmh/java/javaslang/circuitbreaker/30-Nov-2016-i7-I7-5557U-OSX-Java-1.8.0_112-x64.txt new file mode 100644 index 0000000000..c9c091354e --- /dev/null +++ b/src/jmh/java/javaslang/circuitbreaker/30-Nov-2016-i7-I7-5557U-OSX-Java-1.8.0_112-x64.txt @@ -0,0 +1,41 @@ +Benchmark Mode Cnt Score Error Units + +RateLimiterBenchmark.atomicPermission thrpt 10 7.274 ± 0.132 ops/us +RateLimiterBenchmark.semaphoreBasedPermission thrpt 10 17.335 ± 3.441 ops/us +RateLimiterBenchmark.timeBasedPermission thrpt 10 3.522 ± 0.495 ops/us + +RateLimiterBenchmark.atomicPermission avgt 10 0.294 ± 0.038 us/op +RateLimiterBenchmark.semaphoreBasedPermission avgt 10 0.120 ± 0.018 us/op +RateLimiterBenchmark.timeBasedPermission avgt 10 0.562 ± 0.045 us/op + +RateLimiterBenchmark.atomicPermission sample 535765 1.480 ± 0.036 us/op +RateLimiterBenchmark.atomicPermission:atomicPermission·p0.00 sample 0.040 us/op +RateLimiterBenchmark.atomicPermission:atomicPermission·p0.50 sample 0.383 us/op +RateLimiterBenchmark.atomicPermission:atomicPermission·p0.90 sample 4.288 us/op +RateLimiterBenchmark.atomicPermission:atomicPermission·p0.95 sample 7.368 us/op +RateLimiterBenchmark.atomicPermission:atomicPermission·p0.99 sample 14.080 us/op +RateLimiterBenchmark.atomicPermission:atomicPermission·p0.999 sample 18.048 us/op +RateLimiterBenchmark.atomicPermission:atomicPermission·p0.9999 sample 58.449 us/op +RateLimiterBenchmark.atomicPermission:atomicPermission·p1.00 sample 1654.784 us/op +RateLimiterBenchmark.semaphoreBasedPermission sample 635614 0.166 ± 0.010 us/op +RateLimiterBenchmark.semaphoreBasedPermission:semaphoreBasedPermission·p0.00 sample 0.001 us/op +RateLimiterBenchmark.semaphoreBasedPermission:semaphoreBasedPermission·p0.50 sample 0.135 us/op +RateLimiterBenchmark.semaphoreBasedPermission:semaphoreBasedPermission·p0.90 sample 0.219 us/op +RateLimiterBenchmark.semaphoreBasedPermission:semaphoreBasedPermission·p0.95 sample 0.236 us/op +RateLimiterBenchmark.semaphoreBasedPermission:semaphoreBasedPermission·p0.99 sample 0.333 us/op +RateLimiterBenchmark.semaphoreBasedPermission:semaphoreBasedPermission·p0.999 sample 2.468 us/op +RateLimiterBenchmark.semaphoreBasedPermission:semaphoreBasedPermission·p0.9999 sample 15.519 us/op +RateLimiterBenchmark.semaphoreBasedPermission:semaphoreBasedPermission·p1.00 sample 1372.160 us/op +RateLimiterBenchmark.timeBasedPermission sample 553560 0.800 ± 0.053 us/op +RateLimiterBenchmark.timeBasedPermission:timeBasedPermission·p0.00 sample 0.054 us/op +RateLimiterBenchmark.timeBasedPermission:timeBasedPermission·p0.50 sample 0.550 us/op +RateLimiterBenchmark.timeBasedPermission:timeBasedPermission·p0.90 sample 0.749 us/op +RateLimiterBenchmark.timeBasedPermission:timeBasedPermission·p0.95 sample 0.826 us/op +RateLimiterBenchmark.timeBasedPermission:timeBasedPermission·p0.99 sample 8.256 us/op +RateLimiterBenchmark.timeBasedPermission:timeBasedPermission·p0.999 sample 33.920 us/op +RateLimiterBenchmark.timeBasedPermission:timeBasedPermission·p0.9999 sample 160.221 us/op +RateLimiterBenchmark.timeBasedPermission:timeBasedPermission·p1.00 sample 5742.592 us/op + +RateLimiterBenchmark.atomicPermission ss 10 17.140 ± 5.640 us/op +RateLimiterBenchmark.semaphoreBasedPermission ss 10 9.724 ± 4.602 us/op +RateLimiterBenchmark.timeBasedPermission ss 10 26.875 ± 10.869 us/op diff --git a/src/jmh/java/javaslang/circuitbreaker/RateLimiterBenchmark.java b/src/jmh/java/javaslang/circuitbreaker/RateLimiterBenchmark.java new file mode 100644 index 0000000000..80cdcc75a8 --- /dev/null +++ b/src/jmh/java/javaslang/circuitbreaker/RateLimiterBenchmark.java @@ -0,0 +1,89 @@ +package javaslang.circuitbreaker; + +import javaslang.ratelimiter.RateLimiter; +import javaslang.ratelimiter.RateLimiterConfig; +import javaslang.ratelimiter.internal.AtomicRateLimiter; +import javaslang.ratelimiter.internal.SemaphoreBasedRateLimiter; +import javaslang.ratelimiter.internal.TimeBasedRateLimiter; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +@State(Scope.Benchmark) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@BenchmarkMode(Mode.All) +public class RateLimiterBenchmark { + + public static final int FORK_COUNT = 2; + private static final int WARMUP_COUNT = 10; + private static final int ITERATION_COUNT = 5; + private static final int THREAD_COUNT = 2; + + private RateLimiter semaphoreBasedRateLimiter; + private RateLimiter timeBasedRateLimiter; + private RateLimiter atomicRateLimiter; + + private Supplier semaphoreGuardedSupplier; + private Supplier timeGuardedSupplier; + private Supplier atomicGuardedSupplier; + + @Setup + public void setUp() { + RateLimiterConfig rateLimiterConfig = RateLimiterConfig.builder() + .limitForPeriod(Integer.MAX_VALUE) + .limitRefreshPeriod(Duration.ofNanos(10)) + .timeoutDuration(Duration.ofSeconds(5)) + .build(); + semaphoreBasedRateLimiter = new SemaphoreBasedRateLimiter("semaphoreBased", rateLimiterConfig); + timeBasedRateLimiter = new TimeBasedRateLimiter("timeBased", rateLimiterConfig); + atomicRateLimiter = new AtomicRateLimiter("atomicBased", rateLimiterConfig); + + Supplier stringSupplier = () -> { + Blackhole.consumeCPU(1); + return "Hello Benchmark"; + }; + semaphoreGuardedSupplier = RateLimiter.decorateSupplier(semaphoreBasedRateLimiter, stringSupplier); + timeGuardedSupplier = RateLimiter.decorateSupplier(timeBasedRateLimiter, stringSupplier); + atomicGuardedSupplier = RateLimiter.decorateSupplier(atomicRateLimiter, stringSupplier); + } + + @Benchmark + @Threads(value = THREAD_COUNT) + @Warmup(iterations = WARMUP_COUNT) + @Fork(value = FORK_COUNT) + @Measurement(iterations = ITERATION_COUNT) + public String semaphoreBasedPermission() { + return semaphoreGuardedSupplier.get(); + } + + @Benchmark + @Threads(value = THREAD_COUNT) + @Warmup(iterations = WARMUP_COUNT) + @Fork(value = FORK_COUNT) + @Measurement(iterations = ITERATION_COUNT) + public String timeBasedPermission() { + return timeGuardedSupplier.get(); + } + + @Benchmark + @Threads(value = THREAD_COUNT) + @Warmup(iterations = WARMUP_COUNT) + @Fork(value = FORK_COUNT) + @Measurement(iterations = ITERATION_COUNT) + public String atomicPermission() { + return atomicGuardedSupplier.get(); + } +} \ No newline at end of file diff --git a/src/main/java/javaslang/ratelimiter/RateLimiter.java b/src/main/java/javaslang/ratelimiter/RateLimiter.java index 1a5ff9c2b2..a70eaaaea1 100644 --- a/src/main/java/javaslang/ratelimiter/RateLimiter.java +++ b/src/main/java/javaslang/ratelimiter/RateLimiter.java @@ -13,50 +13,6 @@ */ public interface RateLimiter { - /** - * Acquires a permission from this rate limiter, blocking until one is - * available. - *

- *

If the current thread is {@linkplain Thread#interrupt interrupted} - * while waiting for a permit then it won't throw {@linkplain InterruptedException}, - * but its interrupt status will be set. - * - * @return {@code true} if a permit was acquired and {@code false} - * if waiting time elapsed before a permit was acquired - */ - boolean getPermission(Duration timeoutDuration); - - /** - * Get the name of this RateLimiter - * - * @return the name of this RateLimiter - */ - String getName(); - - /** - * Get the RateLimiterConfig of this RateLimiter. - * - * @return the RateLimiterConfig of this RateLimiter - */ - RateLimiterConfig getRateLimiterConfig(); - - /** - * Get the Metrics of this RateLimiter. - * - * @return the Metrics of this RateLimiter - */ - Metrics getMetrics(); - - interface Metrics { - /** - * Returns an estimate of the number of threads waiting for permission - * in this JVM process. - * - * @return estimate of the number of threads waiting for permission. - */ - int getNumberOfWaitingThreads(); - } - /** * Creates a supplier which is restricted by a RateLimiter. * @@ -167,18 +123,67 @@ static Function decorateFunction(RateLimiter rateLimiter, Function< return decoratedFunction; } + /** * Will wait for permission within default timeout duration. - * Throws {@link RequestNotPermitted} if waiting time elapsed before a permit was acquired. * * @param rateLimiter the RateLimiter to get permission from + * @throws RequestNotPermitted if waiting time elapsed before a permit was acquired. + * @throws IllegalStateException if thread was interrupted during permission wait */ - static void waitForPermission(final RateLimiter rateLimiter) { + static void waitForPermission(final RateLimiter rateLimiter) throws IllegalStateException, RequestNotPermitted { RateLimiterConfig rateLimiterConfig = rateLimiter.getRateLimiterConfig(); Duration timeoutDuration = rateLimiterConfig.getTimeoutDuration(); boolean permission = rateLimiter.getPermission(timeoutDuration); + if (Thread.interrupted()) { + throw new IllegalStateException("Thread was interrupted during permission wait"); + } if (!permission) { throw new RequestNotPermitted("Request not permitted for limiter: " + rateLimiter.getName()); } } + + /** + * Acquires a permission from this rate limiter, blocking until one is + * available. + *

+ *

If the current thread is {@linkplain Thread#interrupt interrupted} + * while waiting for a permit then it won't throw {@linkplain InterruptedException}, + * but its interrupt status will be set. + * + * @return {@code true} if a permit was acquired and {@code false} + * if waiting time elapsed before a permit was acquired + */ + boolean getPermission(Duration timeoutDuration); + + /** + * Get the name of this RateLimiter + * + * @return the name of this RateLimiter + */ + String getName(); + + /** + * Get the RateLimiterConfig of this RateLimiter. + * + * @return the RateLimiterConfig of this RateLimiter + */ + RateLimiterConfig getRateLimiterConfig(); + + /** + * Get the Metrics of this RateLimiter. + * + * @return the Metrics of this RateLimiter + */ + Metrics getMetrics(); + + interface Metrics { + /** + * Returns an estimate of the number of threads waiting for permission + * in this JVM process. + * + * @return estimate of the number of threads waiting for permission. + */ + int getNumberOfWaitingThreads(); + } } diff --git a/src/main/java/javaslang/ratelimiter/RateLimiterConfig.java b/src/main/java/javaslang/ratelimiter/RateLimiterConfig.java index 17e2ccd3f3..21fa3d93b8 100644 --- a/src/main/java/javaslang/ratelimiter/RateLimiterConfig.java +++ b/src/main/java/javaslang/ratelimiter/RateLimiterConfig.java @@ -8,7 +8,7 @@ public class RateLimiterConfig { private static final String TIMEOUT_DURATION_MUST_NOT_BE_NULL = "TimeoutDuration must not be null"; private static final String LIMIT_REFRESH_PERIOD_MUST_NOT_BE_NULL = "LimitRefreshPeriod must not be null"; - private static final Duration ACCEPTABLE_REFRESH_PERIOD = Duration.ofNanos(500L); // TODO: use jmh to find real one + private static final Duration ACCEPTABLE_REFRESH_PERIOD = Duration.ofNanos(1L); // TODO: use jmh to find real one private final Duration timeoutDuration; private final Duration limitRefreshPeriod; diff --git a/src/main/java/javaslang/ratelimiter/internal/AtomicRateLimiter.java b/src/main/java/javaslang/ratelimiter/internal/AtomicRateLimiter.java new file mode 100644 index 0000000000..d0cb61ec56 --- /dev/null +++ b/src/main/java/javaslang/ratelimiter/internal/AtomicRateLimiter.java @@ -0,0 +1,232 @@ +package javaslang.ratelimiter.internal; + +import static java.lang.Long.min; +import static java.lang.System.nanoTime; +import static java.lang.Thread.currentThread; +import static java.util.concurrent.locks.LockSupport.parkNanos; + +import javaslang.ratelimiter.RateLimiter; +import javaslang.ratelimiter.RateLimiterConfig; + +import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +/** + * {@link AtomicRateLimiter} splits all nanoseconds from the start of epoch into cycles. + *

Each cycle has duration of {@link RateLimiterConfig#limitRefreshPeriod} in nanoseconds. + *

+ *

By contract on start of each cycle {@link AtomicRateLimiter} should + * set {@link State#activePermissions} to {@link RateLimiterConfig#limitForPeriod}. + * For the {@link AtomicRateLimiter} callers it is really looks so, but under the hood there is + * some optimisations that will skip this refresh if {@link AtomicRateLimiter} is not used actively. + *

+ *

All {@link AtomicRateLimiter} updates are atomic and state is encapsulated in {@link AtomicReference} to + * {@link AtomicRateLimiter.State} + */ +public class AtomicRateLimiter implements RateLimiter { + + private final String name; + private final RateLimiterConfig rateLimiterConfig; + private final long cyclePeriodInNanos; + private final int permissionsPerCycle; + private final AtomicInteger waitingThreads; + private final AtomicReference state; + + + public AtomicRateLimiter(String name, RateLimiterConfig rateLimiterConfig) { + this.name = name; + this.rateLimiterConfig = rateLimiterConfig; + + cyclePeriodInNanos = rateLimiterConfig.getLimitRefreshPeriod().toNanos(); + permissionsPerCycle = rateLimiterConfig.getLimitForPeriod(); + + waitingThreads = new AtomicInteger(0); + long activeCycle = nanoTime() / cyclePeriodInNanos; + int activePermissions = permissionsPerCycle; + state = new AtomicReference<>(new State(activeCycle, activePermissions, 0)); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean getPermission(final Duration timeoutDuration) { + long timeoutInNanos = timeoutDuration.toNanos(); + State modifiedState = state.updateAndGet( + activeState -> calculateNextState(timeoutInNanos, activeState) + ); + return waitForPermissionIfNecessary(timeoutInNanos, modifiedState.nanosToWait); + } + + /** + * A side-effect-free function that can calculate next {@link State} from current. + * It determines time duration that you should wait for permission and reserves it for you, + * if you'll be able to wait long enough. + * + * @param timeoutInNanos max time that caller can wait for permission in nanoseconds + * @param activeState current state of {@link AtomicRateLimiter} + * @return next {@link State} + */ + private State calculateNextState(final long timeoutInNanos, final State activeState) { + long currentNanos = nanoTime(); + long currentCycle = currentNanos / cyclePeriodInNanos; + + long nextCycle = activeState.activeCycle; + int nextPermissions = activeState.activePermissions; + if (nextCycle != currentCycle) { + long elapsedCycles = currentCycle - nextCycle; + long accumulatedPermissions = elapsedCycles * permissionsPerCycle; + nextCycle = currentCycle; + nextPermissions = (int) min(nextPermissions + accumulatedPermissions, permissionsPerCycle); + } + long nextNanosToWait = nanosToWaitForPermission(nextPermissions, currentNanos, currentCycle); + State nextState = reservePermissions(timeoutInNanos, nextCycle, nextPermissions, nextNanosToWait); + return nextState; + } + + /** + * Calculates time to wait for next permission as + * [time to the next cycle] + [duration of full cycles until reserved permissions expire] + * + * @param availablePermissions currently available permissions, can be negative if some permissions have been reserved + * @param currentNanos current time in nanoseconds + * @param currentCycle current {@link AtomicRateLimiter} cycle + * @return nanoseconds to wait for the next permission + */ + private long nanosToWaitForPermission(final int availablePermissions, final long currentNanos, final long currentCycle) { + if (availablePermissions > 0) { + return 0L; + } else { + long nextCycleTimeInNanos = (currentCycle + 1) * cyclePeriodInNanos; + long nanosToNextCycle = nextCycleTimeInNanos - currentNanos; + int fullCyclesToWait = (-availablePermissions) / permissionsPerCycle; + return (fullCyclesToWait * cyclePeriodInNanos) + nanosToNextCycle; + } + } + + /** + * Determines whether caller can acquire permission before timeout or not and then creates corresponding {@link State}. + * Reserves permissions only if caller can successfully wait for permission. + * + * @param timeoutInNanos max time that caller can wait for permission in nanoseconds + * @param cycle cycle for new {@link State} + * @param permissions permissions for new {@link State} + * @param nanosToWait nanoseconds to wait for the next permission + * @return new {@link State} with possibly reserved permissions and time to wait + */ + private State reservePermissions(final long timeoutInNanos, final long cycle, final int permissions, final long nanosToWait) { + boolean canAcquireInTime = timeoutInNanos >= nanosToWait; + int permissionsWithReservation = permissions; + if (canAcquireInTime) { + permissionsWithReservation--; + } + return new State(cycle, permissionsWithReservation, nanosToWait); + } + + /** + * If nanosToWait is bigger than 0 it tries to park {@link Thread} for nanosToWait but not longer then timeoutInNanos. + * + * @param timeoutInNanos max time that caller can wait + * @param nanosToWait nanoseconds caller need to wait + * @return true if caller was able to wait for nanosToWait without {@link Thread#interrupt} and not exceed timeout + */ + private boolean waitForPermissionIfNecessary(final long timeoutInNanos, final long nanosToWait) { + boolean canAcquireImmediately = nanosToWait <= 0; + boolean canAcquireInTime = timeoutInNanos >= nanosToWait; + + if (canAcquireImmediately) { + return true; + } + if (canAcquireInTime) { + return waitForPermission(nanosToWait); + } + waitForPermission(timeoutInNanos); + return false; + } + + /** + * Parks {@link Thread} for nanosToWait. + * + * @param nanosToWait nanoseconds caller need to wait + * @return true if caller was not {@link Thread#interrupted} while waiting + */ + private boolean waitForPermission(final long nanosToWait) { + waitingThreads.incrementAndGet(); + long deadline = nanoTime() + nanosToWait; + while (nanoTime() < deadline || currentThread().isInterrupted()) { + long sleepBlockDuration = deadline - nanoTime(); + parkNanos(sleepBlockDuration); + } + waitingThreads.decrementAndGet(); + return !currentThread().isInterrupted(); + } + + /** + * {@inheritDoc} + */ + @Override + public String getName() { + return name; + } + + /** + * {@inheritDoc} + */ + @Override + public RateLimiterConfig getRateLimiterConfig() { + return rateLimiterConfig; + } + + /** + * {@inheritDoc} + */ + @Override + public Metrics getMetrics() { + return new AtomicRateLimiterMetrics(); + } + + /** + *

{@link AtomicRateLimiter.State} represents immutable state of {@link AtomicRateLimiter} where: + *

    + *
  • activeCycle - {@link AtomicRateLimiter} cycle number that was used + * by the last {@link AtomicRateLimiter#getPermission(Duration)} call.
  • + *

    + *

  • activePermissions - count of available permissions after + * last the last {@link AtomicRateLimiter#getPermission(Duration)} call. + * Can be negative if some permissions where reserved.
  • + *

    + *

  • nanosToWait - count of nanoseconds to wait for permission for + * the last {@link AtomicRateLimiter#getPermission(Duration)} call.
  • + *
+ */ + private static class State { + private final long activeCycle; + private final int activePermissions; + private final long nanosToWait; + + public State(final long activeCycle, final int activePermissions, final long nanosToWait) { + this.activeCycle = activeCycle; + this.activePermissions = activePermissions; + this.nanosToWait = nanosToWait; + } + } + + /** + * Enhanced {@link Metrics} with some implementation specific details + */ + public final class AtomicRateLimiterMetrics implements Metrics { + private AtomicRateLimiterMetrics() { + } + + /** + * {@inheritDoc} + * + * @return + */ + @Override + public int getNumberOfWaitingThreads() { + return waitingThreads.get(); + } + } +} diff --git a/src/main/java/javaslang/ratelimiter/internal/InMemoryRateLimiterRegistry.java b/src/main/java/javaslang/ratelimiter/internal/InMemoryRateLimiterRegistry.java index 2a74e591af..1dd10a2760 100644 --- a/src/main/java/javaslang/ratelimiter/internal/InMemoryRateLimiterRegistry.java +++ b/src/main/java/javaslang/ratelimiter/internal/InMemoryRateLimiterRegistry.java @@ -48,7 +48,7 @@ public RateLimiter rateLimiter(final String name, final RateLimiterConfig rateLi requireNonNull(rateLimiterConfig, CONFIG_MUST_NOT_BE_NULL); return rateLimiters.computeIfAbsent( name, - limitName -> new SemaphoreBasedRateLimiterImpl(name, rateLimiterConfig) + limitName -> new SemaphoreBasedRateLimiter(name, rateLimiterConfig) ); } @@ -64,7 +64,7 @@ public RateLimiter rateLimiter(final String name, final Supplier { RateLimiterConfig rateLimiterConfig = rateLimiterConfigSupplier.get(); requireNonNull(rateLimiterConfig, CONFIG_MUST_NOT_BE_NULL); - return new SemaphoreBasedRateLimiterImpl(limitName, rateLimiterConfig); + return new SemaphoreBasedRateLimiter(limitName, rateLimiterConfig); } ); } diff --git a/src/main/java/javaslang/ratelimiter/internal/SemaphoreBasedRateLimiterImpl.java b/src/main/java/javaslang/ratelimiter/internal/SemaphoreBasedRateLimiter.java similarity index 93% rename from src/main/java/javaslang/ratelimiter/internal/SemaphoreBasedRateLimiterImpl.java rename to src/main/java/javaslang/ratelimiter/internal/SemaphoreBasedRateLimiter.java index 832c9af529..63f5628979 100644 --- a/src/main/java/javaslang/ratelimiter/internal/SemaphoreBasedRateLimiterImpl.java +++ b/src/main/java/javaslang/ratelimiter/internal/SemaphoreBasedRateLimiter.java @@ -18,7 +18,7 @@ * and scheduler that will refresh permissions * after each {@link RateLimiterConfig#limitRefreshPeriod}. */ -public class SemaphoreBasedRateLimiterImpl implements RateLimiter { +public class SemaphoreBasedRateLimiter implements RateLimiter { private static final String NAME_MUST_NOT_BE_NULL = "Name must not be null"; private static final String CONFIG_MUST_NOT_BE_NULL = "RateLimiterConfig must not be null"; @@ -35,7 +35,7 @@ public class SemaphoreBasedRateLimiterImpl implements RateLimiter { * @param name the name of the RateLimiter * @param rateLimiterConfig The RateLimiter configuration. */ - public SemaphoreBasedRateLimiterImpl(final String name, final RateLimiterConfig rateLimiterConfig) { + public SemaphoreBasedRateLimiter(final String name, final RateLimiterConfig rateLimiterConfig) { this(name, rateLimiterConfig, null); } @@ -46,8 +46,8 @@ public SemaphoreBasedRateLimiterImpl(final String name, final RateLimiterConfig * @param rateLimiterConfig The RateLimiter configuration. * @param scheduler executor that will refresh permissions */ - public SemaphoreBasedRateLimiterImpl(String name, RateLimiterConfig rateLimiterConfig, - ScheduledExecutorService scheduler) { + public SemaphoreBasedRateLimiter(String name, RateLimiterConfig rateLimiterConfig, + ScheduledExecutorService scheduler) { this.name = requireNonNull(name, NAME_MUST_NOT_BE_NULL); this.rateLimiterConfig = requireNonNull(rateLimiterConfig, CONFIG_MUST_NOT_BE_NULL); diff --git a/src/main/java/javaslang/ratelimiter/internal/TimeBasedRateLimiter.java b/src/main/java/javaslang/ratelimiter/internal/TimeBasedRateLimiter.java new file mode 100644 index 0000000000..cca265114f --- /dev/null +++ b/src/main/java/javaslang/ratelimiter/internal/TimeBasedRateLimiter.java @@ -0,0 +1,178 @@ +package javaslang.ratelimiter.internal; + +import static java.lang.System.nanoTime; +import static java.lang.Thread.currentThread; +import static java.util.concurrent.locks.LockSupport.parkNanos; + +import javaslang.ratelimiter.RateLimiter; +import javaslang.ratelimiter.RateLimiterConfig; + +import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; + +/** + * @author bstorozhuk + */ +public class TimeBasedRateLimiter implements RateLimiter { + + private final String name; + private final RateLimiterConfig rateLimiterConfig; + private final long cyclePeriodInNanos; + private final int permissionsPerCycle; + private final ReentrantLock lock; + private final AtomicInteger waitingThreads; + private long activeCycle; + private volatile int activePermissions; + + public TimeBasedRateLimiter(String name, RateLimiterConfig rateLimiterConfig) { + this.name = name; + this.rateLimiterConfig = rateLimiterConfig; + + cyclePeriodInNanos = rateLimiterConfig.getLimitRefreshPeriod().toNanos(); + permissionsPerCycle = rateLimiterConfig.getLimitForPeriod(); + + activeCycle = nanoTime() / cyclePeriodInNanos; + waitingThreads = new AtomicInteger(0); + lock = new ReentrantLock(false); + + activePermissions = permissionsPerCycle; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean getPermission(final Duration timeoutDuration) { + Supplier permissionSupplier = () -> { + long currentNanos = nanoTime(); + long currentCycle = currentNanos / cyclePeriodInNanos; +// System.out.println(MessageFormat.format( +// "Thread {0}: START activeCycle={1}; permissions={2}; currentNanos={3}; currentCycle={4};", +// currentThread().getId(), activeCycle, activePermissions, currentNanos, currentCycle) +// ); + if (activeCycle != currentCycle) { + refreshLimiterState(currentCycle); + } + return acquirePermission(currentNanos, timeoutDuration); + }; + return executeConcurrently(permissionSupplier); + } + + private void refreshLimiterState(final long currentCycle) { + assert lock.isHeldByCurrentThread(); + activeCycle = currentCycle; + activePermissions = Integer.min(activePermissions + permissionsPerCycle, permissionsPerCycle); +// System.out.println(MessageFormat.format( +// "Thread {0}: AFTER REFRESH activeCycle={1}; permissions={2}; currentCycle={3};", +// currentThread().getId(), activeCycle, activePermissions, currentCycle) +// ); + } + + private boolean acquirePermission(final long currentNanos, final Duration timeoutDuration) { + assert lock.isHeldByCurrentThread(); + long currentCycle = currentNanos / cyclePeriodInNanos; + long timeoutInNanos = timeoutDuration.toNanos(); + long nanosToWait = nanosToWaitForPermission(currentNanos, currentCycle); + if (timeoutInNanos < nanosToWait) { + waitForPermission(timeoutInNanos); + return false; + } + activePermissions--; + if (nanosToWait <= 0) { +// System.out.println(MessageFormat.format( +// "Thread {0}: ACQUIRE IMMEDIATELY activeCycle={1}; permissions={2}; currentCycle={3}; nanosToWait={4};", +// currentThread().getId(), activeCycle, activePermissions, currentCycle, nanosToWait) +// ); + return true; + } + lock.unlock(); + return waitForPermission(nanosToWait); + } + + private long nanosToWaitForPermission(final long currentNanos, final long currentCycle) { + if (activePermissions > 0) { + return 0L; + } + long nextCycleTimeInNanos = (currentCycle + 1) * cyclePeriodInNanos; + long nanosToNextCycle = nextCycleTimeInNanos - currentNanos; + int fullCyclesToWait = (-activePermissions) / permissionsPerCycle; + return (fullCyclesToWait * cyclePeriodInNanos) + nanosToNextCycle; + } + + private boolean waitForPermission(final long nanosToWait) { +// System.out.println(MessageFormat.format( +// "Thread {0}: WAIT activeCycle={1}; permissions={2}; nanosToWait={3};", +// currentThread().getId(), activeCycle, activePermissions, nanosToWait) +// ); + waitingThreads.incrementAndGet(); + long deadline = nanoTime() + nanosToWait; + while (nanoTime() < deadline || currentThread().isInterrupted()) { + long sleepBlockDuration = deadline - nanoTime(); + parkNanos(sleepBlockDuration); + } + waitingThreads.decrementAndGet(); + return !currentThread().isInterrupted(); + } + + private T executeConcurrently(final Supplier permissionSupplier) { + lock.lock(); + try { + return permissionSupplier.get(); + } finally { + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } + } + + @Override + public String getName() { + return name; + } + + @Override + public RateLimiterConfig getRateLimiterConfig() { + return rateLimiterConfig; + } + + @Override + public Metrics getMetrics() { + return null; + } + + /** + * Enhanced {@link Metrics} with some implementation specific details + */ + public final class TimeBasedRateLimiterMetrics implements Metrics { + private TimeBasedRateLimiterMetrics() { + } + + /** + * {@inheritDoc} + * + * @return + */ + @Override + public int getNumberOfWaitingThreads() { + return waitingThreads.get(); + } + + /** + * Returns the estimated time in nanos to wait for permission. + *

+ *

This method is typically used for debugging and testing purposes. + * + * @return the estimated time in nanos to wait for permission. + */ + public long nanosToWait() { + long currentNanos = nanoTime(); + long currentCycle = currentNanos / cyclePeriodInNanos; + if (currentCycle == activeCycle) { + return 0; + } + return nanosToWaitForPermission(currentNanos, currentCycle); + } + } +} diff --git a/src/test/java/javaslang/ratelimiter/internal/InMemoryRateLimiterRegistryTest.java b/src/test/java/javaslang/ratelimiter/internal/InMemoryRateLimiterRegistryTest.java index ba8ca819a9..ce0aa4621f 100644 --- a/src/test/java/javaslang/ratelimiter/internal/InMemoryRateLimiterRegistryTest.java +++ b/src/test/java/javaslang/ratelimiter/internal/InMemoryRateLimiterRegistryTest.java @@ -18,6 +18,7 @@ import java.util.function.Supplier; +@SuppressWarnings("unchecked") public class InMemoryRateLimiterRegistryTest { private static final int LIMIT = 50; diff --git a/src/test/java/javaslang/ratelimiter/internal/SemaphoreBasedRateLimiterImplTest.java b/src/test/java/javaslang/ratelimiter/internal/SemaphoreBasedRateLimiterImplTest.java index 24da434548..485290dabb 100644 --- a/src/test/java/javaslang/ratelimiter/internal/SemaphoreBasedRateLimiterImplTest.java +++ b/src/test/java/javaslang/ratelimiter/internal/SemaphoreBasedRateLimiterImplTest.java @@ -62,7 +62,7 @@ public void init() { public void rateLimiterCreationWithProvidedScheduler() throws Exception { ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class); RateLimiterConfig configSpy = spy(config); - SemaphoreBasedRateLimiterImpl limit = new SemaphoreBasedRateLimiterImpl("test", configSpy, scheduledExecutorService); + SemaphoreBasedRateLimiter limit = new SemaphoreBasedRateLimiter("test", configSpy, scheduledExecutorService); ArgumentCaptor refreshLimitRunnableCaptor = ArgumentCaptor.forClass(Runnable.class); verify(scheduledExecutorService) @@ -93,7 +93,7 @@ public void rateLimiterCreationWithProvidedScheduler() throws Exception { @Test public void rateLimiterCreationWithDefaultScheduler() throws Exception { - SemaphoreBasedRateLimiterImpl limit = new SemaphoreBasedRateLimiterImpl("test", config); + SemaphoreBasedRateLimiter limit = new SemaphoreBasedRateLimiter("test", config); awaitImpatiently().atMost(FIVE_HUNDRED_MILLISECONDS) .until(() -> limit.getPermission(ZERO), equalTo(false)); awaitImpatiently().atMost(110, TimeUnit.MILLISECONDS) @@ -104,10 +104,10 @@ public void rateLimiterCreationWithDefaultScheduler() throws Exception { public void getPermissionAndMetrics() throws Exception { ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class); RateLimiterConfig configSpy = spy(config); - SemaphoreBasedRateLimiterImpl limit = new SemaphoreBasedRateLimiterImpl("test", configSpy, scheduledExecutorService); - SemaphoreBasedRateLimiterImpl.SemaphoreBasedRateLimiterMetrics detailedMetrics = limit.getDetailedMetrics(); + SemaphoreBasedRateLimiter limit = new SemaphoreBasedRateLimiter("test", configSpy, scheduledExecutorService); + SemaphoreBasedRateLimiter.SemaphoreBasedRateLimiterMetrics detailedMetrics = limit.getDetailedMetrics(); - SynchronousQueue synchronousQueue = new SynchronousQueue(); + SynchronousQueue synchronousQueue = new SynchronousQueue<>(); Thread thread = new Thread(() -> { run(() -> { for (int i = 0; i < LIMIT; i++) { @@ -151,7 +151,7 @@ public void getPermissionAndMetrics() throws Exception { public void getPermissionInterruption() throws Exception { ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class); RateLimiterConfig configSpy = spy(config); - SemaphoreBasedRateLimiterImpl limit = new SemaphoreBasedRateLimiterImpl("test", configSpy, scheduledExecutorService); + SemaphoreBasedRateLimiter limit = new SemaphoreBasedRateLimiter("test", configSpy, scheduledExecutorService); limit.getPermission(ZERO); limit.getPermission(ZERO); @@ -177,14 +177,14 @@ public void getPermissionInterruption() throws Exception { @Test public void getName() throws Exception { ScheduledExecutorService scheduler = mock(ScheduledExecutorService.class); - SemaphoreBasedRateLimiterImpl limit = new SemaphoreBasedRateLimiterImpl("test", config, scheduler); + SemaphoreBasedRateLimiter limit = new SemaphoreBasedRateLimiter("test", config, scheduler); then(limit.getName()).isEqualTo("test"); } @Test public void getMetrics() throws Exception { ScheduledExecutorService scheduler = mock(ScheduledExecutorService.class); - SemaphoreBasedRateLimiterImpl limit = new SemaphoreBasedRateLimiterImpl("test", config, scheduler); + SemaphoreBasedRateLimiter limit = new SemaphoreBasedRateLimiter("test", config, scheduler); RateLimiter.Metrics metrics = limit.getMetrics(); then(metrics.getNumberOfWaitingThreads()).isEqualTo(0); } @@ -192,15 +192,15 @@ public void getMetrics() throws Exception { @Test public void getRateLimiterConfig() throws Exception { ScheduledExecutorService scheduler = mock(ScheduledExecutorService.class); - SemaphoreBasedRateLimiterImpl limit = new SemaphoreBasedRateLimiterImpl("test", config, scheduler); + SemaphoreBasedRateLimiter limit = new SemaphoreBasedRateLimiter("test", config, scheduler); then(limit.getRateLimiterConfig()).isEqualTo(config); } @Test public void getDetailedMetrics() throws Exception { ScheduledExecutorService scheduler = mock(ScheduledExecutorService.class); - SemaphoreBasedRateLimiterImpl limit = new SemaphoreBasedRateLimiterImpl("test", config, scheduler); - SemaphoreBasedRateLimiterImpl.SemaphoreBasedRateLimiterMetrics metrics = limit.getDetailedMetrics(); + SemaphoreBasedRateLimiter limit = new SemaphoreBasedRateLimiter("test", config, scheduler); + SemaphoreBasedRateLimiter.SemaphoreBasedRateLimiterMetrics metrics = limit.getDetailedMetrics(); then(metrics.getNumberOfWaitingThreads()).isEqualTo(0); then(metrics.getAvailablePermits()).isEqualTo(2); } @@ -209,13 +209,13 @@ public void getDetailedMetrics() throws Exception { public void constructionWithNullName() throws Exception { exception.expect(NullPointerException.class); exception.expectMessage(NAME_MUST_NOT_BE_NULL); - new SemaphoreBasedRateLimiterImpl(null, config, null); + new SemaphoreBasedRateLimiter(null, config, null); } @Test public void constructionWithNullConfig() throws Exception { exception.expect(NullPointerException.class); exception.expectMessage(CONFIG_MUST_NOT_BE_NULL); - new SemaphoreBasedRateLimiterImpl("test", null, null); + new SemaphoreBasedRateLimiter("test", null, null); } } \ No newline at end of file diff --git a/src/test/java/javaslang/ratelimiter/internal/TimeBasedRateLimiterTest.java b/src/test/java/javaslang/ratelimiter/internal/TimeBasedRateLimiterTest.java new file mode 100644 index 0000000000..f45a2b5dbb --- /dev/null +++ b/src/test/java/javaslang/ratelimiter/internal/TimeBasedRateLimiterTest.java @@ -0,0 +1,53 @@ +package javaslang.ratelimiter.internal; + +import javaslang.ratelimiter.RateLimiter; +import javaslang.ratelimiter.RateLimiterConfig; +import org.junit.Test; + +import java.time.Duration; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * @author bstorozhuk + */ +public class TimeBasedRateLimiterTest { + + public static final int N_THREADS = 4; + public static final AtomicLong counter = new AtomicLong(0); + public static final AtomicBoolean required = new AtomicBoolean(false); + + @Test + public void test() throws Exception { + RateLimiterConfig config = RateLimiterConfig.builder() + .limitForPeriod(10) + .limitRefreshPeriod(Duration.ofMillis(500)) + .timeoutDuration(Duration.ZERO) + .build(); + RateLimiter limiter = new AtomicRateLimiter("test", config); + + Runnable guarded = () -> { + if (limiter.getPermission(Duration.ofSeconds(10))) { + counter.incrementAndGet(); + } + }; + + ExecutorService pool = Executors.newFixedThreadPool(N_THREADS); + for (int i = 0; i < N_THREADS; i++) { + pool.execute(() -> { + while (true) { + if (required.get()) { + guarded.run(); + } + } + }); + } + required.set(true); + Thread.sleep(2200); + required.set(false); + System.out.println("COUNTER: " + counter); + pool.shutdownNow(); + } +} \ No newline at end of file