Skip to content

Commit

Permalink
Issue ReactiveX#135 CompletionStage support in RateLimiter (ReactiveX…
Browse files Browse the repository at this point in the history
…#136)

* Issue ReactiveX#135 CompletionStage support in RateLimiter

* Issue ReactiveX#135 codacy issue fix

* Issue ReactiveX#135 codacy issue fix

* Issue ReactiveX#135 Decorators interface support
  • Loading branch information
storozhukBM authored and RobWin committed May 14, 2017
1 parent ec5abeb commit 62c1bd1
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,11 @@ public DecorateCompletionStage<T> withBulkhead(Bulkhead bulkhead) {
return this;
}

public DecorateCompletionStage<T> withRateLimiter(RateLimiter rateLimiter) {
stageSupplier = RateLimiter.decorateCompletionStage(rateLimiter, stageSupplier);
return this;
}

public Supplier<CompletionStage<T>> decorate() {
return stageSupplier;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
package io.github.resilience4j.circuitbreaker.operator;


import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerOpenException;
import io.github.resilience4j.core.StopWatch;
Expand All @@ -28,10 +33,6 @@
import io.reactivex.SingleObserver;
import io.reactivex.SingleOperator;
import io.reactivex.disposables.Disposable;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -102,9 +103,7 @@ private final class CircuitBreakerSubscriber implements Subscriber<T>, Subscript
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
if (LOG.isDebugEnabled()) {
LOG.info("onSubscribe");
}
LOG.debug("onSubscribe");
if (circuitBreaker.isCallPermitted()) {
stopWatch = StopWatch.start(circuitBreaker.getName());
childSubscriber.onSubscribe(this);
Expand All @@ -121,9 +120,7 @@ public void onSubscribe(Subscription subscription) {
*/
@Override
public void onNext(T event) {
if (LOG.isDebugEnabled()) {
LOG.info("onNext: {}", event);
}
LOG.debug("onNext: {}", event);
if (!isCancelled()) {
childSubscriber.onNext(event);
}
Expand All @@ -134,9 +131,7 @@ public void onNext(T event) {
*/
@Override
public void onError(Throwable e) {
if (LOG.isDebugEnabled()) {
LOG.info("onError", e);
}
LOG.debug("onError", e);
if (!isCancelled()) {
circuitBreaker.onError(stopWatch.stop().getProcessingDuration().toNanos(), e);
childSubscriber.onError(e);
Expand All @@ -149,9 +144,7 @@ public void onError(Throwable e) {
*/
@Override
public void onComplete() {
if (LOG.isDebugEnabled()) {
LOG.info("onComplete");
}
LOG.debug("onComplete");
if (!isCancelled()) {
circuitBreaker.onSuccess(stopWatch.stop().getProcessingDuration().toNanos());
childSubscriber.onComplete();
Expand Down Expand Up @@ -199,9 +192,7 @@ private final class CircuitBreakerObserver implements Observer<T>, Disposable {
@Override
public void onSubscribe(Disposable disposable) {
this.disposable = disposable;
if (LOG.isDebugEnabled()) {
LOG.info("onSubscribe");
}
LOG.debug("onSubscribe");
if (circuitBreaker.isCallPermitted()) {
stopWatch = StopWatch.start(circuitBreaker.getName());
childObserver.onSubscribe(this);
Expand All @@ -218,9 +209,7 @@ public void onSubscribe(Disposable disposable) {
*/
@Override
public void onNext(T event) {
if (LOG.isDebugEnabled()) {
LOG.info("onNext: {}", event);
}
LOG.debug("onNext: {}", event);
if (!isDisposed()) {
childObserver.onNext(event);
}
Expand All @@ -231,9 +220,7 @@ public void onNext(T event) {
*/
@Override
public void onError(Throwable e) {
if (LOG.isDebugEnabled()) {
LOG.info("onError", e);
}
LOG.debug("onError", e);
if (!isDisposed()) {
circuitBreaker.onError(stopWatch.stop().getProcessingDuration().toNanos(), e);
childObserver.onError(e);
Expand All @@ -245,9 +232,7 @@ public void onError(Throwable e) {
*/
@Override
public void onComplete() {
if (LOG.isDebugEnabled()) {
LOG.info("onComplete");
}
LOG.debug("onComplete");
if (!isDisposed()) {
circuitBreaker.onSuccess(stopWatch.stop().getProcessingDuration().toNanos());
childObserver.onComplete();
Expand Down Expand Up @@ -292,9 +277,7 @@ private class CircuitBreakerSingleObserver implements SingleObserver<T>, Disposa
@Override
public void onSubscribe(Disposable disposable) {
this.disposable = disposable;
if (LOG.isDebugEnabled()) {
LOG.info("onSubscribe");
}
LOG.debug("onSubscribe");
if (circuitBreaker.isCallPermitted()) {
stopWatch = StopWatch.start(circuitBreaker.getName());
childObserver.onSubscribe(this);
Expand All @@ -311,9 +294,7 @@ public void onSubscribe(Disposable disposable) {
*/
@Override
public void onError(Throwable e) {
if (LOG.isDebugEnabled()) {
LOG.info("onError", e);
}
LOG.debug("onError", e);
if (!isDisposed()) {
circuitBreaker.onError(stopWatch.stop().getProcessingDuration().toNanos(), e);
childObserver.onError(e);
Expand All @@ -325,9 +306,7 @@ public void onError(Throwable e) {
*/
@Override
public void onSuccess(T value) {
if (LOG.isDebugEnabled()) {
LOG.info("onComplete");
}
LOG.debug("onComplete");
if (!isDisposed()) {
circuitBreaker.onSuccess(stopWatch.stop().getProcessingDuration().toNanos());
childObserver.onSuccess(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -69,6 +71,37 @@ static RateLimiter ofDefaults(String name) {
return new AtomicRateLimiter(name, RateLimiterConfig.ofDefaults());
}

/**
* Returns a supplier which is decorated by a rateLimiter.
*
* @param rateLimiter the rateLimiter
* @param supplier the original supplier
* @param <T> the type of the returned CompletionStage's result
* @return a supplier which is decorated by a RateLimiter.
*/
static <T> Supplier<CompletionStage<T>> decorateCompletionStage(RateLimiter rateLimiter, Supplier<CompletionStage<T>> supplier) {
return () -> {

final CompletableFuture<T> promise = new CompletableFuture<>();
try {
waitForPermission(rateLimiter);
supplier.get()
.whenComplete(
(result, throwable) -> {
if (throwable != null) {
promise.completeExceptionally(throwable);
} else {
promise.complete(result);
}
}
);
} catch (Throwable throwable) {
promise.completeExceptionally(throwable);
}
return promise;
};
}

/**
* Creates a supplier which is restricted by a RateLimiter.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@

package io.github.resilience4j.ratelimiter.operator;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.github.resilience4j.ratelimiter.RateLimiter;
import io.github.resilience4j.ratelimiter.RequestNotPermitted;
import io.reactivex.FlowableOperator;
Expand All @@ -24,10 +29,6 @@
import io.reactivex.SingleObserver;
import io.reactivex.SingleOperator;
import io.reactivex.disposables.Disposable;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -92,9 +93,7 @@ private final class RateLimiterSubscriber implements Subscriber<T>, Subscription
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
if (LOG.isDebugEnabled()) {
LOG.info("onSubscribe");
}
LOG.debug("onSubscribe");
if (rateLimiter.getPermission(rateLimiter.getRateLimiterConfig().getTimeoutDuration())) {
childSubscriber.onSubscribe(this);
} else {
Expand All @@ -109,9 +108,7 @@ public void onSubscribe(Subscription subscription) {
*/
@Override
public void onNext(T event) {
if (LOG.isDebugEnabled()) {
LOG.info("onNext: {}", event);
}
LOG.debug("onNext: {}", event);
if (!isCancelled()) {
if (rateLimiter.getPermission(rateLimiter.getRateLimiterConfig().getTimeoutDuration())) {
childSubscriber.onNext(event);
Expand All @@ -127,9 +124,7 @@ public void onNext(T event) {
*/
@Override
public void onError(Throwable e) {
if (LOG.isDebugEnabled()) {
LOG.info("onError", e);
}
LOG.debug("onError", e);
if (!isCancelled()) {
childSubscriber.onError(e);

Expand All @@ -141,9 +136,7 @@ public void onError(Throwable e) {
*/
@Override
public void onComplete() {
if (LOG.isDebugEnabled()) {
LOG.info("onComplete");
}
LOG.debug("onComplete");
if (!isCancelled()) {
childSubscriber.onComplete();
}
Expand Down Expand Up @@ -189,9 +182,7 @@ private final class RateLimiterObserver implements Observer<T>, Disposable {
@Override
public void onSubscribe(Disposable disposable) {
this.disposable = disposable;
if (LOG.isDebugEnabled()) {
LOG.info("onSubscribe");
}
LOG.debug("onSubscribe");
if (rateLimiter.getPermission(rateLimiter.getRateLimiterConfig().getTimeoutDuration())) {
childObserver.onSubscribe(this);
} else {
Expand All @@ -206,9 +197,7 @@ public void onSubscribe(Disposable disposable) {
*/
@Override
public void onNext(T event) {
if (LOG.isDebugEnabled()) {
LOG.info("onNext: {}", event);
}
LOG.debug("onNext: {}", event);
if (!isDisposed()) {
if (rateLimiter.getPermission(rateLimiter.getRateLimiterConfig().getTimeoutDuration())) {
childObserver.onNext(event);
Expand All @@ -224,9 +213,7 @@ public void onNext(T event) {
*/
@Override
public void onError(Throwable e) {
if (LOG.isDebugEnabled()) {
LOG.info("onError", e);
}
LOG.debug("onError", e);
if (!isDisposed()) {
childObserver.onError(e);
}
Expand All @@ -237,9 +224,7 @@ public void onError(Throwable e) {
*/
@Override
public void onComplete() {
if (LOG.isDebugEnabled()) {
LOG.info("onComplete");
}
LOG.debug("onComplete");
if (!isDisposed()) {
childObserver.onComplete();
}
Expand Down Expand Up @@ -282,9 +267,7 @@ private class RateLimiterSingleObserver implements SingleObserver<T>, Disposable
@Override
public void onSubscribe(Disposable disposable) {
this.disposable = disposable;
if (LOG.isDebugEnabled()) {
LOG.info("onSubscribe");
}
LOG.debug("onSubscribe");
if (rateLimiter.getPermission(rateLimiter.getRateLimiterConfig().getTimeoutDuration())) {
childObserver.onSubscribe(this);
} else {
Expand All @@ -299,9 +282,7 @@ public void onSubscribe(Disposable disposable) {
*/
@Override
public void onError(Throwable e) {
if (LOG.isDebugEnabled()) {
LOG.info("onError", e);
}
LOG.debug("onError", e);
if (!isDisposed()) {
childObserver.onError(e);
}
Expand All @@ -312,9 +293,7 @@ public void onError(Throwable e) {
*/
@Override
public void onSuccess(T value) {
if (LOG.isDebugEnabled()) {
LOG.info("onComplete");
}
LOG.debug("onComplete");
if (!isDisposed()) {
childObserver.onSuccess(value);
}
Expand Down
Loading

0 comments on commit 62c1bd1

Please sign in to comment.