-
Notifications
You must be signed in to change notification settings - Fork 923
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle exceptions from Subscribers #2553
Conversation
Motivation: An exception raised by a `Subscriber`'s `onNext()` or other handler methods can cause an unexpected connection drop. We should handle it. Modifications: - Catch the exception thrown by `onSubscribe()` and `onNext()` from `Subscriber`s. - Aabort the stream or call `onError` with the cause. - Catch the exception thrown by `onComplete()` and `onError()`, and log it. Result: - Close line#2475 - The connection is not closed by unhandled exceptions from `Subscriber`s anymore.
Codecov Report
@@ Coverage Diff @@
## master #2553 +/- ##
===========================================
- Coverage 73.64% 73.4% -0.25%
- Complexity 10865 10947 +82
===========================================
Files 952 958 +6
Lines 41581 42108 +527
Branches 5219 5268 +49
===========================================
+ Hits 30622 30908 +286
- Misses 8290 8491 +201
- Partials 2669 2709 +40
Continue to review full report at Codecov.
|
core/src/main/java/com/linecorp/armeria/common/stream/AbstractStreamMessage.java
Outdated
Show resolved
Hide resolved
try { | ||
lateSubscriber.onSubscribe(NoopSubscription.INSTANCE); | ||
lateSubscriber.onError(cause); | ||
} catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Global comment: Should we catch a Throwable
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reactive-streams/reactive-streams-jvm#107 (comment)
I think they are guiding not to catch the fatal ones. 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are Error
that aren't fatal, so better to catch Throwable
and we may need a propagateIfFatal
like in zipkin
// Taken from RxJava throwIfFatal, which was taken from scala
public static void propagateIfFatal(Throwable t) {
if (t instanceof VirtualMachineError) {
throw (VirtualMachineError) t;
} else if (t instanceof ThreadDeath) {
throw (ThreadDeath) t;
} else if (t instanceof LinkageError) {
throw (LinkageError) t;
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, let me add this method to Exceptions
. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't have to follow the Reactive Streams TCK, just FYI, which also regards InterruptedException
as fatal.
https://github.com/reactive-streams/reactive-streams-jvm/blob/master/tck/src/main/java/org/reactivestreams/tck/flow/support/NonFatal.java#L35
It is copied from Scala code.
https://github.com/scala/scala/blob/v2.13.1/src/library/scala/util/control/NonFatal.scala#L41
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, I saw it from ReactiveX/RxJava#748 (comment)
and I think they intentionally omit that. But I didn't think about it deeply. 😆
core/src/main/java/com/linecorp/armeria/common/stream/DeferredStreamMessage.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/common/stream/DeferredStreamMessage.java
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/common/stream/FixedStreamMessage.java
Outdated
Show resolved
Hide resolved
import io.netty.buffer.UnpooledByteBufAllocator; | ||
import io.netty.util.concurrent.ImmediateEventExecutor; | ||
|
||
class SubscriberThrowingExceptionTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should move the tests to StreamMessageVerification
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, they don't have the TCK for this case and this is violating the spec, so the test will fail if they have, I think. 🤔
So it's like more for our implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StreamMessageVerification
is our class, so we can perhaps test the failure scenarios there?
try { | ||
lateSubscriber.onSubscribe(NoopSubscription.INSTANCE); | ||
lateSubscriber.onError(cause); | ||
} catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are Error
that aren't fatal, so better to catch Throwable
and we may need a propagateIfFatal
like in zipkin
// Taken from RxJava throwIfFatal, which was taken from scala
public static void propagateIfFatal(Throwable t) {
if (t instanceof VirtualMachineError) {
throw (VirtualMachineError) t;
} else if (t instanceof ThreadDeath) {
throw (ThreadDeath) t;
} else if (t instanceof LinkageError) {
throw (LinkageError) t;
}
}
core/src/main/java/com/linecorp/armeria/common/stream/AbstractStreamMessage.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/common/stream/AbstractStreamMessage.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @minwoox !
@@ -0,0 +1,317 @@ | |||
/* | |||
* Copyright 2016 LINE Corporation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2020
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😅
* under the License. | ||
*/ | ||
/* | ||
* Copyright (c) 2016-present, RxJava Contributors. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you update NOTICE.txt
and licenses/
directory?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, I thought we already had it.
core/src/main/java/com/linecorp/armeria/common/stream/DeferredStreamMessage.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/common/stream/DeferredStreamMessage.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/common/util/CompositeException.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/common/util/CompositeException.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/common/util/Exceptions.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work! @minwoox
Thanks a lot for reviewing! |
Motivation: An exception raised by a `Subscriber`'s `onNext()` or other handler methods can cause an unexpected connection drop. We should handle it. Modifications: - Catch the exception thrown by `onSubscribe()` and `onNext()` from `Subscriber`s. - Aabort the stream or call `onError` with the cause. - Catch the exception thrown by `onComplete()` and `onError()`, and log it. - Fork `CompositeException` from RxJava. Result: - Close line#2475 - The connection is not closed by unhandled exceptions from `Subscriber`s anymore.
Motivation:
An exception raised by a
Subscriber
'sonNext()
or other handler methods can cause an unexpected connection drop.We should handle it.
Modifications:
onSubscribe()
andonNext()
fromSubscriber
s.onError
with the cause.onComplete()
andonError()
, and log it.CompositeException
from RxJava.Result:
Subscriber
#2475Subscriber
s anymore.