From 4b0edacd48b041f379853f43f2c4a9989d68a428 Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Wed, 14 Dec 2022 14:04:38 +0100 Subject: [PATCH 01/15] Track if shift leaks outside of scope, and raise a different exception --- .../arrow/core/continuations/EagerEffect.kt | 16 ++-- .../kotlin/arrow/core/continuations/Effect.kt | 73 ++++++++++++------- .../core/continuations/EagerEffectSpec.kt | 9 +++ .../arrow/core/continuations/EffectSpec.kt | 30 +++++--- 4 files changed, 87 insertions(+), 41 deletions(-) diff --git a/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/continuations/EagerEffect.kt b/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/continuations/EagerEffect.kt index e5a7269cb54..c86e54bd075 100644 --- a/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/continuations/EagerEffect.kt +++ b/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/continuations/EagerEffect.kt @@ -166,6 +166,7 @@ public fun eagerEffect(f: suspend EagerEffectScope.() -> A): EagerEffe private class DefaultEagerEffect(private val f: suspend EagerEffectScope.() -> A) : EagerEffect { override fun fold(recover: (R) -> B, transform: (A) -> B): B { val token = Token() + val isActive = AtomicRef(true) val eagerEffectScope = object : EagerEffectScope { // Shift away from this Continuation by intercepting it, and completing it with @@ -180,12 +181,15 @@ private class DefaultEagerEffect(private val f: suspend EagerEffectScope Any?) + if (isActive.get()) throw Eager(token, r, recover as (Any?) -> Any?) + else throw ShiftLeakedException() } return try { - suspend { transform(f(eagerEffectScope)) } - .startCoroutineUninterceptedOrReturn(Continuation(EmptyCoroutineContext) { result -> + suspend { + val res = f(eagerEffectScope).also { isActive.set(false) } + transform(res) + }.startCoroutineUninterceptedOrReturn(Continuation(EmptyCoroutineContext) { result -> result.getOrElse { throwable -> if (throwable is Eager && token == throwable.token) { throwable.recover(throwable.shifted) as B @@ -193,8 +197,10 @@ private class DefaultEagerEffect(private val f: suspend EagerEffectScope - * [Writing a program with Effect](#writing-a-program-with-effect) - * [Handling errors](#handling-errors) - * [Structured Concurrency](#structured-concurrency) - * [Arrow Fx Coroutines](#arrow-fx-coroutines) - * [parZip](#parzip) - * [parTraverse](#partraverse) - * [raceN](#racen) - * [bracketCase / Resource](#bracketcase--resource) - * [KotlinX](#kotlinx) - * [withContext](#withcontext) - * [async](#async) - * [launch](#launch) - * [Strange edge cases](#strange-edge-cases) + * [Writing a program with Effect](#writing-a-program-with-effect) + * [Handling errors](#handling-errors) + * [Structured Concurrency](#structured-concurrency) + * [Arrow Fx Coroutines](#arrow-fx-coroutines) + * [parZip](#parzip) + * [parTraverse](#partraverse) + * [raceN](#racen) + * [bracketCase / Resource](#bracketcase--resource) + * [KotlinX](#kotlinx) + * [withContext](#withcontext) + * [async](#async) + * [launch](#launch) + * [Strange edge cases](#strange-edge-cases) * * @@ -716,7 +716,7 @@ internal class FoldContinuation( private val error: suspend (Throwable) -> B, private val parent: Continuation, ) : Continuation, Token(), EffectScope { - + constructor(ignored: Token, context: CoroutineContext, parent: Continuation) : this(context, { throw it }, parent) constructor( ignored: Token, @@ -724,9 +724,12 @@ internal class FoldContinuation( error: suspend (Throwable) -> B, parent: Continuation, ) : this(context, error, parent) - + lateinit var recover: suspend (R) -> Any? - + + // Add AtomicBoolean to arrow-atomic + val isActive: AtomicRef = AtomicRef(true) + // Shift away from this Continuation by intercepting it, and completing it with // ShiftCancellationException // This is needed because this function will never yield a result, @@ -739,8 +742,9 @@ internal class FoldContinuation( // CancellationException and thus effectively recovering from the cancellation/shift. // This means try/catch is also capable of recovering from monadic errors. // See: EffectSpec - try/catch tests - throw Suspend(this, r, recover as suspend (Any?) -> Any?) - + if (isActive.get()) throw Suspend(this, r, recover as suspend (Any?) -> Any?) + else throw ShiftLeakedException() + // In contrast to `createCoroutineUnintercepted this doesn't create a new ContinuationImpl private fun (suspend () -> B).startCoroutineUnintercepted() { try { @@ -753,15 +757,20 @@ internal class FoldContinuation( parent.resumeWithException(e) } } - + override fun resumeWith(result: Result) { result.fold(parent::resume) { throwable -> when { - throwable is Suspend && this === throwable.token -> + throwable is Suspend && this === throwable.token -> { + isActive.set(false) suspend { throwable.recover(throwable.shifted) as B }.startCoroutineUnintercepted() - + } + throwable is Suspend -> parent.resumeWith(result) - else -> suspend { error(throwable.nonFatalOrThrow()) }.startCoroutineUnintercepted() + else -> { + isActive.set(false) + suspend { error(throwable.nonFatalOrThrow()) }.startCoroutineUnintercepted() + } } } } @@ -800,12 +809,12 @@ internal class FoldContinuation( public fun effect(f: suspend EffectScope.() -> A): Effect = DefaultEffect(f) private class DefaultEffect(val f: suspend EffectScope.() -> A) : Effect { - + override suspend fun fold( recover: suspend (shifted: R) -> B, transform: suspend (value: A) -> B, ): B = fold({ throw it }, recover, transform) - + // We create a `Token` for fold Continuation, so we can properly differentiate between nested folds override suspend fun fold( error: suspend (error: Throwable) -> B, @@ -816,14 +825,19 @@ private class DefaultEffect(val f: suspend EffectScope.() -> A) : Effec val shift = FoldContinuation(cont.context, error, cont) shift.recover = recover try { - val fold: suspend EffectScope.() -> B = { transform(f(this)) } + val fold: suspend EffectScope.() -> B = { + val res = f(this).also { shift.isActive.set(false) } + transform(res) + } fold.startCoroutineUninterceptedOrReturn(shift, shift) } catch (e: Suspend) { if (shift === e.token) { + shift.isActive.set(false) val f: suspend () -> B = { e.recover(e.shifted) as B } f.startCoroutineUninterceptedOrReturn(cont) } else throw e } catch (e: Throwable) { + shift.isActive.set(false) val f: suspend () -> B = { error(e.nonFatalOrThrow()) } f.startCoroutineUninterceptedOrReturn(cont) } @@ -831,3 +845,12 @@ private class DefaultEffect(val f: suspend EffectScope.() -> A) : Effec } public suspend fun Effect.merge(): A = fold(::identity, ::identity) + +public class ShiftLeakedException : RuntimeException( + """ + "shift or bind occurred outside of its DSL scope, and the DSL scoped operator was leaked, this is kind of usage is incorrect. + Make sure all calls to shift or bind occur within the lifecycle of effect { }, either { } or similar. + + See: ... for additional information. + """.trimIndent() +) diff --git a/arrow-libs/core/arrow-core/src/commonTest/kotlin/arrow/core/continuations/EagerEffectSpec.kt b/arrow-libs/core/arrow-core/src/commonTest/kotlin/arrow/core/continuations/EagerEffectSpec.kt index 7c32aeae4c4..fbe5a9ce49b 100644 --- a/arrow-libs/core/arrow-core/src/commonTest/kotlin/arrow/core/continuations/EagerEffectSpec.kt +++ b/arrow-libs/core/arrow-core/src/commonTest/kotlin/arrow/core/continuations/EagerEffectSpec.kt @@ -5,6 +5,7 @@ import arrow.core.identity import arrow.core.left import arrow.core.right import io.kotest.assertions.fail +import io.kotest.assertions.throwables.shouldThrow import io.kotest.core.spec.style.StringSpec import io.kotest.matchers.shouldBe import io.kotest.property.Arb @@ -134,4 +135,12 @@ class EagerEffectSpec : StringSpec({ }.runCont() } shouldBe Either.Left(e) } + + "shift leaked results in ShiftLeakException" { + shouldThrow { + effect { + suspend { shift("failure") } + }.fold(::println) { it.invoke() } + } + } }) diff --git a/arrow-libs/core/arrow-core/src/commonTest/kotlin/arrow/core/continuations/EffectSpec.kt b/arrow-libs/core/arrow-core/src/commonTest/kotlin/arrow/core/continuations/EffectSpec.kt index af23c30c529..7cbe2d01f0a 100644 --- a/arrow-libs/core/arrow-core/src/commonTest/kotlin/arrow/core/continuations/EffectSpec.kt +++ b/arrow-libs/core/arrow-core/src/commonTest/kotlin/arrow/core/continuations/EffectSpec.kt @@ -272,14 +272,14 @@ class EffectSpec : }.runCont() } shouldBe Either.Left(e) } - + "#2760 - dispatching in nested Effect blocks does not make the nested Continuation to hang" { checkAll(Arb.string()) { msg -> fun failure(): Effect = effect { withContext(Dispatchers.Default) {} shift(Failure(msg)) } - + effect { failure().bind() 1 @@ -289,30 +289,30 @@ class EffectSpec : ) shouldBe Failure(msg) } } - + "#2779 - handleErrorWith does not make nested Continuations hang" { checkAll(Arb.string()) { error -> val failed: Effect = effect { withContext(Dispatchers.Default) {} shift(error) } - + val newError: Effect, Int> = failed.handleErrorWith { str -> effect { shift(str.reversed().toList()) } } - + newError.toEither() shouldBe Either.Left(error.reversed().toList()) } } - + "#2779 - bind nested in fold does not make nested Continuations hang" { checkAll(Arb.string()) { error -> val failed: Effect = effect { withContext(Dispatchers.Default) {} shift(error) } - + val newError: Effect, Int> = effect { failed.fold({ r -> @@ -321,11 +321,11 @@ class EffectSpec : }.bind() }, ::identity) } - + newError.toEither() shouldBe Either.Left(error.reversed().toList()) } } - + "Can handle thrown exceptions" { checkAll(Arb.string().suspend(), Arb.string().suspend()) { msg, fallback -> effect { @@ -337,7 +337,7 @@ class EffectSpec : ) shouldBe fallback() } } - + "Can shift from thrown exceptions" { checkAll(Arb.string().suspend(), Arb.string().suspend()) { msg, fallback -> effect { @@ -351,7 +351,7 @@ class EffectSpec : }.runCont() shouldBe fallback() } } - + "Can throw from thrown exceptions" { checkAll(Arb.string().suspend(), Arb.string().suspend()) { msg, fallback -> shouldThrow { @@ -365,6 +365,14 @@ class EffectSpec : }.message shouldBe fallback() } } + + "shift leaked results in ShiftLeakException" { + shouldThrow { + effect { + suspend { shift("failure") } + }.fold(::println) { f -> f() } + } + } }) private data class Failure(val msg: String) From ed794228aad51b9879f7afec72e8c9664eee9da4 Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Tue, 3 Jan 2023 14:46:28 +0100 Subject: [PATCH 02/15] Update docs --- .../kotlin/arrow/core/continuations/Effect.kt | 143 +++++++++++++----- .../examples/example-effect-guide-11.kt | 8 +- .../examples/example-effect-guide-12.kt | 5 +- .../examples/example-effect-guide-13.kt | 13 -- .../examples/example-effect-guide-14.kt | 22 +++ 5 files changed, 134 insertions(+), 57 deletions(-) create mode 100644 arrow-libs/core/arrow-core/src/jvmTest/kotlin/examples/example-effect-guide-14.kt diff --git a/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/continuations/Effect.kt b/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/continuations/Effect.kt index a0147b7a179..7fd964c4d81 100644 --- a/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/continuations/Effect.kt +++ b/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/continuations/Effect.kt @@ -26,19 +26,19 @@ import kotlin.coroutines.resumeWithException * * - * [Writing a program with Effect](#writing-a-program-with-effect) - * [Handling errors](#handling-errors) - * [Structured Concurrency](#structured-concurrency) - * [Arrow Fx Coroutines](#arrow-fx-coroutines) - * [parZip](#parzip) - * [parTraverse](#partraverse) - * [raceN](#racen) - * [bracketCase / Resource](#bracketcase--resource) - * [KotlinX](#kotlinx) - * [withContext](#withcontext) - * [async](#async) - * [launch](#launch) - * [Strange edge cases](#strange-edge-cases) + * [Writing a program with Effect](#writing-a-program-with-effect) + * [Handling errors](#handling-errors) + * [Structured Concurrency](#structured-concurrency) + * [Arrow Fx Coroutines](#arrow-fx-coroutines) + * [parZip](#parzip) + * [parTraverse](#partraverse) + * [raceN](#racen) + * [bracketCase / Resource](#bracketcase--resource) + * [KotlinX](#kotlinx) + * [withContext](#withcontext) + * [async](#async) + * [launch](#launch) + * [Leaking `shift`](#leaking-shift) * * @@ -418,7 +418,7 @@ import kotlin.coroutines.resumeWithException * * ### KotlinX * #### withContext - * It's always safe to call `shift` from `withContext` since it runs in place, so it has no way of leaking `shift`. + * It's always safe to call `shift` from `withContext` since it runs _in place_, so it has no way of leaking `shift`. * When `shift` is called from within `withContext` it will cancel all `Job`s running inside the `CoroutineScope` of `withContext`. * * + * ```text + * ErrorA + * ``` + * + * The example here will always print `ErrorA`, but never `ErrorB`. This is because `fa` is awaited first, and when it's `shifts` it will cancel `fb`. + * If instead we used `awaitAll`, then it would print `ErrorA` or `ErrorB` due to both `fa` and `fb` being awaited in parallel. * * #### launch * + * It's **not allowed** to call `shift` from within `launch`, this is because `launch` creates a separate process. + * Any calls to `shift` inside of `launch` will be ignored by `effect`, and result in an exception being thrown. + * * + * ```text + * 45 + * ``` + * + * As you can see from the output, the `effect` block is still executed, but the `shift` calls inside `launch` are ignored. * - * #### Strange edge cases + * #### Leaking `shift` * - * **NOTE** - * Capturing `shift` into a lambda, and leaking it outside of `Effect` to be invoked outside will yield unexpected results. - * Below we capture `shift` from inside the DSL, and then invoke it outside its context `EffectScope`. + * **IMPORTANT:** Capturing `shift` and leaking it outside of `effect { }` and invoking it outside its scope will yield unexpected results. + * + * Below an example of the capturing of `shift` inside a `suspend lambda`, and then invoking it outside its `effect { }` scope. * * @@ -553,21 +571,65 @@ import kotlin.coroutines.resumeWithException * suspend { shift("error") } * }.fold({ }, { leakedShift -> leakedShift.invoke() }) * ``` + * * - * The same violation is possible in all DSLs in Kotlin, including Structured Concurrency. + * When we invoke `leakedShift` outside of `effect { }` a special `ShiftLeakedException` is thrown to improve the debugging experience. + * The message clearly states that `shift` was leaked outside its scope, and the stacktrace will point to the exact location where `shift` was captured. + * In this case in line `9` of `example-effect-guide-13.kt`, which is stated in the second line of the stacktrace: `invokeSuspend(example-effect-guide-13.kt:9)`. * + * ```text + * Exception in thread "main" arrow.core.continuations.ShiftLeakedException: + * shift or bind was called outside of its DSL scope, and the DSL Scoped operator was leaked + * This is kind of usage is incorrect, make sure all calls to shift or bind occur within the lifecycle of effect { }, either { } or similar builders. + * + * See: ... for additional information. + * at arrow.core.continuations.FoldContinuation.shift(Effect.kt:770) + * at arrow.core.examples.exampleEffectGuide13.Example_effect_guide_13Kt$main$2$1.invokeSuspend(example-effect-guide-13.kt:9) + * at arrow.core.examples.exampleEffectGuide13.Example_effect_guide_13Kt$main$2$1.invoke(example-effect-guide-13.kt) + * ``` + * + * An example with KotlinX Coroutines launch. Which can _concurrently_ leak `shift` outside of its scope. + * In this case by _delaying_ the invocation of `shift` by `3.seconds`, + * we can see that the `ShiftLeakedException` is again thrown when `shift` is invoked. + * + * + * * ```kotlin - * val leakedAsync = coroutineScope Deferred> { - * suspend { - * async { - * println("I am never going to run, until I get called invoked from outside") + * coroutineScope { + * effect { + * launch { + * delay(3.seconds) + * shift("error") * } - * } + * 1 + * }.fold(::println, ::println) * } - * - * leakedAsync.invoke().await() * ``` - * + * ```text + * 1 + * Exception in thread "main" arrow.core.continuations.ShiftLeakedException: + * shift or bind was called outside of its DSL scope, and the DSL Scoped operator was leaked + * This is kind of usage is incorrect, make sure all calls to shift or bind occur within the lifecycle of effect { }, either { } or similar builders. + * + * See: ... for additional information. + * at arrow.core.continuations.FoldContinuation.shift(Effect.kt:780) + * at arrow.core.examples.exampleEffectGuide14.Example_effect_guide_14Kt$main$1$1$1$1.invokeSuspend(example-effect-guide-14.kt:17) + * at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) <13 internal lines> + * at arrow.core.examples.exampleEffectGuide14.Example_effect_guide_14Kt.main(example-effect-guide-14.kt:11) + * at arrow.core.examples.exampleEffectGuide14.Example_effect_guide_14Kt.main(example-effect-guide-14.kt) + * ``` + * */ public interface Effect { /** @@ -846,11 +908,12 @@ private class DefaultEffect(val f: suspend EffectScope.() -> A) : Effec public suspend fun Effect.merge(): A = fold(::identity, ::identity) -public class ShiftLeakedException : RuntimeException( +public class ShiftLeakedException : IllegalStateException( """ - "shift or bind occurred outside of its DSL scope, and the DSL scoped operator was leaked, this is kind of usage is incorrect. - Make sure all calls to shift or bind occur within the lifecycle of effect { }, either { } or similar. - - See: ... for additional information. - """.trimIndent() + + shift or bind was called outside of its DSL scope, and the DSL Scoped operator was leaked + This is kind of usage is incorrect, make sure all calls to shift or bind occur within the lifecycle of effect { }, either { } or similar builders. + + See: ... for additional information. + """.trimIndent() ) diff --git a/arrow-libs/core/arrow-core/src/jvmTest/kotlin/examples/example-effect-guide-11.kt b/arrow-libs/core/arrow-core/src/jvmTest/kotlin/examples/example-effect-guide-11.kt index 2ba4c626c4a..dbfd80371d3 100644 --- a/arrow-libs/core/arrow-core/src/jvmTest/kotlin/examples/example-effect-guide-11.kt +++ b/arrow-libs/core/arrow-core/src/jvmTest/kotlin/examples/example-effect-guide-11.kt @@ -15,6 +15,12 @@ suspend fun main() { val fa = async { shift(errorA) } val fb = async { shift(errorB) } fa.await() + fb.await() - }.fold({ error -> error shouldBeIn listOf(errorA, errorB) }, { fail("Int can never be the result") }) + }.fold( + { error -> + println(error) + error shouldBeIn listOf(errorA, errorB) + }, + { fail("Int can never be the result") } + ) } } diff --git a/arrow-libs/core/arrow-core/src/jvmTest/kotlin/examples/example-effect-guide-12.kt b/arrow-libs/core/arrow-core/src/jvmTest/kotlin/examples/example-effect-guide-12.kt index 4a5e8be6381..da40654688d 100644 --- a/arrow-libs/core/arrow-core/src/jvmTest/kotlin/examples/example-effect-guide-12.kt +++ b/arrow-libs/core/arrow-core/src/jvmTest/kotlin/examples/example-effect-guide-12.kt @@ -10,12 +10,11 @@ import kotlinx.coroutines.launch suspend fun main() { val errorA = "ErrorA" val errorB = "ErrorB" - val int = 45 effect { coroutineScope { launch { shift(errorA) } launch { shift(errorB) } - int + 45 } - }.fold({ fail("Shift can never finish") }, { it shouldBe int }) + }.fold({ fail("Shift can never finish") }, ::println) } diff --git a/arrow-libs/core/arrow-core/src/jvmTest/kotlin/examples/example-effect-guide-13.kt b/arrow-libs/core/arrow-core/src/jvmTest/kotlin/examples/example-effect-guide-13.kt index e2c8dcdcbea..a11ce8f3684 100644 --- a/arrow-libs/core/arrow-core/src/jvmTest/kotlin/examples/example-effect-guide-13.kt +++ b/arrow-libs/core/arrow-core/src/jvmTest/kotlin/examples/example-effect-guide-13.kt @@ -2,23 +2,10 @@ package arrow.core.examples.exampleEffectGuide13 import arrow.core.continuations.effect -import kotlinx.coroutines.Deferred -import kotlinx.coroutines.async -import kotlinx.coroutines.coroutineScope suspend fun main() { effect Unit> { suspend { shift("error") } }.fold({ }, { leakedShift -> leakedShift.invoke() }) - - val leakedAsync = coroutineScope Deferred> { - suspend { - async { - println("I am never going to run, until I get called invoked from outside") - } - } - } - - leakedAsync.invoke().await() } diff --git a/arrow-libs/core/arrow-core/src/jvmTest/kotlin/examples/example-effect-guide-14.kt b/arrow-libs/core/arrow-core/src/jvmTest/kotlin/examples/example-effect-guide-14.kt new file mode 100644 index 00000000000..6b5d9035872 --- /dev/null +++ b/arrow-libs/core/arrow-core/src/jvmTest/kotlin/examples/example-effect-guide-14.kt @@ -0,0 +1,22 @@ +// This file was automatically generated from Effect.kt by Knit tool. Do not edit. +package arrow.core.examples.exampleEffectGuide14 + +import kotlinx.coroutines.launch +import kotlinx.coroutines.delay +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.runBlocking +import arrow.core.continuations.effect +import kotlin.time.Duration.Companion.seconds + +fun main(): Unit = runBlocking { + + coroutineScope { + effect { + launch { + delay(3.seconds) + shift("error") + } + 1 + }.fold(::println, ::println) + } +} From 349bfaa3db846a6b45aedacd6affc4cd89185ab5 Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Thu, 5 Jan 2023 15:07:42 +0100 Subject: [PATCH 03/15] Improve docs & example --- .../kotlin/arrow/core/continuations/Effect.kt | 21 +++++++++---------- .../examples/example-effect-guide-14.kt | 16 +++++++------- 2 files changed, 17 insertions(+), 20 deletions(-) diff --git a/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/continuations/Effect.kt b/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/continuations/Effect.kt index 7fd964c4d81..a0df3d0c150 100644 --- a/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/continuations/Effect.kt +++ b/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/continuations/Effect.kt @@ -522,8 +522,9 @@ import kotlin.coroutines.resumeWithException * * #### launch * - * It's **not allowed** to call `shift` from within `launch`, this is because `launch` creates a separate process. - * Any calls to `shift` inside of `launch` will be ignored by `effect`, and result in an exception being thrown. + * It's **not allowed** to call `shift` from within `launch`, this is because `launch` creates a separate unrelated child Job/Continuation. + * Any calls to `shift` inside of `launch` will be ignored by `effect`, and result in an exception being thrown inside `launch`. + * Because KotlinX Coroutines ignores `CancellationException`, and thus swallows the `shift` call. * * * ```kotlin - * coroutineScope { - * effect { - * launch { - * delay(3.seconds) - * shift("error") - * } - * 1 - * }.fold(::println, ::println) - * } + * effect { + * launch { + * delay(3.seconds) + * shift("error") + * } + * 1 + * }.fold(::println, ::println) * ``` * ```text * 1 diff --git a/arrow-libs/core/arrow-core/src/jvmTest/kotlin/examples/example-effect-guide-14.kt b/arrow-libs/core/arrow-core/src/jvmTest/kotlin/examples/example-effect-guide-14.kt index 6b5d9035872..2417e0de106 100644 --- a/arrow-libs/core/arrow-core/src/jvmTest/kotlin/examples/example-effect-guide-14.kt +++ b/arrow-libs/core/arrow-core/src/jvmTest/kotlin/examples/example-effect-guide-14.kt @@ -10,13 +10,11 @@ import kotlin.time.Duration.Companion.seconds fun main(): Unit = runBlocking { - coroutineScope { - effect { - launch { - delay(3.seconds) - shift("error") - } - 1 - }.fold(::println, ::println) - } + effect { + launch { + delay(3.seconds) + shift("error") + } + 1 + }.fold(::println, ::println) } From f4d887355375aef37b1e2c79e23605cbf0568603 Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Thu, 5 Jan 2023 15:11:30 +0100 Subject: [PATCH 04/15] replace ... --- .../commonMain/kotlin/arrow/core/continuations/Effect.kt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/continuations/Effect.kt b/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/continuations/Effect.kt index a0df3d0c150..4091cb6e0d9 100644 --- a/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/continuations/Effect.kt +++ b/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/continuations/Effect.kt @@ -583,7 +583,7 @@ import kotlin.coroutines.resumeWithException * shift or bind was called outside of its DSL scope, and the DSL Scoped operator was leaked * This is kind of usage is incorrect, make sure all calls to shift or bind occur within the lifecycle of effect { }, either { } or similar builders. * - * See: ... for additional information. + * See: Effect KDoc for additional information. * at arrow.core.continuations.FoldContinuation.shift(Effect.kt:770) * at arrow.core.examples.exampleEffectGuide13.Example_effect_guide_13Kt$main$2$1.invokeSuspend(example-effect-guide-13.kt:9) * at arrow.core.examples.exampleEffectGuide13.Example_effect_guide_13Kt$main$2$1.invoke(example-effect-guide-13.kt) @@ -621,7 +621,7 @@ import kotlin.coroutines.resumeWithException * shift or bind was called outside of its DSL scope, and the DSL Scoped operator was leaked * This is kind of usage is incorrect, make sure all calls to shift or bind occur within the lifecycle of effect { }, either { } or similar builders. * - * See: ... for additional information. + * See: Effect KDoc for additional information. * at arrow.core.continuations.FoldContinuation.shift(Effect.kt:780) * at arrow.core.examples.exampleEffectGuide14.Example_effect_guide_14Kt$main$1$1$1$1.invokeSuspend(example-effect-guide-14.kt:17) * at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) <13 internal lines> @@ -913,6 +913,6 @@ public class ShiftLeakedException : IllegalStateException( shift or bind was called outside of its DSL scope, and the DSL Scoped operator was leaked This is kind of usage is incorrect, make sure all calls to shift or bind occur within the lifecycle of effect { }, either { } or similar builders. - See: ... for additional information. + See: Effect KDoc for additional information. """.trimIndent() ) From e9af81f9776858e5774c8a792b6a41e6e98535a3 Mon Sep 17 00:00:00 2001 From: nomisRev Date: Thu, 5 Jan 2023 14:18:02 +0000 Subject: [PATCH 05/15] Update API files --- arrow-libs/core/arrow-core/api/arrow-core.api | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/arrow-libs/core/arrow-core/api/arrow-core.api b/arrow-libs/core/arrow-core/api/arrow-core.api index bf59e2f9a5c..f4f6acb47fc 100644 --- a/arrow-libs/core/arrow-core/api/arrow-core.api +++ b/arrow-libs/core/arrow-core/api/arrow-core.api @@ -2749,6 +2749,7 @@ public final class arrow/core/continuations/FoldContinuation : arrow/core/contin public fun ensure (ZLkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun getContext ()Lkotlin/coroutines/CoroutineContext; public final fun getRecover ()Lkotlin/jvm/functions/Function2; + public final fun isActive ()Ljava/util/concurrent/atomic/AtomicReference; public fun resumeWith (Ljava/lang/Object;)V public final fun setRecover (Lkotlin/jvm/functions/Function2;)V public fun shift (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; @@ -3023,6 +3024,10 @@ public final class arrow/core/continuations/ResultKt { public abstract class arrow/core/continuations/ShiftCancellationException : arrow/core/continuations/CancellationExceptionNoTrace { } +public final class arrow/core/continuations/ShiftLeakedException : java/lang/IllegalStateException { + public fun ()V +} + public final class arrow/core/continuations/Suspend : arrow/core/continuations/ShiftCancellationException { public fun (Larrow/core/continuations/Token;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)V public final fun getRecover ()Lkotlin/jvm/functions/Function2; From 31a48f37564c720065b492285c764702e72bcbd9 Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Thu, 5 Jan 2023 15:46:03 +0100 Subject: [PATCH 06/15] Retrigger CI From 4e3f5830dd9e833b2961c44c30f615fcb5ef0120 Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Thu, 5 Jan 2023 18:47:21 +0100 Subject: [PATCH 07/15] Fix StructuredConcurrencySpec --- .../StructuredConcurrencySpec.kt | 354 +++++++++--------- 1 file changed, 176 insertions(+), 178 deletions(-) diff --git a/arrow-libs/core/arrow-core/src/commonTest/kotlin/arrow/core/continuations/StructuredConcurrencySpec.kt b/arrow-libs/core/arrow-core/src/commonTest/kotlin/arrow/core/continuations/StructuredConcurrencySpec.kt index 6d10921ee5d..dd424f6f359 100644 --- a/arrow-libs/core/arrow-core/src/commonTest/kotlin/arrow/core/continuations/StructuredConcurrencySpec.kt +++ b/arrow-libs/core/arrow-core/src/commonTest/kotlin/arrow/core/continuations/StructuredConcurrencySpec.kt @@ -4,6 +4,7 @@ import arrow.core.identity import arrow.fx.coroutines.ExitCase import arrow.fx.coroutines.guaranteeCase import io.kotest.assertions.fail +import io.kotest.assertions.throwables.shouldThrow import io.kotest.core.spec.style.StringSpec import io.kotest.matchers.collections.shouldBeIn import io.kotest.matchers.nulls.shouldNotBeNull @@ -26,216 +27,213 @@ import kotlinx.coroutines.launch import kotlinx.coroutines.suspendCancellableCoroutine import kotlinx.coroutines.withTimeout -class StructuredConcurrencySpec : - StringSpec({ - "async - suspendCancellableCoroutine.invokeOnCancellation is called with Shifted Continuation" { - val started = CompletableDeferred() - val cancelled = CompletableDeferred() - - effect { - coroutineScope { - val never = async { - suspendCancellableCoroutine { cont -> - cont.invokeOnCancellation { cause -> - require(cancelled.complete(cause)) { "cancelled latch was completed twice" } - } - require(started.complete(Unit)) - } +class StructuredConcurrencySpec : StringSpec({ + "async - suspendCancellableCoroutine.invokeOnCancellation is called with Shifted Continuation" { + val started = CompletableDeferred() + val cancelled = CompletableDeferred() + + effect { + coroutineScope { + val never = async { + suspendCancellableCoroutine { cont -> + cont.invokeOnCancellation { cause -> + require(cancelled.complete(cause)) { "cancelled latch was completed twice" } } - async { - started.await() - shift("hello") - } - .await() - never.await() + require(started.complete(Unit)) } } - .runCont() shouldBe "hello" - - withTimeout(2.seconds) { - cancelled.await().shouldNotBeNull().message shouldBe "Shifted Continuation" + async { + started.await() + shift("hello") + } + .await() + never.await() } } + .runCont() shouldBe "hello" - "Computation blocks run on parent context" { - val parentCtx = currentContext() - effect { currentContext() shouldBe parentCtx }.runCont() + withTimeout(2.seconds) { + cancelled.await().shouldNotBeNull().message shouldBe "Shifted Continuation" } + } - "Concurrent shift - async await" { - checkAll(Arb.int(), Arb.int()) { a, b -> - effect { - coroutineScope { - val fa = async { shift(a) } - val fb = async { shift(b) } - fa.await() + fb.await() - } - } - .runCont() shouldBeIn listOf(a, b) - } - } + "Computation blocks run on parent context" { + val parentCtx = currentContext() + effect { currentContext() shouldBe parentCtx }.runCont() + } - "Concurrent shift - async await exit results" { - checkAll(Arb.int()) { a -> - val scopeExit = CompletableDeferred() - val fbExit = CompletableDeferred() - val startLatches = (0..11).map { CompletableDeferred() } - val nestedExits = (0..10).map { CompletableDeferred() } - - fun CoroutineScope.asyncTask( - start: CompletableDeferred, - exit: CompletableDeferred - ): Deferred = async { - guaranteeCase({ - start.complete(Unit) - awaitCancellation() - }) { case -> require(exit.complete(case)) } + "Concurrent shift - async await" { + checkAll(Arb.int(), Arb.int()) { a, b -> + effect { + coroutineScope { + val fa = async { shift(a) } + val fb = async { shift(b) } + fa.await() + fb.await() } + } + .runCont() shouldBeIn listOf(a, b) + } + } + + "Concurrent shift - async await exit results" { + checkAll(Arb.int()) { a -> + val scopeExit = CompletableDeferred() + val fbExit = CompletableDeferred() + val startLatches = (0..11).map { CompletableDeferred() } + val nestedExits = (0..10).map { CompletableDeferred() } + + fun CoroutineScope.asyncTask( + start: CompletableDeferred, + exit: CompletableDeferred + ): Deferred = async { + guaranteeCase({ + start.complete(Unit) + awaitCancellation() + }) { case -> require(exit.complete(case)) } + } - effect { - guaranteeCase({ - coroutineScope { - val fa = - async { - startLatches.drop(1).zip(nestedExits) { start, promise -> - asyncTask(start, promise) - } - startLatches.awaitAll() - shift(a) - } - val fb = asyncTask(startLatches.first(), fbExit) - fa.await() - fb.await() + effect { + guaranteeCase({ + coroutineScope { + val fa = + async { + startLatches.drop(1).zip(nestedExits) { start, promise -> + asyncTask(start, promise) + } + startLatches.awaitAll() + shift(a) } - }) { case -> require(scopeExit.complete(case)) } - fail("Should never come here") + val fb = asyncTask(startLatches.first(), fbExit) + fa.await() + fb.await() } - .runCont() shouldBe a - withTimeout(2.seconds) { - scopeExit.await().shouldBeTypeOf() - fbExit.await().shouldBeTypeOf() - nestedExits.awaitAll().forEach { it.shouldBeTypeOf() } - } + }) { case -> require(scopeExit.complete(case)) } + fail("Should never come here") + } + .runCont() shouldBe a + withTimeout(2.seconds) { + scopeExit.await().shouldBeTypeOf() + fbExit.await().shouldBeTypeOf() + nestedExits.awaitAll().forEach { it.shouldBeTypeOf() } } } + } - "Concurrent shift - async" { - checkAll(Arb.int(), Arb.int()) { a, b -> - effect { - coroutineScope { - val fa = async { shift(a) } - val fb = async { shift(b) } - "I will be overwritten by shift - coroutineScope waits until all async are finished" - } - } - .fold({ fail("Async is never awaited, and thus ignored.") }, ::identity) shouldBe + "Concurrent shift - async" { + checkAll(Arb.int(), Arb.int()) { a, b -> + effect { + coroutineScope { + val fa = async { shift(a) } + val fb = async { shift(b) } "I will be overwritten by shift - coroutineScope waits until all async are finished" + } } + .fold({ fail("Async is never awaited, and thus ignored.") }, ::identity) shouldBe + "I will be overwritten by shift - coroutineScope waits until all async are finished" } + } + + "Concurrent shift - async exit results" { + checkAll(Arb.int(), Arb.string()) { a, str -> + val exitScope = CompletableDeferred() + val startLatches = (0..10).map { CompletableDeferred() } + val nestedExits = (0..10).map { CompletableDeferred() } + + fun CoroutineScope.asyncTask( + start: CompletableDeferred, + exit: CompletableDeferred + ): Deferred = async { + guaranteeCase({ + start.complete(Unit) + awaitCancellation() + }) { case -> require(exit.complete(case)) } + } - "Concurrent shift - async exit results" { - checkAll(Arb.int(), Arb.string()) { a, str -> - val exitScope = CompletableDeferred() - val startLatches = (0..10).map { CompletableDeferred() } - val nestedExits = (0..10).map { CompletableDeferred() } - - fun CoroutineScope.asyncTask( - start: CompletableDeferred, - exit: CompletableDeferred - ): Deferred = async { - guaranteeCase({ - start.complete(Unit) - awaitCancellation() - }) { case -> require(exit.complete(case)) } - } - - effect { - guaranteeCase({ - coroutineScope { - val fa = - async { - startLatches.zip(nestedExits) { start, promise -> asyncTask(start, promise) } - startLatches.awaitAll() - shift(a) - } - str + effect { + guaranteeCase({ + coroutineScope { + val fa = + async { + startLatches.zip(nestedExits) { start, promise -> asyncTask(start, promise) } + startLatches.awaitAll() + shift(a) } - }) { case -> require(exitScope.complete(case)) } + str } - .runCont() shouldBe str - - withTimeout(2.seconds) { - nestedExits.awaitAll().forEach { it.shouldBeTypeOf() } - } + }) { case -> require(exitScope.complete(case)) } } - } + .runCont() shouldBe str - "Concurrent shift - launch" { - checkAll(Arb.int(), Arb.int()) { a, b -> - effect { - coroutineScope { - launch { shift(a) } - launch { shift(b) } - "shift does not escape `launch`" - } - } - .runCont() shouldBe "shift does not escape `launch`" + withTimeout(2.seconds) { + nestedExits.awaitAll().forEach { it.shouldBeTypeOf() } } } + } - "Concurrent shift - launch exit results" { - checkAll(Arb.int(), Arb.string()) { a, str -> - val scopeExit = CompletableDeferred() - val startLatches = (0..10).map { CompletableDeferred() } - val nestedExits = (0..10).map { CompletableDeferred() } - - fun CoroutineScope.launchTask( - start: CompletableDeferred, - exit: CompletableDeferred - ): Job = launch { - guaranteeCase({ - start.complete(Unit) - awaitCancellation() - }) { case -> require(exit.complete(case)) } - } - - effect { - guaranteeCase({ - coroutineScope { - val fa = launch { - startLatches.zip(nestedExits) { start, promise -> launchTask(start, promise) } - startLatches.awaitAll() - shift(a) - } - str - } - }) { case -> require(scopeExit.complete(case)) } - } - .runCont() shouldBe str - withTimeout(2.seconds) { - scopeExit.await().shouldBeTypeOf() - nestedExits.awaitAll().forEach { it.shouldBeTypeOf() } + "Concurrent shift - launch" { + checkAll(Arb.int(), Arb.int()) { a, b -> + effect { + coroutineScope { + launch { shift(a) } + launch { shift(b) } + "shift does not escape `launch`" } } + .runCont() shouldBe "shift does not escape `launch`" } + } + + "Concurrent shift - launch exit results" { + checkAll(Arb.int(), Arb.string()) { a, str -> + val scopeExit = CompletableDeferred() + val startLatches = (0..10).map { CompletableDeferred() } + val nestedExits = (0..10).map { CompletableDeferred() } + + fun CoroutineScope.launchTask( + start: CompletableDeferred, + exit: CompletableDeferred + ): Job = launch { + guaranteeCase({ + start.complete(Unit) + awaitCancellation() + }) { case -> require(exit.complete(case)) } + } - // `shift` escapes `cont` block, and gets rethrown inside `coroutineScope`. - // Effectively awaiting/executing DSL code, outside of the DSL... - "async funky scenario #1 - Extract `shift` from `cont` through `async`" { - checkAll(Arb.int(), Arb.int()) { a, b -> - runCatching { - coroutineScope { - val shiftedAsync = - effect> { - val fa = async { shift(a) } - async { shift(b) } - } - .fold({ fail("shift was never awaited, so it never took effect") }, ::identity) - shiftedAsync.await() + effect { + guaranteeCase({ + coroutineScope { + val fa = launch { + startLatches.zip(nestedExits) { start, promise -> launchTask(start, promise) } + startLatches.awaitAll() + shift(a) } + str } - .exceptionOrNull() - ?.message shouldBe "Shifted Continuation" + }) { case -> require(scopeExit.complete(case)) } + } + .runCont() shouldBe str + withTimeout(2.seconds) { + scopeExit.await().shouldBeTypeOf() + nestedExits.awaitAll().forEach { it.shouldBeTypeOf() } + } + } + } + + // `shift` escapes `cont` block, and gets rethrown inside `coroutineScope`. + // Effectively awaiting/executing DSL code, outside of the DSL... + "async funky scenario #1 - Extract `shift` from `cont` through `async`" { + checkAll(Arb.int(), Arb.int()) { a, b -> + shouldThrow { + coroutineScope { + val shiftedAsync = + effect> { + val fa = async { shift(a) } + async { shift(b) } + } + .fold({ fail("shift was never awaited, so it never took effect") }, ::identity) + shiftedAsync.await() + } } } - }) + } +}) From 07b41690a00abf7651827ba4593085e0009c0f8e Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Mon, 9 Jan 2023 13:54:59 +0100 Subject: [PATCH 08/15] Update arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/continuations/Effect.kt Co-authored-by: Francisco Diaz --- .../src/commonMain/kotlin/arrow/core/continuations/Effect.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/continuations/Effect.kt b/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/continuations/Effect.kt index 4091cb6e0d9..cd23a165590 100644 --- a/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/continuations/Effect.kt +++ b/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/continuations/Effect.kt @@ -517,7 +517,7 @@ import kotlin.coroutines.resumeWithException * ErrorA * ``` * - * The example here will always print `ErrorA`, but never `ErrorB`. This is because `fa` is awaited first, and when it's `shifts` it will cancel `fb`. + * The example here will always print `ErrorA`, but never `ErrorB`. This is because `fa` is awaited first, and when it `shifts` it will cancel `fb`. * If instead we used `awaitAll`, then it would print `ErrorA` or `ErrorB` due to both `fa` and `fb` being awaited in parallel. * * #### launch From b2b88ccadfff7d5ea2202db7b7e434f0ebdb5f4d Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Mon, 9 Jan 2023 21:29:40 +0100 Subject: [PATCH 09/15] Update arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/continuations/Effect.kt Co-authored-by: Francisco Diaz --- .../src/commonMain/kotlin/arrow/core/continuations/Effect.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/continuations/Effect.kt b/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/continuations/Effect.kt index cd23a165590..d67e3a4d9b9 100644 --- a/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/continuations/Effect.kt +++ b/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/continuations/Effect.kt @@ -522,8 +522,8 @@ import kotlin.coroutines.resumeWithException * * #### launch * - * It's **not allowed** to call `shift` from within `launch`, this is because `launch` creates a separate unrelated child Job/Continuation. - * Any calls to `shift` inside of `launch` will be ignored by `effect`, and result in an exception being thrown inside `launch`. + * It's **not allowed** to call `shift` from within `launch`. This is because `launch` creates a separate unrelated child Job/Continuation. + * Any calls to `shift` inside of `launch` will be ignored by `effect` and result in an exception being thrown inside `launch`. * Because KotlinX Coroutines ignores `CancellationException`, and thus swallows the `shift` call. * * */ public inline fun Iterable.align(b: Iterable, fa: (Ior) -> C): List = - this.align(b).map(fa) + buildList(maxOf(this.collectionSizeOrDefault(10), b.collectionSizeOrDefault(10))) { + val first = this@align.iterator() + val second = b.iterator() + while (first.hasNext() || second.hasNext()) { + val element: Ior = when { + first.hasNext() && second.hasNext() -> Ior.Both(first.next(), second.next()) + first.hasNext() -> first.next().leftIor() + second.hasNext() -> second.next().rightIor() + else -> throw IllegalStateException("this should never happen") + } + add(fa(element)) + } + } /** * Combines two structures by taking the union of their shapes and using Ior to hold the elements. @@ -651,18 +663,7 @@ public inline fun Iterable.align(b: Iterable, fa: (Ior) -> * */ public fun Iterable.align(b: Iterable): List> = - alignRec(this, b) - -@Suppress("NAME_SHADOWING") -private fun alignRec(ls: Iterable, rs: Iterable): List> { - val ls = if (ls is List) ls else ls.toList() - val rs = if (rs is List) rs else rs.toList() - return when { - ls.isEmpty() -> rs.map { it.rightIor() } - rs.isEmpty() -> ls.map { it.leftIor() } - else -> listOf(Ior.Both(ls.first(), rs.first())) + alignRec(ls.drop(1), rs.drop(1)) - } -} + this.align(b, ::identity) /** * aligns two structures and combine them with the given [Semigroup.combine] From be89b3ebc4047c45c5289ef9c676204267091baa Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Tue, 10 Jan 2023 12:43:58 +0100 Subject: [PATCH 14/15] Hide atomic behind complete method --- arrow-libs/core/arrow-core/api/arrow-core.api | 1 - .../kotlin/arrow/core/continuations/Effect.kt | 43 ++++++++++--------- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/arrow-libs/core/arrow-core/api/arrow-core.api b/arrow-libs/core/arrow-core/api/arrow-core.api index f4f6acb47fc..5e69f590c55 100644 --- a/arrow-libs/core/arrow-core/api/arrow-core.api +++ b/arrow-libs/core/arrow-core/api/arrow-core.api @@ -2749,7 +2749,6 @@ public final class arrow/core/continuations/FoldContinuation : arrow/core/contin public fun ensure (ZLkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun getContext ()Lkotlin/coroutines/CoroutineContext; public final fun getRecover ()Lkotlin/jvm/functions/Function2; - public final fun isActive ()Ljava/util/concurrent/atomic/AtomicReference; public fun resumeWith (Ljava/lang/Object;)V public final fun setRecover (Lkotlin/jvm/functions/Function2;)V public fun shift (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; diff --git a/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/continuations/Effect.kt b/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/continuations/Effect.kt index d67e3a4d9b9..56aa6386e13 100644 --- a/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/continuations/Effect.kt +++ b/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/continuations/Effect.kt @@ -26,19 +26,19 @@ import kotlin.coroutines.resumeWithException * * - * [Writing a program with Effect](#writing-a-program-with-effect) - * [Handling errors](#handling-errors) - * [Structured Concurrency](#structured-concurrency) - * [Arrow Fx Coroutines](#arrow-fx-coroutines) - * [parZip](#parzip) - * [parTraverse](#partraverse) - * [raceN](#racen) - * [bracketCase / Resource](#bracketcase--resource) - * [KotlinX](#kotlinx) - * [withContext](#withcontext) - * [async](#async) - * [launch](#launch) - * [Leaking `shift`](#leaking-shift) + * [Writing a program with Effect](#writing-a-program-with-effect) + * [Handling errors](#handling-errors) + * [Structured Concurrency](#structured-concurrency) + * [Arrow Fx Coroutines](#arrow-fx-coroutines) + * [parZip](#parzip) + * [parTraverse](#partraverse) + * [raceN](#racen) + * [bracketCase / Resource](#bracketcase--resource) + * [KotlinX](#kotlinx) + * [withContext](#withcontext) + * [async](#async) + * [launch](#launch) + * [Leaking `shift`](#leaking-shift) * * @@ -788,8 +788,9 @@ internal class FoldContinuation( lateinit var recover: suspend (R) -> Any? - // Add AtomicBoolean to arrow-atomic - val isActive: AtomicRef = AtomicRef(true) + private val isActive: AtomicRef = AtomicRef(true) + + internal fun complete(): Boolean = isActive.getAndSet(false) // Shift away from this Continuation by intercepting it, and completing it with // ShiftCancellationException @@ -803,7 +804,7 @@ internal class FoldContinuation( // CancellationException and thus effectively recovering from the cancellation/shift. // This means try/catch is also capable of recovering from monadic errors. // See: EffectSpec - try/catch tests - if (isActive.get()) throw Suspend(this, r, recover as suspend (Any?) -> Any?) + if (complete()) throw Suspend(this, r, recover as suspend (Any?) -> Any?) else throw ShiftLeakedException() // In contrast to `createCoroutineUnintercepted this doesn't create a new ContinuationImpl @@ -823,13 +824,13 @@ internal class FoldContinuation( result.fold(parent::resume) { throwable -> when { throwable is Suspend && this === throwable.token -> { - isActive.set(false) + complete() suspend { throwable.recover(throwable.shifted) as B }.startCoroutineUnintercepted() } throwable is Suspend -> parent.resumeWith(result) else -> { - isActive.set(false) + complete() suspend { error(throwable.nonFatalOrThrow()) }.startCoroutineUnintercepted() } } @@ -887,18 +888,18 @@ private class DefaultEffect(val f: suspend EffectScope.() -> A) : Effec shift.recover = recover try { val fold: suspend EffectScope.() -> B = { - val res = f(this).also { shift.isActive.set(false) } + val res = f(this).also { shift.complete() } transform(res) } fold.startCoroutineUninterceptedOrReturn(shift, shift) } catch (e: Suspend) { if (shift === e.token) { - shift.isActive.set(false) + shift.complete() val f: suspend () -> B = { e.recover(e.shifted) as B } f.startCoroutineUninterceptedOrReturn(cont) } else throw e } catch (e: Throwable) { - shift.isActive.set(false) + shift.complete() val f: suspend () -> B = { error(e.nonFatalOrThrow()) } f.startCoroutineUninterceptedOrReturn(cont) } From 2088deca065614ccc625a26de30d0c3c5ecb5513 Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Tue, 10 Jan 2023 14:40:57 +0100 Subject: [PATCH 15/15] Fix shift --- .../src/commonMain/kotlin/arrow/core/continuations/Effect.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/continuations/Effect.kt b/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/continuations/Effect.kt index 56aa6386e13..e0e3e67f45d 100644 --- a/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/continuations/Effect.kt +++ b/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/continuations/Effect.kt @@ -804,7 +804,7 @@ internal class FoldContinuation( // CancellationException and thus effectively recovering from the cancellation/shift. // This means try/catch is also capable of recovering from monadic errors. // See: EffectSpec - try/catch tests - if (complete()) throw Suspend(this, r, recover as suspend (Any?) -> Any?) + if (isActive.get()) throw Suspend(this, r, recover as suspend (Any?) -> Any?) else throw ShiftLeakedException() // In contrast to `createCoroutineUnintercepted this doesn't create a new ContinuationImpl