Skip to content

Commit

Permalink
ability to local hook for Operators.onRejectedExecution
Browse files Browse the repository at this point in the history
this works by replacing the onOperatorError hook in the context passed
when calling Operators.onOperatorError from onRejectedExecution (if
there is a local hook for the REE-specific key).
  • Loading branch information
simonbasle authored and smaldini committed Sep 5, 2017
1 parent b576f7f commit ac8f299
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 2 deletions.
12 changes: 12 additions & 0 deletions reactor-core/src/main/java/reactor/core/publisher/Operators.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ public abstract class Operators {
*/
public static final String KEY_ON_OPERATOR_ERROR = "reactor.onOperatorError.local";

/**
* A key that can be used to store a sequence-specific {@link Hooks#onOperatorError(BiFunction)}
* hook THAT IS ONLY APPLIED TO Operators{@link #onRejectedExecution(Throwable, Context) onRejectedExecution}
* in a {@link Context}, as a {@link BiFunction BiFunction<Throwable, Object, Throwable>}.
*/
public static final String KEY_ON_REJECTED_EXECUTION = "reactor.onRejectedExecution.local";

/**
* Concurrent addition bound to Long.MAX_VALUE. Any concurrent write will "happen
* before" this operation.
Expand Down Expand Up @@ -470,6 +477,11 @@ public static RuntimeException onRejectedExecution(Throwable original,
@Nullable Throwable suppressed,
@Nullable Object dataSignal,
Context context) {
//we "cheat" to apply the special key for onRejectedExecution in onOperatorError
if (context.hasKey(KEY_ON_REJECTED_EXECUTION)) {
context = context.put(KEY_ON_OPERATOR_ERROR, context.get(KEY_ON_REJECTED_EXECUTION));
}

//FIXME only create REE if original is REE singleton OR there's suppressed OR there's Throwable dataSignal
RejectedExecutionException ree = Exceptions.failWithRejected(original);
if (suppressed != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ public void onOperatorErrorLocal() {
}

@Test
public void onRejectedExecutionWithoutDataSignalErrorLocal() {
public void onRejectedExecutionWithoutDataSignalDelegatesToErrorLocal() {
BiFunction<Throwable, Object, Throwable> localHook = (e, v) ->
new IllegalStateException("boom_" + v, e);
Context c = Context.of(Operators.KEY_ON_OPERATOR_ERROR, localHook);
Expand All @@ -313,7 +313,7 @@ public void onRejectedExecutionWithoutDataSignalErrorLocal() {
}

@Test
public void onRejectedExecutionWithDataSignalErrorLocal() {
public void onRejectedExecutionWithDataSignalDelegatesToErrorLocal() {
BiFunction<Throwable, Object, Throwable> localHook = (e, v) ->
new IllegalStateException("boom_" + v, e);
Context c = Context.of(Operators.KEY_ON_OPERATOR_ERROR, localHook);
Expand All @@ -329,4 +329,27 @@ public void onRejectedExecutionWithDataSignalErrorLocal() {
.hasMessage("Scheduler unavailable")
.hasCause(failure);
}

@Test
public void onRejectedExecutionLocalTakesPrecedenceOverOnOperatorError() {
BiFunction<Throwable, Object, Throwable> localOperatorErrorHook = (e, v) ->
new IllegalStateException("boom_" + v, e);

BiFunction<Throwable, Object, Throwable> localReeHook = (e, v) ->
new IllegalStateException("rejected_" + v, e);
Context c = Context.of(
Operators.KEY_ON_OPERATOR_ERROR, localOperatorErrorHook,
Operators.KEY_ON_REJECTED_EXECUTION, localReeHook);

IllegalArgumentException failure = new IllegalArgumentException("foo");
final Throwable throwable = Operators.onRejectedExecution(failure, null,
null, "bar", c);

assertThat(throwable).isInstanceOf(IllegalStateException.class)
.hasMessage("rejected_bar")
.hasNoSuppressedExceptions();
assertThat(throwable.getCause()).isInstanceOf(RejectedExecutionException.class)
.hasMessage("Scheduler unavailable")
.hasCause(failure);
}
}

0 comments on commit ac8f299

Please sign in to comment.