Skip to content

Commit

Permalink
Consistently handle exceptions in reactive streams
Browse files Browse the repository at this point in the history
* Fixed `PublisherCoroutine` and `rxObservable` ignoring
  cancellations.
* Fatal exceptions are not treated in a special manner by us
  anymore. Instead, we follow the requirement in the reactive
  streams specification that, in case some method of `Subscriber`
  throws, that subscriber MUST be considered cancelled, and the
  exception MUST be reported in some place other than `onError`.
* Fixed `trySend` sometimes throwing in `PublisherCoroutine` and
  `rxObservable`.
* When an exception happens inside a cancellation handler, we now
  consistently throw the original exception passed to the handler,
  with the new exception added as suppressed.
  • Loading branch information
dkhalanskyjb committed Apr 13, 2021
1 parent 12f4dbc commit ed1cf6d
Show file tree
Hide file tree
Showing 22 changed files with 421 additions and 341 deletions.
22 changes: 21 additions & 1 deletion reactive/kotlinx-coroutines-jdk9/test/IntegrationTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,24 @@ class IntegrationTest(
assertEquals(n, last)
}

}
}

internal suspend inline fun <reified E: Throwable> assertCallsExceptionHandlerWith(
crossinline operation: suspend (CoroutineExceptionHandler) -> Unit): E
{
val caughtExceptions = mutableListOf<Throwable>()
val exceptionHandler = object: AbstractCoroutineContextElement(CoroutineExceptionHandler),
CoroutineExceptionHandler
{
override fun handleException(context: CoroutineContext, exception: Throwable) {
caughtExceptions += exception
}
}
return withContext(exceptionHandler) {
operation(exceptionHandler)
caughtExceptions.single().let {
assertTrue(it is E, it.toString())
it
}
}
}
69 changes: 35 additions & 34 deletions reactive/kotlinx-coroutines-jdk9/test/PublishTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -123,42 +123,43 @@ class PublishTest : TestBase() {

@Test
fun testOnNextError() = runTest {
val latch = CompletableDeferred<Unit>()
expect(1)
val publisher = flowPublish(currentDispatcher()) {
expect(4)
try {
send("OK")
} catch(e: Throwable) {
expect(6)
assert(e is TestException)
}
assertCallsExceptionHandlerWith<TestException> { exceptionHandler ->
val publisher = flowPublish(currentDispatcher() + exceptionHandler) {
expect(4)
try {
send("OK")
} catch(e: Throwable) {
expect(6)
assert(e is TestException)
latch.complete(Unit)
}
}
expect(2)
publisher.subscribe(object : JFlow.Subscriber<String> {
override fun onComplete() {
expectUnreached()
}

override fun onSubscribe(s: JFlow.Subscription) {
expect(3)
s.request(1)
}

override fun onNext(t: String) {
expect(5)
assertEquals("OK", t)
throw TestException()
}

override fun onError(t: Throwable) {
expectUnreached()
}
})
latch.await()
}
expect(2)
val latch = CompletableDeferred<Unit>()
publisher.subscribe(object : JFlow.Subscriber<String> {
override fun onComplete() {
expectUnreached()
}

override fun onSubscribe(s: JFlow.Subscription) {
expect(3)
s.request(1)
}

override fun onNext(t: String) {
expect(5)
assertEquals("OK", t)
throw TestException()
}

override fun onError(t: Throwable) {
expect(7)
assert(t is TestException)
latch.complete(Unit)
}
})
latch.await()
finish(8)
finish(7)
}

@Test
Expand Down
153 changes: 90 additions & 63 deletions reactive/kotlinx-coroutines-reactive/src/Publish.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import kotlinx.coroutines.selects.*
import kotlinx.coroutines.sync.*
import org.reactivestreams.*
import kotlin.coroutines.*
import kotlin.internal.*

/**
* Creates cold reactive [Publisher] that runs a given [block] in a coroutine.
Expand Down Expand Up @@ -74,29 +73,27 @@ public class PublisherCoroutine<in T>(
private val _nRequested = atomic(0L) // < 0 when closed (CLOSED or SIGNALLED)

@Volatile
private var cancelled = false // true when Subscription.cancel() is invoked
private var cancelled = false // true after Subscription.cancel() is invoked

override val isClosedForSend: Boolean get() = isCompleted
override fun close(cause: Throwable?): Boolean = cancelCoroutine(cause)
override fun invokeOnClose(handler: (Throwable?) -> Unit): Nothing =
throw UnsupportedOperationException("PublisherCoroutine doesn't support invokeOnClose")

override fun trySend(element: T): ChannelResult<Unit> {
if (!mutex.tryLock()) return ChannelResult.failure()
doLockedNext(element)
return ChannelResult.success(Unit)
}
// TODO: will throw if `null` is passed -- is throwing this kind of programmer-induced errors okay?
override fun trySend(element: T): ChannelResult<Unit> =
if (!mutex.tryLock()) {
ChannelResult.failure()
} else {
when (val throwable = doLockedNext(element)) {
null -> ChannelResult.success(Unit)
else -> ChannelResult.closed(throwable)
}
}

public override suspend fun send(element: T) {
// fast-path -- try send without suspension
if (offer(element)) return
// slow-path does suspend
return sendSuspend(element)
}

private suspend fun sendSuspend(element: T) {
mutex.lock()
doLockedNext(element)
doLockedNext(element)?.let { throw it }
}

override val onSend: SelectClause2<T, SendChannel<T>>
Expand All @@ -106,13 +103,13 @@ public class PublisherCoroutine<in T>(
@Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE")
override fun <R> registerSelectClause2(select: SelectInstance<R>, element: T, block: suspend (SendChannel<T>) -> R) {
mutex.onLock.registerSelectClause2(select, null) {
doLockedNext(element)
doLockedNext(element)?.let { throw it }
block(this)
}
}

/*
* This code is not trivial because of the two properties:
* This code is not trivial because of the following properties:
* 1. It ensures conformance to the reactive specification that mandates that onXXX invocations should not
* be concurrent. It uses Mutex to protect all onXXX invocation and ensure conformance even when multiple
* coroutines are invoking `send` function.
Expand All @@ -121,27 +118,60 @@ public class PublisherCoroutine<in T>(
* globally-scoped coroutine that is invoking `send` outside of this context. Without extra precaution this may
* lead to `onNext` that is concurrent with `onComplete/onError`, so that is why signalling for
* `onComplete/onError` is also done under the same mutex.
* 3. The reactive specification forbids emitting more elements than requested, so `onNext` is forbidden until the
* subscriber actually requests some elements. This is implemented by the mutex being locked when emitting
* elements is not permitted (`_nRequested.value == 0`).
*/

// assert: mutex.isLocked()
private fun doLockedNext(elem: T) {
// check if already closed for send, note that isActive becomes false as soon as cancel() is invoked,
// because the job is cancelled, so this check also ensure conformance to the reactive specification's
// requirement that after cancellation requested we don't call onXXX
/**
* Attempts to emit a value to the subscriber and, if back-pressure permits this, unlock the mutex.
*
* Requires that the caller has locked the mutex before this invocation.
*
* If the channel is closed, returns the corresponding [Throwable]; otherwise, returns `null` to denote success.
*
* @throws NullPointerException if the passed element is `null`
*/
private fun doLockedNext(elem: T): Throwable? {
if (elem == null) {
throw NullPointerException("Can not emit null")
}
/** This guards against the case when the caller of this function managed to lock the mutex not because some
* elements were requested--and thus it is permitted to call `onNext`--but because the channel was closed.
*
* It may look like there is a race condition here between `isActive` and a concurrent cancellation, but it's
* okay for a cancellation to happen during `onNext`, as the reactive spec only requires that we *eventually*
* stop signalling the subscriber. */
if (!isActive) {
unlockAndCheckCompleted()
throw getCancellationException()
return getCancellationException()
}
// notify subscriber
// notify the subscriber
try {
subscriber.onNext(elem)
} catch (e: Throwable) {
// If onNext fails with exception, then we cancel coroutine (with this exception) and then rethrow it
// to abort the corresponding send/offer invocation. From the standpoint of coroutines machinery,
// this failure is essentially equivalent to a failure of a child coroutine.
cancelCoroutine(e)
} catch (cause: Throwable) {
/** The reactive streams spec forbids the subscribers from throwing from [Subscriber.onNext] unless the
* element is `null`, which we check not to be the case. Therefore, we report this exception to the handler
* for uncaught exceptions and consider the subscription cancelled, as mandated by
* https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#2.13.
*
* Some reactive implementations, like RxJava or Reactor, are known to throw from [Subscriber.onNext] if the
* execution encounters an exception they consider to be "fatal", like [VirtualMachineError] or
* [ThreadDeath]. Us using the handler for the undeliverable exceptions to signal "fatal" exceptions is
* inconsistent with RxJava and Reactor, which attempt to bubble the exception up the call chain as soon as
* possible. However, we can't do much better here, as simply throwing from all methods indiscriminately
* would violate the contracts we place on them. */
cancelled = true
val causeDelivered = close(cause)
unlockAndCheckCompleted()
throw e
return if (causeDelivered) {
// `cause` is the reason this channel is closed
cause
} else {
// Someone else closed the channel during `onNext`. We report `cause` as an undeliverable exception.
exceptionOnCancelHandler(cause, context)
getCancellationException()
}
}
// now update nRequested
while (true) { // lock-free loop on nRequested
Expand All @@ -152,12 +182,13 @@ public class PublisherCoroutine<in T>(
if (_nRequested.compareAndSet(current, updated)) {
if (updated == 0L) {
// return to keep locked due to back-pressure
return
return null
}
break // unlock if updated > 0
}
}
unlockAndCheckCompleted()
return null
}

private fun unlockAndCheckCompleted() {
Expand All @@ -177,38 +208,31 @@ public class PublisherCoroutine<in T>(
// assert: mutex.isLocked() & isCompleted
private fun doLockedSignalCompleted(cause: Throwable?, handled: Boolean) {
try {
if (_nRequested.value >= CLOSED) {
_nRequested.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
// Specification requires that after cancellation requested we don't call onXXX
if (cancelled) {
// If the parent had failed to handle our exception, then we must not lose this exception
if (cause != null && !handled) exceptionOnCancelHandler(cause, context)
return
}

if (_nRequested.value == SIGNALLED)
return
_nRequested.value = SIGNALLED // we'll signal onError/onCompleted (the final state, so no CAS needed)
// Specification requires that after the cancellation is requested we eventually stop calling onXXX
if (cancelled) {
// If the parent had failed to handle our exception, then we must not lose this exception
if (cause != null && !handled) exceptionOnCancelHandler(cause, context)
return
}
if (cause == null) {
try {
if (cause != null && cause !is CancellationException) {
/*
* Reactive frameworks have two types of exceptions: regular and fatal.
* Regular are passed to onError.
* Fatal can be passed to onError, but even the standard implementations **can just swallow it** (e.g. see #1297).
* Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether
* the cause will be handled by onError (and moreover, it depends on whether a fatal exception was
* thrown by subscriber or upstream).
* To make behaviour consistent and least surprising, we always handle fatal exceptions
* by coroutines machinery, anyway, they should not be present in regular program flow,
* thus our goal here is just to expose it as soon as possible.
*/
subscriber.onError(cause)
if (!handled && cause.isFatal()) {
exceptionOnCancelHandler(cause, context)
}
} else {
subscriber.onComplete()
}
subscriber.onComplete()
} catch (e: Throwable) {
handleCoroutineException(context, e)
}
} else {
try {
// This can't be the cancellation exception from `cancel`, as then `cancelled` would be `true`.
subscriber.onError(cause)
} catch (e: Throwable) {
if (e !== cause) {
cause.addSuppressed(e)
}
handleCoroutineException(context, cause)
}
}
} finally {
mutex.unlock()
Expand All @@ -217,20 +241,25 @@ public class PublisherCoroutine<in T>(

override fun request(n: Long) {
if (n <= 0) {
// Specification requires IAE for n <= 0
// Specification requires to call onError with IAE for n <= 0
cancelCoroutine(IllegalArgumentException("non-positive subscription request $n"))
return
}
while (true) { // lock-free loop for nRequested
val cur = _nRequested.value
if (cur < 0) return // already closed for send, ignore requests
if (cur < 0) return // already closed for send, ignore requests, as mandated by the reactive streams spec
var upd = cur + n
if (upd < 0 || n == Long.MAX_VALUE)
upd = Long.MAX_VALUE
if (cur == upd) return // nothing to do
if (_nRequested.compareAndSet(cur, upd)) {
// unlock the mutex when we don't have back-pressure anymore
if (cur == 0L) {
/** In a sense, after a successful CAS, it is this invocation, not the coroutine itself, that owns
* the lock, given that `upd` is necessarily strictly positive. Thus, no other operation has the
* right to lower the value on [_nRequested], it can only grow or become [CLOSED]. Therefore, it is
* impossible for any other operations to assume that they own the lock without actually acquiring
* it. */
unlockAndCheckCompleted()
}
return
Expand Down Expand Up @@ -271,8 +300,6 @@ public class PublisherCoroutine<in T>(
cancelled = true
super.cancel(null)
}

private fun Throwable.isFatal() = this is VirtualMachineError || this is ThreadDeath || this is LinkageError
}

@Deprecated(
Expand Down
Loading

0 comments on commit ed1cf6d

Please sign in to comment.