Skip to content

Commit

Permalink
Issue ReactiveX#384: Added higher-order functions to chain resultHand…
Browse files Browse the repository at this point in the history
…ler and e… (ReactiveX#415)
  • Loading branch information
RobWin authored Apr 8, 2019
1 parent 945d6d1 commit 0389530
Show file tree
Hide file tree
Showing 8 changed files with 383 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
package io.github.resilience4j.circuitbreaker;

import io.github.resilience4j.test.HelloWorldService;
import io.vavr.*;
import io.vavr.CheckedConsumer;
import io.vavr.CheckedFunction0;
import io.vavr.CheckedFunction1;
import io.vavr.CheckedRunnable;
import io.vavr.control.Try;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -30,14 +33,16 @@
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.time.Duration;
import java.util.concurrent.*;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

import static io.vavr.API.*;
import static io.vavr.API.$;
import static io.vavr.Predicates.*;
import static io.vavr.Predicates.instanceOf;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package io.github.resilience4j.core;

import java.util.concurrent.Callable;
import java.util.function.BiFunction;
import java.util.function.Function;

public class CallableUtils {

private CallableUtils(){}

/**
* Returns a composed function that first applies the Callable and then applies
* the resultHandler.
*
* @param <T> return type of callable
* @param <R> return type of handler
* @param resultHandler the function applied after callable
* @return a function composed of supplier and resultHandler
*/
public static <T, R> Callable<R> andThen(Callable<T> callable, Function<T, R> resultHandler){
return () -> resultHandler.apply(callable.call());
}

/**
* Returns a composed function that first applies the Callable and then applies
* {@linkplain BiFunction} {@code after} to the result.
*
* @param <T> return type of callable
* @param <R> return type of handler
* @param handler the function applied after callable
* @return a function composed of supplier and handler
*/
public static <T, R> Callable<R> andThen(Callable<T> callable, BiFunction<T, Exception, R> handler){
return () -> {
try{
T result = callable.call();
return handler.apply(result, null);
}catch (Exception exception){
return handler.apply(null, exception);
}
};
}

/**
* Returns a composed function that first applies the Callable and then applies
* either the resultHandler or exceptionHandler.
*
* @param <T> return type of callable
* @param <R> return type of resultHandler and exceptionHandler
* @param resultHandler the function applied after callable was successful
* @param exceptionHandler the function applied after callable has failed
* @return a function composed of supplier and handler
*/
public static <T, R> Callable<R> andThen(Callable<T> callable, Function<T, R> resultHandler, Function<Exception, R> exceptionHandler){
return () -> {
try{
T result = callable.call();
return resultHandler.apply(result);
}catch (Exception exception){
return exceptionHandler.apply(exception);
}
};
}

/**
* Returns a composed function that first executes the Callable and optionally recovers from an exception.
*
* @param <T> return type of after
* @param exceptionHandler the exception handler
* @return a function composed of callable and exceptionHandler
*/
public static <T> Callable<T> recover(Callable<T> callable, Function<Exception, T> exceptionHandler){
return () -> {
try{
return callable.call();
}catch (Exception exception){
return exceptionHandler.apply(exception);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@

public class EventProcessor<T> implements EventPublisher<T> {

protected volatile boolean consumerRegistered;
@Nullable private volatile EventConsumer<T> onEventConsumer;
private boolean consumerRegistered;
@Nullable private EventConsumer<T> onEventConsumer;
private ConcurrentMap<Class<? extends T>, EventConsumer<Object>> eventConsumers = new ConcurrentHashMap<>();

public boolean hasConsumers(){
return consumerRegistered;
}

@SuppressWarnings("unchecked")
public <E extends T> void registerConsumer(Class<? extends E> eventType, EventConsumer<E> eventConsumer){
public synchronized <E extends T> void registerConsumer(Class<? extends E> eventType, EventConsumer<E> eventConsumer){
consumerRegistered = true;
eventConsumers.put(eventType, (EventConsumer<Object>) eventConsumer);
}
Expand All @@ -58,7 +58,7 @@ public <E extends T> boolean processEvent(E event) {
}

@Override
public void onEvent(@Nullable EventConsumer<T> onEventConsumer) {
public synchronized void onEvent(@Nullable EventConsumer<T> onEventConsumer) {
consumerRegistered = true;
this.onEventConsumer = onEventConsumer;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package io.github.resilience4j.core;

import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;

public class SupplierUtils {

private SupplierUtils(){}

/**
* Returns a composed function that first applies the Supplier and then applies
* the resultHandler.
*
* @param <T> return type of callable
* @param <R> return type of handler
* @param resultHandler the function applied after supplier
* @return a function composed of supplier and resultHandler
*/
public static <T, R> Supplier<R> andThen(Supplier<T> supplier, Function<T, R> resultHandler){
return () -> resultHandler.apply(supplier.get());
}

/**
* Returns a composed function that first applies the Supplier and then applies
* {@linkplain BiFunction} {@code after} to the result.
*
* @param <T> return type of after
* @param handler the function applied after supplier
* @return a function composed of supplier and handler
*/
public static <T, R> Supplier<R> andThen(Supplier<T> supplier, BiFunction<T, Exception, R> handler){
return () -> {
try{
T result = supplier.get();
return handler.apply(result, null);
}catch (Exception exception){
return handler.apply(null, exception);
}
};
}

/**
* Returns a composed function that first executes the Supplier and optionally recovers from an exception.
*
* @param <T> return type of after
* @param exceptionHandler the exception handler
* @return a function composed of supplier and exceptionHandler
*/
public static <T> Supplier<T> recover(Supplier<T> supplier, Function<Exception, T> exceptionHandler){
return () -> {
try{
return supplier.get();
}catch (Exception exception){
return exceptionHandler.apply(exception);
}
};
}

/**
* Returns a composed function that first applies the Supplier and then applies
* either the resultHandler or exceptionHandler.
*
* @param <T> return type of after
* @param resultHandler the function applied after Supplier was successful
* @param exceptionHandler the function applied after Supplier has failed
* @return a function composed of supplier and handler
*/
public static <T, R> Supplier<R> andThen(Supplier<T> supplier, Function<T, R> resultHandler, Function<Exception, R> exceptionHandler){
return () -> {
try{
T result = supplier.get();
return resultHandler.apply(result);
}catch (Exception exception){
return exceptionHandler.apply(exception);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package io.github.resilience4j.core;

import org.junit.Test;

import javax.xml.ws.WebServiceException;
import java.io.IOException;
import java.util.concurrent.Callable;

import static org.assertj.core.api.Assertions.assertThat;

public class CallableUtilsTest {

@Test
public void shouldChainCallableAndResultHandler() throws Exception {

Callable<String> supplier = () -> "BLA";
//When
Callable<String> callableWithRecovery = CallableUtils.andThen(supplier, result -> "Bla");

String result = callableWithRecovery.call();

//Then
assertThat(result).isEqualTo("Bla");
}


@Test
public void shouldChainCallableAndRecoverFromException() throws Exception {

Callable<String> callable = () -> {
throw new IOException("BAM!");
};
//When
Callable<String> callableWithRecovery = CallableUtils.andThen(callable, (result, ex) -> "Bla");

String result = callableWithRecovery.call();

//Then
assertThat(result).isEqualTo("Bla");
}

@Test
public void shouldChainCallableAndRecoverWithErrorHandler() throws Exception {

Callable<String> callable = () -> {
throw new IOException("BAM!");
};
//When
Callable<String> callableWithRecovery = CallableUtils.andThen(callable, (result) -> result, ex -> "Bla");

String result = callableWithRecovery.call();

//Then
assertThat(result).isEqualTo("Bla");
}

@Test
public void shouldRecoverCallableFromException() throws Exception {

Callable<String> callable = () -> {
throw new IOException("BAM!");
};
//When
Callable<String> callableWithRecovery = CallableUtils.recover(callable, (ex) -> "Bla");

String result = callableWithRecovery.call();

//Then
assertThat(result).isEqualTo("Bla");
}

@Test(expected = WebServiceException.class)
public void shouldRethrowException() throws Exception {

Callable<String> callable = () -> {
throw new IOException("BAM!");
};
//When
Callable<String> callableWithRecovery = CallableUtils.recover(callable, (ex) -> {
throw new WebServiceException();
});

callableWithRecovery.call();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package io.github.resilience4j.core;

import org.junit.Test;

import javax.xml.ws.WebServiceException;
import java.util.function.Supplier;

import static org.assertj.core.api.Assertions.assertThat;

public class SupplierUtilsTest {

@Test
public void shouldChainSupplierAndResultHandler() {

Supplier<String> supplier = () -> "BLA";
//When
Supplier<String> supplierWithRecovery = SupplierUtils.andThen(supplier, result -> "Bla");

String result = supplierWithRecovery.get();

//Then
assertThat(result).isEqualTo("Bla");
}

@Test
public void shouldChainSupplierAndRecoverWithHandler() {

Supplier<String> supplier = () -> {
throw new RuntimeException("BAM!");
};
//When
Supplier<String> supplierWithRecovery = SupplierUtils.andThen(supplier, (result, ex) -> "Bla");

String result = supplierWithRecovery.get();

//Then
assertThat(result).isEqualTo("Bla");
}

@Test
public void shouldChainSupplierAndRecoverWithErrorHandler() {

Supplier<String> supplier = () -> {
throw new RuntimeException("BAM!");
};
//When
Supplier<String> supplierWithRecovery = SupplierUtils.andThen(supplier, (result) -> result, ex -> "Bla");

String result = supplierWithRecovery.get();

//Then
assertThat(result).isEqualTo("Bla");
}


@Test
public void shouldRecoverSupplierFromException() {

Supplier<String> supplier = () -> {
throw new RuntimeException("BAM!");
};
//When
Supplier<String> supplierWithRecovery = SupplierUtils.recover(supplier, (ex) -> "Bla");

String result = supplierWithRecovery.get();

//Then
assertThat(result).isEqualTo("Bla");
}

@Test(expected = WebServiceException.class)
public void shouldRethrowException() {

Supplier<String> supplier = () -> {
throw new RuntimeException("BAM!");
};
//When
Supplier<String> supplierWithRecovery = SupplierUtils.recover(supplier, (ex) -> {
throw new WebServiceException();
});

supplierWithRecovery.get();
}
}
Loading

0 comments on commit 0389530

Please sign in to comment.