Skip to content

Commit

Permalink
feat: Add Flow/Sink#foldWhile operator. (#1012)
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin authored Jan 22, 2024
1 parent f99a9ea commit 2431eb4
Show file tree
Hide file tree
Showing 23 changed files with 551 additions and 16 deletions.
42 changes: 42 additions & 0 deletions docs/src/main/paradox/stream/operators/Sink/foldWhile.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Sink.foldWhile

Fold over emitted elements with a function, where each invocation will get the new element and the result from the previous fold invocation.

@ref[Sink operators](../index.md#sink-operators)

## Signature

@apidoc[Sink.foldWhile](Sink$) { scala="#foldWhile%5BU%2C%20T%5D(zero%3A%20U)(p%3A%20U%20%3D%3E%20Boolean)(f%3A%20(U%2C%20T)%20%3D%3E%20U):org.apache.pekko.stream.scaladsl.Sink[T,scala.concurrent.Future[U]]" java="#foldWhile(java.lang.Object,org.apache.pekko.japi.function.Predicate,org.apache.pekko.japi.function.Function2)" }

## Description

A Sink that will invoke the given function for every received element, giving it its previous output (or the given zero value)
and the element as input.

Materializes into a @scala[`Future`] @java[`CompletionStage`] that will complete with the last state when the stream has completed,
predicate p returns false, or completed with Failure if there is a failure signaled in the stream.

This operator allows combining values into a result without a global mutable state by instead passing the state along
between invocations.

## Example

`foldWhile` is typically used to 'fold up' the incoming values into an aggregate with a predicate.
For example, you can use `foldWhile` to calculate the sum while some predicate is true.

Scala
: @@snip [FoldWhile.scala](/docs/src/test/scala/docs/stream/operators/sink/FoldWhile.scala) { #foldWhile }

Java
: @@snip [FoldWhile.java](/docs/src/test/java/jdocs/stream/operators/sink/FoldWhile.java) { #foldWhile }

## Reactive Streams semantics

@@@div { .callout }

**cancels** never

**backpressures** when the previous fold function invocation has not yet completed

@@@

48 changes: 48 additions & 0 deletions docs/src/main/paradox/stream/operators/Source-or-Flow/foldWhile.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# foldWhile

Start with current value `zero` and then apply the current and next value to the given function. When upstream completes or the predicate `p` returns `false`, the current value is emitted downstream.

@ref[Simple operators](../index.md#simple-operators)

## Signature

@apidoc[Source.foldWhile](Source) { scala="#foldWhile%5BT%5D(zero%3A%20T)(p%3A%20T%20%3D%3E%20Boolean)(f%3A%20(T%2C%20Out)%20%3D%3E%20T):FlowOps.this.Repr[T]" java="#foldWhile(java.lang.Object,org.apache.pekko.japi.function.Predicate,org.apache.pekko.japi.function.Function2)" }
@apidoc[Flow.foldWhile](Flow) { scala="#foldWhile%5BT%5D(zero%3A%20T)(p%3A%20T%20%3D%3E%20Boolean)(f%3A%20(T%2C%20Out)%20%3D%3E%20T):FlowOps.this.Repr[T]" java="#foldWhile(java.lang.Object,org.apache.pekko.japi.function.Predicate,org.apache.pekko.japi.function.Function2)" }

## Description

Start with current value `zero` and then apply the current and next value to the given function. When upstream
completes, the current value is emitted downstream.

@@@ warning

Note that the `zero` value must be immutable, because otherwise
the same mutable instance would be shared across different threads
when running the stream more than once.

@@@

## Example

`foldWhile` is typically used to 'fold up' the incoming values into an aggregate with a predicate.
For example, you can use `foldWhile` to calculate the sum while some predicate is true.


Scala
: @@snip [FoldWhile.scala](/docs/src/test/scala/docs/stream/operators/sourceorflow/FoldWhile.scala) { #imports #foldWhile }

Java
: @@snip [FoldWhile.java](/docs/src/test/java/jdocs/stream/operators/flow/FoldWhile.java) { #foldWhile }

## Reactive Streams semantics

@@@div { .callout }

**emits** when upstream completes

**backpressures** when downstream backpressures

**completes** when upstream completes

@@@

4 changes: 4 additions & 0 deletions docs/src/main/paradox/stream/operators/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ These built-in sinks are available from @scala[`org.apache.pekko.stream.scaladsl
|Sink|<a name="combine"></a>@ref[combine](Sink/combine.md)|Combine several sinks into one using a user specified strategy|
|Sink|<a name="completionstagesink"></a>@ref[completionStageSink](Sink/completionStageSink.md)|Streams the elements to the given future sink once it successfully completes. |
|Sink|<a name="fold"></a>@ref[fold](Sink/fold.md)|Fold over emitted elements with a function, where each invocation will get the new element and the result from the previous fold invocation.|
|Sink|<a name="foldwhile"></a>@ref[foldWhile](Sink/foldWhile.md)|Fold over emitted elements with a function, where each invocation will get the new element and the result from the previous fold invocation.|
|Sink|<a name="foreach"></a>@ref[foreach](Sink/foreach.md)|Invoke a given procedure for each element received.|
|Sink|<a name="foreachasync"></a>@ref[foreachAsync](Sink/foreachAsync.md)|Invoke a given procedure asynchronously for each element received.|
|Sink|<a name="foreachparallel"></a>@ref[foreachParallel](Sink/foreachParallel.md)|Like `foreach` but allows up to `parallellism` procedure calls to happen in parallel.|
Expand Down Expand Up @@ -158,6 +159,7 @@ depending on being backpressured by downstream or not.
|Flow|<a name="flattenoptional"></a>@ref[flattenOptional](Flow/flattenOptional.md)|Collect the value of `Optional` from all the elements passing through this flow , empty `Optional` is filtered out.|
|Source/Flow|<a name="fold"></a>@ref[fold](Source-or-Flow/fold.md)|Start with current value `zero` and then apply the current and next value to the given function. When upstream completes, the current value is emitted downstream.|
|Source/Flow|<a name="foldasync"></a>@ref[foldAsync](Source-or-Flow/foldAsync.md)|Just like `fold` but receives a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value.|
|Source/Flow|<a name="foldwhile"></a>@ref[foldWhile](Source-or-Flow/foldWhile.md)|Start with current value `zero` and then apply the current and next value to the given function. When upstream completes or the predicate `p` returns `false`, the current value is emitted downstream.|
|Source/Flow|<a name="frommaterializer"></a>@ref[fromMaterializer](Source-or-Flow/fromMaterializer.md)|Defer the creation of a `Source/Flow` until materialization and access `Materializer` and `Attributes`|
|Flow|<a name="futureflow"></a>@ref[futureFlow](Flow/futureFlow.md)|Streams the elements through the given future flow once it successfully completes.|
|Source/Flow|<a name="grouped"></a>@ref[grouped](Source-or-Flow/grouped.md)|Accumulate incoming events until the specified number of elements have been accumulated and then pass the collection of elements downstream.|
Expand Down Expand Up @@ -453,6 +455,8 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [fold](Source-or-Flow/fold.md)
* [fold](Sink/fold.md)
* [foldAsync](Source-or-Flow/foldAsync.md)
* [foldWhile](Source-or-Flow/foldWhile.md)
* [foldWhile](Sink/foldWhile.md)
* [foreach](Sink/foreach.md)
* [foreachAsync](Sink/foreachAsync.md)
* [foreachParallel](Sink/foreachParallel.md)
Expand Down
39 changes: 39 additions & 0 deletions docs/src/test/java/jdocs/stream/operators/flow/FoldWhile.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package jdocs.stream.operators.flow;

// #imports

import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.stream.javadsl.Source;

// #imports

public class FoldWhile {
private static final ActorSystem system = null;

private void foldWhileUsage() {
// #foldWhile
Source.range(1, 10)
.foldWhile(0, acc -> acc < 10, Integer::sum)
.runForeach(System.out::println, system);
// Expect prints:
// 100
// #foldWhile
}
}
39 changes: 39 additions & 0 deletions docs/src/test/java/jdocs/stream/operators/sink/FoldWhile.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package jdocs.stream.operators.sink;

import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;

public class FoldWhile {
private ActorSystem system = null;

public void foldWhileUsage() throws Exception {
// #foldWhile
final int result =
Source.range(1, 10)
.runWith(Sink.foldWhile(0, acc -> acc < 10, Integer::sum), system)
.toCompletableFuture()
.get();
System.out.println(result);
// Expect prints:
// 10
// #foldWhile
}
}
37 changes: 37 additions & 0 deletions docs/src/test/scala/docs/stream/operators/sink/FoldWhile.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package docs.stream.operators.sink

import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.scaladsl.{ Sink, Source }

import scala.concurrent.duration.DurationInt
import scala.concurrent.{ Await, ExecutionContextExecutor }

class FoldWhile {
implicit val system: ActorSystem = ???
implicit val ec: ExecutionContextExecutor = system.dispatcher

// #foldWhile
val r = Source(1 to 10)
.runWith(Sink.foldWhile(0L)(_ < 10)(_ + _))
println(Await.result(r, 3.seconds))
// Expect prints:
// 10
// #foldWhile
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,10 @@ object Fold extends App {

// Prints: Histogram(99,51)
// #fold

// #foldWhile
Source(1 to 100)
.foldWhile(0)(elem => elem < 100)(_ + _)
.runForeach(println)
// #foldWhile
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package docs.stream.operators.sourceorflow

//#imports
import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.stream.scaladsl.Source

//#imports
object FoldWhile extends App {

implicit val sys: ActorSystem = ActorSystem()

// #foldWhile
Source(1 to 10)
.foldWhile(0)(_ < 10)(_ + _)
.runForeach(println)
// Expect prints:
// 10
// #foldWhile
}
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,18 @@ public void mustBeAbleToUseMapWithResource() {
Assert.assertFalse(gate.get());
}

@Test
public void mustBeAbleToUseFoldWhile() throws Exception {
final int result =
Source.range(1, 10)
.via(Flow.of(Integer.class).foldWhile(0, acc -> acc < 10, Integer::sum))
.toMat(Sink.head(), Keep.right())
.run(system)
.toCompletableFuture()
.get(1, TimeUnit.SECONDS);
Assert.assertEquals(10, result);
}

@Test
public void mustBeAbleToUseIntersperse() throws Exception {
final TestKit probe = new TestKit(system);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,31 @@

package org.apache.pekko.stream.javadsl;

import java.util.Arrays;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;

import org.apache.pekko.Done;
import org.apache.pekko.NotUsed;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.japi.function.Function;
import org.apache.pekko.stream.*;
import org.apache.pekko.stream.Attributes;
import org.apache.pekko.stream.Graph;
import org.apache.pekko.stream.StreamTest;
import org.apache.pekko.stream.UniformFanOutShape;
import org.apache.pekko.stream.testkit.TestSubscriber;
import org.apache.pekko.stream.testkit.javadsl.TestSink;
import org.apache.pekko.testkit.PekkoJUnitActorSystemResource;
import org.apache.pekko.testkit.PekkoSpec;
import org.apache.pekko.testkit.javadsl.TestKit;
import org.junit.ClassRule;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.apache.pekko.testkit.PekkoSpec;
import org.apache.pekko.testkit.PekkoJUnitActorSystemResource;
import org.reactivestreams.Subscription;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;

import static org.junit.Assert.*;

public class SinkTest extends StreamTest {
Expand Down Expand Up @@ -70,6 +72,16 @@ public void mustBeAbleToUseFold() throws Exception {
Source.from(new ArrayList<Integer>()).runWith(foldSink, system);
}

@Test
public void mustBeAbleToUseFoldWhile() throws Exception {
final int result =
Source.range(1, 10)
.runWith(Sink.foldWhile(0, acc -> acc < 10, Integer::sum), system)
.toCompletableFuture()
.get(1, TimeUnit.SECONDS);
assertEquals(10, result);
}

@Test
public void mustBeAbleToUseActorRefSink() throws Exception {
final TestKit probe = new TestKit(system);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,18 @@ public void mustBeAbleToUseMapWithResource() {
Assert.assertFalse(gate.get());
}

@Test
public void mustBeAbleToUseFoldWhile() throws Exception {
final int result =
Source.range(1, 10)
.foldWhile(0, acc -> acc < 10, Integer::sum)
.toMat(Sink.head(), Keep.right())
.run(system)
.toCompletableFuture()
.get(1, TimeUnit.SECONDS);
Assert.assertEquals(10, result);
}

@Test
public void mustBeAbleToUseIntersperse() throws Exception {
final TestKit probe = new TestKit(system);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class DslFactoriesConsistencySpec extends AnyWordSpec with Matchers {
(classOf[scala.Function0[_]], classOf[pekko.japi.function.Creator[_]]) ::
(classOf[scala.Function0[_]], classOf[java.util.concurrent.Callable[_]]) ::
(classOf[scala.Function1[_, Unit]], classOf[pekko.japi.function.Procedure[_]]) ::
(classOf[scala.Function1[_, Boolean]], classOf[pekko.japi.function.Predicate[_]]) ::
(classOf[scala.Function1[_, _]], classOf[pekko.japi.function.Function[_, _]]) ::
(classOf[scala.Function2[_, _, _]], classOf[java.util.function.BiFunction[_, _, _]]) :: // setup
(classOf[scala.Function1[scala.Function1[_, _], _]], classOf[pekko.japi.function.Function2[_, _, _]]) ::
Expand Down
Loading

0 comments on commit 2431eb4

Please sign in to comment.