diff --git a/src/jmh/java/javaslang/circuitbreaker/RateLimiterBenchmark.java b/src/jmh/java/javaslang/circuitbreaker/RateLimiterBenchmark.java index fbdc3c13b3..265ff125c3 100644 --- a/src/jmh/java/javaslang/circuitbreaker/RateLimiterBenchmark.java +++ b/src/jmh/java/javaslang/circuitbreaker/RateLimiterBenchmark.java @@ -19,11 +19,12 @@ import java.time.Duration; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; import java.util.function.Supplier; @State(Scope.Benchmark) @OutputTimeUnit(TimeUnit.MICROSECONDS) -@BenchmarkMode(Mode.All) +@BenchmarkMode(Mode.Throughput) public class RateLimiterBenchmark { public static final int FORK_COUNT = 2; @@ -32,7 +33,9 @@ public class RateLimiterBenchmark { private static final int THREAD_COUNT = 2; private RateLimiter semaphoreBasedRateLimiter; - private RateLimiter atomicRateLimiter; + private AtomicRateLimiter atomicRateLimiter; + private AtomicRateLimiter.State state; + private static final Object mutex = new Object(); private Supplier semaphoreGuardedSupplier; private Supplier atomicGuardedSupplier; @@ -46,6 +49,7 @@ public void setUp() { .build(); semaphoreBasedRateLimiter = new SemaphoreBasedRateLimiter("semaphoreBased", rateLimiterConfig); atomicRateLimiter = new AtomicRateLimiter("atomicBased", rateLimiterConfig); + state = atomicRateLimiter.state.get(); Supplier stringSupplier = () -> { Blackhole.consumeCPU(1); @@ -55,6 +59,54 @@ public void setUp() { atomicGuardedSupplier = RateLimiter.decorateSupplier(atomicRateLimiter, stringSupplier); } + @Benchmark + @Threads(value = THREAD_COUNT) + @Warmup(iterations = WARMUP_COUNT) + @Fork(value = FORK_COUNT) + @Measurement(iterations = ITERATION_COUNT) + public void mutex(Blackhole bh) { + synchronized (mutex) { + state = atomicRateLimiter.calculateNextState(Duration.ZERO.toNanos(), state); + } + } + + @Benchmark + @Threads(value = THREAD_COUNT) + @Warmup(iterations = WARMUP_COUNT) + @Fork(value = FORK_COUNT) + @Measurement(iterations = ITERATION_COUNT) + public void atomic(Blackhole bh) { + atomicRateLimiter.state.updateAndGet(state -> { + return atomicRateLimiter.calculateNextState(Duration.ZERO.toNanos(), state); + }); + } + + @Benchmark + @Threads(value = THREAD_COUNT) + @Warmup(iterations = WARMUP_COUNT) + @Fork(value = FORK_COUNT) + @Measurement(iterations = ITERATION_COUNT) + public void atomicBackOf(Blackhole bh) { + AtomicRateLimiter.State prev; + AtomicRateLimiter.State next; + do { + prev = atomicRateLimiter.state.get(); + next = atomicRateLimiter.calculateNextState(Duration.ZERO.toNanos(), prev); + } while (!compareAndSet(prev, next)); + } + + /* + https://arxiv.org/abs/1305.5800 https://dzone.com/articles/wanna-get-faster-wait-bit + */ + public boolean compareAndSet(final AtomicRateLimiter.State current, final AtomicRateLimiter.State next) { + if (atomicRateLimiter.state.compareAndSet(current, next)) { + return true; + } else { + LockSupport.parkNanos(1); + return false; + } + } + @Benchmark @Threads(value = THREAD_COUNT) @Warmup(iterations = WARMUP_COUNT) diff --git a/src/jmh/java/javaslang/circuitbreaker/casWithBackOff.txt b/src/jmh/java/javaslang/circuitbreaker/casWithBackOff.txt new file mode 100644 index 0000000000..8423672aea --- /dev/null +++ b/src/jmh/java/javaslang/circuitbreaker/casWithBackOff.txt @@ -0,0 +1,6 @@ +Benchmark Mode Cnt Score Error Units +RateLimiterBenchmark.atomic thrpt 10 8.016 ± 0.327 ops/us +RateLimiterBenchmark.atomicBackOf thrpt 10 14.104 ± 0.097 ops/us +RateLimiterBenchmark.atomicPermission thrpt 10 7.429 ± 0.299 ops/us +RateLimiterBenchmark.mutex thrpt 10 7.520 ± 0.304 ops/us +RateLimiterBenchmark.semaphoreBasedPermission thrpt 10 17.923 ± 6.043 ops/us diff --git a/src/main/java/javaslang/ratelimiter/internal/AtomicRateLimiter.java b/src/main/java/javaslang/ratelimiter/internal/AtomicRateLimiter.java index 430d7ba841..2053824a0c 100644 --- a/src/main/java/javaslang/ratelimiter/internal/AtomicRateLimiter.java +++ b/src/main/java/javaslang/ratelimiter/internal/AtomicRateLimiter.java @@ -31,7 +31,7 @@ public class AtomicRateLimiter implements RateLimiter { private final long cyclePeriodInNanos; private final int permissionsPerCycle; private final AtomicInteger waitingThreads; - private final AtomicReference state; + public final AtomicReference state; public AtomicRateLimiter(String name, RateLimiterConfig rateLimiterConfig) { @@ -66,7 +66,7 @@ public boolean getPermission(final Duration timeoutDuration) { * @param activeState current state of {@link AtomicRateLimiter} * @return next {@link State} */ - private State calculateNextState(final long timeoutInNanos, final State activeState) { + public State calculateNextState(final long timeoutInNanos, final State activeState) { long currentNanos = currentNanoTime(); long currentCycle = currentNanos / cyclePeriodInNanos; @@ -93,7 +93,7 @@ private State calculateNextState(final long timeoutInNanos, final State activeSt * @param currentCycle current {@link AtomicRateLimiter} cycle * @return nanoseconds to wait for the next permission */ - private long nanosToWaitForPermission(final int availablePermissions, final long currentNanos, final long currentCycle) { + public long nanosToWaitForPermission(final int availablePermissions, final long currentNanos, final long currentCycle) { if (availablePermissions > 0) { return 0L; } else { @@ -114,7 +114,7 @@ private long nanosToWaitForPermission(final int availablePermissions, final long * @param nanosToWait nanoseconds to wait for the next permission * @return new {@link State} with possibly reserved permissions and time to wait */ - private State reservePermissions(final long timeoutInNanos, final long cycle, final int permissions, final long nanosToWait) { + public State reservePermissions(final long timeoutInNanos, final long cycle, final int permissions, final long nanosToWait) { boolean canAcquireInTime = timeoutInNanos >= nanosToWait; int permissionsWithReservation = permissions; if (canAcquireInTime) { @@ -130,7 +130,7 @@ private State reservePermissions(final long timeoutInNanos, final long cycle, fi * @param nanosToWait nanoseconds caller need to wait * @return true if caller was able to wait for nanosToWait without {@link Thread#interrupt} and not exceed timeout */ - private boolean waitForPermissionIfNecessary(final long timeoutInNanos, final long nanosToWait) { + public boolean waitForPermissionIfNecessary(final long timeoutInNanos, final long nanosToWait) { boolean canAcquireImmediately = nanosToWait <= 0; boolean canAcquireInTime = timeoutInNanos >= nanosToWait; @@ -153,7 +153,7 @@ private boolean waitForPermissionIfNecessary(final long timeoutInNanos, final lo * @param nanosToWait nanoseconds caller need to wait * @return true if caller was not {@link Thread#interrupted} while waiting */ - private boolean waitForPermission(final long nanosToWait) { + public boolean waitForPermission(final long nanosToWait) { waitingThreads.incrementAndGet(); long deadline = currentNanoTime() + nanosToWait; boolean wasInterrupted = false; @@ -207,7 +207,7 @@ public AtomicRateLimiterMetrics getMetrics() { * the last {@link AtomicRateLimiter#getPermission(Duration)} call. * */ - private static class State { + public static class State { private final long activeCycle; private final int activePermissions;