Skip to content

Commit

Permalink
Issue ReactiveX#12 CAS with back-off benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
storozhukBM committed Dec 2, 2016
1 parent eb47a8e commit 7fb310c
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 9 deletions.
56 changes: 54 additions & 2 deletions src/jmh/java/javaslang/circuitbreaker/RateLimiterBenchmark.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@

import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Supplier;

@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@BenchmarkMode(Mode.All)
@BenchmarkMode(Mode.Throughput)
public class RateLimiterBenchmark {

public static final int FORK_COUNT = 2;
Expand All @@ -32,7 +33,9 @@ public class RateLimiterBenchmark {
private static final int THREAD_COUNT = 2;

private RateLimiter semaphoreBasedRateLimiter;
private RateLimiter atomicRateLimiter;
private AtomicRateLimiter atomicRateLimiter;
private AtomicRateLimiter.State state;
private static final Object mutex = new Object();

private Supplier<String> semaphoreGuardedSupplier;
private Supplier<String> atomicGuardedSupplier;
Expand All @@ -46,6 +49,7 @@ public void setUp() {
.build();
semaphoreBasedRateLimiter = new SemaphoreBasedRateLimiter("semaphoreBased", rateLimiterConfig);
atomicRateLimiter = new AtomicRateLimiter("atomicBased", rateLimiterConfig);
state = atomicRateLimiter.state.get();

Supplier<String> stringSupplier = () -> {
Blackhole.consumeCPU(1);
Expand All @@ -55,6 +59,54 @@ public void setUp() {
atomicGuardedSupplier = RateLimiter.decorateSupplier(atomicRateLimiter, stringSupplier);
}

@Benchmark
@Threads(value = THREAD_COUNT)
@Warmup(iterations = WARMUP_COUNT)
@Fork(value = FORK_COUNT)
@Measurement(iterations = ITERATION_COUNT)
public void mutex(Blackhole bh) {
synchronized (mutex) {
state = atomicRateLimiter.calculateNextState(Duration.ZERO.toNanos(), state);
}
}

@Benchmark
@Threads(value = THREAD_COUNT)
@Warmup(iterations = WARMUP_COUNT)
@Fork(value = FORK_COUNT)
@Measurement(iterations = ITERATION_COUNT)
public void atomic(Blackhole bh) {
atomicRateLimiter.state.updateAndGet(state -> {
return atomicRateLimiter.calculateNextState(Duration.ZERO.toNanos(), state);
});
}

@Benchmark
@Threads(value = THREAD_COUNT)
@Warmup(iterations = WARMUP_COUNT)
@Fork(value = FORK_COUNT)
@Measurement(iterations = ITERATION_COUNT)
public void atomicBackOf(Blackhole bh) {
AtomicRateLimiter.State prev;
AtomicRateLimiter.State next;
do {
prev = atomicRateLimiter.state.get();
next = atomicRateLimiter.calculateNextState(Duration.ZERO.toNanos(), prev);
} while (!compareAndSet(prev, next));
}

/*
https://arxiv.org/abs/1305.5800 https://dzone.com/articles/wanna-get-faster-wait-bit
*/
public boolean compareAndSet(final AtomicRateLimiter.State current, final AtomicRateLimiter.State next) {
if (atomicRateLimiter.state.compareAndSet(current, next)) {
return true;
} else {
LockSupport.parkNanos(1);
return false;
}
}

@Benchmark
@Threads(value = THREAD_COUNT)
@Warmup(iterations = WARMUP_COUNT)
Expand Down
6 changes: 6 additions & 0 deletions src/jmh/java/javaslang/circuitbreaker/casWithBackOff.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Benchmark Mode Cnt Score Error Units
RateLimiterBenchmark.atomic thrpt 10 8.016 ± 0.327 ops/us
RateLimiterBenchmark.atomicBackOf thrpt 10 14.104 ± 0.097 ops/us
RateLimiterBenchmark.atomicPermission thrpt 10 7.429 ± 0.299 ops/us
RateLimiterBenchmark.mutex thrpt 10 7.520 ± 0.304 ops/us
RateLimiterBenchmark.semaphoreBasedPermission thrpt 10 17.923 ± 6.043 ops/us
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class AtomicRateLimiter implements RateLimiter {
private final long cyclePeriodInNanos;
private final int permissionsPerCycle;
private final AtomicInteger waitingThreads;
private final AtomicReference<State> state;
public final AtomicReference<State> state;


public AtomicRateLimiter(String name, RateLimiterConfig rateLimiterConfig) {
Expand Down Expand Up @@ -66,7 +66,7 @@ public boolean getPermission(final Duration timeoutDuration) {
* @param activeState current state of {@link AtomicRateLimiter}
* @return next {@link State}
*/
private State calculateNextState(final long timeoutInNanos, final State activeState) {
public State calculateNextState(final long timeoutInNanos, final State activeState) {
long currentNanos = currentNanoTime();
long currentCycle = currentNanos / cyclePeriodInNanos;

Expand All @@ -93,7 +93,7 @@ private State calculateNextState(final long timeoutInNanos, final State activeSt
* @param currentCycle current {@link AtomicRateLimiter} cycle
* @return nanoseconds to wait for the next permission
*/
private long nanosToWaitForPermission(final int availablePermissions, final long currentNanos, final long currentCycle) {
public long nanosToWaitForPermission(final int availablePermissions, final long currentNanos, final long currentCycle) {
if (availablePermissions > 0) {
return 0L;
} else {
Expand All @@ -114,7 +114,7 @@ private long nanosToWaitForPermission(final int availablePermissions, final long
* @param nanosToWait nanoseconds to wait for the next permission
* @return new {@link State} with possibly reserved permissions and time to wait
*/
private State reservePermissions(final long timeoutInNanos, final long cycle, final int permissions, final long nanosToWait) {
public State reservePermissions(final long timeoutInNanos, final long cycle, final int permissions, final long nanosToWait) {
boolean canAcquireInTime = timeoutInNanos >= nanosToWait;
int permissionsWithReservation = permissions;
if (canAcquireInTime) {
Expand All @@ -130,7 +130,7 @@ private State reservePermissions(final long timeoutInNanos, final long cycle, fi
* @param nanosToWait nanoseconds caller need to wait
* @return true if caller was able to wait for nanosToWait without {@link Thread#interrupt} and not exceed timeout
*/
private boolean waitForPermissionIfNecessary(final long timeoutInNanos, final long nanosToWait) {
public boolean waitForPermissionIfNecessary(final long timeoutInNanos, final long nanosToWait) {
boolean canAcquireImmediately = nanosToWait <= 0;
boolean canAcquireInTime = timeoutInNanos >= nanosToWait;

Expand All @@ -153,7 +153,7 @@ private boolean waitForPermissionIfNecessary(final long timeoutInNanos, final lo
* @param nanosToWait nanoseconds caller need to wait
* @return true if caller was not {@link Thread#interrupted} while waiting
*/
private boolean waitForPermission(final long nanosToWait) {
public boolean waitForPermission(final long nanosToWait) {
waitingThreads.incrementAndGet();
long deadline = currentNanoTime() + nanosToWait;
boolean wasInterrupted = false;
Expand Down Expand Up @@ -207,7 +207,7 @@ public AtomicRateLimiterMetrics getMetrics() {
* the last {@link AtomicRateLimiter#getPermission(Duration)} call.</li>
* </ul>
*/
private static class State {
public static class State {

private final long activeCycle;
private final int activePermissions;
Expand Down

0 comments on commit 7fb310c

Please sign in to comment.