diff --git a/docs/src/main/paradox/stream/operators/Sink/foldWhile.md b/docs/src/main/paradox/stream/operators/Sink/foldWhile.md
new file mode 100644
index 00000000000..3c49b2a9595
--- /dev/null
+++ b/docs/src/main/paradox/stream/operators/Sink/foldWhile.md
@@ -0,0 +1,42 @@
+# Sink.foldWhile
+
+Fold over emitted elements with a function, where each invocation will get the new element and the result from the previous fold invocation.
+
+@ref[Sink operators](../index.md#sink-operators)
+
+## Signature
+
+@apidoc[Sink.foldWhile](Sink$) { scala="#foldWhile%5BU%2C%20T%5D(zero%3A%20U)(p%3A%20U%20%3D%3E%20Boolean)(f%3A%20(U%2C%20T)%20%3D%3E%20U):org.apache.pekko.stream.scaladsl.Sink[T,scala.concurrent.Future[U]]" java="#foldWhile(java.lang.Object,org.apache.pekko.japi.function.Predicate,org.apache.pekko.japi.function.Function2)" }
+
+## Description
+
+A Sink that will invoke the given function for every received element, giving it its previous output (or the given zero value)
+and the element as input.
+
+Materializes into a @scala[`Future`] @java[`CompletionStage`] that will complete with the last state when the stream has completed,
+predicate p returns false, or completed with Failure if there is a failure signaled in the stream.
+
+This operator allows combining values into a result without a global mutable state by instead passing the state along
+between invocations.
+
+## Example
+
+`foldWhile` is typically used to 'fold up' the incoming values into an aggregate with a predicate.
+For example, you can use `foldWhile` to calculate the sum while some predicate is true.
+
+Scala
+: @@snip [FoldWhile.scala](/docs/src/test/scala/docs/stream/operators/sink/FoldWhile.scala) { #foldWhile }
+
+Java
+: @@snip [FoldWhile.java](/docs/src/test/java/jdocs/stream/operators/sink/FoldWhile.java) { #foldWhile }
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**cancels** never
+
+**backpressures** when the previous fold function invocation has not yet completed
+
+@@@
+
diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/foldWhile.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/foldWhile.md
new file mode 100644
index 00000000000..1de0907726d
--- /dev/null
+++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/foldWhile.md
@@ -0,0 +1,48 @@
+# foldWhile
+
+Start with current value `zero` and then apply the current and next value to the given function. When upstream completes or the predicate `p` returns `false`, the current value is emitted downstream.
+
+@ref[Simple operators](../index.md#simple-operators)
+
+## Signature
+
+@apidoc[Source.foldWhile](Source) { scala="#foldWhile%5BT%5D(zero%3A%20T)(p%3A%20T%20%3D%3E%20Boolean)(f%3A%20(T%2C%20Out)%20%3D%3E%20T):FlowOps.this.Repr[T]" java="#foldWhile(java.lang.Object,org.apache.pekko.japi.function.Predicate,org.apache.pekko.japi.function.Function2)" }
+@apidoc[Flow.foldWhile](Flow) { scala="#foldWhile%5BT%5D(zero%3A%20T)(p%3A%20T%20%3D%3E%20Boolean)(f%3A%20(T%2C%20Out)%20%3D%3E%20T):FlowOps.this.Repr[T]" java="#foldWhile(java.lang.Object,org.apache.pekko.japi.function.Predicate,org.apache.pekko.japi.function.Function2)" }
+
+## Description
+
+Start with current value `zero` and then apply the current and next value to the given function. When upstream
+completes, the current value is emitted downstream.
+
+@@@ warning
+
+Note that the `zero` value must be immutable, because otherwise
+the same mutable instance would be shared across different threads
+when running the stream more than once.
+
+@@@
+
+## Example
+
+`foldWhile` is typically used to 'fold up' the incoming values into an aggregate with a predicate.
+For example, you can use `foldWhile` to calculate the sum while some predicate is true.
+
+
+Scala
+: @@snip [FoldWhile.scala](/docs/src/test/scala/docs/stream/operators/sourceorflow/FoldWhile.scala) { #imports #foldWhile }
+
+Java
+: @@snip [FoldWhile.java](/docs/src/test/java/jdocs/stream/operators/flow/FoldWhile.java) { #foldWhile }
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** when upstream completes
+
+**backpressures** when downstream backpressures
+
+**completes** when upstream completes
+
+@@@
+
diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md
index 4fbb26ebcb0..4e2311aec16 100644
--- a/docs/src/main/paradox/stream/operators/index.md
+++ b/docs/src/main/paradox/stream/operators/index.md
@@ -61,6 +61,7 @@ These built-in sinks are available from @scala[`org.apache.pekko.stream.scaladsl
|Sink|@ref[combine](Sink/combine.md)|Combine several sinks into one using a user specified strategy|
|Sink|@ref[completionStageSink](Sink/completionStageSink.md)|Streams the elements to the given future sink once it successfully completes. |
|Sink|@ref[fold](Sink/fold.md)|Fold over emitted element with a function, where each invocation will get the new element and the result from the previous fold invocation.|
+|Sink|@ref[foldWhile](Sink/foldWhile.md)|Fold over emitted element with a function, where each invocation will get the new element and the result from the previous fold invocation.|
|Sink|@ref[foreach](Sink/foreach.md)|Invoke a given procedure for each element received.|
|Sink|@ref[foreachAsync](Sink/foreachAsync.md)|Invoke a given procedure asynchronously for each element received.|
|Sink|@ref[foreachParallel](Sink/foreachParallel.md)|Like `foreach` but allows up to `parallellism` procedure calls to happen in parallel.|
@@ -158,6 +159,7 @@ depending on being backpressured by downstream or not.
|Flow|@ref[flattenOptional](Flow/flattenOptional.md)|Collect the value of `Optional` from all the elements passing through this flow , empty `Optional` is filtered out.|
|Source/Flow|@ref[fold](Source-or-Flow/fold.md)|Start with current value `zero` and then apply the current and next value to the given function. When upstream completes, the current value is emitted downstream.|
|Source/Flow|@ref[foldAsync](Source-or-Flow/foldAsync.md)|Just like `fold` but receives a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value.|
+|Source/Flow|@ref[foldWhile](Source-or-Flow/foldWhile.md)|Start with current value `zero` and then apply the current and next value to the given function. When upstream completes or the predicate `p` returns `false`, the current value is emitted downstream.|
|Source/Flow|@ref[fromMaterializer](Source-or-Flow/fromMaterializer.md)|Defer the creation of a `Source/Flow` until materialization and access `Materializer` and `Attributes`|
|Flow|@ref[futureFlow](Flow/futureFlow.md)|Streams the elements through the given future flow once it successfully completes.|
|Source/Flow|@ref[grouped](Source-or-Flow/grouped.md)|Accumulate incoming events until the specified number of elements have been accumulated and then pass the collection of elements downstream.|
@@ -453,6 +455,8 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [fold](Source-or-Flow/fold.md)
* [fold](Sink/fold.md)
* [foldAsync](Source-or-Flow/foldAsync.md)
+* [foldWhile](Source-or-Flow/foldWhile.md)
+* [foldWhile](Sink/foldWhile.md)
* [foreach](Sink/foreach.md)
* [foreachAsync](Sink/foreachAsync.md)
* [foreachParallel](Sink/foreachParallel.md)
diff --git a/docs/src/test/java/jdocs/stream/operators/flow/FoldWhile.java b/docs/src/test/java/jdocs/stream/operators/flow/FoldWhile.java
new file mode 100644
index 00000000000..2130835dc07
--- /dev/null
+++ b/docs/src/test/java/jdocs/stream/operators/flow/FoldWhile.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package jdocs.stream.operators.flow;
+
+// #imports
+
+import org.apache.pekko.actor.ActorSystem;
+import org.apache.pekko.stream.javadsl.Source;
+
+// #imports
+
+public class FoldWhile {
+ private static final ActorSystem system = null;
+
+ private void foldWhileUsage() {
+ // #foldWhile
+ Source.range(1, 10)
+ .foldWhile(0, acc -> acc < 10, Integer::sum)
+ .runForeach(System.out::println, system);
+ // Expect prints:
+ // 100
+ // #foldWhile
+ }
+}
diff --git a/docs/src/test/java/jdocs/stream/operators/sink/FoldWhile.java b/docs/src/test/java/jdocs/stream/operators/sink/FoldWhile.java
new file mode 100644
index 00000000000..f821c6fe3b7
--- /dev/null
+++ b/docs/src/test/java/jdocs/stream/operators/sink/FoldWhile.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package jdocs.stream.operators.sink;
+
+import org.apache.pekko.actor.ActorSystem;
+import org.apache.pekko.stream.javadsl.Sink;
+import org.apache.pekko.stream.javadsl.Source;
+
+public class FoldWhile {
+ private ActorSystem system = null;
+
+ public void foldWhileUsage() throws Exception {
+ // #foldWhile
+ final int result =
+ Source.range(1, 10)
+ .runWith(Sink.foldWhile(0, acc -> acc < 10, Integer::sum), system)
+ .toCompletableFuture()
+ .get();
+ System.out.println(result);
+ // Expect prints:
+ // 10
+ // #foldWhile
+ }
+}
diff --git a/docs/src/test/scala/docs/stream/operators/sink/FoldWhile.scala b/docs/src/test/scala/docs/stream/operators/sink/FoldWhile.scala
new file mode 100644
index 00000000000..22b20d4c1f6
--- /dev/null
+++ b/docs/src/test/scala/docs/stream/operators/sink/FoldWhile.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package docs.stream.operators.sink
+
+import org.apache.pekko.actor.ActorSystem
+import org.apache.pekko.stream.scaladsl.{ Sink, Source }
+
+import scala.concurrent.duration.DurationInt
+import scala.concurrent.{ Await, ExecutionContextExecutor }
+
+class FoldWhile {
+ implicit val system: ActorSystem = ???
+ implicit val ec: ExecutionContextExecutor = system.dispatcher
+
+ // #foldWhile
+ val r = Source(1 to 10)
+ .runWith(Sink.foldWhile(0L)(_ < 10)(_ + _))
+ println(Await.result(r, 3.seconds))
+ // Expect prints:
+ // 10
+ // #foldWhile
+}
diff --git a/docs/src/test/scala/docs/stream/operators/sourceorflow/Fold.scala b/docs/src/test/scala/docs/stream/operators/sourceorflow/Fold.scala
index 65541c6bd43..d3d3c7a0a66 100644
--- a/docs/src/test/scala/docs/stream/operators/sourceorflow/Fold.scala
+++ b/docs/src/test/scala/docs/stream/operators/sourceorflow/Fold.scala
@@ -34,4 +34,10 @@ object Fold extends App {
// Prints: Histogram(99,51)
// #fold
+
+ // #foldWhile
+ Source(1 to 100)
+ .foldWhile(0)(elem => elem < 100)(_ + _)
+ .runForeach(println)
+ // #foldWhile
}
diff --git a/docs/src/test/scala/docs/stream/operators/sourceorflow/FoldWhile.scala b/docs/src/test/scala/docs/stream/operators/sourceorflow/FoldWhile.scala
new file mode 100644
index 00000000000..ce041e39ad0
--- /dev/null
+++ b/docs/src/test/scala/docs/stream/operators/sourceorflow/FoldWhile.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package docs.stream.operators.sourceorflow
+
+//#imports
+import org.apache.pekko
+import pekko.actor.ActorSystem
+import pekko.stream.scaladsl.Source
+
+//#imports
+object FoldWhile extends App {
+
+ implicit val sys: ActorSystem = ActorSystem()
+
+ // #foldWhile
+ Source(1 to 10)
+ .foldWhile(0)(_ < 10)(_ + _)
+ .runForeach(println)
+ // Expect prints:
+ // 10
+ // #foldWhile
+}
diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
index d95e1eab5af..bdfcf3a6d5a 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
@@ -237,6 +237,18 @@ public void mustBeAbleToUseMapWithResource() {
Assert.assertFalse(gate.get());
}
+ @Test
+ public void mustBeAbleToUseFoldWhile() throws Exception {
+ final int result =
+ Source.range(1, 10)
+ .via(Flow.of(Integer.class).foldWhile(0, acc -> acc < 10, Integer::sum))
+ .toMat(Sink.head(), Keep.right())
+ .run(system)
+ .toCompletableFuture()
+ .get(1, TimeUnit.SECONDS);
+ Assert.assertEquals(10, result);
+ }
+
@Test
public void mustBeAbleToUseIntersperse() throws Exception {
final TestKit probe = new TestKit(system);
diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java
index cdcfd588bd2..1e48272dbb0 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java
@@ -13,29 +13,31 @@
package org.apache.pekko.stream.javadsl;
-import java.util.Arrays;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.*;
-import java.util.stream.Collectors;
-
import org.apache.pekko.Done;
import org.apache.pekko.NotUsed;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.japi.function.Function;
-import org.apache.pekko.stream.*;
+import org.apache.pekko.stream.Attributes;
+import org.apache.pekko.stream.Graph;
+import org.apache.pekko.stream.StreamTest;
+import org.apache.pekko.stream.UniformFanOutShape;
import org.apache.pekko.stream.testkit.TestSubscriber;
import org.apache.pekko.stream.testkit.javadsl.TestSink;
+import org.apache.pekko.testkit.PekkoJUnitActorSystemResource;
+import org.apache.pekko.testkit.PekkoSpec;
import org.apache.pekko.testkit.javadsl.TestKit;
import org.junit.ClassRule;
import org.junit.Test;
import org.reactivestreams.Publisher;
-import org.apache.pekko.testkit.PekkoSpec;
-import org.apache.pekko.testkit.PekkoJUnitActorSystemResource;
import org.reactivestreams.Subscription;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.stream.Collectors;
+
import static org.junit.Assert.*;
public class SinkTest extends StreamTest {
@@ -70,6 +72,16 @@ public void mustBeAbleToUseFold() throws Exception {
Source.from(new ArrayList()).runWith(foldSink, system);
}
+ @Test
+ public void mustBeAbleToUseFoldWhile() throws Exception {
+ final int result =
+ Source.range(1, 10)
+ .runWith(Sink.foldWhile(0, acc -> acc < 10, Integer::sum), system)
+ .toCompletableFuture()
+ .get(1, TimeUnit.SECONDS);
+ assertEquals(10, result);
+ }
+
@Test
public void mustBeAbleToUseActorRefSink() throws Exception {
final TestKit probe = new TestKit(system);
diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
index 68345bc7398..b1ddb49111d 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
@@ -815,6 +815,18 @@ public void mustBeAbleToUseMapWithResource() {
Assert.assertFalse(gate.get());
}
+ @Test
+ public void mustBeAbleToUseFoldWhile() throws Exception {
+ final int result =
+ Source.range(1, 10)
+ .foldWhile(0, acc -> acc < 10, Integer::sum)
+ .toMat(Sink.head(), Keep.right())
+ .run(system)
+ .toCompletableFuture()
+ .get(1, TimeUnit.SECONDS);
+ Assert.assertEquals(10, result);
+ }
+
@Test
public void mustBeAbleToUseIntersperse() throws Exception {
final TestKit probe = new TestKit(system);
diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala
index df715bbf13f..35f2eb70142 100644
--- a/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala
+++ b/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala
@@ -67,6 +67,7 @@ class DslFactoriesConsistencySpec extends AnyWordSpec with Matchers {
(classOf[scala.Function0[_]], classOf[pekko.japi.function.Creator[_]]) ::
(classOf[scala.Function0[_]], classOf[java.util.concurrent.Callable[_]]) ::
(classOf[scala.Function1[_, Unit]], classOf[pekko.japi.function.Procedure[_]]) ::
+ (classOf[scala.Function1[_, Boolean]], classOf[pekko.japi.function.Predicate[_]]) ::
(classOf[scala.Function1[_, _]], classOf[pekko.japi.function.Function[_, _]]) ::
(classOf[scala.Function2[_, _, _]], classOf[java.util.function.BiFunction[_, _, _]]) :: // setup
(classOf[scala.Function1[scala.Function1[_, _], _]], classOf[pekko.japi.function.Function2[_, _, _]]) ::
diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFoldWhileSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFoldWhileSpec.scala
new file mode 100644
index 00000000000..93eab05ac28
--- /dev/null
+++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFoldWhileSpec.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2014-2022 Lightbend Inc.
+ */
+
+package org.apache.pekko.stream.scaladsl
+
+import org.apache.pekko
+import pekko.stream.testkit.StreamSpec
+
+class FlowFoldWhileSpec extends StreamSpec {
+
+ "A Fold While" must {
+ "can be used in the happy path" in {
+ Source(1 to 10)
+ .foldWhile(0)(_ < 10)(_ + _)
+ .runWith(Sink.head)
+ .futureValue shouldBe 10
+ }
+
+ "can be used to implement forAll" in {
+ val f: Int => Boolean = any => any < 5
+ Source(1 to 10)
+ .foldWhile(true)(identity)(_ && f(_))
+ .runWith(Sink.head)
+ .futureValue shouldBe false
+ }
+
+ "can be used to implement exists" in {
+ val f: Int => Boolean = any => any > 5
+ Source(1 to 10)
+ .foldWhile(false)(!_)(_ || f(_))
+ .runWith(Sink.head)
+ .futureValue shouldBe true
+ }
+ }
+
+}
diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkFoldSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkFoldSpec.scala
new file mode 100644
index 00000000000..8b0f035028a
--- /dev/null
+++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkFoldSpec.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.scaladsl
+
+import org.apache.pekko.stream.testkit.StreamSpec
+
+class SinkFoldSpec extends StreamSpec("""
+ pekko.stream.materializer.initial-input-buffer-size = 2
+ """) {
+
+ "sink.foldWhile" must {
+ "works in happy path" in {
+ Source(1 to 10)
+ .runWith(Sink.foldWhile(0L)(_ < 10)(_ + _))
+ .futureValue shouldBe 10
+ }
+ }
+
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
index a488bd6a47b..6bb286bd5a5 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
@@ -62,6 +62,7 @@ import pekko.stream.Attributes._
val scan = name("scan")
val scanAsync = name("scanAsync")
val fold = name("fold")
+ val foldWhile = name("foldWhile")
val foldAsync = name("foldAsync")
val reduce = name("reduce")
val intersperse = name("intersperse")
diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
index 4f2a8031a2e..03d5baff776 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
@@ -14,7 +14,6 @@
package org.apache.pekko.stream.impl.fusing
import java.util.concurrent.TimeUnit.NANOSECONDS
-
import scala.annotation.nowarn
import scala.annotation.tailrec
import scala.collection.immutable
@@ -41,7 +40,7 @@ import pekko.stream.impl.Stages.DefaultAttributes
import pekko.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import pekko.stream.scaladsl.{ DelayStrategy, Source }
import pekko.stream.stage._
-import pekko.util.{ unused, OptionVal }
+import pekko.util.{ unused, ConstantFun, OptionVal }
import pekko.util.ccompat._
/**
@@ -601,15 +600,22 @@ private[stream] object Collect {
/**
* INTERNAL API
*/
-@InternalApi private[pekko] final case class Fold[In, Out](zero: Out, f: (Out, In) => Out)
+@InternalApi private[pekko] object Fold {
+ def apply[In, Out](zero: Out, f: (Out, In) => Out): Fold[In, Out] = new Fold(zero, ConstantFun.anyToTrue, f)
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] final case class Fold[In, Out](zero: Out,
+ predicate: Out => Boolean,
+ f: (Out, In) => Out)
extends GraphStage[FlowShape[In, Out]] {
val in = Inlet[In]("Fold.in")
val out = Outlet[Out]("Fold.out")
override val shape: FlowShape[In, Out] = FlowShape(in, out)
- override def toString: String = "Fold"
-
override val initialAttributes = DefaultAttributes.fold and SourceLocation.forLambda(f)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
@@ -623,6 +629,10 @@ private[stream] object Collect {
val elem = grab(in)
try {
aggregator = f(aggregator, elem)
+ if (!predicate(aggregator)) {
+ push(out, aggregator)
+ completeStage()
+ }
} catch {
case NonFatal(ex) =>
decider(ex) match {
@@ -653,6 +663,8 @@ private[stream] object Collect {
setHandlers(in, out, this)
}
+
+ override def toString: String = "Fold"
}
/**
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index 258fdc1d7fa..92accbddd0b 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -1379,6 +1379,32 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
def fold[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.fold(zero)(f.apply))
+ /**
+ * Similar to `scan` but only emits its result when the upstream completes or the predicate `p` returns `false`.
+ * after which it also completes. Applies the given function towards its current and next value,
+ * yielding the next current value.
+ *
+ * If the function `f` throws an exception and the supervision decision is
+ * [[pekko.stream.Supervision.Restart]] current value starts at `zero` again
+ * the stream will continue.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * Note that the `zero` value must be immutable.
+ *
+ * '''Emits when''' upstream completes or the predicate `p` returns `false`
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * See also [[FlowOps.fold]]
+ */
+ def foldWhile[T](zero: T, p: function.Predicate[T], f: function.Function2[T, Out, T]): javadsl.Flow[In, T, Mat] =
+ new Flow(delegate.foldWhile(zero)(p.test)(f.apply))
+
/**
* Similar to `fold` but with an asynchronous function.
* Applies the given function towards its current and next value,
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
index c71f4338471..2fc19d7681e 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
@@ -57,6 +57,17 @@ object Sink {
def fold[U, In](zero: U, f: function.Function2[U, In, U]): javadsl.Sink[In, CompletionStage[U]] =
new Sink(scaladsl.Sink.fold[U, In](zero)(f.apply).toCompletionStage())
+ /**
+ * A `Sink` that will invoke the given function for every received element, giving it its previous
+ * output (or the given `zero` value) and the element as input.
+ * The returned [[java.util.concurrent.CompletionStage]] will be completed with value of the final
+ * function evaluation when the input stream ends, predicate `p` returns false, or completed with `Failure`
+ * if there is a failure is signaled in the stream.
+ */
+ def foldWhile[U, In](
+ zero: U, p: function.Predicate[U], f: function.Function2[U, In, U]): javadsl.Sink[In, CompletionStage[U]] =
+ new Sink(scaladsl.Sink.foldWhile[U, In](zero)(p.test)(f.apply).toCompletionStage())
+
/**
* A `Sink` that will invoke the given asynchronous function for every received element, giving it its previous
* output (or the given `zero` value) and the element as input.
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index 9227e9e0283..a2cdc6def38 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -3089,6 +3089,32 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
def fold[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Source[T, Mat] =
new Source(delegate.fold(zero)(f.apply))
+ /**
+ * Similar to `scan` but only emits its result when the upstream completes or the predicate `p` returns `false`.
+ * after which it also completes. Applies the given function towards its current and next value,
+ * yielding the next current value.
+ *
+ * If the function `f` throws an exception and the supervision decision is
+ * [[pekko.stream.Supervision.Restart]] current value starts at `zero` again
+ * the stream will continue.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * Note that the `zero` value must be immutable.
+ *
+ * '''Emits when''' upstream completes or the predicate `p` returns `false`
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * See also [[FlowOps.fold]]
+ */
+ def foldWhile[T](zero: T, p: function.Predicate[T], f: function.Function2[T, Out, T]): javadsl.Source[T, Mat] =
+ new Source(delegate.foldWhile(zero)(p.test)(f.apply))
+
/**
* Similar to `fold` but with an asynchronous function.
* Applies the given function towards its current and next value,
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
index 681b40ea04f..1afa33569b3 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
@@ -757,6 +757,32 @@ class SubFlow[In, Out, Mat](
def fold[T](zero: T)(f: function.Function2[T, Out, T]): SubFlow[In, T, Mat] =
new SubFlow(delegate.fold(zero)(f.apply))
+ /**
+ * Similar to `scan` but only emits its result when the upstream completes or the predicate `p` returns `false`.
+ * after which it also completes. Applies the given function towards its current and next value,
+ * yielding the next current value.
+ *
+ * If the function `f` throws an exception and the supervision decision is
+ * [[pekko.stream.Supervision.Restart]] current value starts at `zero` again
+ * the stream will continue.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * Note that the `zero` value must be immutable.
+ *
+ * '''Emits when''' upstream completes or the predicate `p` returns `false`
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * See also [[FlowOps.fold]]
+ */
+ def foldWhile[T](zero: T, p: function.Predicate[T], f: function.Function2[T, Out, T]): SubFlow[In, T, Mat] =
+ new SubFlow(delegate.foldWhile(zero)(p.test)(f.apply))
+
/**
* Similar to `fold` but with an asynchronous function.
* Applies the given function towards its current and next value,
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
index 4c00a25b2e5..0faa7e0d5e5 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
@@ -748,6 +748,32 @@ class SubSource[Out, Mat](
def fold[T](zero: T)(f: function.Function2[T, Out, T]): SubSource[T, Mat] =
new SubSource(delegate.fold(zero)(f.apply))
+ /**
+ * Similar to `scan` but only emits its result when the upstream completes or the predicate `p` returns `false`.
+ * after which it also completes. Applies the given function towards its current and next value,
+ * yielding the next current value.
+ *
+ * If the function `f` throws an exception and the supervision decision is
+ * [[pekko.stream.Supervision.Restart]] current value starts at `zero` again
+ * the stream will continue.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * Note that the `zero` value must be immutable.
+ *
+ * '''Emits when''' upstream completes or the predicate `p` returns `false`
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * See also [[FlowOps.fold]]
+ */
+ def foldWhile[T](zero: T, p: function.Predicate[T], f: function.Function2[T, Out, T]): SubSource[T, Mat] =
+ new SubSource(delegate.foldWhile(zero)(p.test)(f.apply))
+
/**
* Similar to `fold` but with an asynchronous function.
* Applies the given function towards its current and next value,
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index b86a766fd14..250af8cd246 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -1788,6 +1788,32 @@ trait FlowOps[+Out, +Mat] {
*/
def fold[T](zero: T)(f: (T, Out) => T): Repr[T] = via(Fold(zero, f))
+ /**
+ * Similar to `scan` but only emits its result when the upstream completes or the predicate `p` returns `false`.
+ * after which it also completes. Applies the given function towards its current and next value,
+ * yielding the next current value.
+ *
+ * If the function `f` throws an exception and the supervision decision is
+ * [[pekko.stream.Supervision.Restart]] current value starts at `zero` again
+ * the stream will continue.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * Note that the `zero` value must be immutable.
+ *
+ * '''Emits when''' upstream completes or the predicate `p` returns `false`
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * See also [[FlowOps.fold]]
+ */
+ def foldWhile[T](zero: T)(p: T => Boolean)(f: (T, Out) => T): Repr[T] = via(
+ Fold[Out, T](zero, p, f).withAttributes(DefaultAttributes.foldWhile))
+
/**
* Similar to `fold` but with an asynchronous function.
* Applies the given function towards its current and next value,
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
index 935796a4789..84f1ed05082 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
@@ -423,6 +423,18 @@ object Sink {
def fold[U, T](zero: U)(f: (U, T) => U): Sink[T, Future[U]] =
Flow[T].fold(zero)(f).toMat(Sink.head)(Keep.right).named("foldSink")
+ /**
+ * A `Sink` that will invoke the given function for every received element, giving it its previous
+ * output (or the given `zero` value) and the element as input.
+ * The returned [[scala.concurrent.Future]] will be completed with value of the final
+ * function evaluation when the input stream ends, predicate `p` returns false, or completed with `Failure`
+ * if there is a failure signaled in the stream.
+ *
+ * @see [[#fold]]
+ */
+ def foldWhile[U, T](zero: U)(p: U => Boolean)(f: (U, T) => U): Sink[T, Future[U]] =
+ Flow[T].foldWhile(zero)(p)(f).toMat(Sink.head)(Keep.right).named("foldWhileSink")
+
/**
* A `Sink` that will invoke the given asynchronous function for every received element, giving it its previous
* output (or the given `zero` value) and the element as input.