Skip to content

Commit

Permalink
Merge pull request ReactiveX#23 from storozhukBM/master
Browse files Browse the repository at this point in the history
Issue ReactiveX#12 Implement a Rate Limiter
  • Loading branch information
RobWin authored Dec 4, 2016
2 parents cb1b4b0 + ba07383 commit 35ef836
Show file tree
Hide file tree
Showing 20 changed files with 2,301 additions and 24 deletions.
67 changes: 64 additions & 3 deletions README.adoc
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
= Fault tolerance library designed for functional programming
:author: Robert Winkler
:author: Robert Winkler and Bohdan Storozhuk
:hardbreaks:

image:https://travis-ci.org/RobWin/javaslang-circuitbreaker.svg?branch=master["Build Status", link="https://travis-ci.org/RobWin/javaslang-circuitbreaker"] image:https://coveralls.io/repos/RobWin/javaslang-circuitbreaker/badge.svg["Coverage Status", link="https://coveralls.io/r/RobWin/javaslang-circuitbreaker"] image:https://api.bintray.com/packages/robwin/maven/javaslang-circuitbreaker/images/download.svg[link="https://bintray.com/robwin/maven/javaslang-circuitbreaker/_latestVersion"] image:http://img.shields.io/badge/license-ASF2-blue.svg["Apache License 2", link="http://www.apache.org/licenses/LICENSE-2.0.txt"]
Expand All @@ -9,7 +9,7 @@ This library is a lightweight, easy-to-use fault tolerance library inspired by h
To highlight a few differences to Netflix Hystrix:

In Hystrix calls to external systems have to be wrapped in a HystrixCommand. This library, in contrast, provides higher-order functions to decorate any function, lambda expression or method reference with a http://martinfowler.com/bliki/CircuitBreaker.html[Circuit Breaker]. Furthermore, the library provides a custom RxJava operator to decorate any `Observable` or `Flowable` with a Circuit Breaker.
In the following I call the higher-order functions decorators. The decorators return an enhanced version of your function. Furthermore, the library provides a decorator to retry failed functions. You can stack more than one decorator on any given function. That means, you can combine a Retry decorator with a CircuitBreaker decorator. Any decorated function can be executed synchronously or asynchronously by using a CompletableFuture or RxJava.
In the following I call the higher-order functions decorators. The decorators return an enhanced version of your function. Furthermore, the library provides few additional decorators to retry failed functions and limit call frequency. You can stack more than one decorator on any given function. That means, you can combine a RateLimiter and Retry decorators with a CircuitBreaker decorator. Any decorated function can be executed synchronously or asynchronously by using a CompletableFuture or RxJava.
However, features such as Request Caching, Request Collapsing, Response Time Metrics and Bulk Heading are not in the scope of this library. But Request Caching can be easily implemented by a custom higher-order function and JCache. And Response Time Metrics using http://metrics.dropwizard.io/[Dropwizard Metrics].

== Examples
Expand Down Expand Up @@ -42,6 +42,27 @@ Observable.fromCallable(backendService::doSomething)
.lift(CircuitBreakerOperator.of(circuitBreaker))
----

The following example shows how restrict the calling rate of some method to be not higher than 1 req/sec.

[source,java]
----
// Create a RateLimiter
RateLimiterConfig config = RateLimiterConfig.builder()
.timeoutDuration(Duration.ofMillis(100))
.limitRefreshPeriod(Duration.ofSeconds(1))
.limitForPeriod(1)
.build();
RateLimiter rateLimiter = RateLimiter.of("backendName", config);
// Decorate your call to BackendService.doSomething()
Try.CheckedRunnable restrictedCall = RateLimiter
.decorateCheckedRunnable(rateLimiter, backendService::doSomething);
Try.run(restrictedCall)
.andThenTry(restrictedCall)
.onFailure((RequestNotPermitted throwable) -> LOG.info("Wait before call it again :)"));
----

== Consume published CircuitBreakerEvents

The CircuitBreaker publishes a stream of CircuitBreakerEvents to any Observer/Consumer who subscribes. An event can be a state transition or a recorded error. This library uses RxJava to to provide this functionality. If you want to consume events, you have to subscribe to the event stream. This library provides a consumer `CircuitBreakerEventConsumer` which can be used to store events in a circular buffer with a fixed capacity. You can use RxJava to filter certain events.
Expand Down Expand Up @@ -70,11 +91,29 @@ int bufferedCalls = metrics.getNumberOfBufferedCalls();
int failedCalls = metrics.getNumberOfFailedCalls();
----

== Monitor RateLimiter metrics

The RateLimiter provides simple an interface to monitor the current limiter.
Also AtomicRateLimiter has some enhanced Metrics with some implementation specific details.

