diff --git a/docs/src/main/paradox/release-notes/releases-1.1.md b/docs/src/main/paradox/release-notes/releases-1.1.md
index 4a0ae7611b9..9571bdda364 100644
--- a/docs/src/main/paradox/release-notes/releases-1.1.md
+++ b/docs/src/main/paradox/release-notes/releases-1.1.md
@@ -41,6 +41,7 @@ The Stream API has been updated to add some extra functions.
* add Sink.forall operator ([PR989](https://github.com/apache/pekko/pull/989))
* add Source.iterate operator ([PR1244](https://github.com/apache/pekko/pull/1244))
* added extra retry operators that allow users to provide a predicate to decide whether to retry based on the exception ([PR1269](https://github.com/apache/pekko/pull/1269))
+* add optionalVia/unsafeOptionalDataVia operators ([PR1422](https://github.com/apache/pekko/pull/1422))
The Stream Testkit Java DSL has some extra functions.
diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/optionalVia.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/optionalVia.md
new file mode 100644
index 00000000000..c48c3aed17e
--- /dev/null
+++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/optionalVia.md
@@ -0,0 +1,34 @@
+# optionalVia
+
+For a stream containing optional elements, transforms each element by applying the given `viaFlow` and passing the value downstream as an optional value.
+
+@ref[Simple operators](../index.md#simple-operators)
+
+## Signature
+
+@apidoc[Source.optionalVia](Source$) { scala="#optionalVia%5BSOut,FOut,SMat,FMat,Mat](source:org.apache.pekko.stream.scaladsl.Source%5BOption%5BSOut],SMat],viaFlow:org.apache.pekko.stream.scaladsl.Flow%5BSOut,FOut,FMat])(combine:(SMat,FMat)=%3EMat):org.apache.pekko.stream.scaladsl.Source%5BOption%5BFOut],Mat]" java="#optionalVia(org.apache.pekko.stream.javadsl.Source,org.apache.pekko.stream.javadsl.Flow,org.apache.pekko.japi.function.Function2)" }
+@apidoc[Flow.optionalVia](Flow$) { scala="#optionalVia%5BFIn,FOut,FViaOut,FMat,FViaMat,Mat](flow:org.apache.pekko.stream.scaladsl.Flow%5BFIn,Option%5BFOut],FMat],viaFlow:org.apache.pekko.stream.scaladsl.Flow%5BFOut,FViaOut,FViaMat])(combine:(FMat,FViaMat)=%3EMat):org.apache.pekko.stream.scaladsl.Flow%5BFIn,Option%5BFViaOut],Mat]" java="#optionalVia(org.apache.pekko.stream.javadsl.Flow,org.apache.pekko.stream.javadsl.Flow,org.apache.pekko.japi.function.Function2)" }
+
+## Description
+
+For a stream containing optio nal elements, transforms each element by applying
+the given `viaFlow` and passing the value downstream as an optional value.
+
+Scala
+: @@snip [OptionalVia.scala](/docs/src/test/scala/docs/stream/operators/sourceorflow/OptionalVia.scala) { #optionalVia }
+
+Java
+: @@snip [SourceOrFlow.java](/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #optionalVia }
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** while the provided viaFlow is runs with defined elements
+
+**backpressures** when the viaFlow runs for the defined elements and downstream backpressures
+
+**completes** when the upstream completes
+
+@@@
+
diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md
index cc6d698a5b6..3de82bd35fd 100644
--- a/docs/src/main/paradox/stream/operators/index.md
+++ b/docs/src/main/paradox/stream/operators/index.md
@@ -180,6 +180,7 @@ depending on being backpressured by downstream or not.
|Source/Flow|@ref[map](Source-or-Flow/map.md)|Transform each element in the stream by calling a mapping function with it and passing the returned value downstream.|
|Source/Flow|@ref[mapConcat](Source-or-Flow/mapConcat.md)|Transform each element into zero or more elements that are individually passed downstream.|
|Source/Flow|@ref[mapWithResource](Source-or-Flow/mapWithResource.md)|Map elements with the help of a resource that can be opened, transform each element (in a blocking way) and closed.|
+|Source/Flow|@ref[optionalVia](Source-or-Flow/optionalVia.md)|For a stream containing optional elements, transforms each element by applying the given `viaFlow` and passing the value downstream as an optional value.|
|Source/Flow|@ref[preMaterialize](Source-or-Flow/preMaterialize.md)|Materializes this Graph, immediately returning (1) its materialized value, and (2) a new pre-materialized Graph.|
|Source/Flow|@ref[reduce](Source-or-Flow/reduce.md)|Start with first element and then apply the current and next value to the given function, when upstream complete the current value is emitted downstream.|
|Source/Flow|@ref[scan](Source-or-Flow/scan.md)|Emit its current value, which starts at `zero`, and then apply the current and next value to the given function, emitting the next current value.|
@@ -558,6 +559,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [onErrorComplete](Source-or-Flow/onErrorComplete.md)
* [onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md)
* [onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md)
+* [optionalVia](Source-or-Flow/optionalVia.md)
* [orElse](Source-or-Flow/orElse.md)
* [Partition](Partition.md)
* [prefixAndTail](Source-or-Flow/prefixAndTail.md)
diff --git a/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java b/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java
index 4385dd2de1f..3bab8fb3de8 100644
--- a/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java
+++ b/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java
@@ -532,6 +532,23 @@ void foldAsyncExample() {
// #foldAsync
}
+ void optionalViaExample() {
+
+ // #optionalVia
+ Flow flow = Flow.fromFunction(Integer::parseInt);
+
+ Source, NotUsed> source =
+ Source.from(
+ Arrays.asList(Optional.of("1"), Optional.empty(), Optional.empty(), Optional.of("4")));
+
+ Source.optionalVia(source, flow, Keep.none()).runForeach(System.out::println, system);
+ // Optional[1]
+ // Optional.empty
+ // Optional.empty
+ // Optional[4]
+ // #optionalVia
+ }
+
void takeExample() {
// #take
Source.from(Arrays.asList(1, 2, 3, 4, 5)).take(3).runForeach(System.out::println, system);
diff --git a/docs/src/test/scala/docs/stream/operators/sourceorflow/OptionalVia.scala b/docs/src/test/scala/docs/stream/operators/sourceorflow/OptionalVia.scala
new file mode 100644
index 00000000000..64714df49ec
--- /dev/null
+++ b/docs/src/test/scala/docs/stream/operators/sourceorflow/OptionalVia.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package docs.stream.operators.sourceorflow
+
+object OptionalVia {
+ def optionalViaExample(): Unit = {
+ import org.apache.pekko.actor.ActorSystem
+ import org.apache.pekko.stream.scaladsl.{ Flow, Keep, Source }
+
+ implicit val system: ActorSystem = ActorSystem()
+
+ // #optionalVia
+ Source.optionalVia(
+ Source(List(Some("1"), None, None, Some("4"))),
+ Flow.fromFunction { (string: String) => string.toInt }
+ )(Keep.none).runForeach(println)
+ // Some(1)
+ // None
+ // None
+ // Some(4)
+ // #optionalVia
+ }
+
+}
diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala
index 632f98a320e..3c918db1ecb 100644
--- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala
+++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala
@@ -548,6 +548,22 @@ class FlowSpec extends StreamSpec(ConfigFactory.parseString("pekko.actor.debug.r
"should be created from a function easily" in {
Source(0 to 9).via(Flow.fromFunction(_ + 1)).runWith(Sink.seq).futureValue should ===(1 to 10)
}
+
+ "Apply a viaFlow with optional elements using optionalVia" in {
+ val data = List(Some("1"), None, None, Some("4"))
+
+ val flow = Flow[Option[String]]
+
+ Source(data).via(
+ Flow.optionalVia(
+ flow,
+ Flow.fromFunction { (string: String) => string.toInt }
+ )(Keep.none)
+ ).runWith(TestSink.probe[Option[Int]])
+ .request(4)
+ .expectNext(Some(1), None, None, Some(4))
+ .expectComplete()
+ }
}
/**
diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala
index 0036cf34a13..9ac07e2a39b 100644
--- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala
+++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala
@@ -93,5 +93,25 @@ class FlowWithContextSpec extends StreamSpec {
.expectNext((1, 1), (2, 2), (3, 3), (4, 4))
.expectComplete()
}
+
+ "Apply a viaFlow with optional elements using unsafeOptionalVia" in {
+ val data = List((Some("1"), 1), (None, 2), (None, 3), (Some("4"), 4))
+
+ val flow = Flow[(Option[String], Int)]
+ .asFlowWithContext[Option[String], Int, Int](collapseContext = Tuple2.apply)(extractContext = _._2)
+ .map(_._1)
+
+ SourceWithContext
+ .fromTuples(Source(data)).via(
+ FlowWithContext.unsafeOptionalDataVia(
+ flow,
+ Flow.fromFunction { (string: String) => string.toInt }
+ )(Keep.none)
+ )
+ .runWith(TestSink.probe[(Option[Int], Int)])
+ .request(4)
+ .expectNext((Some(1), 1), (None, 2), (None, 3), (Some(4), 4))
+ .expectComplete()
+ }
}
}
diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
index 2444e724298..a54a17a4c7f 100644
--- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
+++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
@@ -444,6 +444,19 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
import Attributes._
val s: Source[Int, NotUsed] = Source.single(42).async.addAttributes(none).named("")
}
+
+ "Apply a viaFlow with optional elements using optionalVia" in {
+ val data = List(Some("1"), None, None, Some("4"))
+
+ Source.optionalVia(
+ Source(data),
+ Flow.fromFunction { (string: String) => string.toInt }
+ )(Keep.none)
+ .runWith(TestSink.probe[Option[Int]])
+ .request(4)
+ .expectNext(Some(1), None, None, Some(4))
+ .expectComplete()
+ }
}
"A Source.run" must {
diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala
index 137b83ee888..62dda8a96fa 100644
--- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala
+++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala
@@ -157,5 +157,20 @@ class SourceWithContextSpec extends StreamSpec {
.expectNext((1, 1), (2, 2), (3, 3), (4, 4))
.expectComplete()
}
+
+ "Apply a viaFlow with optional elements using unsafeOptionalVia" in {
+ val data = List((Some("1"), 1), (None, 2), (None, 3), (Some("4"), 4))
+
+ val source = SourceWithContext.fromTuples(Source(data))
+
+ SourceWithContext.unsafeOptionalDataVia(
+ source,
+ Flow.fromFunction { (string: String) => string.toInt }
+ )(Keep.none)
+ .runWith(TestSink.probe[(Option[Int], Int)])
+ .request(4)
+ .expectNext((Some(1), 1), (None, 2), (None, 3), (Some(4), 4))
+ .expectComplete()
+ }
}
}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index 1963a97f584..043b5fe50cb 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -69,6 +69,32 @@ object Flow {
def fromFunction[I, O](f: function.Function[I, O]): javadsl.Flow[I, O, NotUsed] =
Flow.create[I]().map(f)
+ /**
+ * Creates a Flow from an existing base Flow outputting an optional element and
+ * applying an additional viaFlow only if the element in the stream is defined.
+ *
+ * '''Emits when''' the provided viaFlow is runs with defined elements
+ *
+ * '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @param flow The base flow that outputs an optional element
+ * @param viaFlow The flow that gets used if the optional element in is defined.
+ * @param combine How to combine the materialized values of flow and viaFlow
+ * @return a Flow with the viaFlow applied onto defined elements of the flow. The output value
+ * is contained within an Optional which indicates whether the original flow's element had viaFlow
+ * applied.
+ * @since 1.1.0
+ */
+ def optionalVia[FIn, FOut, FViaOut, FMat, FViaMat, Mat](flow: Flow[FIn, Optional[FOut], FMat],
+ viaFlow: Flow[FOut, FViaOut, FViaMat],
+ combine: function.Function2[FMat, FViaMat, Mat]
+ ): Flow[FIn, Optional[FViaOut], Mat] =
+ scaladsl.Flow.optionalVia(flow.map(_.toScala).asScala, viaFlow.asScala)(combinerToScala(combine)).map(_.toJava).asJava
+
/** Create a `Flow` which can process elements of type `T`. */
def of[T](@unused clazz: Class[T]): javadsl.Flow[T, T, NotUsed] = create[T]()
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala
index cb76749dbd9..2e058f8d11c 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala
@@ -13,6 +13,7 @@
package org.apache.pekko.stream.javadsl
+import java.util.Optional
import java.util.concurrent.CompletionStage
import scala.annotation.unchecked.uncheckedVariance
@@ -25,6 +26,7 @@ import pekko.stream._
import pekko.util.ConstantFun
import pekko.util.FutureConverters._
import pekko.util.JavaDurationConverters._
+import pekko.util.OptionConverters._
import pekko.util.ccompat.JavaConverters._
object FlowWithContext {
@@ -39,6 +41,38 @@ object FlowWithContext {
under: Flow[Pair[In, CtxIn], Pair[Out, CtxOut], Mat]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
new FlowWithContext(under)
+ /**
+ * Creates a FlowWithContext from an existing base FlowWithContext outputting an optional element
+ * and applying an additional viaFlow only if the element in the stream is defined.
+ *
+ * '''Emits when''' the provided viaFlow is runs with defined elements
+ *
+ * '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @param flow The base flow that outputs an optional element
+ * @param viaFlow The flow that gets used if the optional element in is defined. This flow only works
+ * on the data portion of flow and ignores the context so this flow *must* not re-order,
+ * drop or emit multiple elements for one incoming element
+ * @param combine How to combine the materialized values of flow and viaFlow
+ * @return a FlowWithContext with the viaFlow applied onto defined elements of the flow. The output value
+ * is contained within an Optional which indicates whether the original flow's element had viaFlow
+ * applied.
+ * @since 1.1.0
+ */
+ @ApiMayChange
+ def unsafeOptionalDataVia[FIn, FOut, FViaOut, Ctx, FMat, FViaMat, Mat](
+ flow: FlowWithContext[FIn, Ctx, Optional[FOut], Ctx, FMat],
+ viaFlow: Flow[FOut, FViaOut, FViaMat],
+ combine: function.Function2[FMat, FViaMat, Mat]
+ ): FlowWithContext[FIn, Ctx, Optional[FViaOut], Ctx, Mat] =
+ scaladsl.FlowWithContext.unsafeOptionalDataVia(flow.map(_.toScala).asScala, viaFlow.asScala)(
+ combinerToScala(combine)).map(
+ _.toJava).asJava
+
}
/**
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index bf384639d4f..9affc6af150 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -137,6 +137,32 @@ object Source {
def cycle[O](f: function.Creator[java.util.Iterator[O]]): javadsl.Source[O, NotUsed] =
new Source(scaladsl.Source.cycle(() => f.create().asScala))
+ /**
+ * Creates a Source from an existing base Source outputting an optional element
+ * and applying an additional viaFlow only if the element in the stream is defined.
+ *
+ * '''Emits when''' the provided viaFlow is runs with defined elements
+ *
+ * '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @param source The base source that outputs an optional element
+ * @param viaFlow The flow that gets used if the optional element in is defined.
+ * @param combine How to combine the materialized values of source and viaFlow
+ * @return a Source with the viaFlow applied onto defined elements of the flow. The output value
+ * is contained within an Optional which indicates whether the original source's element had viaFlow
+ * applied.
+ * @since 1.1.0
+ */
+ def optionalVia[SOut, FOut, SMat, FMat, Mat](source: Source[Optional[SOut], SMat],
+ viaFlow: Flow[SOut, FOut, FMat],
+ combine: function.Function2[SMat, FMat, Mat]
+ ): Source[Optional[FOut], Mat] =
+ scaladsl.Source.optionalVia(source.map(_.toScala).asScala, viaFlow.asScala)(combinerToScala(combine)).map(_.toJava).asJava
+
/**
* Helper to create [[Source]] from `Iterable`.
* Example usage:
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala
index 39e2c4e4d96..f41b7babfad 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala
@@ -13,6 +13,7 @@
package org.apache.pekko.stream.javadsl
+import java.util.Optional
import java.util.concurrent.CompletionStage
import scala.annotation.unchecked.uncheckedVariance
@@ -28,6 +29,7 @@ import pekko.stream._
import pekko.util.ConstantFun
import pekko.util.FutureConverters._
import pekko.util.JavaDurationConverters._
+import pekko.util.OptionConverters._
import pekko.util.ccompat.JavaConverters._
object SourceWithContext {
@@ -38,6 +40,38 @@ object SourceWithContext {
def fromPairs[Out, CtxOut, Mat](under: Source[Pair[Out, CtxOut], Mat]): SourceWithContext[Out, CtxOut, Mat] = {
new SourceWithContext(scaladsl.SourceWithContext.fromTuples(under.asScala.map(_.toScala)))
}
+
+ /**
+ * Creates a SourceWithContext from an existing base SourceWithContext outputting an optional element
+ * and applying an additional viaFlow only if the element in the stream is defined.
+ *
+ * '''Emits when''' the provided viaFlow is runs with defined elements
+ *
+ * '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @param source The base source that outputs an optional element
+ * @param viaFlow The flow that gets used if the optional element in is defined. This flow only works
+ * on the data portion of flow and ignores the context so this flow *must* not re-order,
+ * drop or emit multiple elements for one incoming element
+ * @param combine How to combine the materialized values of source and viaFlow
+ * @return a SourceWithContext with the viaFlow applied onto defined elements of the flow. The output value
+ * is contained within an Optional which indicates whether the original source's element had viaFlow
+ * applied.
+ * @since 1.1.0
+ */
+ @ApiMayChange
+ def unsafeOptionalDataVia[SOut, FOut, Ctx, SMat, FMat, Mat](source: SourceWithContext[Optional[SOut], Ctx, SMat],
+ viaFlow: Flow[SOut, FOut, FMat],
+ combine: function.Function2[SMat, FMat, Mat]
+ ): SourceWithContext[Optional[FOut], Ctx, Mat] =
+ scaladsl.SourceWithContext.unsafeOptionalDataVia(source.map(_.toScala).asScala, viaFlow.asScala)(
+ combinerToScala(combine)).map(
+ _.toJava).asJava
+
}
/**
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index 07ba28c9bfe..0287e9b0582 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -428,6 +428,57 @@ object Flow {
*/
def fromFunction[A, B](f: A => B): Flow[A, B, NotUsed] = apply[A].map(f)
+ /**
+ * Creates a FlowW from an existing base Flow outputting an optional element and
+ * applying an additional viaFlow only if the element in the stream is defined.
+ *
+ * '''Emits when''' the provided viaFlow is runs with defined elements
+ *
+ * '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @param flow The base flow that outputs an optional element
+ * @param viaFlow The flow that gets used if the optional element in is defined.
+ * @param combine How to combine the materialized values of flow and viaFlow
+ * @return a Flow with the viaFlow applied onto defined elements of the flow. The output value
+ * is contained within an Option which indicates whether the original flow's element had viaFlow
+ * applied.
+ * @since 1.1.0
+ */
+ def optionalVia[FIn, FOut, FViaOut, FMat, FViaMat, Mat](flow: Flow[FIn, Option[FOut], FMat],
+ viaFlow: Flow[FOut, FViaOut, FViaMat])(
+ combine: (FMat, FViaMat) => Mat
+ ): Flow[FIn, Option[FViaOut], Mat] =
+ Flow.fromGraph(GraphDSL.createGraph(flow, viaFlow)(combine) { implicit b => (s, viaF) =>
+ import GraphDSL.Implicits._
+ val broadcast = b.add(Broadcast[Option[FOut]](2))
+ val merge = b.add(Merge[Option[FViaOut]](2))
+
+ val filterAvailable = Flow[Option[FOut]].collect {
+ case Some(f) => f
+ }
+
+ val filterUnavailable = Flow[Option[FOut]].filter { opt =>
+ opt.isEmpty
+ }.map {
+ _ => Option.empty[FViaOut]
+ }
+
+ val mapIntoOption = Flow[FViaOut].map {
+ f => Some(f)
+ }
+
+ s ~> broadcast.in
+
+ broadcast.out(0) ~> filterAvailable ~> viaF ~> mapIntoOption ~> merge.in(0)
+ broadcast.out(1) ~> filterUnavailable ~> merge.in(1)
+
+ FlowShape(s.in, merge.out)
+ })
+
/**
* A graph with the shape of a flow logically is a flow, this method makes
* it so also in type.
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala
index b2e9bced042..8e18a637385 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala
@@ -16,6 +16,7 @@ package org.apache.pekko.stream.scaladsl
import scala.annotation.unchecked.uncheckedVariance
import org.apache.pekko
+import pekko.annotation.ApiMayChange
import pekko.NotUsed
import pekko.japi.Pair
import pekko.stream._
@@ -36,6 +37,71 @@ object FlowWithContext {
def fromTuples[In, CtxIn, Out, CtxOut, Mat](
flow: Flow[(In, CtxIn), (Out, CtxOut), Mat]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
new FlowWithContext(flow)
+
+ /**
+ * Creates a FlowWithContext from an existing base FlowWithContext outputting an optional element
+ * and applying an additional viaFlow only if the element in the stream is defined.
+ *
+ * '''Emits when''' the provided viaFlow is runs with defined elements
+ *
+ * '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @param flow The base flow that outputs an optional element
+ * @param viaFlow The flow that gets used if the optional element in is defined. This flow only works
+ * on the data portion of flow and ignores the context so this flow *must* not re-order,
+ * drop or emit multiple elements for one incoming element
+ * @param combine How to combine the materialized values of flow and viaFlow
+ * @return a FlowWithContext with the viaFlow applied onto defined elements of the flow. The output value
+ * is contained within an Option which indicates whether the original flow's element had viaFlow
+ * applied.
+ * @since 1.1.0
+ */
+ @ApiMayChange
+ def unsafeOptionalDataVia[FIn, FOut, FViaOut, Ctx, FMat, FViaMat, Mat](
+ flow: FlowWithContext[FIn, Ctx, Option[FOut], Ctx, FMat],
+ viaFlow: Flow[FOut, FViaOut, FViaMat])(
+ combine: (FMat, FViaMat) => Mat
+ ): FlowWithContext[FIn, Ctx, Option[FViaOut], Ctx, Mat] =
+ FlowWithContext.fromTuples(Flow.fromGraph(GraphDSL.createGraph(flow, viaFlow)(combine) {
+ implicit b => (f, viaF) =>
+ import GraphDSL.Implicits._
+ val broadcast = b.add(Broadcast[(Option[FOut], Ctx)](2))
+ val merge = b.add(Merge[(Option[FViaOut], Ctx)](2))
+
+ val unzip = b.add(Unzip[FOut, Ctx]())
+ val zipper = b.add(Zip[FViaOut, Ctx]())
+
+ val filterAvailable = Flow[(Option[FOut], Ctx)].collect {
+ case (Some(f), ctx) => (f, ctx)
+ }
+
+ val filterUnavailable = Flow[(Option[FOut], Ctx)].filter { case (opt, _) =>
+ opt.isEmpty
+ }.map {
+ case (_, ctx) => (Option.empty[FViaOut], ctx)
+ }
+
+ val mapIntoOption = Flow[(FViaOut, Ctx)].map {
+ case (f, ctx) => (Some(f), ctx)
+ }
+
+ f ~> broadcast.in
+
+ broadcast.out(0) ~> filterAvailable ~> unzip.in
+
+ unzip.out0 ~> viaF ~> zipper.in0
+ unzip.out1 ~> zipper.in1
+
+ zipper.out ~> mapIntoOption ~> merge.in(0)
+
+ broadcast.out(1) ~> filterUnavailable ~> merge.in(1)
+
+ FlowShape(f.in, merge.out)
+ }))
}
/**
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
index a39a15df59b..04ec272f2e2 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
@@ -308,6 +308,57 @@ object Source {
fromIterator(() => iterator).withAttributes(DefaultAttributes.cycledSource)
}
+ /**
+ * Creates a Source from an existing base Source outputting an optional element
+ * and applying an additional viaFlow only if the element in the stream is defined.
+ *
+ * '''Emits when''' the provided viaFlow is runs with defined elements
+ *
+ * '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @param source The base source that outputs an optional element
+ * @param viaFlow The flow that gets used if the optional element in is defined.
+ * @param combine How to combine the materialized values of source and viaFlow
+ * @return a Source with the viaFlow applied onto defined elements of the flow. The output value
+ * is contained within an Option which indicates whether the original source's element had viaFlow
+ * applied.
+ * @since 1.1.0
+ */
+ def optionalVia[SOut, FOut, SMat, FMat, Mat](source: Source[Option[SOut], SMat],
+ viaFlow: Flow[SOut, FOut, FMat])(
+ combine: (SMat, FMat) => Mat
+ ): Source[Option[FOut], Mat] =
+ Source.fromGraph(GraphDSL.createGraph(source, viaFlow)(combine) { implicit b => (s, viaF) =>
+ import GraphDSL.Implicits._
+ val broadcast = b.add(Broadcast[Option[SOut]](2))
+ val merge = b.add(Merge[Option[FOut]](2))
+
+ val filterAvailable = Flow[Option[SOut]].collect {
+ case Some(f) => f
+ }
+
+ val filterUnavailable = Flow[Option[SOut]].filter { opt =>
+ opt.isEmpty
+ }.map {
+ _ => Option.empty[FOut]
+ }
+
+ val mapIntoOption = Flow[FOut].map {
+ f => Some(f)
+ }
+
+ s ~> broadcast.in
+
+ broadcast.out(0) ~> filterAvailable ~> viaF ~> mapIntoOption ~> merge.in(0)
+ broadcast.out(1) ~> filterUnavailable ~> merge.in(1)
+
+ SourceShape(merge.out)
+ })
+
/**
* A graph with the shape of a source logically is a source, this method makes
* it so also in type.
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala
index 55a6ce316c8..7fb7d01e724 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala
@@ -16,6 +16,7 @@ package org.apache.pekko.stream.scaladsl
import scala.annotation.unchecked.uncheckedVariance
import org.apache.pekko
+import pekko.annotation.ApiMayChange
import pekko.stream._
object SourceWithContext {
@@ -25,6 +26,70 @@ object SourceWithContext {
*/
def fromTuples[Out, CtxOut, Mat](source: Source[(Out, CtxOut), Mat]): SourceWithContext[Out, CtxOut, Mat] =
new SourceWithContext(source)
+
+ /**
+ * Creates a SourceWithContext from an existing base SourceWithContext outputting an optional element
+ * and applying an additional viaFlow only if the element in the stream is defined.
+ *
+ * '''Emits when''' the provided viaFlow is runs with defined elements
+ *
+ * '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @param source The base source that outputs an optional element
+ * @param viaFlow The flow that gets used if the optional element in is defined. This flow only works
+ * on the data portion of flow and ignores the context so this flow *must* not re-order,
+ * drop or emit multiple elements for one incoming element
+ * @param combine How to combine the materialized values of source and viaFlow
+ * @return a SourceWithContext with the viaFlow applied onto defined elements of the flow. The output value
+ * is contained within an Option which indicates whether the original source's element had viaFlow
+ * applied.
+ * @since 1.1.0
+ */
+ @ApiMayChange
+ def unsafeOptionalDataVia[SOut, FOut, Ctx, SMat, FMat, Mat](source: SourceWithContext[Option[SOut], Ctx, SMat],
+ viaFlow: Flow[SOut, FOut, FMat])(
+ combine: (SMat, FMat) => Mat
+ ): SourceWithContext[Option[FOut], Ctx, Mat] =
+ SourceWithContext.fromTuples(Source.fromGraph(GraphDSL.createGraph(source, viaFlow)(combine) {
+ implicit b => (s, viaF) =>
+ import GraphDSL.Implicits._
+ val broadcast = b.add(Broadcast[(Option[SOut], Ctx)](2))
+ val merge = b.add(Merge[(Option[FOut], Ctx)](2))
+
+ val unzip = b.add(Unzip[SOut, Ctx]())
+ val zipper = b.add(Zip[FOut, Ctx]())
+
+ val filterAvailable = Flow[(Option[SOut], Ctx)].collect {
+ case (Some(f), ctx) => (f, ctx)
+ }
+
+ val filterUnavailable = Flow[(Option[SOut], Ctx)].filter { case (opt, _) =>
+ opt.isEmpty
+ }.map {
+ case (_, ctx) => (Option.empty[FOut], ctx)
+ }
+
+ val mapIntoOption = Flow[(FOut, Ctx)].map {
+ case (f, ctx) => (Some(f), ctx)
+ }
+
+ s ~> broadcast.in
+
+ broadcast.out(0) ~> filterAvailable ~> unzip.in
+
+ unzip.out0 ~> viaF ~> zipper.in0
+ unzip.out1 ~> zipper.in1
+
+ zipper.out ~> mapIntoOption ~> merge.in(0)
+
+ broadcast.out(1) ~> filterUnavailable ~> merge.in(1)
+
+ SourceShape(merge.out)
+ }))
}
/**