From 2431eb4083ff42076fef643846042fc6c11c352b Mon Sep 17 00:00:00 2001 From: "He-Pin(kerr)" Date: Mon, 22 Jan 2024 19:14:34 +0800 Subject: [PATCH] feat: Add Flow/Sink#foldWhile operator. (#1012) --- .../stream/operators/Sink/foldWhile.md | 42 ++++++++++++++++ .../operators/Source-or-Flow/foldWhile.md | 48 +++++++++++++++++++ .../main/paradox/stream/operators/index.md | 4 ++ .../stream/operators/flow/FoldWhile.java | 39 +++++++++++++++ .../stream/operators/sink/FoldWhile.java | 39 +++++++++++++++ .../stream/operators/sink/FoldWhile.scala | 37 ++++++++++++++ .../stream/operators/sourceorflow/Fold.scala | 6 +++ .../operators/sourceorflow/FoldWhile.scala | 37 ++++++++++++++ .../apache/pekko/stream/javadsl/FlowTest.java | 12 +++++ .../apache/pekko/stream/javadsl/SinkTest.java | 34 ++++++++----- .../pekko/stream/javadsl/SourceTest.java | 12 +++++ .../stream/DslFactoriesConsistencySpec.scala | 1 + .../stream/scaladsl/FlowFoldWhileSpec.scala | 46 ++++++++++++++++++ .../pekko/stream/scaladsl/SinkFoldSpec.scala | 34 +++++++++++++ .../org/apache/pekko/stream/impl/Stages.scala | 1 + .../apache/pekko/stream/impl/fusing/Ops.scala | 22 +++++++-- .../apache/pekko/stream/javadsl/Flow.scala | 26 ++++++++++ .../apache/pekko/stream/javadsl/Sink.scala | 11 +++++ .../apache/pekko/stream/javadsl/Source.scala | 26 ++++++++++ .../apache/pekko/stream/javadsl/SubFlow.scala | 26 ++++++++++ .../pekko/stream/javadsl/SubSource.scala | 26 ++++++++++ .../apache/pekko/stream/scaladsl/Flow.scala | 26 ++++++++++ .../apache/pekko/stream/scaladsl/Sink.scala | 12 +++++ 23 files changed, 551 insertions(+), 16 deletions(-) create mode 100644 docs/src/main/paradox/stream/operators/Sink/foldWhile.md create mode 100644 docs/src/main/paradox/stream/operators/Source-or-Flow/foldWhile.md create mode 100644 docs/src/test/java/jdocs/stream/operators/flow/FoldWhile.java create mode 100644 docs/src/test/java/jdocs/stream/operators/sink/FoldWhile.java create mode 100644 docs/src/test/scala/docs/stream/operators/sink/FoldWhile.scala create mode 100644 docs/src/test/scala/docs/stream/operators/sourceorflow/FoldWhile.scala create mode 100644 stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFoldWhileSpec.scala create mode 100644 stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkFoldSpec.scala diff --git a/docs/src/main/paradox/stream/operators/Sink/foldWhile.md b/docs/src/main/paradox/stream/operators/Sink/foldWhile.md new file mode 100644 index 00000000000..3c49b2a9595 --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Sink/foldWhile.md @@ -0,0 +1,42 @@ +# Sink.foldWhile + +Fold over emitted elements with a function, where each invocation will get the new element and the result from the previous fold invocation. + +@ref[Sink operators](../index.md#sink-operators) + +## Signature + +@apidoc[Sink.foldWhile](Sink$) { scala="#foldWhile%5BU%2C%20T%5D(zero%3A%20U)(p%3A%20U%20%3D%3E%20Boolean)(f%3A%20(U%2C%20T)%20%3D%3E%20U):org.apache.pekko.stream.scaladsl.Sink[T,scala.concurrent.Future[U]]" java="#foldWhile(java.lang.Object,org.apache.pekko.japi.function.Predicate,org.apache.pekko.japi.function.Function2)" } + +## Description + +A Sink that will invoke the given function for every received element, giving it its previous output (or the given zero value) +and the element as input. + +Materializes into a @scala[`Future`] @java[`CompletionStage`] that will complete with the last state when the stream has completed, +predicate p returns false, or completed with Failure if there is a failure signaled in the stream. + +This operator allows combining values into a result without a global mutable state by instead passing the state along +between invocations. + +## Example + +`foldWhile` is typically used to 'fold up' the incoming values into an aggregate with a predicate. +For example, you can use `foldWhile` to calculate the sum while some predicate is true. + +Scala +: @@snip [FoldWhile.scala](/docs/src/test/scala/docs/stream/operators/sink/FoldWhile.scala) { #foldWhile } + +Java +: @@snip [FoldWhile.java](/docs/src/test/java/jdocs/stream/operators/sink/FoldWhile.java) { #foldWhile } + +## Reactive Streams semantics + +@@@div { .callout } + +**cancels** never + +**backpressures** when the previous fold function invocation has not yet completed + +@@@ + diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/foldWhile.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/foldWhile.md new file mode 100644 index 00000000000..1de0907726d --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/foldWhile.md @@ -0,0 +1,48 @@ +# foldWhile + +Start with current value `zero` and then apply the current and next value to the given function. When upstream completes or the predicate `p` returns `false`, the current value is emitted downstream. + +@ref[Simple operators](../index.md#simple-operators) + +## Signature + +@apidoc[Source.foldWhile](Source) { scala="#foldWhile%5BT%5D(zero%3A%20T)(p%3A%20T%20%3D%3E%20Boolean)(f%3A%20(T%2C%20Out)%20%3D%3E%20T):FlowOps.this.Repr[T]" java="#foldWhile(java.lang.Object,org.apache.pekko.japi.function.Predicate,org.apache.pekko.japi.function.Function2)" } +@apidoc[Flow.foldWhile](Flow) { scala="#foldWhile%5BT%5D(zero%3A%20T)(p%3A%20T%20%3D%3E%20Boolean)(f%3A%20(T%2C%20Out)%20%3D%3E%20T):FlowOps.this.Repr[T]" java="#foldWhile(java.lang.Object,org.apache.pekko.japi.function.Predicate,org.apache.pekko.japi.function.Function2)" } + +## Description + +Start with current value `zero` and then apply the current and next value to the given function. When upstream +completes, the current value is emitted downstream. + +@@@ warning + +Note that the `zero` value must be immutable, because otherwise +the same mutable instance would be shared across different threads +when running the stream more than once. + +@@@ + +## Example + +`foldWhile` is typically used to 'fold up' the incoming values into an aggregate with a predicate. +For example, you can use `foldWhile` to calculate the sum while some predicate is true. + + +Scala +: @@snip [FoldWhile.scala](/docs/src/test/scala/docs/stream/operators/sourceorflow/FoldWhile.scala) { #imports #foldWhile } + +Java +: @@snip [FoldWhile.java](/docs/src/test/java/jdocs/stream/operators/flow/FoldWhile.java) { #foldWhile } + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when upstream completes + +**backpressures** when downstream backpressures + +**completes** when upstream completes + +@@@ + diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md index 6fd8ad43231..e79c880f21f 100644 --- a/docs/src/main/paradox/stream/operators/index.md +++ b/docs/src/main/paradox/stream/operators/index.md @@ -61,6 +61,7 @@ These built-in sinks are available from @scala[`org.apache.pekko.stream.scaladsl |Sink|@ref[combine](Sink/combine.md)|Combine several sinks into one using a user specified strategy| |Sink|@ref[completionStageSink](Sink/completionStageSink.md)|Streams the elements to the given future sink once it successfully completes. | |Sink|@ref[fold](Sink/fold.md)|Fold over emitted elements with a function, where each invocation will get the new element and the result from the previous fold invocation.| +|Sink|@ref[foldWhile](Sink/foldWhile.md)|Fold over emitted elements with a function, where each invocation will get the new element and the result from the previous fold invocation.| |Sink|@ref[foreach](Sink/foreach.md)|Invoke a given procedure for each element received.| |Sink|@ref[foreachAsync](Sink/foreachAsync.md)|Invoke a given procedure asynchronously for each element received.| |Sink|@ref[foreachParallel](Sink/foreachParallel.md)|Like `foreach` but allows up to `parallellism` procedure calls to happen in parallel.| @@ -158,6 +159,7 @@ depending on being backpressured by downstream or not. |Flow|@ref[flattenOptional](Flow/flattenOptional.md)|Collect the value of `Optional` from all the elements passing through this flow , empty `Optional` is filtered out.| |Source/Flow|@ref[fold](Source-or-Flow/fold.md)|Start with current value `zero` and then apply the current and next value to the given function. When upstream completes, the current value is emitted downstream.| |Source/Flow|@ref[foldAsync](Source-or-Flow/foldAsync.md)|Just like `fold` but receives a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value.| +|Source/Flow|@ref[foldWhile](Source-or-Flow/foldWhile.md)|Start with current value `zero` and then apply the current and next value to the given function. When upstream completes or the predicate `p` returns `false`, the current value is emitted downstream.| |Source/Flow|@ref[fromMaterializer](Source-or-Flow/fromMaterializer.md)|Defer the creation of a `Source/Flow` until materialization and access `Materializer` and `Attributes`| |Flow|@ref[futureFlow](Flow/futureFlow.md)|Streams the elements through the given future flow once it successfully completes.| |Source/Flow|@ref[grouped](Source-or-Flow/grouped.md)|Accumulate incoming events until the specified number of elements have been accumulated and then pass the collection of elements downstream.| @@ -453,6 +455,8 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [fold](Source-or-Flow/fold.md) * [fold](Sink/fold.md) * [foldAsync](Source-or-Flow/foldAsync.md) +* [foldWhile](Source-or-Flow/foldWhile.md) +* [foldWhile](Sink/foldWhile.md) * [foreach](Sink/foreach.md) * [foreachAsync](Sink/foreachAsync.md) * [foreachParallel](Sink/foreachParallel.md) diff --git a/docs/src/test/java/jdocs/stream/operators/flow/FoldWhile.java b/docs/src/test/java/jdocs/stream/operators/flow/FoldWhile.java new file mode 100644 index 00000000000..2130835dc07 --- /dev/null +++ b/docs/src/test/java/jdocs/stream/operators/flow/FoldWhile.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package jdocs.stream.operators.flow; + +// #imports + +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.stream.javadsl.Source; + +// #imports + +public class FoldWhile { + private static final ActorSystem system = null; + + private void foldWhileUsage() { + // #foldWhile + Source.range(1, 10) + .foldWhile(0, acc -> acc < 10, Integer::sum) + .runForeach(System.out::println, system); + // Expect prints: + // 100 + // #foldWhile + } +} diff --git a/docs/src/test/java/jdocs/stream/operators/sink/FoldWhile.java b/docs/src/test/java/jdocs/stream/operators/sink/FoldWhile.java new file mode 100644 index 00000000000..f821c6fe3b7 --- /dev/null +++ b/docs/src/test/java/jdocs/stream/operators/sink/FoldWhile.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package jdocs.stream.operators.sink; + +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.stream.javadsl.Sink; +import org.apache.pekko.stream.javadsl.Source; + +public class FoldWhile { + private ActorSystem system = null; + + public void foldWhileUsage() throws Exception { + // #foldWhile + final int result = + Source.range(1, 10) + .runWith(Sink.foldWhile(0, acc -> acc < 10, Integer::sum), system) + .toCompletableFuture() + .get(); + System.out.println(result); + // Expect prints: + // 10 + // #foldWhile + } +} diff --git a/docs/src/test/scala/docs/stream/operators/sink/FoldWhile.scala b/docs/src/test/scala/docs/stream/operators/sink/FoldWhile.scala new file mode 100644 index 00000000000..22b20d4c1f6 --- /dev/null +++ b/docs/src/test/scala/docs/stream/operators/sink/FoldWhile.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package docs.stream.operators.sink + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.stream.scaladsl.{ Sink, Source } + +import scala.concurrent.duration.DurationInt +import scala.concurrent.{ Await, ExecutionContextExecutor } + +class FoldWhile { + implicit val system: ActorSystem = ??? + implicit val ec: ExecutionContextExecutor = system.dispatcher + + // #foldWhile + val r = Source(1 to 10) + .runWith(Sink.foldWhile(0L)(_ < 10)(_ + _)) + println(Await.result(r, 3.seconds)) + // Expect prints: + // 10 + // #foldWhile +} diff --git a/docs/src/test/scala/docs/stream/operators/sourceorflow/Fold.scala b/docs/src/test/scala/docs/stream/operators/sourceorflow/Fold.scala index 65541c6bd43..d3d3c7a0a66 100644 --- a/docs/src/test/scala/docs/stream/operators/sourceorflow/Fold.scala +++ b/docs/src/test/scala/docs/stream/operators/sourceorflow/Fold.scala @@ -34,4 +34,10 @@ object Fold extends App { // Prints: Histogram(99,51) // #fold + + // #foldWhile + Source(1 to 100) + .foldWhile(0)(elem => elem < 100)(_ + _) + .runForeach(println) + // #foldWhile } diff --git a/docs/src/test/scala/docs/stream/operators/sourceorflow/FoldWhile.scala b/docs/src/test/scala/docs/stream/operators/sourceorflow/FoldWhile.scala new file mode 100644 index 00000000000..ce041e39ad0 --- /dev/null +++ b/docs/src/test/scala/docs/stream/operators/sourceorflow/FoldWhile.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package docs.stream.operators.sourceorflow + +//#imports +import org.apache.pekko +import pekko.actor.ActorSystem +import pekko.stream.scaladsl.Source + +//#imports +object FoldWhile extends App { + + implicit val sys: ActorSystem = ActorSystem() + + // #foldWhile + Source(1 to 10) + .foldWhile(0)(_ < 10)(_ + _) + .runForeach(println) + // Expect prints: + // 10 + // #foldWhile +} diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java index d95e1eab5af..bdfcf3a6d5a 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java @@ -237,6 +237,18 @@ public void mustBeAbleToUseMapWithResource() { Assert.assertFalse(gate.get()); } + @Test + public void mustBeAbleToUseFoldWhile() throws Exception { + final int result = + Source.range(1, 10) + .via(Flow.of(Integer.class).foldWhile(0, acc -> acc < 10, Integer::sum)) + .toMat(Sink.head(), Keep.right()) + .run(system) + .toCompletableFuture() + .get(1, TimeUnit.SECONDS); + Assert.assertEquals(10, result); + } + @Test public void mustBeAbleToUseIntersperse() throws Exception { final TestKit probe = new TestKit(system); diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java index cdcfd588bd2..1e48272dbb0 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java @@ -13,29 +13,31 @@ package org.apache.pekko.stream.javadsl; -import java.util.Arrays; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.*; -import java.util.stream.Collectors; - import org.apache.pekko.Done; import org.apache.pekko.NotUsed; import org.apache.pekko.japi.Pair; import org.apache.pekko.japi.function.Function; -import org.apache.pekko.stream.*; +import org.apache.pekko.stream.Attributes; +import org.apache.pekko.stream.Graph; +import org.apache.pekko.stream.StreamTest; +import org.apache.pekko.stream.UniformFanOutShape; import org.apache.pekko.stream.testkit.TestSubscriber; import org.apache.pekko.stream.testkit.javadsl.TestSink; +import org.apache.pekko.testkit.PekkoJUnitActorSystemResource; +import org.apache.pekko.testkit.PekkoSpec; import org.apache.pekko.testkit.javadsl.TestKit; import org.junit.ClassRule; import org.junit.Test; import org.reactivestreams.Publisher; -import org.apache.pekko.testkit.PekkoSpec; -import org.apache.pekko.testkit.PekkoJUnitActorSystemResource; import org.reactivestreams.Subscription; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.*; +import java.util.stream.Collectors; + import static org.junit.Assert.*; public class SinkTest extends StreamTest { @@ -70,6 +72,16 @@ public void mustBeAbleToUseFold() throws Exception { Source.from(new ArrayList()).runWith(foldSink, system); } + @Test + public void mustBeAbleToUseFoldWhile() throws Exception { + final int result = + Source.range(1, 10) + .runWith(Sink.foldWhile(0, acc -> acc < 10, Integer::sum), system) + .toCompletableFuture() + .get(1, TimeUnit.SECONDS); + assertEquals(10, result); + } + @Test public void mustBeAbleToUseActorRefSink() throws Exception { final TestKit probe = new TestKit(system); diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java index 68345bc7398..b1ddb49111d 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java @@ -815,6 +815,18 @@ public void mustBeAbleToUseMapWithResource() { Assert.assertFalse(gate.get()); } + @Test + public void mustBeAbleToUseFoldWhile() throws Exception { + final int result = + Source.range(1, 10) + .foldWhile(0, acc -> acc < 10, Integer::sum) + .toMat(Sink.head(), Keep.right()) + .run(system) + .toCompletableFuture() + .get(1, TimeUnit.SECONDS); + Assert.assertEquals(10, result); + } + @Test public void mustBeAbleToUseIntersperse() throws Exception { final TestKit probe = new TestKit(system); diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala index df715bbf13f..35f2eb70142 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala @@ -67,6 +67,7 @@ class DslFactoriesConsistencySpec extends AnyWordSpec with Matchers { (classOf[scala.Function0[_]], classOf[pekko.japi.function.Creator[_]]) :: (classOf[scala.Function0[_]], classOf[java.util.concurrent.Callable[_]]) :: (classOf[scala.Function1[_, Unit]], classOf[pekko.japi.function.Procedure[_]]) :: + (classOf[scala.Function1[_, Boolean]], classOf[pekko.japi.function.Predicate[_]]) :: (classOf[scala.Function1[_, _]], classOf[pekko.japi.function.Function[_, _]]) :: (classOf[scala.Function2[_, _, _]], classOf[java.util.function.BiFunction[_, _, _]]) :: // setup (classOf[scala.Function1[scala.Function1[_, _], _]], classOf[pekko.japi.function.Function2[_, _, _]]) :: diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFoldWhileSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFoldWhileSpec.scala new file mode 100644 index 00000000000..93eab05ac28 --- /dev/null +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFoldWhileSpec.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +/* + * Copyright (C) 2014-2022 Lightbend Inc. + */ + +package org.apache.pekko.stream.scaladsl + +import org.apache.pekko +import pekko.stream.testkit.StreamSpec + +class FlowFoldWhileSpec extends StreamSpec { + + "A Fold While" must { + "can be used in the happy path" in { + Source(1 to 10) + .foldWhile(0)(_ < 10)(_ + _) + .runWith(Sink.head) + .futureValue shouldBe 10 + } + + "can be used to implement forAll" in { + val f: Int => Boolean = any => any < 5 + Source(1 to 10) + .foldWhile(true)(identity)(_ && f(_)) + .runWith(Sink.head) + .futureValue shouldBe false + } + + "can be used to implement exists" in { + val f: Int => Boolean = any => any > 5 + Source(1 to 10) + .foldWhile(false)(!_)(_ || f(_)) + .runWith(Sink.head) + .futureValue shouldBe true + } + } + +} diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkFoldSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkFoldSpec.scala new file mode 100644 index 00000000000..8b0f035028a --- /dev/null +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkFoldSpec.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.scaladsl + +import org.apache.pekko.stream.testkit.StreamSpec + +class SinkFoldSpec extends StreamSpec(""" + pekko.stream.materializer.initial-input-buffer-size = 2 + """) { + + "sink.foldWhile" must { + "works in happy path" in { + Source(1 to 10) + .runWith(Sink.foldWhile(0L)(_ < 10)(_ + _)) + .futureValue shouldBe 10 + } + } + +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala index a488bd6a47b..6bb286bd5a5 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala @@ -62,6 +62,7 @@ import pekko.stream.Attributes._ val scan = name("scan") val scanAsync = name("scanAsync") val fold = name("fold") + val foldWhile = name("foldWhile") val foldAsync = name("foldAsync") val reduce = name("reduce") val intersperse = name("intersperse") diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala index 4f2a8031a2e..03d5baff776 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala @@ -14,7 +14,6 @@ package org.apache.pekko.stream.impl.fusing import java.util.concurrent.TimeUnit.NANOSECONDS - import scala.annotation.nowarn import scala.annotation.tailrec import scala.collection.immutable @@ -41,7 +40,7 @@ import pekko.stream.impl.Stages.DefaultAttributes import pekko.stream.impl.fusing.GraphStages.SimpleLinearGraphStage import pekko.stream.scaladsl.{ DelayStrategy, Source } import pekko.stream.stage._ -import pekko.util.{ unused, OptionVal } +import pekko.util.{ unused, ConstantFun, OptionVal } import pekko.util.ccompat._ /** @@ -601,15 +600,22 @@ private[stream] object Collect { /** * INTERNAL API */ -@InternalApi private[pekko] final case class Fold[In, Out](zero: Out, f: (Out, In) => Out) +@InternalApi private[pekko] object Fold { + def apply[In, Out](zero: Out, f: (Out, In) => Out): Fold[In, Out] = new Fold(zero, ConstantFun.anyToTrue, f) +} + +/** + * INTERNAL API + */ +@InternalApi private[pekko] final case class Fold[In, Out](zero: Out, + predicate: Out => Boolean, + f: (Out, In) => Out) extends GraphStage[FlowShape[In, Out]] { val in = Inlet[In]("Fold.in") val out = Outlet[Out]("Fold.out") override val shape: FlowShape[In, Out] = FlowShape(in, out) - override def toString: String = "Fold" - override val initialAttributes = DefaultAttributes.fold and SourceLocation.forLambda(f) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = @@ -623,6 +629,10 @@ private[stream] object Collect { val elem = grab(in) try { aggregator = f(aggregator, elem) + if (!predicate(aggregator)) { + push(out, aggregator) + completeStage() + } } catch { case NonFatal(ex) => decider(ex) match { @@ -653,6 +663,8 @@ private[stream] object Collect { setHandlers(in, out, this) } + + override def toString: String = "Fold" } /** diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 258fdc1d7fa..92accbddd0b 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -1379,6 +1379,32 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr def fold[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Flow[In, T, Mat] = new Flow(delegate.fold(zero)(f.apply)) + /** + * Similar to `scan` but only emits its result when the upstream completes or the predicate `p` returns `false`. + * after which it also completes. Applies the given function towards its current and next value, + * yielding the next current value. + * + * If the function `f` throws an exception and the supervision decision is + * [[pekko.stream.Supervision.Restart]] current value starts at `zero` again + * the stream will continue. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * Note that the `zero` value must be immutable. + * + * '''Emits when''' upstream completes or the predicate `p` returns `false` + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * See also [[FlowOps.fold]] + */ + def foldWhile[T](zero: T, p: function.Predicate[T], f: function.Function2[T, Out, T]): javadsl.Flow[In, T, Mat] = + new Flow(delegate.foldWhile(zero)(p.test)(f.apply)) + /** * Similar to `fold` but with an asynchronous function. * Applies the given function towards its current and next value, diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala index c71f4338471..2fc19d7681e 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala @@ -57,6 +57,17 @@ object Sink { def fold[U, In](zero: U, f: function.Function2[U, In, U]): javadsl.Sink[In, CompletionStage[U]] = new Sink(scaladsl.Sink.fold[U, In](zero)(f.apply).toCompletionStage()) + /** + * A `Sink` that will invoke the given function for every received element, giving it its previous + * output (or the given `zero` value) and the element as input. + * The returned [[java.util.concurrent.CompletionStage]] will be completed with value of the final + * function evaluation when the input stream ends, predicate `p` returns false, or completed with `Failure` + * if there is a failure is signaled in the stream. + */ + def foldWhile[U, In]( + zero: U, p: function.Predicate[U], f: function.Function2[U, In, U]): javadsl.Sink[In, CompletionStage[U]] = + new Sink(scaladsl.Sink.foldWhile[U, In](zero)(p.test)(f.apply).toCompletionStage()) + /** * A `Sink` that will invoke the given asynchronous function for every received element, giving it its previous * output (or the given `zero` value) and the element as input. diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index 9227e9e0283..a2cdc6def38 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -3089,6 +3089,32 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ def fold[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Source[T, Mat] = new Source(delegate.fold(zero)(f.apply)) + /** + * Similar to `scan` but only emits its result when the upstream completes or the predicate `p` returns `false`. + * after which it also completes. Applies the given function towards its current and next value, + * yielding the next current value. + * + * If the function `f` throws an exception and the supervision decision is + * [[pekko.stream.Supervision.Restart]] current value starts at `zero` again + * the stream will continue. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * Note that the `zero` value must be immutable. + * + * '''Emits when''' upstream completes or the predicate `p` returns `false` + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * See also [[FlowOps.fold]] + */ + def foldWhile[T](zero: T, p: function.Predicate[T], f: function.Function2[T, Out, T]): javadsl.Source[T, Mat] = + new Source(delegate.foldWhile(zero)(p.test)(f.apply)) + /** * Similar to `fold` but with an asynchronous function. * Applies the given function towards its current and next value, diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index 681b40ea04f..1afa33569b3 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -757,6 +757,32 @@ class SubFlow[In, Out, Mat]( def fold[T](zero: T)(f: function.Function2[T, Out, T]): SubFlow[In, T, Mat] = new SubFlow(delegate.fold(zero)(f.apply)) + /** + * Similar to `scan` but only emits its result when the upstream completes or the predicate `p` returns `false`. + * after which it also completes. Applies the given function towards its current and next value, + * yielding the next current value. + * + * If the function `f` throws an exception and the supervision decision is + * [[pekko.stream.Supervision.Restart]] current value starts at `zero` again + * the stream will continue. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * Note that the `zero` value must be immutable. + * + * '''Emits when''' upstream completes or the predicate `p` returns `false` + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * See also [[FlowOps.fold]] + */ + def foldWhile[T](zero: T, p: function.Predicate[T], f: function.Function2[T, Out, T]): SubFlow[In, T, Mat] = + new SubFlow(delegate.foldWhile(zero)(p.test)(f.apply)) + /** * Similar to `fold` but with an asynchronous function. * Applies the given function towards its current and next value, diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index 4c00a25b2e5..0faa7e0d5e5 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -748,6 +748,32 @@ class SubSource[Out, Mat]( def fold[T](zero: T)(f: function.Function2[T, Out, T]): SubSource[T, Mat] = new SubSource(delegate.fold(zero)(f.apply)) + /** + * Similar to `scan` but only emits its result when the upstream completes or the predicate `p` returns `false`. + * after which it also completes. Applies the given function towards its current and next value, + * yielding the next current value. + * + * If the function `f` throws an exception and the supervision decision is + * [[pekko.stream.Supervision.Restart]] current value starts at `zero` again + * the stream will continue. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * Note that the `zero` value must be immutable. + * + * '''Emits when''' upstream completes or the predicate `p` returns `false` + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * See also [[FlowOps.fold]] + */ + def foldWhile[T](zero: T, p: function.Predicate[T], f: function.Function2[T, Out, T]): SubSource[T, Mat] = + new SubSource(delegate.foldWhile(zero)(p.test)(f.apply)) + /** * Similar to `fold` but with an asynchronous function. * Applies the given function towards its current and next value, diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index b86a766fd14..250af8cd246 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -1788,6 +1788,32 @@ trait FlowOps[+Out, +Mat] { */ def fold[T](zero: T)(f: (T, Out) => T): Repr[T] = via(Fold(zero, f)) + /** + * Similar to `scan` but only emits its result when the upstream completes or the predicate `p` returns `false`. + * after which it also completes. Applies the given function towards its current and next value, + * yielding the next current value. + * + * If the function `f` throws an exception and the supervision decision is + * [[pekko.stream.Supervision.Restart]] current value starts at `zero` again + * the stream will continue. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * Note that the `zero` value must be immutable. + * + * '''Emits when''' upstream completes or the predicate `p` returns `false` + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * See also [[FlowOps.fold]] + */ + def foldWhile[T](zero: T)(p: T => Boolean)(f: (T, Out) => T): Repr[T] = via( + Fold[Out, T](zero, p, f).withAttributes(DefaultAttributes.foldWhile)) + /** * Similar to `fold` but with an asynchronous function. * Applies the given function towards its current and next value, diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala index 935796a4789..84f1ed05082 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala @@ -423,6 +423,18 @@ object Sink { def fold[U, T](zero: U)(f: (U, T) => U): Sink[T, Future[U]] = Flow[T].fold(zero)(f).toMat(Sink.head)(Keep.right).named("foldSink") + /** + * A `Sink` that will invoke the given function for every received element, giving it its previous + * output (or the given `zero` value) and the element as input. + * The returned [[scala.concurrent.Future]] will be completed with value of the final + * function evaluation when the input stream ends, predicate `p` returns false, or completed with `Failure` + * if there is a failure signaled in the stream. + * + * @see [[#fold]] + */ + def foldWhile[U, T](zero: U)(p: U => Boolean)(f: (U, T) => U): Sink[T, Future[U]] = + Flow[T].foldWhile(zero)(p)(f).toMat(Sink.head)(Keep.right).named("foldWhileSink") + /** * A `Sink` that will invoke the given asynchronous function for every received element, giving it its previous * output (or the given `zero` value) and the element as input.