From b50206cf0b5674e5e2f4b02351c49238da6f8cf2 Mon Sep 17 00:00:00 2001 From: Leonid Stashevsky Date: Mon, 13 Jan 2020 14:04:10 +0300 Subject: [PATCH] Prevent propagating exception to top-level Fix #1356 #1237 --- .../jvm/src/io/ktor/client/engine/cio/Endpoint.kt | 13 +++++++------ .../ktor/client/utils/CoroutineDispatcherUtils.kt | 7 ++++++- .../ktor/network/selector/ActorSelectorManager.kt | 13 ++++++------- 3 files changed, 19 insertions(+), 14 deletions(-) diff --git a/ktor-client/ktor-client-cio/jvm/src/io/ktor/client/engine/cio/Endpoint.kt b/ktor-client/ktor-client-cio/jvm/src/io/ktor/client/engine/cio/Endpoint.kt index e4dbe41c5f5..e42ee18f429 100644 --- a/ktor-client/ktor-client-cio/jvm/src/io/ktor/client/engine/cio/Endpoint.kt +++ b/ktor-client/ktor-client-cio/jvm/src/io/ktor/client/engine/cio/Endpoint.kt @@ -10,7 +10,6 @@ import io.ktor.network.sockets.Socket import io.ktor.network.tls.* import io.ktor.util.* import io.ktor.util.date.* -import io.ktor.utils.io.* import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel @@ -61,11 +60,13 @@ internal class Endpoint( } } - suspend fun execute(request: HttpRequestData, callContext: CoroutineContext): HttpResponseData = - suspendCancellableCoroutine { continuation -> - val task = RequestTask(request, continuation, callContext) - tasks.offer(task) - } + suspend fun execute( + request: HttpRequestData, + callContext: CoroutineContext + ): HttpResponseData = suspendCancellableCoroutine { continuation -> + val task = RequestTask(request, continuation, callContext) + tasks.offer(task) + } private suspend fun makePipelineRequest(task: RequestTask) { if (deliveryPoint.offer(task)) return diff --git a/ktor-client/ktor-client-core/jvm/src/io/ktor/client/utils/CoroutineDispatcherUtils.kt b/ktor-client/ktor-client-core/jvm/src/io/ktor/client/utils/CoroutineDispatcherUtils.kt index 78ac9e78549..8782d4c2ee9 100644 --- a/ktor-client/ktor-client-core/jvm/src/io/ktor/client/utils/CoroutineDispatcherUtils.kt +++ b/ktor-client/ktor-client-core/jvm/src/io/ktor/client/utils/CoroutineDispatcherUtils.kt @@ -13,12 +13,17 @@ import java.util.concurrent.atomic.* * Creates [CoroutineDispatcher] based on thread pool of [threadCount] threads. */ @InternalAPI -fun Dispatchers.fixedThreadPoolDispatcher(threadCount: Int, threadName: String = "thread-pool-%d"): CoroutineDispatcher { +fun Dispatchers.fixedThreadPoolDispatcher( + threadCount: Int, + threadName: String = "thread-pool-%d" +): CoroutineDispatcher { val threadsNum = AtomicInteger(0) return Executors.newFixedThreadPool(threadCount) { Thread(it).apply { isDaemon = true name = threadName.format(threadsNum.getAndIncrement()) + + setUncaughtExceptionHandler { _, _ -> } } }.asCoroutineDispatcher() } diff --git a/ktor-network/jvm/src/io/ktor/network/selector/ActorSelectorManager.kt b/ktor-network/jvm/src/io/ktor/network/selector/ActorSelectorManager.kt index fd4e2050e00..de69144b305 100644 --- a/ktor-network/jvm/src/io/ktor/network/selector/ActorSelectorManager.kt +++ b/ktor-network/jvm/src/io/ktor/network/selector/ActorSelectorManager.kt @@ -7,19 +7,18 @@ package io.ktor.network.selector import io.ktor.util.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* -import java.io.Closeable +import java.io.* import java.nio.channels.* import java.util.concurrent.atomic.* import kotlin.coroutines.* import kotlin.coroutines.intrinsics.* -import kotlin.jvm.* /** * Default CIO selector manager implementation */ @Suppress("BlockingMethodInNonBlockingContext") @KtorExperimentalAPI -class ActorSelectorManager(dispatcher: CoroutineContext) : SelectorManagerSupport(), Closeable, CoroutineScope { +class ActorSelectorManager(context: CoroutineContext) : SelectorManagerSupport(), Closeable, CoroutineScope { @Volatile private var selectorRef: Selector? = null @@ -35,7 +34,7 @@ class ActorSelectorManager(dispatcher: CoroutineContext) : SelectorManagerSuppor private val mb = LockFreeMPSCQueue() - override val coroutineContext: CoroutineContext = dispatcher + CoroutineName("selector") + override val coroutineContext: CoroutineContext = context + CoroutineName("selector") init { launch { @@ -143,8 +142,7 @@ class ActorSelectorManager(dispatcher: CoroutineContext) : SelectorManagerSuppor if (!continuation.resume(Unit)) { selectWakeup() } - } - else if (selectable.channel.isOpen) throw ClosedSelectorException() + } else if (selectable.channel.isOpen) throw ClosedSelectorException() else throw ClosedChannelException() } catch (t: Throwable) { cancelAllSuspensions(selectable, t) @@ -186,7 +184,8 @@ class ActorSelectorManager(dispatcher: CoroutineContext) : SelectorManagerSuppor fun resume(value: R): Boolean { val continuation = ref.getAndSet(null) if (continuation != null) { - continuation.resume(value) /** we resume unintercepted, see [dispatchIfNeeded] */ + continuation.resume(value) + /** we resume unintercepted, see [dispatchIfNeeded] */ return true }