Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add local error consumer hooks support #830

Merged
merged 5 commits into from
Sep 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ protected void hookOnCancel() {
* cancel). The hook is executed in addition to and after {@link #hookOnError(Throwable)},
* {@link #hookOnComplete()} and {@link #hookOnCancel()} hooks, even if these callbacks
* fail. Defaults to doing nothing. A failure of the callback will be caught by
* {@link Operators#onErrorDropped(Throwable)}.
* {@link Operators#onErrorDropped(Throwable, reactor.util.context.Context)}.
*
* @param type the type of termination event that triggered the hook
* ({@link SignalType#ON_ERROR}, {@link SignalType#ON_COMPLETE} or
Expand All @@ -146,7 +146,7 @@ public final void onSubscribe(Subscription s) {
hookOnSubscribe(s);
}
catch (Throwable throwable) {
onError(Operators.onOperatorError(s, throwable));
onError(Operators.onOperatorError(s, throwable, currentContext()));
}
}
}
Expand All @@ -158,7 +158,7 @@ public final void onNext(T value) {
hookOnNext(value);
}
catch (Throwable throwable) {
onError(Operators.onOperatorError(subscription, throwable, value));
onError(Operators.onOperatorError(subscription, throwable, value, currentContext()));
}
}

Expand All @@ -169,7 +169,7 @@ public final void onError(Throwable t) {
if (S.getAndSet(this, Operators.cancelledSubscription()) == Operators
.cancelledSubscription()) {
//already cancelled concurrently
Operators.onErrorDropped(t);
Operators.onErrorDropped(t, currentContext());
return;
}

Expand All @@ -181,7 +181,7 @@ public final void onError(Throwable t) {
if (e != t) {
e.addSuppressed(t);
}
Operators.onErrorDropped(e);
Operators.onErrorDropped(e, currentContext());
}
finally {
safeHookFinally(SignalType.ON_ERROR);
Expand All @@ -198,7 +198,7 @@ public final void onComplete() {
}
catch (Throwable throwable) {
//onError itself will short-circuit due to the CancelledSubscription being push above
hookOnError(Operators.onOperatorError(throwable));
hookOnError(Operators.onOperatorError(throwable, currentContext()));
}
finally {
safeHookFinally(SignalType.ON_COMPLETE);
Expand Down Expand Up @@ -230,7 +230,7 @@ public final void cancel() {
hookOnCancel();
}
catch (Throwable throwable) {
hookOnError(Operators.onOperatorError(subscription, throwable));
hookOnError(Operators.onOperatorError(subscription, throwable, currentContext()));
}
finally {
safeHookFinally(SignalType.CANCEL);
Expand All @@ -243,7 +243,7 @@ void safeHookFinally(SignalType type) {
hookFinally(type);
}
catch (Throwable finallyFailure) {
Operators.onErrorDropped(finallyFailure);
Operators.onErrorDropped(finallyFailure, currentContext());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.reactivestreams.Subscription;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.util.context.Context;

/**
* An iterable that consumes a Publisher in a blocking fashion.
Expand Down Expand Up @@ -221,7 +222,7 @@ public void onNext(T t) {

onError(Operators.onOperatorError(null,
Exceptions.failWithOverflow(Exceptions.BACKPRESSURE_ERROR_QUEUE_FULL),
t));
t, currentContext()));
}
else {
signalConsumer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void onNext(T t) {
DirectInner<T>[] inners = subscribers;

if (inners == TERMINATED) {
Operators.onNextDropped(t);
Operators.onNextDropped(t, currentContext());
return;
}

Expand All @@ -114,7 +114,7 @@ public void onError(Throwable t) {
DirectInner<T>[] inners = subscribers;

if (inners == TERMINATED) {
Operators.onErrorDropped(t);
Operators.onErrorDropped(t, currentContext());
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ else if (m == Fuseable.ASYNC) {
@SuppressWarnings("unchecked")
public void onNext(T t) {
if (done) {
Operators.onNextDropped(t, currentContext());
return;
}

Expand Down Expand Up @@ -268,15 +269,15 @@ public void onNext(T t) {
public void onError(Throwable t) {
Objects.requireNonNull(t, "onError");
if (done) {
Operators.onErrorDropped(t);
Operators.onErrorDropped(t, currentContext());
return;
}
if (Exceptions.addThrowable(ERROR, this, t)) {
done = true;
drain();
}
else {
Operators.onErrorDropped(t);
Operators.onErrorDroppedMulticast(t);
}
}

Expand Down Expand Up @@ -373,8 +374,7 @@ final void drain() {
}
catch (Throwable ex) {
Exceptions.addThrowable(ERROR,
this,
Operators.onOperatorError(s, ex));
this, Operators.onOperatorError(s, ex, currentContext()));
d = true;
v = null;
}
Expand All @@ -398,8 +398,7 @@ final void drain() {
}
catch (Throwable ex) {
Exceptions.addThrowable(ERROR,
this,
Operators.onOperatorError(s, ex));
this, Operators.onOperatorError(s, ex, currentContext()));
d = true;
v = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.stream.Stream;

import org.reactivestreams.Subscription;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.util.concurrent.Queues;
import reactor.util.concurrent.WaitStrategy;
Expand Down Expand Up @@ -427,7 +426,7 @@ final public void onSubscribe(final Subscription s) {
}
}
catch (Throwable t) {
onError(Operators.onOperatorError(s, t));
onError(Operators.onOperatorError(s, t, currentContext()));
}
}
}
Expand All @@ -448,7 +447,7 @@ public final void shutdown() {
requestTaskExecutor.shutdown();
}
catch (Throwable t) {
onError(Operators.onOperatorError(t));
onError(Operators.onOperatorError(t, currentContext()));
}
}

Expand Down Expand Up @@ -527,7 +526,7 @@ public void run() {
}
return;
}
parent.onError(Operators.onOperatorError(t));
parent.onError(Operators.onOperatorError(t, parent.currentContext()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3535,7 +3535,7 @@ public final Flux<T> doOnNext(Consumer<? super T> onNext) {
* receives any request.
* <p>
* Note that non fatal error raised in the callback will not be propagated and
* will simply trigger {@link Operators#onOperatorError(Throwable)}.
* will simply trigger {@link Operators#onOperatorError(Throwable, Context)}.
*
* <p>
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.0.M3/src/docs/marble/doonrequest.png" alt="">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void onSubscribe(Subscription s) {
@Override
public void onNext(T t) {
if (done) {
Operators.onNextDropped(t);
Operators.onNextDropped(t, actual.currentContext());
return;
}

Expand All @@ -140,7 +140,7 @@ public void onNext(T t) {
"The bufferSupplier returned a null buffer");
}
catch (Throwable e) {
onError(Operators.onOperatorError(s, e, t));
onError(Operators.onOperatorError(s, e, t, actual.currentContext()));
return;
}
buffer = b;
Expand All @@ -157,7 +157,7 @@ public void onNext(T t) {
@Override
public void onError(Throwable t) {
if (done) {
Operators.onErrorDropped(t);
Operators.onErrorDropped(t, actual.currentContext());
return;
}
done = true;
Expand Down Expand Up @@ -271,7 +271,7 @@ public void onSubscribe(Subscription s) {
@Override
public void onNext(T t) {
if (done) {
Operators.onNextDropped(t);
Operators.onNextDropped(t, actual.currentContext());
return;
}

Expand All @@ -285,7 +285,7 @@ public void onNext(T t) {
"The bufferSupplier returned a null buffer");
}
catch (Throwable e) {
onError(Operators.onOperatorError(s, e, t));
onError(Operators.onOperatorError(s, e, t, actual.currentContext()));
return;
}

Expand All @@ -306,7 +306,7 @@ public void onNext(T t) {
@Override
public void onError(Throwable t) {
if (done) {
Operators.onErrorDropped(t);
Operators.onErrorDropped(t, actual.currentContext());
return;
}

Expand Down Expand Up @@ -450,7 +450,7 @@ public void onSubscribe(Subscription s) {
@Override
public void onNext(T t) {
if (done) {
Operators.onNextDropped(t);
Operators.onNextDropped(t, actual.currentContext());
return;
}

Expand All @@ -464,7 +464,7 @@ public void onNext(T t) {
"The bufferSupplier returned a null buffer");
}
catch (Throwable e) {
onError(Operators.onOperatorError(s, e, t));
onError(Operators.onOperatorError(s, e, t, actual.currentContext()));
return;
}

Expand Down Expand Up @@ -495,7 +495,7 @@ public void onNext(T t) {
@Override
public void onError(Throwable t) {
if (done) {
Operators.onErrorDropped(t);
Operators.onErrorDropped(t, actual.currentContext());
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void subscribe(CoreSubscriber<? super C> actual) {
"The bufferSupplier returned a null buffer");
}
catch (Throwable e) {
Operators.error(actual, Operators.onOperatorError(e));
Operators.error(actual, Operators.onOperatorError(e, actual.currentContext()));
return;
}

Expand Down Expand Up @@ -167,7 +167,7 @@ public void onNext(T t) {
}
}

Operators.onNextDropped(t);
Operators.onNextDropped(t, actual.currentContext());
}

@Override
Expand All @@ -181,7 +181,7 @@ public void onError(Throwable t) {
actual.onError(t);
return;
}
Operators.onErrorDropped(t);
Operators.onErrorDropped(t, actual.currentContext());
}

@Override
Expand Down Expand Up @@ -242,7 +242,7 @@ void otherError(Throwable t){
actual.onError(t);
return;
}
Operators.onErrorDropped(t);
Operators.onErrorDropped(t, actual.currentContext());
}

void otherNext() {
Expand All @@ -253,7 +253,7 @@ void otherNext() {
"The bufferSupplier returned a null buffer");
}
catch (Throwable e) {
otherError(Operators.onOperatorError(other, e));
otherError(Operators.onOperatorError(other, e, actual.currentContext()));
return;
}

Expand Down Expand Up @@ -281,7 +281,7 @@ boolean emit(C b) {
}
else {
actual.onError(Operators.onOperatorError(this, Exceptions
.failWithOverflow(), b));
.failWithOverflow(), b, actual.currentContext()));

return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void subscribe(CoreSubscriber<? super C> actual) {
"The bufferSupplier returned a null initial buffer");
}
catch (Throwable e) {
Operators.error(actual, Operators.onOperatorError(e));
Operators.error(actual, Operators.onOperatorError(e, actual.currentContext()));
return;
}

Expand Down Expand Up @@ -185,7 +185,7 @@ public void onNext(T t) {
@Override
public boolean tryOnNext(T t) {
if (done) {
Operators.onNextDropped(t);
Operators.onNextDropped(t, actual.currentContext());
return true;
}

Expand All @@ -195,7 +195,7 @@ public boolean tryOnNext(T t) {
match = predicate.test(t);
}
catch (Throwable e) {
onError(Operators.onOperatorError(s, e, t));
onError(Operators.onOperatorError(s, e, t, actual.currentContext()));
return true;
}

Expand Down Expand Up @@ -237,7 +237,7 @@ private C triggerNewBuffer() {
"The bufferSupplier returned a null buffer");
}
catch (Throwable e) {
onError(Operators.onOperatorError(s, e));
onError(Operators.onOperatorError(s, e, actual.currentContext()));
return null;
}

Expand All @@ -261,7 +261,7 @@ public CoreSubscriber<? super C> actual() {
@Override
public void onError(Throwable t) {
if (done) {
Operators.onErrorDropped(t);
Operators.onErrorDropped(t, actual.currentContext());
return;
}
done = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ public void onNext(final T value) {
timespanRegistration = timer.schedule(flushTask, timespan, TimeUnit.MILLISECONDS);
}
catch (RejectedExecutionException ree) {
onError(Operators.onRejectedExecution(ree, this, null, value));
onError(Operators.onRejectedExecution(ree, this, null, value,
actual.currentContext()));
return;
}
}
Expand Down
Loading