Skip to content

Commit

Permalink
Issue ReactiveX#12 TimeBasedRateLimiter and AtomicRateLimiter impleme…
Browse files Browse the repository at this point in the history
…ntations + benchmarks
  • Loading branch information
storozhukBM committed Dec 2, 2016
1 parent 5a30b38 commit bbfc98f
Show file tree
Hide file tree
Showing 14 changed files with 710 additions and 123 deletions.
12 changes: 5 additions & 7 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ buildscript {
repositories {
jcenter()
mavenCentral()
maven {
url "https://plugins.gradle.org/m2/"
}
}
dependencies {
classpath 'org.kt3k.gradle.plugin:coveralls-gradle-plugin:2.0.1'
classpath 'com.jfrog.bintray.gradle:gradle-bintray-plugin:1.2'
classpath 'me.champeau.gradle:jmh-gradle-plugin:0.2.0'
classpath 'me.champeau.gradle:jmh-gradle-plugin:0.3.1'
classpath 'com.github.jengelman.gradle.plugins:shadow:1.2.0'
classpath "org.asciidoctor:asciidoctor-gradle-plugin:1.5.3"
classpath "org.ajoberstar:gradle-git:1.3.2"
Expand Down Expand Up @@ -50,12 +53,7 @@ repositories {
}

jmh {
benchmarkMode = 'all'
jmhVersion = '1.11.2'
fork = 1
threads = 10
iterations = 2
warmupIterations = 2
jmhVersion = '1.17'
include=''
}

Expand Down
10 changes: 3 additions & 7 deletions gradlew
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ case "`uname`" in
;;
esac

# For Cygwin, ensure paths are in UNIX format before anything is touched.
if $cygwin ; then
[ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
fi

# Attempt to set APP_HOME
# Resolve links: $0 may be a link
PRG="$0"
Expand All @@ -61,9 +56,9 @@ while [ -h "$PRG" ] ; do
fi
done
SAVED="`pwd`"
cd "`dirname \"$PRG\"`/" >&-
cd "`dirname \"$PRG\"`/" >/dev/null
APP_HOME="`pwd -P`"
cd "$SAVED" >&-
cd "$SAVED" >/dev/null

CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar

Expand Down Expand Up @@ -114,6 +109,7 @@ fi
if $cygwin ; then
APP_HOME=`cygpath --path --mixed "$APP_HOME"`
CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
JAVACMD=`cygpath --unix "$JAVACMD"`

# We build the pattern for arguments to be converted via cygpath
ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,46 +18,40 @@
*/
package io.github.robwin.circuitbreaker;

import org.openjdk.jmh.annotations.*;

import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Mode.Throughput)
public class CircuitBreakerBenchmark {

private CircuitBreaker circuitBreaker;
private Supplier<String> supplier;
private static final int ITERATION_COUNT = 10;
private static final int WARMUP_COUNT = 10;
private static final int THREAD_COUNT = 10;

@Setup
public void setUp() {
CircuitBreakerRegistry circuitBreakerRegistry = CircuitBreakerRegistry.of(CircuitBreakerConfig.custom()
.failureRateThreshold(1)
.waitDurationInOpenState(Duration.ofSeconds(1))
.build());
circuitBreaker = circuitBreakerRegistry.circuitBreaker("testCircuitBreaker");

supplier = CircuitBreaker.decorateSupplier(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello Benchmark";
}, circuitBreaker);
}

@Benchmark
@Threads(value = THREAD_COUNT)
@Warmup(iterations = WARMUP_COUNT)
@Measurement(iterations = ITERATION_COUNT)
public String invokeSupplier(){
return supplier.get();
}
}
//@State(Scope.Benchmark)
//@OutputTimeUnit(TimeUnit.MILLISECONDS)
//@BenchmarkMode(Mode.Throughput)
//public class CircuitBreakerBenchmark {
//
// private CircuitBreaker circuitBreaker;
// private Supplier<String> supplier;
// private static final int ITERATION_COUNT = 10;
// private static final int WARMUP_COUNT = 10;
// private static final int THREAD_COUNT = 10;
//
// @Setup
// public void setUp() {
// CircuitBreakerRegistry circuitBreakerRegistry = CircuitBreakerRegistry.of(CircuitBreakerConfig.custom()
// .failureRateThreshold(1)
// .waitDurationInOpenState(Duration.ofSeconds(1))
// .build());
// circuitBreaker = circuitBreakerRegistry.circuitBreaker("testCircuitBreaker");
//
// supplier = CircuitBreaker.decorateSupplier(() -> {
// try {
// Thread.sleep(100);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// return "Hello Benchmark";
// }, circuitBreaker);
// }
//
// @Benchmark
// @Threads(value = THREAD_COUNT)
// @Warmup(iterations = WARMUP_COUNT)
// @Measurement(iterations = ITERATION_COUNT)
// public String invokeSupplier(){
// return supplier.get();
// }
//}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
Benchmark Mode Cnt Score Error Units

RateLimiterBenchmark.atomicPermission thrpt 10 7.274 ± 0.132 ops/us
RateLimiterBenchmark.semaphoreBasedPermission thrpt 10 17.335 ± 3.441 ops/us
RateLimiterBenchmark.timeBasedPermission thrpt 10 3.522 ± 0.495 ops/us

RateLimiterBenchmark.atomicPermission avgt 10 0.294 ± 0.038 us/op
RateLimiterBenchmark.semaphoreBasedPermission avgt 10 0.120 ± 0.018 us/op
RateLimiterBenchmark.timeBasedPermission avgt 10 0.562 ± 0.045 us/op

RateLimiterBenchmark.atomicPermission sample 535765 1.480 ± 0.036 us/op
RateLimiterBenchmark.atomicPermission:atomicPermission·p0.00 sample 0.040 us/op
RateLimiterBenchmark.atomicPermission:atomicPermission·p0.50 sample 0.383 us/op
RateLimiterBenchmark.atomicPermission:atomicPermission·p0.90 sample 4.288 us/op
RateLimiterBenchmark.atomicPermission:atomicPermission·p0.95 sample 7.368 us/op
RateLimiterBenchmark.atomicPermission:atomicPermission·p0.99 sample 14.080 us/op
RateLimiterBenchmark.atomicPermission:atomicPermission·p0.999 sample 18.048 us/op
RateLimiterBenchmark.atomicPermission:atomicPermission·p0.9999 sample 58.449 us/op
RateLimiterBenchmark.atomicPermission:atomicPermission·p1.00 sample 1654.784 us/op
RateLimiterBenchmark.semaphoreBasedPermission sample 635614 0.166 ± 0.010 us/op
RateLimiterBenchmark.semaphoreBasedPermission:semaphoreBasedPermission·p0.00 sample 0.001 us/op
RateLimiterBenchmark.semaphoreBasedPermission:semaphoreBasedPermission·p0.50 sample 0.135 us/op
RateLimiterBenchmark.semaphoreBasedPermission:semaphoreBasedPermission·p0.90 sample 0.219 us/op
RateLimiterBenchmark.semaphoreBasedPermission:semaphoreBasedPermission·p0.95 sample 0.236 us/op
RateLimiterBenchmark.semaphoreBasedPermission:semaphoreBasedPermission·p0.99 sample 0.333 us/op
RateLimiterBenchmark.semaphoreBasedPermission:semaphoreBasedPermission·p0.999 sample 2.468 us/op
RateLimiterBenchmark.semaphoreBasedPermission:semaphoreBasedPermission·p0.9999 sample 15.519 us/op
RateLimiterBenchmark.semaphoreBasedPermission:semaphoreBasedPermission·p1.00 sample 1372.160 us/op
RateLimiterBenchmark.timeBasedPermission sample 553560 0.800 ± 0.053 us/op
RateLimiterBenchmark.timeBasedPermission:timeBasedPermission·p0.00 sample 0.054 us/op
RateLimiterBenchmark.timeBasedPermission:timeBasedPermission·p0.50 sample 0.550 us/op
RateLimiterBenchmark.timeBasedPermission:timeBasedPermission·p0.90 sample 0.749 us/op
RateLimiterBenchmark.timeBasedPermission:timeBasedPermission·p0.95 sample 0.826 us/op
RateLimiterBenchmark.timeBasedPermission:timeBasedPermission·p0.99 sample 8.256 us/op
RateLimiterBenchmark.timeBasedPermission:timeBasedPermission·p0.999 sample 33.920 us/op
RateLimiterBenchmark.timeBasedPermission:timeBasedPermission·p0.9999 sample 160.221 us/op
RateLimiterBenchmark.timeBasedPermission:timeBasedPermission·p1.00 sample 5742.592 us/op

RateLimiterBenchmark.atomicPermission ss 10 17.140 ± 5.640 us/op
RateLimiterBenchmark.semaphoreBasedPermission ss 10 9.724 ± 4.602 us/op
RateLimiterBenchmark.timeBasedPermission ss 10 26.875 ± 10.869 us/op
89 changes: 89 additions & 0 deletions src/jmh/java/javaslang/circuitbreaker/RateLimiterBenchmark.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package javaslang.circuitbreaker;

import javaslang.ratelimiter.RateLimiter;
import javaslang.ratelimiter.RateLimiterConfig;
import javaslang.ratelimiter.internal.AtomicRateLimiter;
import javaslang.ratelimiter.internal.SemaphoreBasedRateLimiter;
import javaslang.ratelimiter.internal.TimeBasedRateLimiter;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;

import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@BenchmarkMode(Mode.All)
public class RateLimiterBenchmark {

public static final int FORK_COUNT = 2;
private static final int WARMUP_COUNT = 10;
private static final int ITERATION_COUNT = 5;
private static final int THREAD_COUNT = 2;

private RateLimiter semaphoreBasedRateLimiter;
private RateLimiter timeBasedRateLimiter;
private RateLimiter atomicRateLimiter;

private Supplier<String> semaphoreGuardedSupplier;
private Supplier<String> timeGuardedSupplier;
private Supplier<String> atomicGuardedSupplier;

@Setup
public void setUp() {
RateLimiterConfig rateLimiterConfig = RateLimiterConfig.builder()
.limitForPeriod(Integer.MAX_VALUE)
.limitRefreshPeriod(Duration.ofNanos(10))
.timeoutDuration(Duration.ofSeconds(5))
.build();
semaphoreBasedRateLimiter = new SemaphoreBasedRateLimiter("semaphoreBased", rateLimiterConfig);
timeBasedRateLimiter = new TimeBasedRateLimiter("timeBased", rateLimiterConfig);
atomicRateLimiter = new AtomicRateLimiter("atomicBased", rateLimiterConfig);

Supplier<String> stringSupplier = () -> {
Blackhole.consumeCPU(1);
return "Hello Benchmark";
};
semaphoreGuardedSupplier = RateLimiter.decorateSupplier(semaphoreBasedRateLimiter, stringSupplier);
timeGuardedSupplier = RateLimiter.decorateSupplier(timeBasedRateLimiter, stringSupplier);
atomicGuardedSupplier = RateLimiter.decorateSupplier(atomicRateLimiter, stringSupplier);
}

@Benchmark
@Threads(value = THREAD_COUNT)
@Warmup(iterations = WARMUP_COUNT)
@Fork(value = FORK_COUNT)
@Measurement(iterations = ITERATION_COUNT)
public String semaphoreBasedPermission() {
return semaphoreGuardedSupplier.get();
}

@Benchmark
@Threads(value = THREAD_COUNT)
@Warmup(iterations = WARMUP_COUNT)
@Fork(value = FORK_COUNT)
@Measurement(iterations = ITERATION_COUNT)
public String timeBasedPermission() {
return timeGuardedSupplier.get();
}

@Benchmark
@Threads(value = THREAD_COUNT)
@Warmup(iterations = WARMUP_COUNT)
@Fork(value = FORK_COUNT)
@Measurement(iterations = ITERATION_COUNT)
public String atomicPermission() {
return atomicGuardedSupplier.get();
}
}
97 changes: 51 additions & 46 deletions src/main/java/javaslang/ratelimiter/RateLimiter.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,50 +13,6 @@
*/
public interface RateLimiter {

/**
* Acquires a permission from this rate limiter, blocking until one is
* available.
* <p>
* <p>If the current thread is {@linkplain Thread#interrupt interrupted}
* while waiting for a permit then it won't throw {@linkplain InterruptedException},
* but its interrupt status will be set.
*
* @return {@code true} if a permit was acquired and {@code false}
* if waiting time elapsed before a permit was acquired
*/
boolean getPermission(Duration timeoutDuration);

/**
* Get the name of this RateLimiter
*
* @return the name of this RateLimiter
*/
String getName();

/**
* Get the RateLimiterConfig of this RateLimiter.
*
* @return the RateLimiterConfig of this RateLimiter
*/
RateLimiterConfig getRateLimiterConfig();

/**
* Get the Metrics of this RateLimiter.
*
* @return the Metrics of this RateLimiter
*/
Metrics getMetrics();

interface Metrics {
/**
* Returns an estimate of the number of threads waiting for permission
* in this JVM process.
*
* @return estimate of the number of threads waiting for permission.
*/
int getNumberOfWaitingThreads();
}

/**
* Creates a supplier which is restricted by a RateLimiter.
*
Expand Down Expand Up @@ -167,18 +123,67 @@ static <T, R> Function<T, R> decorateFunction(RateLimiter rateLimiter, Function<
return decoratedFunction;
}


/**
* Will wait for permission within default timeout duration.
* Throws {@link RequestNotPermitted} if waiting time elapsed before a permit was acquired.
*
* @param rateLimiter the RateLimiter to get permission from
* @throws RequestNotPermitted if waiting time elapsed before a permit was acquired.
* @throws IllegalStateException if thread was interrupted during permission wait
*/
static void waitForPermission(final RateLimiter rateLimiter) {
static void waitForPermission(final RateLimiter rateLimiter) throws IllegalStateException, RequestNotPermitted {
RateLimiterConfig rateLimiterConfig = rateLimiter.getRateLimiterConfig();
Duration timeoutDuration = rateLimiterConfig.getTimeoutDuration();
boolean permission = rateLimiter.getPermission(timeoutDuration);
if (Thread.interrupted()) {
throw new IllegalStateException("Thread was interrupted during permission wait");
}
if (!permission) {
throw new RequestNotPermitted("Request not permitted for limiter: " + rateLimiter.getName());
}
}

/**
* Acquires a permission from this rate limiter, blocking until one is
* available.
* <p>
* <p>If the current thread is {@linkplain Thread#interrupt interrupted}
* while waiting for a permit then it won't throw {@linkplain InterruptedException},
* but its interrupt status will be set.
*
* @return {@code true} if a permit was acquired and {@code false}
* if waiting time elapsed before a permit was acquired
*/
boolean getPermission(Duration timeoutDuration);

/**
* Get the name of this RateLimiter
*
* @return the name of this RateLimiter
*/
String getName();

/**
* Get the RateLimiterConfig of this RateLimiter.
*
* @return the RateLimiterConfig of this RateLimiter
*/
RateLimiterConfig getRateLimiterConfig();

/**
* Get the Metrics of this RateLimiter.
*
* @return the Metrics of this RateLimiter
*/
Metrics getMetrics();

interface Metrics {
/**
* Returns an estimate of the number of threads waiting for permission
* in this JVM process.
*
* @return estimate of the number of threads waiting for permission.
*/
int getNumberOfWaitingThreads();
}
}
Loading

0 comments on commit bbfc98f

Please sign in to comment.