Skip to content

Commit

Permalink
adding ratpack support, closes ReactiveX#74 (ReactiveX#82)
Browse files Browse the repository at this point in the history
* adding ratpack support, closes ReactiveX#74

* fix ratelimiter test configs to account for execution time differences in environments

* adding tests
  • Loading branch information
Dan Maas authored and RobWin committed Mar 31, 2017
1 parent 4dacdd7 commit 987c3f7
Show file tree
Hide file tree
Showing 21 changed files with 1,657 additions and 1 deletion.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
*.iml
.gradle
build
classes
classes
/resilience4j-ratpack/src/test/groovy/io/github/resilience4j/ratpack/annotation/RateLimiterSpec.groovy
7 changes: 7 additions & 0 deletions libraries.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ ext {
metricsVersion = '3.1.2'
vertxVersion = '3.4.1'
springBootVersion = '1.4.3.RELEASE'
ratpackVersion = '1.4.5'
spockVersion = '1.1-groovy-2.4-rc-4'
retrofitVersion = '2.1.0'
prometheusSimpleClientVersion = '0.0.21'

Expand Down Expand Up @@ -44,6 +46,11 @@ ext {
spring_boot_web: "org.springframework.boot:spring-boot-starter-web:${springBootVersion}",
spring_boot_test: "org.springframework.boot:spring-boot-starter-test:${springBootVersion}",

// ratpack addon
ratpack: "io.ratpack:ratpack-guice:${ratpackVersion}",
ratpack_test: "io.ratpack:ratpack-groovy-test:${ratpackVersion}",
spock: "org.spockframework:spock-core:${spockVersion}",

// Retrofit addon
retrofit: "com.squareup.retrofit2:retrofit:${retrofitVersion}",
retrofit_test: "com.squareup.retrofit2:converter-scalars:${retrofitVersion}",
Expand Down
10 changes: 10 additions & 0 deletions resilience4j-ratpack/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
apply plugin: 'groovy'

dependencies {
compile ( libraries.ratpack )
compile project(':resilience4j-circuitbreaker')
compile project(':resilience4j-ratelimiter')
compile project(':resilience4j-retry')
testCompile ( libraries.ratpack_test )
testCompile ( libraries.spock )
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright 2017 Dan Maas
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.resilience4j.ratpack;

import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerOpenException;
import io.github.resilience4j.core.StopWatch;
import ratpack.exec.Downstream;
import ratpack.exec.Upstream;
import ratpack.func.Function;

public class CircuitBreakerTransformer<T> implements Function<Upstream<? extends T>, Upstream<T>> {

private final CircuitBreaker circuitBreaker;
private Function<Throwable, ? extends T> recoverer;

private CircuitBreakerTransformer(CircuitBreaker circuitBreaker) {
this.circuitBreaker = circuitBreaker;
}

/**
* Create a new transformer that can be applied to the {@link ratpack.exec.Promise#transform(Function)} method.
* The Promised value will pass through the circuitbreaker, potentially causing it to open if the thresholds
* for the circuit breaker are exceeded.
*
* @param circuitBreaker the circuit breaker to use
* @param <T> the type of object
* @return the transformer
*/
public static <T> CircuitBreakerTransformer<T> of(CircuitBreaker circuitBreaker) {
return new CircuitBreakerTransformer<>(circuitBreaker);
}

/**
* Set a recovery function that will execute when the circuit breaker is open.
*
* @param recoverer the recovery function
* @return the transformer
*/
public CircuitBreakerTransformer<T> recover(Function<Throwable, ? extends T> recoverer) {
this.recoverer = recoverer;
return this;
}

@Override
public Upstream<T> apply(Upstream<? extends T> upstream) throws Exception {
return down -> {
StopWatch stopWatch;
if (circuitBreaker.isCallPermitted()) {
stopWatch = StopWatch.start(circuitBreaker.getName());
upstream.connect(new Downstream<T>() {

@Override
public void success(T value) {
circuitBreaker.onSuccess(stopWatch.stop().getProcessingDuration());
down.success(value);
}

@Override
public void error(Throwable throwable) {
circuitBreaker.onError(stopWatch.stop().getProcessingDuration(), throwable);
try {
if (recoverer != null) {
down.success(recoverer.apply(throwable));
} else {
down.error(throwable);
}
} catch (Throwable t) {
down.error(t);
}
}

@Override
public void complete() {
down.complete();
}
});
} else {
Throwable t = new CircuitBreakerOpenException("CircuitBreaker ${circuitBreaker.name} is open");
if (recoverer != null) {
try {
down.success(recoverer.apply(t));
} catch (Throwable t2) {
down.error(t2);
}
} else {
down.error(t);
}
}
};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2017 Dan Maas
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.resilience4j.ratpack;

import io.github.resilience4j.ratelimiter.RateLimiter;
import io.github.resilience4j.ratelimiter.RateLimiterConfig;
import io.github.resilience4j.ratelimiter.RequestNotPermitted;
import ratpack.exec.Downstream;
import ratpack.exec.Upstream;
import ratpack.func.Function;

import java.time.Duration;

public class RateLimiterTransformer<T> implements Function<Upstream<? extends T>, Upstream<T>> {

private final RateLimiter rateLimiter;
private Function<Throwable, ? extends T> recover;

private RateLimiterTransformer(RateLimiter rateLimiter) {
this.rateLimiter = rateLimiter;
}

/**
* Create a new transformer that can be applied to the {@link ratpack.exec.Promise#transform(Function)} method.
* The Promised value will pass through the rateLimiter, potentially causing it to rateLimiter on error.
*
* @param rateLimiter the rateLimiter to use
* @param <T> the type of object
* @return the transformer
*/
public static <T> RateLimiterTransformer<T> of(RateLimiter rateLimiter) {
return new RateLimiterTransformer<>(rateLimiter);
}

/**
* Set a recovery function that will execute when the rateLimiter limit is exceeded.
*
* @param recover the recovery function
* @return the transformer
*/
public RateLimiterTransformer<T> recover(Function<Throwable, ? extends T> recover) {
this.recover = recover;
return this;
}

@Override
public Upstream<T> apply(Upstream<? extends T> upstream) throws Exception {
return down -> {
RateLimiterConfig rateLimiterConfig = rateLimiter.getRateLimiterConfig();
Duration timeoutDuration = rateLimiterConfig.getTimeoutDuration();
boolean permission = rateLimiter.getPermission(timeoutDuration);
if (Thread.interrupted()) {
throw new IllegalStateException("Thread was interrupted during permission wait");
}
if (!permission) {
Throwable t = new RequestNotPermitted("Request not permitted for limiter: " + rateLimiter.getName());
if (recover != null) {
down.success(recover.apply(t));
} else {
down.error(t);
}
} else {
upstream.connect(new Downstream<T>() {

@Override
public void success(T value) {
down.success(value);
}

@Override
public void error(Throwable throwable) {
down.error(throwable);
}

@Override
public void complete() {
down.complete();
}
});
}
};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2017 Dan Maas
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.resilience4j.ratpack;

import ratpack.func.Function;

public interface RecoveryFunction<O> extends Function<Throwable, O> {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2017 Dan Maas
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.resilience4j.ratpack;

import com.google.inject.matcher.Matchers;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import io.github.resilience4j.ratelimiter.RateLimiterRegistry;
import io.github.resilience4j.ratpack.annotation.CircuitBreaker;
import io.github.resilience4j.ratpack.annotation.RateLimiter;
import io.github.resilience4j.ratpack.internal.CircuitBreakerMethodInterceptor;
import io.github.resilience4j.ratpack.internal.RateLimiterMethodInterceptor;
import ratpack.guice.ConfigurableModule;

public class ResilienceModule extends ConfigurableModule<ResilienceModule.ResilienceConfig> {

@Override
protected void configure() {
bindInterceptor(Matchers.any(), Matchers.annotatedWith(CircuitBreaker.class), injected(new CircuitBreakerMethodInterceptor(getProvider(CircuitBreakerRegistry.class))));
bindInterceptor(Matchers.any(), Matchers.annotatedWith(RateLimiter.class), injected(new RateLimiterMethodInterceptor(getProvider(RateLimiterRegistry.class))));
}

private <T> T injected(T instance) {
requestInjection(instance);
return instance;
}

public static class ResilienceConfig {
private boolean enableMetrics = false;

public ResilienceConfig enableMetrics(boolean enableMetrics) {
this.enableMetrics = enableMetrics;
return this;
}

}
}
Loading

0 comments on commit 987c3f7

Please sign in to comment.