Skip to content

Commit

Permalink
Copy CopyableThreadContextElement when switching context with flowOn (#…
Browse files Browse the repository at this point in the history
…3778)

`flowOn` uses its own undispatched coroutine start when it detects a fast path. Previously, it concatenated the context missing the copy of CopyableThreadContextElement.
Fixed by replacing concatenation with `newCoroutineContext`.

Fixes #3787 

Co-authored-by: Vsevolod Tolstopyatov <[email protected]>
  • Loading branch information
wanyingd1996 and qwwdfsad authored Jun 22, 2023
1 parent f1404c0 commit 1074e33
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ internal abstract class ChannelFlowOperator<S, T>(
// Fast-path: When channel creation is optional (flowOn/flowWith operators without buffer)
if (capacity == Channel.OPTIONAL_CHANNEL) {
val collectContext = coroutineContext
val newContext = collectContext + context // compute resulting collect context
val newContext = collectContext.newCoroutineContext(context) // compute resulting collect context
// #1: If the resulting context happens to be the same as it was -- fallback to plain collect
if (newContext == collectContext)
return flowCollect(collector)
Expand Down
19 changes: 18 additions & 1 deletion kotlinx-coroutines-core/jvm/test/ThreadContextElementTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package kotlinx.coroutines
import org.junit.Test
import kotlin.coroutines.*
import kotlin.test.*
import kotlinx.coroutines.flow.*

class ThreadContextElementTest : TestBase() {

Expand Down Expand Up @@ -37,7 +38,7 @@ class ThreadContextElementTest : TestBase() {
}

@Test
fun testUndispatched()= runTest {
fun testUndispatched() = runTest {
val exceptionHandler = coroutineContext[CoroutineExceptionHandler]!!
val data = MyData()
val element = MyElement(data)
Expand Down Expand Up @@ -191,6 +192,21 @@ class ThreadContextElementTest : TestBase() {

assertEquals(manuallyCaptured, captor.capturees)
}

@Test
fun testThreadLocalFlowOn() = runTest {
val myData = MyData()
myThreadLocal.set(myData)
expect(1)
flow {
assertEquals(myData, myThreadLocal.get())
emit(1)
}
.flowOn(myThreadLocal.asContextElement() + Dispatchers.Default)
.single()
myThreadLocal.set(null)
finish(2)
}
}

class MyData
Expand Down Expand Up @@ -259,6 +275,7 @@ class CopyForChildCoroutineElement(val data: MyData?) : CopyableThreadContextEle
}
}


/**
* Calls [block], setting the value of [this] [ThreadLocal] for the duration of [block].
*
Expand Down
29 changes: 29 additions & 0 deletions kotlinx-coroutines-core/jvm/test/ThreadContextMutableCopiesTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package kotlinx.coroutines

import kotlinx.coroutines.flow.*
import kotlin.coroutines.*
import kotlin.test.*

Expand Down Expand Up @@ -131,4 +132,32 @@ class ThreadContextMutableCopiesTest : TestBase() {
finish(2)
}
}

@Test
fun testDataIsCopiedThroughFlowOnUndispatched() = runTest {
expect(1)
val root = MyMutableElement(ArrayList())
val originalData = root.mutableData
flow {
assertNotSame(originalData, threadLocalData.get())
emit(1)
}
.flowOn(root)
.single()
finish(2)
}

@Test
fun testDataIsCopiedThroughFlowOnDispatched() = runTest {
expect(1)
val root = MyMutableElement(ArrayList())
val originalData = root.mutableData
flow {
assertNotSame(originalData, threadLocalData.get())
emit(1)
}
.flowOn(root + Dispatchers.Default)
.single()
finish(2)
}
}

0 comments on commit 1074e33

Please sign in to comment.