Skip to content

Commit

Permalink
Removed default Scheduler from MonoRateLimiter and FluxRateLimiter. T…
Browse files Browse the repository at this point in the history
…he user should can use subscribeOn or publishOn whenever he wants.
  • Loading branch information
RobWin committed May 21, 2019
1 parent 8e0a940 commit f102a93
Show file tree
Hide file tree
Showing 9 changed files with 18 additions and 60 deletions.
10 changes: 0 additions & 10 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,6 @@ before_install:
after_success:
- test -n $SONAR_TOKEN && ./gradlew sonarqube
- test $TRAVIS_BRANCH = "master" && test -n $BINTRAY_USER && test -n $PUBLISH && ./gradlew artifactoryPublish -PbintrayUsername="${BINTRAY_USER}" -PbintrayApiKey="${BINTRAY_KEY}"
- test $TRAVIS_BRANCH = "master" && test -n $GITHUB_TOKEN && test -n $PUBLISH && ./gradlew asciidoctor
deploy:
provider: pages
skip_cleanup: true
github_token: $GITHUB_TOKEN # Set in the settings page of your repository, as a secure variable
keep_history: true
local_dir: resilience4j-documentation/build/asciidoc/html5
on:
branch: master
condition: $PUBLISH = true
before_cache:
- rm -f $HOME/.gradle/caches/modules-2/modules-2.lock
- rm -fr $HOME/.gradle/caches/*/plugin-resolution/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import ratpack.exec.Promise;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;
import java.util.Optional;
Expand Down Expand Up @@ -93,14 +92,14 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
} else if (Flux.class.isAssignableFrom(returnType)) {
Flux<?> result = (Flux<?>) proceed(invocation, rateLimiter, fallbackMethod);
if (result != null) {
RateLimiterOperator operator = RateLimiterOperator.of(rateLimiter, Schedulers.immediate());
RateLimiterOperator operator = RateLimiterOperator.of(rateLimiter);
result = fallbackMethod.onErrorResume(result.transform(operator));
}
return result;
} else if (Mono.class.isAssignableFrom(returnType)) {
Mono<?> result = (Mono<?>) proceed(invocation, rateLimiter, fallbackMethod);
if (result != null) {
RateLimiterOperator operator = RateLimiterOperator.of(rateLimiter, Schedulers.immediate());
RateLimiterOperator operator = RateLimiterOperator.of(rateLimiter);
result = fallbackMethod.onErrorResume(result.transform(operator));
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,22 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.core.publisher.Operators;
import reactor.core.scheduler.Scheduler;

import java.time.Duration;

class FluxRateLimiter<T> extends FluxOperator<T, T> {

private final RateLimiter rateLimiter;
private final Scheduler scheduler;

FluxRateLimiter(Flux<? extends T> source, RateLimiter rateLimiter,
Scheduler scheduler) {
FluxRateLimiter(Flux<? extends T> source, RateLimiter rateLimiter) {
super(source);
this.rateLimiter = rateLimiter;
this.scheduler = scheduler;
}

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
if(rateLimiter.acquirePermission(Duration.ZERO)){
source.publishOn(scheduler)
.subscribe(new RateLimiterSubscriber<>(actual));
source.subscribe(new RateLimiterSubscriber<>(actual));
}else{
Operators.error(actual, new RequestNotPermitted(rateLimiter));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,21 @@
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoOperator;
import reactor.core.publisher.Operators;
import reactor.core.scheduler.Scheduler;

import java.time.Duration;

class MonoRateLimiter<T> extends MonoOperator<T, T> {
private final RateLimiter rateLimiter;
private final Scheduler scheduler;

MonoRateLimiter(Mono<? extends T> source, RateLimiter rateLimiter,
Scheduler scheduler) {
MonoRateLimiter(Mono<? extends T> source, RateLimiter rateLimiter) {
super(source);
this.rateLimiter = rateLimiter;
this.scheduler = scheduler;
}

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
if(rateLimiter.acquirePermission(Duration.ZERO)){
source.publishOn(scheduler)
.subscribe(new RateLimiterSubscriber<>(actual));
source.subscribe(new RateLimiterSubscriber<>(actual));
}else{
Operators.error(actual, new RequestNotPermitted(rateLimiter));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.util.function.UnaryOperator;

Expand All @@ -34,11 +32,9 @@
*/
public class RateLimiterOperator<T> implements UnaryOperator<Publisher<T>> {
private final RateLimiter rateLimiter;
private final Scheduler scheduler;

private RateLimiterOperator(RateLimiter rateLimiter, Scheduler scheduler) {
private RateLimiterOperator(RateLimiter rateLimiter) {
this.rateLimiter = rateLimiter;
this.scheduler = scheduler;
}

/**
Expand All @@ -49,27 +45,15 @@ private RateLimiterOperator(RateLimiter rateLimiter, Scheduler scheduler) {
* @return a RateLimiterOperator
*/
public static <T> RateLimiterOperator<T> of(RateLimiter rateLimiter) {
return of(rateLimiter, Schedulers.parallel());
}

/**
* Creates a RateLimiterOperator.
*
* @param <T> the value type of the upstream and downstream
* @param rateLimiter the Rate limiter
* @param scheduler the {@link Scheduler} where to publish
* @return a RateLimiterOperator
*/
public static <T> RateLimiterOperator<T> of(RateLimiter rateLimiter, Scheduler scheduler) {
return new RateLimiterOperator<>(rateLimiter, scheduler);
return new RateLimiterOperator<>(rateLimiter);
}

@Override
public Publisher<T> apply(Publisher<T> publisher) {
if (publisher instanceof Mono) {
return new MonoRateLimiter<>((Mono<? extends T>) publisher, rateLimiter, scheduler);
return new MonoRateLimiter<>((Mono<? extends T>) publisher, rateLimiter);
} else if (publisher instanceof Flux) {
return new FluxRateLimiter<>((Flux<? extends T>) publisher, rateLimiter, scheduler);
return new FluxRateLimiter<>((Flux<? extends T>) publisher, rateLimiter);
}

throw new IllegalStateException("Publisher of type <" + publisher.getClass().getSimpleName()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

import java.io.IOException;
Expand Down Expand Up @@ -108,7 +107,7 @@ public void shouldEmitErrorWithCircuitBreakerOpenExceptionEvenWhenErrorDuringSub
Flux.error(new IOException("BAM!"))
.compose(CircuitBreakerOperator.of(circuitBreaker))
.compose(BulkheadOperator.of(bulkhead))
.compose(RateLimiterOperator.of(rateLimiter, Schedulers.immediate()))
.compose(RateLimiterOperator.of(rateLimiter))
).expectError(CallNotPermittedException.class)
.verify(Duration.ofSeconds(1));
}
Expand All @@ -120,7 +119,7 @@ public void shouldEmitErrorWithCircuitBreakerOpenExceptionEvenWhenErrorNotOnSubs
Flux.error(new IOException("BAM!"), true)
.compose(CircuitBreakerOperator.of(circuitBreaker))
.compose(BulkheadOperator.of(bulkhead))
.compose(RateLimiterOperator.of(rateLimiter, Schedulers.immediate()))
.compose(RateLimiterOperator.of(rateLimiter))
).expectError(CallNotPermittedException.class)
.verify(Duration.ofSeconds(1));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.junit.Test;
import org.mockito.Mockito;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

import java.io.IOException;
Expand Down Expand Up @@ -69,7 +68,7 @@ public void shouldEmitRequestNotPermittedException() {

StepVerifier.create(
Flux.just("Event")
.compose(RateLimiterOperator.of(rateLimiter, Schedulers.immediate())))
.compose(RateLimiterOperator.of(rateLimiter)))
.expectSubscription()
.expectError(RequestNotPermitted.class)
.verify(Duration.ofSeconds(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.junit.Test;
import org.mockito.Mockito;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

import java.io.IOException;
Expand Down Expand Up @@ -79,7 +78,7 @@ public void shouldEmitRequestNotPermittedExceptionEvenWhenErrorDuringSubscribe()

StepVerifier.create(
Mono.error(new IOException("BAM!"))
.compose(RateLimiterOperator.of(rateLimiter, Schedulers.immediate())))
.compose(RateLimiterOperator.of(rateLimiter)))
.expectError(RequestNotPermitted.class)
.verify(Duration.ofSeconds(1));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@
*/
package io.github.resilience4j.ratelimiter.configure;

import io.github.resilience4j.ratelimiter.RateLimiter;
import io.github.resilience4j.reactor.ratelimiter.operator.RateLimiterOperator;
import org.aspectj.lang.ProceedingJoinPoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.github.resilience4j.ratelimiter.RateLimiter;
import io.github.resilience4j.reactor.ratelimiter.operator.RateLimiterOperator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/**
* the Reactor RateLimiter logic support for the spring AOP
Expand Down Expand Up @@ -57,10 +55,10 @@ public Object handle(ProceedingJoinPoint proceedingJoinPoint, RateLimiter rateLi
Object returnValue = proceedingJoinPoint.proceed();
if (Flux.class.isAssignableFrom(returnValue.getClass())) {
Flux<?> fluxReturnValue = (Flux<?>) returnValue;
return fluxReturnValue.compose(RateLimiterOperator.of(rateLimiter, Schedulers.immediate()));
return fluxReturnValue.compose(RateLimiterOperator.of(rateLimiter));
} else if (Mono.class.isAssignableFrom(returnValue.getClass())) {
Mono<?> monoReturnValue = (Mono<?>) returnValue;
return monoReturnValue.compose(RateLimiterOperator.of(rateLimiter, Schedulers.immediate()));
return monoReturnValue.compose(RateLimiterOperator.of(rateLimiter));
} else {
logger.error("Unsupported type for Reactor rateLimiter {}", returnValue.getClass().getTypeName());
throw new IllegalArgumentException("Not Supported type for the rateLimiter in Reactor :" + returnValue.getClass().getName());
Expand Down

0 comments on commit f102a93

Please sign in to comment.