Skip to content

Commit

Permalink
Issue ReactiveX#137: Refactored Retry so that it can be shared and us…
Browse files Browse the repository at this point in the history
…ed for mu… (ReactiveX#138)

* Issue ReactiveX#137: Refactored Retry so that it can be shared and used for multiple requests.
  • Loading branch information
RobWin authored May 19, 2017
1 parent 6f854a5 commit 2f71963
Show file tree
Hide file tree
Showing 38 changed files with 1,130 additions and 895 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import java.util.function.Supplier;

/**
* A Bulkhead instance is thread-safe can be used to decorate multiple requests.
*
* A {@link Bulkhead} represent an entity limiting the amount of parallel operations. It does not assume nor does it mandate usage
* of any particular concurrency and/or io model. These details are left for the client to manage. This bulkhead, depending on the
* underlying concurrency/io model can be used to shed load, and, where it makes sense, limit resource use (i.e. limit amount of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package io.github.resilience4j.cache;

import io.github.resilience4j.cache.event.CacheEvent;
import io.github.resilience4j.cache.internal.CacheContext;
import io.github.resilience4j.cache.internal.CacheImpl;
import io.reactivex.Flowable;
import io.vavr.CheckedFunction0;
import io.vavr.CheckedFunction1;
Expand Down Expand Up @@ -72,7 +72,7 @@ public interface Cache<K, V> {
*/
static <K,V> Cache<K,V> of(javax.cache.Cache<K, V> cache){
Objects.requireNonNull(cache, "Cache must not be null");
return new CacheContext<>(cache);
return new CacheImpl<>(cache);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Supplier;

public class CacheContext<K, V> implements Cache<K,V> {
public class CacheImpl<K, V> implements Cache<K,V> {

private static final Logger LOG = LoggerFactory.getLogger(CacheContext.class);
private static final Logger LOG = LoggerFactory.getLogger(CacheImpl.class);

private final javax.cache.Cache<K, V> cache;
private final FlowableProcessor<CacheEvent> eventPublisher;
private final CacheMetrics metrics;

public CacheContext(javax.cache.Cache<K, V> cache) {
public CacheImpl(javax.cache.Cache<K, V> cache) {
this.cache = cache;
PublishProcessor<CacheEvent> publisher = PublishProcessor.create();
this.eventPublisher = publisher.toSerialized();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import java.util.function.Supplier;

/**
* A CircuitBreaker instance is thread-safe can be used to decorate multiple requests.
*
* A {@link CircuitBreaker} manages the state of a backend system.
* The CircuitBreaker is implemented via a finite state machine with three states: CLOSED, OPEN and HALF_OPEN.
* The CircuitBreaker does not know anything about the backend’s state by itself, but uses the information provided by the decorators via
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,19 @@ RetryConfig config = RetryConfig.custom()
.maxAttempts(3)
.waitDuration(Duration.ofMillis(500))
.build();
Retry retryContext = Retry.of("id", config);
Retry retry = Retry.of("id", config);
----

In order to create a custom `Retry` context, you can use the Retry context builder. You can configure the maximum number of retry attempts and the wait duration between successive attempts. Furthermore, you can configure a custom Predicate which evaluates if an exception should trigger a retry.
In order to create a custom-configured `Retry`, you can use the RetryConfig builder. You can configure the maximum number of retry attempts and the wait duration between successive attempts. Furthermore, you can configure a custom Predicate which evaluates if an exception should trigger a retry.

[source,java]
----
RetryConfig config = RetryConfig.custom()
.maxAttempts(2)
.waitDurationInOpenState(Duration.ofMillis(100))
.retryOnException(throwable -> Match.of(throwable)
.whenType(WebServiceException.class).then(false)
.otherwise(true).get())
.retryOnException(throwable -> API.Match(throwable).of(
API.Case($(Predicates.instanceOf(WebServiceException.class)), true),
API.Case($(), false)))
.build();
----

Expand All @@ -39,9 +39,9 @@ HelloWorldService helloWorldService = mock(HelloWorldService.class);
given(helloWorldService.sayHelloWorld()).willThrow(new WebServiceException("BAM!"));
// Create a Retry with default configuration
Retry retryContext = Retry.ofDefaults("id");
Retry retry = Retry.ofDefaults("id");
// Decorate the invocation of the HelloWorldService
CheckedFunction0<String> retryableSupplier = Retry.decorateCheckedSupplier(retryContext, helloWorldService::sayHelloWorld);
CheckedFunction0<String> retryableSupplier = Retry.decorateCheckedSupplier(retry, helloWorldService::sayHelloWorld);
// When I invoke the function
Try<String> result = Try.of(retryableSupplier).recover((throwable) -> "Hello world from recovery function");
Expand Down Expand Up @@ -69,9 +69,9 @@ The RetryContext emits a stream of RetryEvents to any Observer/Consumer who subs

[source,java]
----
Retry retryContext = Retry.ofDefaults("id");
Retry retry = Retry.ofDefaults("id");
CircularEventConsumer<RetryEvent> circularEventConsumer = new CircularEventConsumer<>(10);
retryContext.getEventStream()
retry.getEventStream()
.subscribe(circularEventConsumer);
List<RetryEvent> bufferedEvents = circularEventConsumer.getBufferedEvents();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
*/
package io.github.resilience4j.metrics;

import static com.codahale.metrics.MetricRegistry.name;
import static java.util.Objects.requireNonNull;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
Expand All @@ -31,12 +28,20 @@

import java.util.Map;

import static com.codahale.metrics.MetricRegistry.name;
import static java.util.Objects.requireNonNull;

/**
* An adapter which exports {@link CircuitBreaker.Metrics} as Dropwizard Metrics Gauges.
*/
public class CircuitBreakerMetrics implements MetricSet {

private static final String DEFAULT_PREFIX = "resilience4j.circuitbreaker";
public static final String SUCCESSFUL = "successful";
public static final String FAILED = "failed";
public static final String NOT_PERMITTED = "not_permitted";
public static final String BUFFERED = "buffered";
public static final String BUFFERED_MAX = "buffered_max";
private final MetricRegistry metricRegistry = new MetricRegistry();

private CircuitBreakerMetrics(Iterable<CircuitBreaker> circuitBreakers) {
Expand All @@ -50,15 +55,15 @@ private CircuitBreakerMetrics(String prefix, Iterable<CircuitBreaker> circuitBre
String name = circuitBreaker.getName();
CircuitBreaker.Metrics metrics = circuitBreaker.getMetrics();

metricRegistry.register(name(prefix, name, "successful"),
metricRegistry.register(name(prefix, name, SUCCESSFUL),
(Gauge<Integer>) metrics::getNumberOfSuccessfulCalls);
metricRegistry.register(name(prefix, name, "failed"),
metricRegistry.register(name(prefix, name, FAILED),
(Gauge<Integer>) metrics::getNumberOfFailedCalls);
metricRegistry.register(name(prefix, name, "not_permitted"),
metricRegistry.register(name(prefix, name, NOT_PERMITTED),
(Gauge<Long>) metrics::getNumberOfNotPermittedCalls);
metricRegistry.register(name(prefix, name, "buffered"),
metricRegistry.register(name(prefix, name, BUFFERED),
(Gauge<Integer>) metrics::getNumberOfBufferedCalls);
metricRegistry.register(name(prefix, name, "buffered_max"),
metricRegistry.register(name(prefix, name, BUFFERED_MAX),
(Gauge<Integer>) metrics::getMaxNumberOfBufferedCalls);
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
*/
public class RetryMetrics implements MetricSet {

public static final String SUCCESSFUL_CALLS_WITHOUT_RETRY = "successful_calls_without_retry";
public static final String SUCCESSFUL_CALLS_WITH_RETRY = "successful_calls_with_retry";
public static final String FAILED_CALLS_WITHOUT_RETRY = "failed_calls_without_retry";
public static final String FAILED_CALLS_WITH_RETRY = "failed_calls_with_retry";
private final MetricRegistry metricRegistry = new MetricRegistry();
private static final String DEFAULT_PREFIX = "resilience4j.retry";

Expand All @@ -29,9 +33,14 @@ private RetryMetrics(String prefix, Iterable<Retry> retries){
String name = retry.getName();
Retry.Metrics metrics = retry.getMetrics();

metricRegistry.register(name(prefix, name, "retry_max_ratio"),
new RetryRatio(metrics.getNumAttempts(), metrics.getMaxAttempts()));

metricRegistry.register(name(prefix, name, SUCCESSFUL_CALLS_WITHOUT_RETRY),
(Gauge<Long>) metrics::getNumberOfSuccessfulCallsWithoutRetryAttempt);
metricRegistry.register(name(prefix, name, SUCCESSFUL_CALLS_WITH_RETRY),
(Gauge<Long>) metrics::getNumberOfSuccessfulCallsWithRetryAttempt);
metricRegistry.register(name(prefix, name, FAILED_CALLS_WITHOUT_RETRY),
(Gauge<Long>) metrics::getNumberOfFailedCallsWithoutRetryAttempt);
metricRegistry.register(name(prefix, name, FAILED_CALLS_WITH_RETRY),
(Gauge<Long>) metrics::getNumberOfFailedCallsWithRetryAttempt);
});
}

Expand Down
Loading

0 comments on commit 2f71963

Please sign in to comment.