Skip to content

Commit

Permalink
Issue ReactiveX#531: Refactored TimeLimiter to emit Events.
Browse files Browse the repository at this point in the history
  • Loading branch information
RobWin committed Aug 23, 2019
1 parent 6be60e8 commit ab93819
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public void acquirePermission() {

@Override
public void onError(long duration, TimeUnit durationUnit, Throwable throwable) {
// Handle the case if the completable future throw CompletionException wrapping the original exception
// Handle the case if the completable future throws a CompletionException wrapping the original exception
// where original exception is the the one to retry not the CompletionException.
if (throwable instanceof CompletionException) {
Throwable cause = throwable.getCause();
Expand Down Expand Up @@ -278,9 +278,9 @@ private boolean shouldPublishEvents(CircuitBreakerEvent event) {
private void publishEventIfPossible(CircuitBreakerEvent event) {
if(shouldPublishEvents(event)) {
if (eventProcessor.hasConsumers()) {
LOG.debug("Event {} published: {}", event.getEventType(), event);
try{
eventProcessor.consumeEvent(event);
LOG.debug("Event {} published: {}", event.getEventType(), event);
}catch (Throwable t){
LOG.warn("Failed to handle event {}", event.getEventType(), t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import io.github.resilience4j.core.EventConsumer;
import io.github.resilience4j.timelimiter.event.TimeLimiterEvent;
import io.github.resilience4j.timelimiter.event.TimeLimiterOnFailureEvent;
import io.github.resilience4j.timelimiter.event.TimeLimiterOnErrorEvent;
import io.github.resilience4j.timelimiter.event.TimeLimiterOnSuccessEvent;
import io.github.resilience4j.timelimiter.event.TimeLimiterOnTimeoutEvent;
import io.github.resilience4j.timelimiter.internal.TimeLimiterImpl;
Expand Down Expand Up @@ -107,9 +107,19 @@ default <T, F extends Future<T>> T executeFutureSupplier(Supplier<F> futureSuppl

EventPublisher getEventPublisher();

/**
* Records a successful call.
*
* This method must be invoked when a call was successful.
*/
void onSuccess();

void onError(Exception exception);
/**
* Records a failed call.
* This method must be invoked when a call failed.
* @param throwable The throwable which must be recorded
*/
void onError(Throwable throwable);

/**
* An EventPublisher which can be used to register event consumers.
Expand All @@ -118,7 +128,7 @@ interface EventPublisher extends io.github.resilience4j.core.EventPublisher<Time

EventPublisher onSuccess(EventConsumer<TimeLimiterOnSuccessEvent> eventConsumer);

EventPublisher onFailure(EventConsumer<TimeLimiterOnFailureEvent> eventConsumer);
EventPublisher onError(EventConsumer<TimeLimiterOnErrorEvent> eventConsumer);

EventPublisher onTimeout(EventConsumer<TimeLimiterOnTimeoutEvent> eventConsumer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,6 @@
*/
public interface TimeLimiterEvent {

static TimeLimiterEvent of(String name, Type eventType) {
switch (eventType) {
case SUCCESS:
return new TimeLimiterOnSuccessEvent(name);
case TIMEOUT:
return new TimeLimiterOnTimeoutEvent(name);
default:
return new TimeLimiterOnFailureEvent(name);
}
}

String getTimeLimiterName();

Type getEventType();
Expand All @@ -45,6 +34,6 @@ static TimeLimiterEvent of(String name, Type eventType) {
enum Type {
SUCCESS,
TIMEOUT,
FAILURE
ERROR
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,25 @@
*/
package io.github.resilience4j.timelimiter.event;

public class TimeLimiterOnFailureEvent extends AbstractTimeLimiterEvent {
public class TimeLimiterOnErrorEvent extends AbstractTimeLimiterEvent {

public TimeLimiterOnFailureEvent(String timeLimiterName) {
super(timeLimiterName, Type.FAILURE);
private final Throwable throwable;

public TimeLimiterOnErrorEvent(String timeLimiterName, Throwable throwable) {
super(timeLimiterName, Type.ERROR);
this.throwable = throwable;
}

public Throwable getThrowable() {
return throwable;
}

@Override
public String toString() {
return String.format("%s: TimeLimiter '%s' recorded an error: '%s'",
getCreationTime(),
getTimeLimiterName(),
getThrowable().toString());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,11 @@ public TimeLimiterOnSuccessEvent(String timeLimiterName) {
super(timeLimiterName, Type.SUCCESS);
}

@Override
public String toString() {
return String.format("%s: TimeLimiter '%s' recorded a successful call.",
getCreationTime(),
getTimeLimiterName());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,11 @@ public TimeLimiterOnTimeoutEvent(String timeLimiterName) {
super(timeLimiterName, Type.TIMEOUT);
}

@Override
public String toString() {
return String.format("%s: TimeLimiter '%s' recorded a timeout exception.",
getCreationTime(),
getTimeLimiterName());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import io.github.resilience4j.core.EventProcessor;
import io.github.resilience4j.timelimiter.TimeLimiter;
import io.github.resilience4j.timelimiter.event.TimeLimiterEvent;
import io.github.resilience4j.timelimiter.event.TimeLimiterOnFailureEvent;
import io.github.resilience4j.timelimiter.event.TimeLimiterOnErrorEvent;
import io.github.resilience4j.timelimiter.event.TimeLimiterOnSuccessEvent;
import io.github.resilience4j.timelimiter.event.TimeLimiterOnTimeoutEvent;

Expand All @@ -41,8 +41,8 @@ public TimeLimiter.EventPublisher onSuccess(EventConsumer<TimeLimiterOnSuccessEv
}

@Override
public TimeLimiter.EventPublisher onFailure(EventConsumer<TimeLimiterOnFailureEvent> onOnFailureEventConsumer) {
registerConsumer(TimeLimiterOnFailureEvent.class.getSimpleName(), onOnFailureEventConsumer);
public TimeLimiter.EventPublisher onError(EventConsumer<TimeLimiterOnErrorEvent> onOnFailureEventConsumer) {
registerConsumer(TimeLimiterOnErrorEvent.class.getSimpleName(), onOnFailureEventConsumer);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
import io.github.resilience4j.timelimiter.TimeLimiter;
import io.github.resilience4j.timelimiter.TimeLimiterConfig;
import io.github.resilience4j.timelimiter.event.TimeLimiterEvent;
import io.github.resilience4j.timelimiter.event.TimeLimiterOnErrorEvent;
import io.github.resilience4j.timelimiter.event.TimeLimiterOnSuccessEvent;
import io.github.resilience4j.timelimiter.event.TimeLimiterOnTimeoutEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
Expand All @@ -13,6 +18,8 @@

public class TimeLimiterImpl implements TimeLimiter {

private static final Logger LOG = LoggerFactory.getLogger(TimeLimiterImpl.class);

private String name;
private final TimeLimiterConfig timeLimiterConfig;
private final TimeLimiterEventProcessor eventProcessor;
Expand All @@ -38,11 +45,12 @@ public <T, F extends Future<T>> Callable<T> decorateFutureSupplier(Supplier<F> f
}
throw e;
} catch (ExecutionException e) {
onError(e);
Throwable t = e.getCause();
if (t == null) {
onError(e);
throw e;
}
onError(t);
if (t instanceof Error) {
throw (Error) t;
}
Expand All @@ -51,14 +59,6 @@ public <T, F extends Future<T>> Callable<T> decorateFutureSupplier(Supplier<F> f
};
}

private void publishTimeLimiterEvent(TimeLimiterEvent.Type eventType) {
if (!eventProcessor.hasConsumers()) {
return;
}
eventProcessor.consumeEvent(TimeLimiterEvent.of(name, eventType));

}

@Override
public TimeLimiterConfig getTimeLimiterConfig() {
return timeLimiterConfig;
Expand All @@ -71,15 +71,30 @@ public EventPublisher getEventPublisher() {

@Override
public void onSuccess() {
publishTimeLimiterEvent(TimeLimiterEvent.Type.SUCCESS);
if (!eventProcessor.hasConsumers()) {
return;
}
publishEvent(new TimeLimiterOnSuccessEvent(name));
}

@Override
public void onError(Exception e) {
if (e instanceof TimeoutException) {
publishTimeLimiterEvent(TimeLimiterEvent.Type.TIMEOUT);
public void onError(Throwable throwable) {
if (!eventProcessor.hasConsumers()) {
return;
}
if (throwable instanceof TimeoutException) {
publishEvent(new TimeLimiterOnTimeoutEvent(name));
} else {
publishTimeLimiterEvent(TimeLimiterEvent.Type.FAILURE);
publishEvent(new TimeLimiterOnErrorEvent(name, throwable));
}
}

private void publishEvent(TimeLimiterEvent event) {
try{
eventProcessor.consumeEvent(event);
LOG.debug("Event {} published: {}", event.getEventType(), event);
}catch (Throwable t){
LOG.warn("Failed to handle event {}", event.getEventType(), t);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package io.github.resilience4j.timelimiter;

import io.github.resilience4j.timelimiter.event.TimeLimiterEvent;
import io.github.resilience4j.timelimiter.event.TimeLimiterOnErrorEvent;
import io.vavr.control.Try;

import java.time.Duration;
Expand Down Expand Up @@ -54,7 +55,7 @@ public void shouldReturnTheSameConsumer() {
public void shouldConsumeOnSuccessEvent() throws Exception {
TimeLimiter timeLimiter = TimeLimiter.of(NEVER);
timeLimiter.getEventPublisher()
.onSuccess(this::logEventType);
.onSuccess(event -> logger.info(event.getEventType().toString()));
Supplier<CompletableFuture<String>> futureSupplier = () ->
CompletableFuture.completedFuture("Hello world");

Expand All @@ -68,7 +69,7 @@ public void shouldConsumeOnSuccessEvent() throws Exception {
public void shouldConsumeOnTimeoutEvent() {
TimeLimiter timeLimiter = TimeLimiter.of(NEVER);
timeLimiter.getEventPublisher()
.onTimeout(this::logEventType);
.onTimeout(event -> logger.info(event.getEventType().toString()));
Supplier<CompletableFuture<String>> futureSupplier = () ->
CompletableFuture.supplyAsync(this::fail);

Expand All @@ -78,20 +79,16 @@ public void shouldConsumeOnTimeoutEvent() {
}

@Test
public void shouldConsumeOnFailureEvent() {
public void shouldConsumeOnErrorEvent() {
TimeLimiter timeLimiter = TimeLimiter.of(Duration.ofSeconds(1));
timeLimiter.getEventPublisher()
.onFailure(this::logEventType);
.onError(event -> logger.info(event.getEventType().toString() + " " + event.getThrowable().toString()));
Supplier<CompletableFuture<String>> futureSupplier = () ->
CompletableFuture.supplyAsync(this::fail);

Try.ofCallable(timeLimiter.decorateFutureSupplier(futureSupplier));

then(logger).should(times(1)).info("FAILURE");
}

private void logEventType(TimeLimiterEvent event) {
logger.info(event.getEventType().toString());
then(logger).should(times(1)).info("ERROR java.lang.RuntimeException");
}

private String fail() {
Expand Down

0 comments on commit ab93819

Please sign in to comment.