From bed3d29acc39083151da11802a5dc212acb60bb8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ivan=20=E2=80=9CCLOVIS=E2=80=9D=20Canet?= Date: Mon, 14 Oct 2024 23:22:50 +0200 Subject: [PATCH] Introduce Flow.any, Flow.all, Flow.none Fixes #4212 --- .../api/kotlinx-coroutines-core.api | 3 + .../api/kotlinx-coroutines-core.klib.api | 3 + .../common/src/flow/terminal/Logic.kt | 107 ++++++++++++++++++ .../flow/operators/BooleanTerminationTest.kt | 106 +++++++++++++++++ 4 files changed, 219 insertions(+) create mode 100644 kotlinx-coroutines-core/common/src/flow/terminal/Logic.kt create mode 100644 kotlinx-coroutines-core/common/test/flow/operators/BooleanTerminationTest.kt diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 189a0f3544..6d75746e6b 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -980,6 +980,8 @@ public abstract interface class kotlinx/coroutines/flow/FlowCollector { public final class kotlinx/coroutines/flow/FlowKt { public static final field DEFAULT_CONCURRENCY_PROPERTY_NAME Ljava/lang/String; + public static final fun all (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun any (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun asFlow (Ljava/lang/Iterable;)Lkotlinx/coroutines/flow/Flow; public static final fun asFlow (Ljava/util/Iterator;)Lkotlinx/coroutines/flow/Flow; public static final fun asFlow (Lkotlin/jvm/functions/Function0;)Lkotlinx/coroutines/flow/Flow; @@ -1075,6 +1077,7 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final fun merge (Ljava/lang/Iterable;)Lkotlinx/coroutines/flow/Flow; public static final fun merge (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; public static final fun merge ([Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; + public static final fun none (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun observeOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow; public static final fun onCompletion (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; public static final fun onEach (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api index f419598f9b..a27e3154af 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api @@ -997,6 +997,8 @@ final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel< final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/toMutableList(): kotlin.collections/MutableList<#A> // kotlinx.coroutines.channels/toMutableList|toMutableList@kotlinx.coroutines.channels.ReceiveChannel<0:0>(){0§}[0] final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/toMutableSet(): kotlin.collections/MutableSet<#A> // kotlinx.coroutines.channels/toMutableSet|toMutableSet@kotlinx.coroutines.channels.ReceiveChannel<0:0>(){0§}[0] final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/toSet(): kotlin.collections/Set<#A> // kotlinx.coroutines.channels/toSet|toSet@kotlinx.coroutines.channels.ReceiveChannel<0:0>(){0§}[0] +final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/all(kotlin.coroutines/SuspendFunction1<#A, kotlin/Boolean>): kotlin/Boolean // kotlinx.coroutines.flow/all|all@kotlinx.coroutines.flow.Flow<0:0>(kotlin.coroutines.SuspendFunction1<0:0,kotlin.Boolean>){0§}[0] +final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/any(kotlin.coroutines/SuspendFunction1<#A, kotlin/Boolean>): kotlin/Boolean // kotlinx.coroutines.flow/any|any@kotlinx.coroutines.flow.Flow<0:0>(kotlin.coroutines.SuspendFunction1<0:0,kotlin.Boolean>){0§}[0] final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/collectLatest(kotlin.coroutines/SuspendFunction1<#A, kotlin/Unit>) // kotlinx.coroutines.flow/collectLatest|collectLatest@kotlinx.coroutines.flow.Flow<0:0>(kotlin.coroutines.SuspendFunction1<0:0,kotlin.Unit>){0§}[0] final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/count(): kotlin/Int // kotlinx.coroutines.flow/count|count@kotlinx.coroutines.flow.Flow<0:0>(){0§}[0] final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/count(kotlin.coroutines/SuspendFunction1<#A, kotlin/Boolean>): kotlin/Int // kotlinx.coroutines.flow/count|count@kotlinx.coroutines.flow.Flow<0:0>(kotlin.coroutines.SuspendFunction1<0:0,kotlin.Boolean>){0§}[0] @@ -1006,6 +1008,7 @@ final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.c final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/firstOrNull(kotlin.coroutines/SuspendFunction1<#A, kotlin/Boolean>): #A? // kotlinx.coroutines.flow/firstOrNull|firstOrNull@kotlinx.coroutines.flow.Flow<0:0>(kotlin.coroutines.SuspendFunction1<0:0,kotlin.Boolean>){0§}[0] final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/last(): #A // kotlinx.coroutines.flow/last|last@kotlinx.coroutines.flow.Flow<0:0>(){0§}[0] final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/lastOrNull(): #A? // kotlinx.coroutines.flow/lastOrNull|lastOrNull@kotlinx.coroutines.flow.Flow<0:0>(){0§}[0] +final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/none(kotlin.coroutines/SuspendFunction1<#A, kotlin/Boolean>): kotlin/Boolean // kotlinx.coroutines.flow/none|none@kotlinx.coroutines.flow.Flow<0:0>(kotlin.coroutines.SuspendFunction1<0:0,kotlin.Boolean>){0§}[0] final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/single(): #A // kotlinx.coroutines.flow/single|single@kotlinx.coroutines.flow.Flow<0:0>(){0§}[0] final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/singleOrNull(): #A? // kotlinx.coroutines.flow/singleOrNull|singleOrNull@kotlinx.coroutines.flow.Flow<0:0>(){0§}[0] final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/stateIn(kotlinx.coroutines/CoroutineScope): kotlinx.coroutines.flow/StateFlow<#A> // kotlinx.coroutines.flow/stateIn|stateIn@kotlinx.coroutines.flow.Flow<0:0>(kotlinx.coroutines.CoroutineScope){0§}[0] diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Logic.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Logic.kt new file mode 100644 index 0000000000..6d1cd6fee9 --- /dev/null +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Logic.kt @@ -0,0 +1,107 @@ +@file:JvmMultifileClass +@file:JvmName("FlowKt") + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlin.jvm.* + + +/** + * A terminal operator that returns `true` and immediately cancels the flow + * if at least one element matches the given [predicate]. + * + * If the flow does not emit any elements or no element matches the predicate, the function returns `false`. + * + * Equivalent to `!all { !predicate(it) }` (see [Flow.all]) and `!none { predicate(it) }` (see [Flow.none]). + * + * Example: + * + * ``` + * val myFlow = flow { + * repeat(10) { + * emit(it) + * } + * throw RuntimeException("You still didn't find the required number? I gave you ten!") + * } + * println(myFlow.any { it > 5 }) // true + * println(flowOf(1, 2, 3).any { it > 5 }) // false + * ``` + * + * @see Iterable.any + * @see Sequence.any + */ +public suspend fun Flow.any(predicate: suspend (T) -> Boolean): Boolean { + var found = false + collectWhile { + val satisfies = predicate(it) + if (satisfies) found = true + !satisfies + } + return found +} + +/** + * A terminal operator that returns `true` if all elements match the given [predicate], + * or returns `false` and cancels the flow as soon as the first element not matching the predicate is encountered. + * + * If the flow terminates without emitting any elements, the function returns `true` because there + * are no elements in it that *do not* match the predicate. + * See a more detailed explanation of this logic concept in the + * ["Vacuous truth"](https://en.wikipedia.org/wiki/Vacuous_truth) article. + * + * Equivalent to `!any { !predicate(it) }` (see [Flow.any]) and `none { !predicate(it) }` (see [Flow.none]). + * + * Example: + * + * ``` + * val myFlow = flow { + * repeat(10) { + * emit(it) + * } + * throw RuntimeException("You still didn't find the required number? I gave you ten!") + * } + * println(myFlow.all { it <= 5 }) // false + * println(flowOf(1, 2, 3).all { it <= 5 }) // true + * ``` + * + * @see Iterable.all + * @see Sequence.all + */ +public suspend fun Flow.all(predicate: suspend (T) -> Boolean): Boolean { + var foundCounterExample = false + collectWhile { + val satisfies = predicate(it) + if (!satisfies) foundCounterExample = true + satisfies + } + return !foundCounterExample +} + +/** + * A terminal operator that returns `true` if no elements match the given [predicate], + * or returns `false` and cancels the flow as soon as the first element matching the predicate is encountered. + * + * If the flow terminates without emitting any elements, the function returns `true` because there + * are no elements in it that match the predicate. + * See a more detailed explanation of this logic concept in the + * ["Vacuous truth"](https://en.wikipedia.org/wiki/Vacuous_truth) article. + * + * Equivalent to `!any(predicate)` (see [Flow.any]) and `all { !predicate(it) }` (see [Flow.all]). + * + * Example: + * ``` + * val myFlow = flow { + * repeat(10) { + * emit(it) + * } + * throw RuntimeException("You still didn't find the required number? I gave you ten!") + * } + * println(myFlow.none { it > 5 }) // false + * println(flowOf(1, 2, 3).none { it > 5 }) // true + * ``` + * + * @see Iterable.none + * @see Sequence.none + */ +public suspend fun Flow.none(predicate: suspend (T) -> Boolean): Boolean = !any(predicate) diff --git a/kotlinx-coroutines-core/common/test/flow/operators/BooleanTerminationTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/BooleanTerminationTest.kt new file mode 100644 index 0000000000..3087c78f67 --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/operators/BooleanTerminationTest.kt @@ -0,0 +1,106 @@ +package kotlinx.coroutines.flow + +import kotlinx.coroutines.testing.* +import kotlin.test.* + +class BooleanTerminationTest : TestBase() { + @Test + fun testAnyNominal() = runTest { + val flow = flow { + emit(1) + emit(2) + } + + assertTrue(flow.any { it > 0 }) + assertTrue(flow.any { it % 2 == 0 }) + assertFalse(flow.any { it > 5 }) + } + + @Test + fun testAnyEmpty() = runTest { + assertFalse(emptyFlow().any { it > 0 }) + } + + @Test + fun testAnyInfinite() = runTest { + assertTrue(flow { while (true) { emit(5) } }.any { it == 5 }) + } + + @Test + fun testAnyShortCircuit() = runTest { + assertTrue(flow { + emit(1) + emit(2) + expectUnreached() + }.any { + it == 2 + }) + } + + @Test + fun testAllNominal() = runTest { + val flow = flow { + emit(1) + emit(2) + } + + assertTrue(flow.all { it > 0 }) + assertFalse(flow.all { it % 2 == 0 }) + assertFalse(flow.all { it > 5 }) + } + + @Test + fun testAllEmpty() = runTest { + assertTrue(emptyFlow().all { it > 0 }) + } + + @Test + fun testAllInfinite() = runTest { + assertFalse(flow { while (true) { emit(5) } }.all { it == 0 }) + } + + @Test + fun testAllShortCircuit() = runTest { + assertFalse(flow { + emit(1) + emit(2) + expectUnreached() + }.all { + it <= 1 + }) + } + + @Test + fun testNoneNominal() = runTest { + val flow = flow { + emit(1) + emit(2) + } + + assertFalse(flow.none { it > 0 }) + assertFalse(flow.none { it % 2 == 0 }) + assertTrue(flow.none { it > 5 }) + } + + @Test + fun testNoneEmpty() = runTest { + assertTrue(emptyFlow().none { it > 0 }) + } + + @Test + fun testNoneInfinite() = runTest { + assertFalse(flow { while (true) { emit(5) } }.none { it == 5 }) + } + + @Test + fun testNoneShortCircuit() = runTest { + assertFalse(flow { + emit(1) + emit(2) + expectUnreached() + }.none { + it == 2 + }) + } + +}