[source,java]
----
RateLimiter limit;
RateLimiter.Metrics metrics = limit.getMetrics();
int numberOfThreadsWaitingForPermission = metrics.getNumberOfWaitingThreads();
// Estimates count of available permissions. Can be negative if some permissions where reserved.
int availablePermissions = metrics.getAvailablePermissions();
AtomicRateLimiter atomicLimiter;
// Estimated time duration in nanos to wait for the next permission
long nanosToWaitForPermission = atomicLimiter.getNanosToWait();
----

== Usage Guide

For more details see http://robwin.github.io/javaslang-circuitbreaker/0.7.1/[User Guide].

== Implementation details
== CircuitBreaker implementation details

The CircuitBreaker is implemented via a finite state machine with three states: `CLOSED`, `OPEN` and `HALF_OPEN`.

Expand Down Expand Up @@ -112,6 +151,28 @@ For example, if the size of the Ring Buffer is 10, then at least 10 calls must e
After the time duration has elapsed, the CircuitBreaker state changes from `OPEN` to `HALF_OPEN` and allows calls to see if the backend is still unavailable or has become available again. The CircuitBreaker uses another (configurable) Ring Bit Buffer to evaluate the failure rate in the `HALF_OPEN` state. If the failure rate is above the configured threshold, the state changes back to `OPEN`. If the failure rate is below or equal to the threshold, the state changes back to `CLOSED`.
`CircuitBreaker::onError(exception)` checks if the exception should be recorded as a failure or should be ignored. You can configure a custom `Predicate` which decides whether an exception should be recorded as a failure. The default Predicate records all exceptions as a failure.

== RateLimiter implementation details
Conceptually `RateLimiter` splits all nanoseconds from the start of epoch into cycles.
Each cycle has duration configured by `RateLimiterConfig.limitRefreshPeriod`.
By contract on start of each cycle `RateLimiter` should set `activePermissions` to `RateLimiterConfig.limitForPeriod`.
For the `RateLimiter` callers it is really looks so, but for example `AtomicRateLimiter` implementation has
some optimisations under the hood that will skip this refresh if `RateLimiter` is not used actively.

image::src/docs/asciidoc/images/rate_limiter.png[Rate Limiter]

The default implementation of `RateLimiter` is `AtomicRateLimiter` it manages state via `AtomicReference`.
`AtomicRateLimiter.State` is completely immutable and has the folowing fields:

* `activeCycle` - cycle number that was used by the last call.
* `activePermissions` - count of available permissions after the last call.
Can be negative if some permissions where reserved.
* `nanosToWait` - count of nanoseconds to wait for permission for the last call.

`AtomicRateLimiter` is also very fast on i7-5557U processor and with x64 Java-1.8.0_112
it takes only `143±1 [ns]` to acquire permission.
So you can easily restrict not ony network calls but your local in-memory operations too.


== Companies who use javaslang-circuitbreaker

