Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Sink.none operator #1614

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions docs/src/main/paradox/stream/operators/Sink/none.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Sink.none

A `Sink` that will test the given predicate `p` for every received element and completes with the result.

@ref[Sink operators](../index.md#sink-operators)

## Signature

@apidoc[Sink.none](Sink$) { scala="#none[T](p:T=%3EBoolean):org.apache.pekko.stream.scaladsl.Sink[T,scala.concurrent.Future[Boolean]]" java="#none(org.apache.pekko.japi.function.Predicate)" }

## Description
none operator applies a predicate function to assert each element received, it returns false if any element satisfy the assertion, otherwise it returns true.

It materializes into a `Future` (in Scala) or a `CompletionStage` (in Java) that completes with the last state when the stream has finished.

Notes that if source is empty, it will return true

A `Sink` that will test the given predicate `p` for every received element and

- completes and returns @scala[`Future`] @java[`CompletionStage`] of `true` if the predicate is false for all elements;
- completes and returns @scala[`Future`] @java[`CompletionStage`] of `true` if the stream is empty (i.e. completes before signalling any elements);
- completes and returns @scala[`Future`] @java[`CompletionStage`] of `false` if the predicate is true for any element.

The materialized value @scala[`Future`] @java[`CompletionStage`] will be completed with the value `true` or `false`
when the input stream ends, or completed with `Failure` if there is a failure signaled in the stream.

## Example

This example tests all elements in the stream is `<=` 100.

Scala
: @@snip [NoneMatch.scala](/docs/src/test/scala/docs/stream/operators/sink/NoneMatch.scala) { #none }

Java
: @@snip [NoneMatch.java](/docs/src/test/java/jdocs/stream/operators/sink/NoneMatch.java) { #none }

## Reactive Streams Semantics

@@@div { .callout }

***Completes*** when upstream completes or the predicate `p` returns `true`

**cancels** when predicate `p` returns `true`

**backpressures** when the invocation of predicate `p` has not yet completed

@@@
2 changes: 2 additions & 0 deletions docs/src/main/paradox/stream/operators/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ These built-in sinks are available from @scala[`org.apache.pekko.stream.scaladsl
|Sink|<a name="lazyinitasync"></a>@ref[lazyInitAsync](Sink/lazyInitAsync.md)|Deprecated by @ref[`Sink.lazyFutureSink`](Sink/lazyFutureSink.md).|
|Sink|<a name="lazysink"></a>@ref[lazySink](Sink/lazySink.md)|Defers creation and materialization of a `Sink` until there is a first element.|
|Sink|<a name="never"></a>@ref[never](Sink/never.md)|Always backpressure never cancel and never consume any elements from the stream.|
|Sink|<a name="none"></a>@ref[none](Sink/none.md)|A `Sink` that will test the given predicate `p` for every received element and completes with the result.|
|Sink|<a name="oncomplete"></a>@ref[onComplete](Sink/onComplete.md)|Invoke a callback when the stream has completed or failed.|
|Sink|<a name="prematerialize"></a>@ref[preMaterialize](Sink/preMaterialize.md)|Materializes this Sink, immediately returning (1) its materialized value, and (2) a new Sink that can be consume elements 'into' the pre-materialized one.|
|Sink|<a name="queue"></a>@ref[queue](Sink/queue.md)|Materialize a `SinkQueue` that can be pulled to trigger demand through the sink.|
Expand Down Expand Up @@ -555,6 +556,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [monitor](Source-or-Flow/monitor.md)
* [never](Source/never.md)
* [never](Sink/never.md)
* [none](Sink/none.md)
* [onComplete](Sink/onComplete.md)
* [onErrorComplete](Source-or-Flow/onErrorComplete.md)
* [onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md)
Expand Down
41 changes: 41 additions & 0 deletions docs/src/test/java/jdocs/stream/operators/sink/NoneMatch.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package jdocs.stream.operators.sink;

import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;

import java.util.concurrent.TimeUnit;

public class NoneMatch {
private ActorSystem system = null;

public void noneUsage() throws Exception {
// #none
final boolean noneMatch =
Source.range(1, 100)
.runWith(Sink.none(elem -> elem > 100), system)
.toCompletableFuture()
.get(3, TimeUnit.SECONDS);
System.out.println(noneMatch);
// Expect prints:
// true
// #none
}
}
40 changes: 40 additions & 0 deletions docs/src/test/scala/docs/stream/operators/sink/NoneMatch.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package docs.stream.operators.sink

import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.scaladsl.{ Sink, Source }

import scala.concurrent.duration.DurationInt
import scala.concurrent.{ Await, ExecutionContextExecutor, Future }

object NoneMatch {
implicit val system: ActorSystem = ???
implicit val ec: ExecutionContextExecutor = system.dispatcher
def noneExample(): Unit = {
// #none
val result: Future[Boolean] =
Source(1 to 100)
.runWith(Sink.none(_ > 100))
val noneMatch = Await.result(result, 3.seconds)
println(noneMatch)
// Expect prints:
// true
// #none
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,15 @@ public void sinkMustBeAbleToUseForall()
assertTrue(allMatch);
}

@Test
public void sinkMustBeAbleToUseNoneMatch()
throws InterruptedException, ExecutionException, TimeoutException {
CompletionStage<Boolean> cs =
Source.from(Arrays.asList(1, 2, 3, 4)).runWith(Sink.none(param -> param < 0), system);
boolean noneMatch = cs.toCompletableFuture().get(100, TimeUnit.MILLISECONDS);
assertTrue(noneMatch);
}

@Test
public void sinkMustBeAbleToUseForExists()
throws InterruptedException, ExecutionException, TimeoutException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,46 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures {

}

"The none sink" must {

"completes with `ture` when all elements not match" in {
Source(1 to 4)
.runWith(Sink.none(_ < 0))
.futureValue shouldBe true
}

"completes with `false` when any element match" in {
Source(1 to 4)
.runWith(Sink.none(_ > 2))
.futureValue shouldBe false
}

"completes with `true` if the stream is empty" in {
Source.empty[Int]
.runWith(Sink.none(_ > 2))
.futureValue shouldBe true
}

"completes with `Failure` if the stream failed" in {
Source.failed[Int](new RuntimeException("Oops"))
.runWith(Sink.none(_ > 2))
.failed.futureValue shouldBe a[RuntimeException]
}

"completes with `false` with restart strategy" in {
val sink = Sink.none[Int](elem => {
if (elem == 2) {
throw new RuntimeException("Oops")
}
elem > 1
}).withAttributes(supervisionStrategy(Supervision.restartingDecider))

Source(1 to 3)
.runWith(sink)
.futureValue shouldBe false
}
}

"The exists sink" must {

"completes with `false` when none element match" in {
Expand Down
25 changes: 25 additions & 0 deletions stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,31 @@ object Sink {
.mapMaterializedValue(_.map(Boolean.box)(ExecutionContexts.parasitic).asJava))
}

/**
* A `Sink` that will test the given predicate `p` for every received element and
* 1. completes and returns [[java.util.concurrent.CompletionStage]] of `true` if the predicate is false for all elements;
* 2. completes and returns [[java.util.concurrent.CompletionStage]] of `true` if the stream is empty (i.e. completes before signalling any elements);
* 3. completes and returns [[java.util.concurrent.CompletionStage]] of `false` if the predicate is true for any element.
*
* The materialized value [[java.util.concurrent.CompletionStage]] will be completed with the value `true` or `false`
* when the input stream ends, or completed with `Failure` if there is a failure signaled in the stream.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Completes when''' upstream completes or the predicate `p` returns `true`
*
* '''Backpressures when''' the invocation of predicate `p` has not yet completed
*
* '''Cancels when''' predicate `p` returns `true`
*
* @since 1.1.3
*/
def none[In](p: function.Predicate[In]): javadsl.Sink[In, CompletionStage[java.lang.Boolean]] = {
import pekko.util.FutureConverters._
new Sink(scaladsl.Sink.none[In](p.test)
.mapMaterializedValue(_.map(Boolean.box)(ExecutionContexts.parasitic).asJava))
}

/**
* A `Sink` that will test the given predicate `p` for every received element and
* 1. completes and returns [[java.util.concurrent.CompletionStage]] of `true` if the predicate is true for any element;
Expand Down
24 changes: 24 additions & 0 deletions stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,30 @@ object Sink {
.toMat(Sink.head)(Keep.right)
.named("forallSink")

/**
* A `Sink` that will test the given predicate `p` for every received element and
* 1. completes and returns [[scala.concurrent.Future]] of `true` if the predicate is false for all elements;
* 2. completes and returns [[scala.concurrent.Future]] of `true` if the stream is empty (i.e. completes before signalling any elements);
* 3. completes and returns [[scala.concurrent.Future]] of `false` if the predicate is true for any element.
*
* The materialized value [[scala.concurrent.Future]] will be completed with the value `true` or `false`
* when the input stream ends, or completed with `Failure` if there is a failure signaled in the stream.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Completes when''' upstream completes or the predicate `p` returns `true`
*
* '''Backpressures when''' the invocation of predicate `p` has not yet completed
*
* '''Cancels when''' predicate `p` returns `true`
*
* @since 1.1.3
*/
def none[T](p: T => Boolean): Sink[T, Future[Boolean]] =
Flow[T].foldWhile(true)(util.ConstantFun.scalaIdentityFunction)(_ && !p(_))
.toMat(Sink.head)(Keep.right)
.named("noneSink")

/**
* A `Sink` that will test the given predicate `p` for every received element and
* 1. completes and returns [[scala.concurrent.Future]] of `true` if the predicate is true for any element;
Expand Down
Loading