Skip to content

Commit

Permalink
Handle exceptions from Subscribers (line#2553)
Browse files Browse the repository at this point in the history
Motivation:
An exception raised by a `Subscriber`'s `onNext()` or other handler methods can cause an unexpected connection drop.
We should handle it.

Modifications:
- Catch the exception thrown by `onSubscribe()` and `onNext()` from `Subscriber`s.
  - Aabort the stream or call `onError` with the cause.
- Catch the exception thrown by `onComplete()` and `onError()`, and log it.
- Fork `CompositeException` from RxJava.

Result:
- Close line#2475
- The connection is not closed by unhandled exceptions from `Subscriber`s anymore.
  • Loading branch information
minwoox authored Mar 10, 2020
1 parent d3e1f9f commit a0699b9
Show file tree
Hide file tree
Showing 19 changed files with 1,078 additions and 142 deletions.
10 changes: 10 additions & 0 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ This product contains a modified part of Netty, distributed by Netty.io:
* License: licenses/LICENSE.netty.al20.txt (Apache License v2.0)
* Homepage: https://netty.io/

This product contains a modified part of RxJava, distributed by RxJava Contributors:

* License: licenses/LICENSE.rxjava.al20.txt (Apache License v2.0)
* Homepage: https://github.com/ReactiveX/RxJava

This product contains a modified part of Spring Boot, distributed by Pivotal Software, Inc:

* License: licenses/LICENSE.spring.al20.txt (Apache License v2.0)
Expand Down Expand Up @@ -277,6 +282,11 @@ This product depends on Retrofit, distributed by Square:
* License: licenses/LICENSE.retrofit.al20.txt (Apache License v2.0)
* Homepage: https://square.github.io/retrofit/

This product depends on RxJava, distributed by RxJava Contributors:

* License: licenses/LICENSE.rxjava.al20.txt (Apache License v2.0)
* Homepage: https://github.com/ReactiveX/RxJava

This product depends on SLF4J, distributed by QOS.ch:

* License: licenses/LICENSE.slf4j.mit.txt (MIT License)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,6 @@ public void onSubscribe(Subscription subscription) {

@Override
public void onNext(HttpObject obj) {
// There's no chance that an exception is raised from the underlying logic, so we don't do
// try catch not to propagate the exception to the publisher.
// https://github.com/reactive-streams/reactive-streams-jvm#2.13

if (closeable.isClosing()) {
subscription.cancel();
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,6 @@ public final void onComplete() {

@Override
public final void onNext(HttpObject o) {
// There's no chance that an exception is raised from the underlying logic, so we don't do try catch not
// to propagate the exception to the publisher.
// https://github.com/reactive-streams/reactive-streams-jvm#2.13

if (o instanceof HttpHeaders) {
onHeaders((HttpHeaders) o);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.linecorp.armeria.common.stream.StreamMessageUtil.abortedOrLate;
import static com.linecorp.armeria.common.stream.StreamMessageUtil.containsNotifyCancellation;
import static com.linecorp.armeria.common.stream.StreamMessageUtil.containsWithPooledObjects;
import static com.linecorp.armeria.common.util.Exceptions.throwIfFatal;
import static java.util.Objects.requireNonNull;

import java.util.List;
Expand All @@ -28,10 +29,13 @@

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.MoreObjects;
import com.spotify.futures.CompletableFutures;

import com.linecorp.armeria.common.util.CompositeException;
import com.linecorp.armeria.common.util.EventLoopCheckingFuture;
import com.linecorp.armeria.internal.common.util.PooledObjects;

Expand All @@ -40,6 +44,8 @@

abstract class AbstractStreamMessage<T> implements StreamMessage<T> {

static final Logger logger = LoggerFactory.getLogger(AbstractStreamMessage.class);

static final CloseEvent SUCCESSFUL_CLOSE = new CloseEvent(null);
static final CloseEvent CANCELLED_CLOSE = new CloseEvent(CancelledSubscriptionException.INSTANCE);
static final CloseEvent ABORTED_CLOSE = new CloseEvent(AbortedStreamException.INSTANCE);
Expand Down Expand Up @@ -145,16 +151,24 @@ static void failLateSubscriber(SubscriptionImpl subscription, Subscriber<?> late
final Throwable cause = abortedOrLate(oldSubscriber);

if (subscription.needsDirectInvocation()) {
lateSubscriber.onSubscribe(NoopSubscription.INSTANCE);
lateSubscriber.onError(cause);
handleLateSubscriber(lateSubscriber, cause);
} else {
subscription.executor().execute(() -> {
lateSubscriber.onSubscribe(NoopSubscription.INSTANCE);
lateSubscriber.onError(cause);
handleLateSubscriber(lateSubscriber, cause);
});
}
}

private static void handleLateSubscriber(Subscriber<?> lateSubscriber, Throwable cause) {
try {
lateSubscriber.onSubscribe(NoopSubscription.INSTANCE);
lateSubscriber.onError(cause);
} catch (Throwable t) {
throwIfFatal(t);
logger.warn("Subscriber should not throw an exception. subscriber: {}", lateSubscriber, t);
}
}

T prepareObjectForNotification(SubscriptionImpl subscription, T o) {
ReferenceCountUtil.touch(o);
onRemoval(o);
Expand Down Expand Up @@ -231,7 +245,8 @@ boolean cancelRequested() {
@Override
public void request(long n) {
if (n <= 0) {
invokeOnError(new IllegalArgumentException(
// Just abort the publisher so subscriber().onError(e) is called and resources are cleaned up.
publisher.abort(new IllegalArgumentException(
"n: " + n + " (expected: > 0, see Reactive Streams specification rule 3.9)"));
return;
}
Expand All @@ -245,14 +260,6 @@ public void cancel() {
publisher.cancel();
}

private void invokeOnError(Throwable cause) {
if (needsDirectInvocation()) {
subscriber.onError(cause);
} else {
executor.execute(() -> subscriber.onError(cause));
}
}

// We directly run callbacks for event loops if we're already on the loop, which applies to the vast
// majority of cases.
boolean needsDirectInvocation() {
Expand Down Expand Up @@ -291,16 +298,25 @@ void notifySubscriber(SubscriptionImpl subscription, CompletableFuture<?> comple
if (cause == null) {
try {
subscriber.onComplete();
} finally {
completionFuture.complete(null);
} catch (Throwable t) {
completionFuture.completeExceptionally(t);
throwIfFatal(t);
logger.warn("Subscriber.onComplete() should not raise an exception. subscriber: {}",
subscriber, t);
}
} else {
try {
if (subscription.notifyCancellation || !(cause instanceof CancelledSubscriptionException)) {
subscriber.onError(cause);
}
} finally {
completionFuture.completeExceptionally(cause);
} catch (Throwable t) {
final Exception composite = new CompositeException(t, cause);
completionFuture.completeExceptionally(composite);
throwIfFatal(t);
logger.warn("Subscriber.onError() should not raise an exception. subscriber: {}",
subscriber, composite);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.linecorp.armeria.common.stream;

import static com.linecorp.armeria.common.util.Exceptions.throwIfFatal;
import static java.util.Objects.requireNonNull;

import java.util.Queue;
Expand All @@ -27,6 +28,8 @@

import org.jctools.queues.MpscChunkedArrayQueue;
import org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.linecorp.armeria.common.util.UnstableApi;

Expand Down Expand Up @@ -66,6 +69,8 @@
@UnstableApi
public class DefaultStreamMessage<T> extends AbstractStreamMessageAndWriter<T> {

private static final Logger logger = LoggerFactory.getLogger(DefaultStreamMessage.class);

@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<DefaultStreamMessage, SubscriptionImpl>
subscriptionUpdater = AtomicReferenceFieldUpdater.newUpdater(
Expand Down Expand Up @@ -118,18 +123,32 @@ SubscriptionImpl subscribe(SubscriptionImpl subscription) {

final Subscriber<Object> subscriber = subscription.subscriber();
if (subscription.needsDirectInvocation()) {
invokedOnSubscribe = true;
subscriber.onSubscribe(subscription);
subscribe(subscription, subscriber);
} else {
subscription.executor().execute(() -> {
invokedOnSubscribe = true;
subscriber.onSubscribe(subscription);
subscribe(subscription, subscriber);
});
}

return subscription;
}

private void subscribe(SubscriptionImpl subscription, Subscriber<Object> subscriber) {
try {
invokedOnSubscribe = true;
subscriber.onSubscribe(subscription);
} catch (Throwable t) {
if (setState(State.OPEN, State.CLEANUP) || setState(State.CLOSED, State.CLEANUP)) {
notifySubscriberOfCloseEvent(subscription, newCloseEvent(t));
throwIfFatal(t);
} else {
throwIfFatal(t);
logger.warn("Subscriber.onSubscribe() should not raise an exception. subscriber: {}",
subscriber, t);
}
}
}

@Override
public void abort() {
abort0(AbortedStreamException.get());
Expand Down Expand Up @@ -328,7 +347,7 @@ private void notifySubscriber0() {

for (;;) {
if (state == State.CLEANUP) {
cleanup();
cleanupObjects();
return;
}

Expand Down Expand Up @@ -375,6 +394,17 @@ private boolean notifySubscriberWithElements(SubscriptionImpl subscription) {
try {
o = prepareObjectForNotification(subscription, o);
subscriber.onNext(o);
} catch (Throwable t) {
if (setState(State.OPEN, State.CLEANUP) || setState(State.CLOSED, State.CLEANUP)) {
notifySubscriberOfCloseEvent(subscription, newCloseEvent(t));
throwIfFatal(t);
} else {
throwIfFatal(t);
logger.warn("Subscriber.onNext({}) should not raise an exception. subscriber: {}",
o, subscriber, t);
}

return false;
} finally {
inOnNext = false;
}
Expand Down Expand Up @@ -435,7 +465,7 @@ private boolean setState(State oldState, State newState) {
return stateUpdater.compareAndSet(this, oldState, newState);
}

private void cleanup() {
private void cleanupObjects() {
Throwable cause = null;
for (;;) {
final Object e = queue.poll();
Expand Down
Loading

0 comments on commit a0699b9

Please sign in to comment.