* Deutsche Telekom
Expand Down
20 changes: 12 additions & 8 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ buildscript {
repositories {
jcenter()
mavenCentral()
maven {
url "https://plugins.gradle.org/m2/"
}
}
dependencies {
classpath 'org.kt3k.gradle.plugin:coveralls-gradle-plugin:2.0.1'
classpath 'com.jfrog.bintray.gradle:gradle-bintray-plugin:1.2'
classpath 'me.champeau.gradle:jmh-gradle-plugin:0.2.0'
classpath 'me.champeau.gradle:jmh-gradle-plugin:0.3.1'
classpath 'com.github.jengelman.gradle.plugins:shadow:1.2.0'
classpath "org.asciidoctor:asciidoctor-gradle-plugin:1.5.3"
classpath "org.ajoberstar:gradle-git:1.3.2"
Expand Down Expand Up @@ -50,12 +53,7 @@ repositories {
}

jmh {
benchmarkMode = 'all'
jmhVersion = '1.11.2'
fork = 1
threads = 10
iterations = 2
warmupIterations = 2
jmhVersion = '1.17'
include=''
}

Expand All @@ -65,12 +63,18 @@ dependencies {
compile "io.reactivex.rxjava2:rxjava:2.0.1"
compile "org.slf4j:slf4j-api:1.7.13"
compile "javax.cache:cache-api:1.0.0"

testCompile "io.dropwizard.metrics:metrics-core:3.1.2"
testCompile "junit:junit:4.11"
testCompile "org.assertj:assertj-core:3.0.0"
testCompile "ch.qos.logback:logback-classic:0.9.26"
testCompile "io.dropwizard.metrics:metrics-healthchecks:3.1.2"
testCompile "org.mockito:mockito-all:1.10.19"
testCompile "org.mockito:mockito-core:1.10.19"
testCompile "org.powermock:powermock:1.6.6"
testCompile "org.powermock:powermock-api-mockito:1.6.6"
testCompile "org.powermock:powermock-module-junit4:1.6.6"
testCompile "com.jayway.awaitility:awaitility:1.7.0"

jmh "ch.qos.logback:logback-classic:0.9.26"
}

Expand Down
Binary file added src/docs/asciidoc/images/rate_limiter.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
48 changes: 46 additions & 2 deletions src/docs/asciidoc/introduction.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ This library is a lightweight, easy-to-use fault tolerance library inspired by h
To highlight a few differences to Netflix Hystrix:

In Hystrix calls to external systems have to be wrapped in a HystrixCommand. This library, in contrast, provides higher-order functions to decorate any function, lambda expression or method reference with a http://martinfowler.com/bliki/CircuitBreaker.html[Circuit Breaker]. Furthermore, the library provides a custom RxJava operator to decorate any `Observable` or `Flowable` with a Circuit Breaker.
In the following I call the higher-order functions decorators. The decorators return an enhanced version of your function. Furthermore, the library provides a decorator to retry failed functions. You can stack more than one decorator on any given function. That means, you can combine a Retry decorator with a CircuitBreaker decorator. Any decorated function can be invoked synchronously or asynchronously by using a CompletableFuture or RxJava.
In the following I call the higher-order functions decorators. The decorators return an enhanced version of your function. Furthermore, the library provides few additional decorators to retry failed functions and limit call frequency. You can stack more than one decorator on any given function. That means, you can combine a Retry decorator with a CircuitBreaker decorator. Any decorated function can be invoked synchronously or asynchronously by using a CompletableFuture or RxJava.
However, features such as Request Caching, Request Collapsing and Bulk Heading are not in the scope of this library.

[source,java]
Expand Down Expand Up @@ -33,6 +33,29 @@ Observable.fromCallable(helloWorldService::sayHelloWorld)
.lift(CircuitBreakerOperator.of(circuitBreaker))
----

The following example shows how restrict the calling rate of some method to be not higher than 1 req/sec.

[source,java]
----
// Create a RateLimiter
RateLimiterConfig config = RateLimiterConfig.builder()
.timeoutDuration(Duration.ofMillis(100))
.limitRefreshPeriod(Duration.ofSeconds(1))
.limitForPeriod(1)
.build();
RateLimiter rateLimiter = RateLimiter.of("backendName", config);
// Decorate your call to BackendService.doSomething()
Try.CheckedRunnable restrictedCall = RateLimiter
.decorateCheckedRunnable(rateLimiter, backendService::doSomething);
Try.run(restrictedCall)
.andThenTry(restrictedCall)
.onFailure((RequestNotPermitted throwable) -> LOG.info("Wait before call it again :)"));
----

=== CircuitBreaker implementation details

The CircuitBreaker is implemented via a finite state machine with three states: `CLOSED`, `OPEN` and `HALF_OPEN`.

image::images/state_machine.jpg[]
Expand Down Expand Up @@ -78,4 +101,25 @@ CircuitBreakerEventConsumer ringBuffer = new CircuitBreakerEventConsumer(10);
circuitBreaker.getEventStream()
.filter(event -> event.getEventType() == Type.ERROR)
.subscribe(ringBuffer);
----
----

=== RateLimiter implementation details
Conceptually `RateLimiter` splits all nanoseconds from the start of epoch into cycles.
Each cycle has duration configured by `RateLimiterConfig.limitRefreshPeriod`.
By contract on start of each cycle `RateLimiter` should set `activePermissions` to `RateLimiterConfig.limitForPeriod`.
For the `RateLimiter` callers it is really looks so, but for example `AtomicRateLimiter` implementation has
some optimisations under the hood that will skip this refresh if `RateLimiter` is not used actively.

image::images/rate_limiter.png[Rate Limiter]

The default implementation of `RateLimiter` is `AtomicRateLimiter` it manages state via `AtomicReference`.
`AtomicRateLimiter.State` is completely immutable and has the folowing fields:

* `activeCycle` - cycle number that was used by the last call.
* `activePermissions` - count of available permissions after the last call.
Can be negative if some permissions where reserved.
* `nanosToWait` - count of nanoseconds to wait for permission for the last call.

`AtomicRateLimiter` is also very fast on i7-5557U processor and with x64 Java-1.8.0_112
it takes only `143±1 [ns]` to acquire permission.
So you can easily restrict not ony network calls but your local in-memory operations too.
61 changes: 61 additions & 0 deletions src/docs/asciidoc/usage_guide.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,64 @@ CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("testName");
Observable.fromCallable(helloWorldService::sayHelloWorld)
.lift(CircuitBreakerOperator.of(circuitBreaker))
----

=== Create a RateLimiter

The RateLimiter API is very similar to CircuitBreaker.
So it also have in-memory RateLimiterRegistry and RateLimiterConfig where you can configure:

* the period of limit refresh, after each period rate limiter sets its permissions count to `limitForPeriod` value.
* the permissions limit for refresh period.
* the default wait for permission duration.

[source,java]
----
// For example you want to restrict the calling rate of some method to be not higher than 10 req/ms.
RateLimiterConfig config = RateLimiterConfig.builder()
.limitRefreshPeriod(Duration.ofMillis(1))
.limitForPeriod(10)
.timeoutDuration(Duration.ofMillis(25))
.build();
// Create registry
RateLimiterRegistry rateLimiterRegistry = RateLimiterRegistry.of(config);
// Use registry
RateLimiter reateLimiterWithDefaultConfig = rateLimiterRegistry.rateLimiter("backend");
RateLimiter reateLimiterWithCustomConfig = rateLimiterRegistry.rateLimiter("backend#2", config);
// Or create RateLimiter directly
RateLimiter rateLimiter = RateLimiter.of("NASDAQ :-)", config);
----

=== Use a RateLimiter
As you can gues RateLimiter has all sort of higher order decorator functions just like CircuitBreaker.

[source,java]
----
// Decorate your call to BackendService.doSomething()
Try.CheckedRunnable restrictedCall = RateLimiter
.decorateCheckedRunnable(rateLimiter, backendService::doSomething);
Try.run(restrictedCall)
.andThenTry(restrictedCall)
.onFailure((RequestNotPermitted throwable) -> LOG.info("Wait before call it again :)"));
----

== Monitor RateLimiter metrics

The RateLimiter provides simple an interface to monitor the current limiter.
Also AtomicRateLimiter has some enhanced Metrics with some implementation specific details.

[source,java]
----
RateLimiter limit;
RateLimiter.Metrics metrics = limit.getMetrics();
int numberOfThreadsWaitingForPermission = metrics.getNumberOfWaitingThreads();
// Estimates count of available permissions. Can be negative if some permissions where reserved.
int availablePermissions = metrics.getAvailablePermissions();
AtomicRateLimiter atomicLimiter;
// Estimated time duration in nanos to wait for the next permission
long nanosToWaitForPermission = atomicLimiter.getNanosToWait();
----
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,22 @@

@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Mode.Throughput)
@BenchmarkMode(Mode.All)
public class CircuitBreakerBenchmark {

private CircuitBreaker circuitBreaker;
private Supplier<String> supplier;
private static final int ITERATION_COUNT = 10;
private static final int WARMUP_COUNT = 10;
private static final int ITERATION_COUNT = 2;
private static final int WARMUP_COUNT = 2;
private static final int THREAD_COUNT = 10;
public static final int FORK_COUNT = 1;

@Setup
public void setUp() {
CircuitBreakerRegistry circuitBreakerRegistry = CircuitBreakerRegistry.of(CircuitBreakerConfig.custom()
.failureRateThreshold(1)
.waitDurationInOpenState(Duration.ofSeconds(1))
.build());
.failureRateThreshold(1)
.waitDurationInOpenState(Duration.ofSeconds(1))
.build());
circuitBreaker = circuitBreakerRegistry.circuitBreaker("testCircuitBreaker");

supplier = CircuitBreaker.decorateSupplier(() -> {
Expand All @@ -54,10 +55,11 @@ public void setUp() {
}

@Benchmark
@Fork(value = FORK_COUNT)
@Threads(value = THREAD_COUNT)
@Warmup(iterations = WARMUP_COUNT)
@Measurement(iterations = ITERATION_COUNT)
public String invokeSupplier(){
public String invokeSupplier() {
return supplier.get();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,22 @@

@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Mode.Throughput)
@BenchmarkMode(Mode.All)
public class RingBitSetBenachmark {

private RingBitSet ringBitSet;
private static final int ITERATION_COUNT = 10;
private static final int WARMUP_COUNT = 10;
private static final int ITERATION_COUNT = 2;
private static final int WARMUP_COUNT = 2;
private static final int THREAD_COUNT = 10;
public static final int FORK_COUNT = 1;

@Setup
public void setUp() {
ringBitSet = new RingBitSet(1000);
}

@Benchmark
@Fork(value = FORK_COUNT)
@Threads(value = THREAD_COUNT)
@Warmup(iterations = WARMUP_COUNT)
@Measurement(iterations = ITERATION_COUNT)
Expand Down
Loading

0 comments on commit 35ef836

Please sign in to comment.