Skip to content

Commit

Permalink
Prevent propagating exception to top-level
Browse files Browse the repository at this point in the history
  • Loading branch information
e5l committed Jan 13, 2020
1 parent 6365524 commit b50206c
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import io.ktor.network.sockets.Socket
import io.ktor.network.tls.*
import io.ktor.util.*
import io.ktor.util.date.*
import io.ktor.utils.io.*
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
Expand Down Expand Up @@ -61,11 +60,13 @@ internal class Endpoint(
}
}

suspend fun execute(request: HttpRequestData, callContext: CoroutineContext): HttpResponseData =
suspendCancellableCoroutine { continuation ->
val task = RequestTask(request, continuation, callContext)
tasks.offer(task)
}
suspend fun execute(
request: HttpRequestData,
callContext: CoroutineContext
): HttpResponseData = suspendCancellableCoroutine { continuation ->
val task = RequestTask(request, continuation, callContext)
tasks.offer(task)
}

private suspend fun makePipelineRequest(task: RequestTask) {
if (deliveryPoint.offer(task)) return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,17 @@ import java.util.concurrent.atomic.*
* Creates [CoroutineDispatcher] based on thread pool of [threadCount] threads.
*/
@InternalAPI
fun Dispatchers.fixedThreadPoolDispatcher(threadCount: Int, threadName: String = "thread-pool-%d"): CoroutineDispatcher {
fun Dispatchers.fixedThreadPoolDispatcher(
threadCount: Int,
threadName: String = "thread-pool-%d"
): CoroutineDispatcher {
val threadsNum = AtomicInteger(0)
return Executors.newFixedThreadPool(threadCount) {
Thread(it).apply {
isDaemon = true
name = threadName.format(threadsNum.getAndIncrement())

setUncaughtExceptionHandler { _, _ -> }
}
}.asCoroutineDispatcher()
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,18 @@ package io.ktor.network.selector
import io.ktor.util.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import java.io.Closeable
import java.io.*
import java.nio.channels.*
import java.util.concurrent.atomic.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
import kotlin.jvm.*

/**
* Default CIO selector manager implementation
*/
@Suppress("BlockingMethodInNonBlockingContext")
@KtorExperimentalAPI
class ActorSelectorManager(dispatcher: CoroutineContext) : SelectorManagerSupport(), Closeable, CoroutineScope {
class ActorSelectorManager(context: CoroutineContext) : SelectorManagerSupport(), Closeable, CoroutineScope {
@Volatile
private var selectorRef: Selector? = null

Expand All @@ -35,7 +34,7 @@ class ActorSelectorManager(dispatcher: CoroutineContext) : SelectorManagerSuppor

private val mb = LockFreeMPSCQueue<Selectable>()

override val coroutineContext: CoroutineContext = dispatcher + CoroutineName("selector")
override val coroutineContext: CoroutineContext = context + CoroutineName("selector")

init {
launch {
Expand Down Expand Up @@ -143,8 +142,7 @@ class ActorSelectorManager(dispatcher: CoroutineContext) : SelectorManagerSuppor
if (!continuation.resume(Unit)) {
selectWakeup()
}
}
else if (selectable.channel.isOpen) throw ClosedSelectorException()
} else if (selectable.channel.isOpen) throw ClosedSelectorException()
else throw ClosedChannelException()
} catch (t: Throwable) {
cancelAllSuspensions(selectable, t)
Expand Down Expand Up @@ -186,7 +184,8 @@ class ActorSelectorManager(dispatcher: CoroutineContext) : SelectorManagerSuppor
fun resume(value: R): Boolean {
val continuation = ref.getAndSet(null)
if (continuation != null) {
continuation.resume(value) /** we resume unintercepted, see [dispatchIfNeeded] */
continuation.resume(value)
/** we resume unintercepted, see [dispatchIfNeeded] */
return true
}

Expand Down

0 comments on commit b50206c

Please sign in to comment.