Skip to content

Commit

Permalink
Fixed ratpackk method interceptor and simplifying other method interc…
Browse files Browse the repository at this point in the history
…eptors (ReactiveX#522)
  • Loading branch information
Dan Maas authored and RobWin committed Jun 26, 2019
1 parent c147b33 commit d6470ba
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,21 +84,21 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
io.github.resilience4j.bulkhead.Bulkhead bulkhead = registry.bulkhead(annotation.name());
Class<?> returnType = invocation.getMethod().getReturnType();
if (Promise.class.isAssignableFrom(returnType)) {
Promise<?> result = (Promise<?>) invocation.proceed();
Promise<?> result = (Promise<?>) proceed(invocation);
if (result != null) {
BulkheadTransformer transformer = BulkheadTransformer.of(bulkhead).recover(fallbackMethod);
result = result.transform(transformer);
}
return result;
} else if (Flux.class.isAssignableFrom(returnType)) {
Flux<?> result = (Flux<?>) invocation.proceed();
Flux<?> result = (Flux<?>) proceed(invocation);
if (result != null) {
BulkheadOperator operator = BulkheadOperator.of(bulkhead);
result = fallbackMethod.onErrorResume(result.transform(operator));
}
return result;
} else if (Mono.class.isAssignableFrom(returnType)) {
Mono<?> result = (Mono<?>) invocation.proceed();
Mono<?> result = (Mono<?>) proceed(invocation);
if (result != null) {
BulkheadOperator operator = BulkheadOperator.of(bulkhead);
result = fallbackMethod.onErrorResume(result.transform(operator));
Expand All @@ -107,7 +107,7 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
} else if (CompletionStage.class.isAssignableFrom(returnType)) {
final CompletableFuture promise = new CompletableFuture<>();
if (bulkhead.tryAcquirePermission()) {
CompletionStage<?> result = (CompletionStage<?>) invocation.proceed();
CompletionStage<?> result = (CompletionStage<?>) proceed(invocation);
if (result != null) {
result.whenComplete((value, throwable) -> {
bulkhead.onComplete();
Expand All @@ -124,21 +124,17 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
}
return promise;
} else {
boolean permission = bulkhead.tryAcquirePermission();
if (!permission) {
Throwable t = new BulkheadFullException(bulkhead);
return fallbackMethod.apply(t);
}
try {
if (Thread.interrupted()) {
throw new IllegalStateException("Thread was interrupted during permission wait");
}
return invocation.proceed();
} catch (Exception e) {
return fallbackMethod.apply(e);
} finally {
bulkhead.onComplete();
}
return handleProceedWithException(invocation, bulkhead, fallbackMethod);
}
}


@Nullable
private Object handleProceedWithException(MethodInvocation invocation, io.github.resilience4j.bulkhead.Bulkhead bulkhead, RecoveryFunction<?> recoveryFunction) throws Throwable {
try {
return io.github.resilience4j.bulkhead.Bulkhead.decorateCheckedSupplier(bulkhead, invocation::proceed).apply();
} catch (Throwable throwable) {
return recoveryFunction.apply(throwable);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,7 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
}
return promise;
} else {
try {
return proceed(invocation, breaker);
} catch (Throwable throwable) {
return fallbackMethod.apply(throwable);
}
return handleProceedWithException(invocation, breaker, fallbackMethod);
}
}

Expand Down Expand Up @@ -163,4 +159,12 @@ private Object proceed(MethodInvocation invocation, io.github.resilience4j.circu
return result;
}

@Nullable
private Object handleProceedWithException(MethodInvocation invocation, io.github.resilience4j.circuitbreaker.CircuitBreaker breaker, RecoveryFunction<?> recoveryFunction) throws Throwable {
try {
return io.github.resilience4j.circuitbreaker.CircuitBreaker.decorateCheckedSupplier(breaker, invocation::proceed).apply();
} catch (Throwable throwable) {
return recoveryFunction.apply(throwable);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import io.github.resilience4j.ratpack.recovery.RecoveryFunction;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import ratpack.exec.Promise;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.lang.reflect.Method;
import java.util.Arrays;
Expand All @@ -15,6 +18,30 @@

public abstract class AbstractMethodInterceptor implements MethodInterceptor {

@Nullable
protected Object proceed(MethodInvocation invocation) throws Throwable {
Class<?> returnType = invocation.getMethod().getReturnType();
Object result;
try {
result = invocation.proceed();
} catch (Exception e) {
if (Promise.class.isAssignableFrom(returnType)) {
return Promise.error(e);
} else if (Flux.class.isAssignableFrom(returnType)) {
return Flux.error(e);
} else if (Mono.class.isAssignableFrom(returnType)) {
return Mono.error(e);
} else if (CompletionStage.class.isAssignableFrom(returnType)) {
CompletableFuture<?> future = new CompletableFuture<>();
future.completeExceptionally(e);
return future;
} else {
throw e;
}
}
return result;
}

@SuppressWarnings("unchecked")
protected void completeFailedFuture(Throwable throwable, RecoveryFunction<?> fallbackMethod, CompletableFuture promise) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,29 +84,29 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
io.github.resilience4j.ratelimiter.RateLimiter rateLimiter = registry.rateLimiter(annotation.name());
Class<?> returnType = invocation.getMethod().getReturnType();
if (Promise.class.isAssignableFrom(returnType)) {
Promise<?> result = (Promise<?>) proceed(invocation, rateLimiter, fallbackMethod);
Promise<?> result = (Promise<?>) proceed(invocation);
if (result != null) {
RateLimiterTransformer transformer = RateLimiterTransformer.of(rateLimiter).recover(fallbackMethod);
result = result.transform(transformer);
}
return result;
} else if (Flux.class.isAssignableFrom(returnType)) {
Flux<?> result = (Flux<?>) proceed(invocation, rateLimiter, fallbackMethod);
Flux<?> result = (Flux<?>) proceed(invocation);
if (result != null) {
RateLimiterOperator operator = RateLimiterOperator.of(rateLimiter);
result = fallbackMethod.onErrorResume(result.transform(operator));
}
return result;
} else if (Mono.class.isAssignableFrom(returnType)) {
Mono<?> result = (Mono<?>) proceed(invocation, rateLimiter, fallbackMethod);
Mono<?> result = (Mono<?>) proceed(invocation);
if (result != null) {
RateLimiterOperator operator = RateLimiterOperator.of(rateLimiter);
result = fallbackMethod.onErrorResume(result.transform(operator));
}
return result;
} else if (CompletionStage.class.isAssignableFrom(returnType)) {
if (rateLimiter.acquirePermission()) {
return proceed(invocation, rateLimiter, fallbackMethod);
return proceed(invocation);
} else {
final CompletableFuture promise = new CompletableFuture<>();
Throwable t = new RequestNotPermitted(rateLimiter);
Expand All @@ -118,28 +118,13 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
}
}

@Nullable
private Object proceed(MethodInvocation invocation, io.github.resilience4j.ratelimiter.RateLimiter rateLimiter, RecoveryFunction<?> recoveryFunction) throws Throwable {
Object result;
try {
result = invocation.proceed();
} catch (Exception e) {
result = handleProceedWithException(invocation, rateLimiter, recoveryFunction);
}
return result;
}

@Nullable
private Object handleProceedWithException(MethodInvocation invocation, io.github.resilience4j.ratelimiter.RateLimiter rateLimiter, RecoveryFunction<?> recoveryFunction) throws Throwable {
boolean permission = rateLimiter.acquirePermission();
if (Thread.interrupted()) {
throw new IllegalStateException("Thread was interrupted during permission wait");
}
if (!permission) {
Throwable t = new RequestNotPermitted(rateLimiter);
try {
return io.github.resilience4j.ratelimiter.RateLimiter.decorateCheckedSupplier(rateLimiter, invocation::proceed).apply();
} catch (Throwable t) {
return recoveryFunction.apply(t);
}
return invocation.proceed();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,14 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
.orElse(new DefaultRecoveryFunction<>());
Class<?> returnType = invocation.getMethod().getReturnType();
if (Promise.class.isAssignableFrom(returnType)) {
Promise<?> result = (Promise<?>) proceed(invocation, retry, fallbackMethod);
Promise<?> result = (Promise<?>) proceed(invocation);
if (result != null) {
RetryTransformer transformer = RetryTransformer.of(retry).recover(fallbackMethod);
result = result.transform(transformer);
}
return result;
} else if (Flux.class.isAssignableFrom(returnType)) {
Flux<?> result = (Flux<?>) proceed(invocation, retry, fallbackMethod);
Flux<?> result = (Flux<?>) proceed(invocation);
if (result != null) {
RetryTransformer transformer = RetryTransformer.of(retry).recover(fallbackMethod);
final Flux<?> temp = result;
Expand All @@ -103,7 +103,7 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
}
return result;
} else if (Mono.class.isAssignableFrom(returnType)) {
Mono<?> result = (Mono<?>) proceed(invocation, retry, fallbackMethod);
Mono<?> result = (Mono<?>) proceed(invocation);
if (result != null) {
RetryTransformer transformer = RetryTransformer.of(retry).recover(fallbackMethod);
final Mono<?> temp = result;
Expand All @@ -116,10 +116,10 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
return result;
}
else if (CompletionStage.class.isAssignableFrom(returnType)) {
CompletionStage stage = (CompletionStage) proceed(invocation, retry, fallbackMethod);
CompletionStage stage = (CompletionStage) proceed(invocation);
return executeCompletionStage(invocation, stage, retry.context(), fallbackMethod);
} else {
return proceed(invocation, retry, fallbackMethod);
return handleProceedWithException(invocation, retry, fallbackMethod);
}
}

Expand All @@ -145,29 +145,11 @@ private CompletionStage<?> executeCompletionStage(MethodInvocation invocation, C
}

@Nullable
private Object proceed(MethodInvocation invocation, io.github.resilience4j.retry.Retry retry, RecoveryFunction<?> recoveryFunction) throws Throwable {
io.github.resilience4j.retry.Retry.Context context = retry.context();
private Object handleProceedWithException(MethodInvocation invocation, io.github.resilience4j.retry.Retry retry, RecoveryFunction<?> recoveryFunction) throws Throwable {
try {
Object result = invocation.proceed();
context.onSuccess();
return result;
} catch (Exception e) {
// exception thrown, we know a direct value was attempted to be returned
Object result;
context.onError(e);
while (true) {
try {
result = invocation.proceed();
context.onSuccess();
return result;
} catch (Exception e1) {
try {
context.onError(e1);
} catch (Exception e2) {
return recoveryFunction.apply(e2);
}
}
}
return io.github.resilience4j.retry.Retry.decorateCheckedSupplier(retry, invocation::proceed).apply();
} catch (Throwable t) {
return recoveryFunction.apply(t);
}
}

Expand Down

0 comments on commit d6470ba

Please sign in to comment.