diff --git a/docs/src/main/paradox/release-notes/releases-1.1.md b/docs/src/main/paradox/release-notes/releases-1.1.md index 4a0ae7611b9..42335ec4047 100644 --- a/docs/src/main/paradox/release-notes/releases-1.1.md +++ b/docs/src/main/paradox/release-notes/releases-1.1.md @@ -41,6 +41,7 @@ The Stream API has been updated to add some extra functions. * add Sink.forall operator ([PR989](https://github.com/apache/pekko/pull/989)) * add Source.iterate operator ([PR1244](https://github.com/apache/pekko/pull/1244)) * added extra retry operators that allow users to provide a predicate to decide whether to retry based on the exception ([PR1269](https://github.com/apache/pekko/pull/1269)) +* add optionalVia/unsafeOptionalVia operators ([PR1422](https://github.com/apache/pekko/pull/1422)) The Stream Testkit Java DSL has some extra functions. diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala index 632f98a320e..dfe689ba397 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala @@ -548,6 +548,22 @@ class FlowSpec extends StreamSpec(ConfigFactory.parseString("pekko.actor.debug.r "should be created from a function easily" in { Source(0 to 9).via(Flow.fromFunction(_ + 1)).runWith(Sink.seq).futureValue should ===(1 to 10) } + + "Apply a viaFlow with optional elements using optionalVia" in { + val data = List(Some("1"), None, None, Some("4")) + + val flow = Flow[Option[String]] + + Source(data).via( + Flow.optionalVia( + flow, + Flow[String].map(_.toInt) + )(Keep.none) + ).runWith(TestSink.probe[Option[Int]]) + .request(4) + .expectNext(Some(1), None, None, Some(4)) + .expectComplete() + } } /** diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala index 0036cf34a13..206e6c41760 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala @@ -93,5 +93,25 @@ class FlowWithContextSpec extends StreamSpec { .expectNext((1, 1), (2, 2), (3, 3), (4, 4)) .expectComplete() } + + "Apply a viaFlow with optional elements using unsafeOptionalVia" in { + val data = List((Some("1"), 1), (None, 2), (None, 3), (Some("4"), 4)) + + val flow = Flow[(Option[String], Int)] + .asFlowWithContext[Option[String], Int, Int](collapseContext = Tuple2.apply)(extractContext = _._2) + .map(_._1) + + SourceWithContext + .fromTuples(Source(data)).via( + FlowWithContext.unsafeOptionalVia( + flow, + Flow[String].map(_.toInt) + )(Keep.none) + ) + .runWith(TestSink.probe[(Option[Int], Int)]) + .request(4) + .expectNext((Some(1), 1), (None, 2), (None, 3), (Some(4), 4)) + .expectComplete() + } } } diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala index 2444e724298..ba0dc9fcedb 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala @@ -444,6 +444,19 @@ class SourceSpec extends StreamSpec with DefaultTimeout { import Attributes._ val s: Source[Int, NotUsed] = Source.single(42).async.addAttributes(none).named("") } + + "Apply a viaFlow with optional elements using optionalVia" in { + val data = List(Some("1"), None, None, Some("4")) + + Source.optionalVia( + Source(data), + Flow[String].map(_.toInt) + )(Keep.none) + .runWith(TestSink.probe[Option[Int]]) + .request(4) + .expectNext(Some(1), None, None, Some(4)) + .expectComplete() + } } "A Source.run" must { diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala index 137b83ee888..40af0421438 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala @@ -157,5 +157,20 @@ class SourceWithContextSpec extends StreamSpec { .expectNext((1, 1), (2, 2), (3, 3), (4, 4)) .expectComplete() } + + "Apply a viaFlow with optional elements using unsafeOptionalVia" in { + val data = List((Some("1"), 1), (None, 2), (None, 3), (Some("4"), 4)) + + val source = SourceWithContext.fromTuples(Source(data)) + + SourceWithContext.unsafeOptionalVia( + source, + Flow[String].map(_.toInt) + )(Keep.none) + .runWith(TestSink.probe[(Option[Int], Int)]) + .request(4) + .expectNext((Some(1), 1), (None, 2), (None, 3), (Some(4), 4)) + .expectComplete() + } } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 1963a97f584..5f6c063e517 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -69,6 +69,32 @@ object Flow { def fromFunction[I, O](f: function.Function[I, O]): javadsl.Flow[I, O, NotUsed] = Flow.create[I]().map(f) + /** + * Creates a Flow from an existing base Flow outputting an optional element and + * applying an additional viaFlow only if the element in the stream is defined. + * + * '''Emits when''' the provided viaFlow is runs with defined elements + * + * '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures + * + * '''Completes when''' upstream completes or the first element is emitted + * + * '''Cancels when''' downstream cancels + * + * @param flow The base flow that outputs an optional element + * @param viaFlow The flow that gets used if the optional element in is defined. + * @param combineMat How to combine the materialized values of flow and viaFlow + * @return a Flow with the viaFlow applied onto defined elements of the flow. The output value + * is contained within an Option which indicates whether the original flow's element had viaFlow + * applied. + */ + @ApiMayChange + def optionalVia[FIn, FOut, FViaOut, FMat, FViaMat, Mat](flow: Flow[FIn, Optional[FOut], FMat], + viaFlow: Flow[FOut, FViaOut, FViaMat], + combineMat: (FMat, FViaMat) => Mat + ): Flow[FIn, Optional[FViaOut], Mat] = + scaladsl.Flow.optionalVia(flow.map(_.toScala).asScala, viaFlow.asScala)(combineMat).map(_.toJava).asJava + /** Create a `Flow` which can process elements of type `T`. */ def of[T](@unused clazz: Class[T]): javadsl.Flow[T, T, NotUsed] = create[T]() diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala index cb76749dbd9..353bc508f7b 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala @@ -13,6 +13,7 @@ package org.apache.pekko.stream.javadsl +import java.util.Optional import java.util.concurrent.CompletionStage import scala.annotation.unchecked.uncheckedVariance @@ -25,6 +26,7 @@ import pekko.stream._ import pekko.util.ConstantFun import pekko.util.FutureConverters._ import pekko.util.JavaDurationConverters._ +import pekko.util.OptionConverters._ import pekko.util.ccompat.JavaConverters._ object FlowWithContext { @@ -39,6 +41,35 @@ object FlowWithContext { under: Flow[Pair[In, CtxIn], Pair[Out, CtxOut], Mat]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = new FlowWithContext(under) + /** + * Creates a FlowWithContext from an existing base FlowWithContext outputting an optional element + * and applying an additional viaFlow only if the element in the stream is defined. + * + * '''Emits when''' the provided viaFlow is runs with defined elements + * + * '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures + * + * '''Completes when''' upstream completes or the first element is emitted + * + * '''Cancels when''' downstream cancels + * + * @param flow The base flow that outputs an optional element + * @param viaFlow The flow that gets used if the optional element in is defined. This flow only works + * on the data portion of flow and ignores the context so this flow *must* not re-order, + * drop or emit multiple elements for one incoming element + * @param combineMat How to combine the materialized values of flow and viaFlow + * @return a FlowWithContext with the viaFlow applied onto defined elements of the flow. The output value + * is contained within an Option which indicates whether the original flow's element had viaFlow + * applied. + */ + @ApiMayChange + def unsafeOptionalVia[FIn, FOut, FViaOut, Ctx, FMat, FViaMat, Mat]( + flow: FlowWithContext[FIn, Ctx, Optional[FOut], Ctx, FMat], + viaFlow: Flow[FOut, FViaOut, FViaMat], + combineMat: (FMat, FViaMat) => Mat + ): FlowWithContext[FIn, Ctx, Optional[FViaOut], Ctx, Mat] = + scaladsl.FlowWithContext.unsafeOptionalVia(flow.map(_.toScala).asScala, viaFlow.asScala)(combineMat).map(_.toJava).asJava + } /** diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index bf384639d4f..4d6150380a0 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -137,6 +137,32 @@ object Source { def cycle[O](f: function.Creator[java.util.Iterator[O]]): javadsl.Source[O, NotUsed] = new Source(scaladsl.Source.cycle(() => f.create().asScala)) + /** + * Creates a Source from an existing base Source outputting an optional element + * and applying an additional viaFlow only if the element in the stream is defined. + * + * '''Emits when''' the provided viaFlow is runs with defined elements + * + * '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures + * + * '''Completes when''' upstream completes or the first element is emitted + * + * '''Cancels when''' downstream cancels + * + * @param source The base source that outputs an optional element + * @param viaFlow The flow that gets used if the optional element in is defined. + * @param combineMat How to combine the materialized values of source and viaFlow + * @return a Source with the viaFlow applied onto defined elements of the flow. The output value + * is contained within an Option which indicates whether the original source's element had viaFlow + * applied. + */ + @ApiMayChange + def optionalVia[SOut, FOut, SMat, FMat, Mat](source: Source[Optional[SOut], SMat], + viaFlow: Flow[SOut, FOut, FMat], + combineMat: (SMat, FMat) => Mat + ): Source[Optional[FOut], Mat] = + scaladsl.Source.optionalVia(source.map(_.toScala).asScala, viaFlow.asScala)(combineMat).map(_.toJava).asJava + /** * Helper to create [[Source]] from `Iterable`. * Example usage: diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala index 39e2c4e4d96..8e8387a762e 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala @@ -13,6 +13,7 @@ package org.apache.pekko.stream.javadsl +import java.util.Optional import java.util.concurrent.CompletionStage import scala.annotation.unchecked.uncheckedVariance @@ -28,6 +29,7 @@ import pekko.stream._ import pekko.util.ConstantFun import pekko.util.FutureConverters._ import pekko.util.JavaDurationConverters._ +import pekko.util.OptionConverters._ import pekko.util.ccompat.JavaConverters._ object SourceWithContext { @@ -38,6 +40,36 @@ object SourceWithContext { def fromPairs[Out, CtxOut, Mat](under: Source[Pair[Out, CtxOut], Mat]): SourceWithContext[Out, CtxOut, Mat] = { new SourceWithContext(scaladsl.SourceWithContext.fromTuples(under.asScala.map(_.toScala))) } + + /** + * Creates a SourceWithContext from an existing base SourceWithContext outputting an optional element + * and applying an additional viaFlow only if the element in the stream is defined. + * + * '''Emits when''' the provided viaFlow is runs with defined elements + * + * '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures + * + * '''Completes when''' upstream completes or the first element is emitted + * + * '''Cancels when''' downstream cancels + * + * @param source The base source that outputs an optional element + * @param viaFlow The flow that gets used if the optional element in is defined. This flow only works + * on the data portion of flow and ignores the context so this flow *must* not re-order, + * drop or emit multiple elements for one incoming element + * @param combineMat How to combine the materialized values of source and viaFlow + * @return a SourceWithContext with the viaFlow applied onto defined elements of the flow. The output value + * is contained within an Option which indicates whether the original source's element had viaFlow + * applied. + */ + @ApiMayChange + def unsafeOptionalVia[SOut, FOut, Ctx, SMat, FMat, Mat](source: SourceWithContext[Optional[SOut], Ctx, SMat], + viaFlow: Flow[SOut, FOut, FMat], + combineMat: (SMat, FMat) => Mat + ): SourceWithContext[Optional[FOut], Ctx, Mat] = + scaladsl.SourceWithContext.unsafeOptionalVia(source.map(_.toScala).asScala, viaFlow.asScala)(combineMat).map( + _.toJava).asJava + } /** diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 07ba28c9bfe..8f8ec1376e6 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -428,6 +428,57 @@ object Flow { */ def fromFunction[A, B](f: A => B): Flow[A, B, NotUsed] = apply[A].map(f) + /** + * Creates a FlowW from an existing base Flow outputting an optional element and + * applying an additional viaFlow only if the element in the stream is defined. + * + * '''Emits when''' the provided viaFlow is runs with defined elements + * + * '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures + * + * '''Completes when''' upstream completes or the first element is emitted + * + * '''Cancels when''' downstream cancels + * + * @param flow The base flow that outputs an optional element + * @param viaFlow The flow that gets used if the optional element in is defined. + * @param combineMat How to combine the materialized values of flow and viaFlow + * @return a Flow with the viaFlow applied onto defined elements of the flow. The output value + * is contained within an Option which indicates whether the original flow's element had viaFlow + * applied. + */ + @ApiMayChange + def optionalVia[FIn, FOut, FViaOut, FMat, FViaMat, Mat](flow: Flow[FIn, Option[FOut], FMat], + viaFlow: Flow[FOut, FViaOut, FViaMat])( + combineMat: (FMat, FViaMat) => Mat + ): Flow[FIn, Option[FViaOut], Mat] = + Flow.fromGraph(GraphDSL.createGraph(flow, viaFlow)(combineMat) { implicit b => (s, viaF) => + import GraphDSL.Implicits._ + val broadcast = b.add(Broadcast[Option[FOut]](2)) + val merge = b.add(Merge[Option[FViaOut]](2)) + + val filterAvailable = Flow[Option[FOut]].collect { + case Some(f) => f + } + + val filterUnavailable = Flow[Option[FOut]].filter { opt => + opt.isEmpty + }.map { + _ => Option.empty[FViaOut] + } + + val mapIntoOption = Flow[FViaOut].map { + f => Some(f) + } + + s ~> broadcast.in + + broadcast.out(0) ~> filterAvailable ~> viaF ~> mapIntoOption ~> merge.in(0) + broadcast.out(1) ~> filterUnavailable ~> merge.in(1) + + FlowShape(s.in, merge.out) + }) + /** * A graph with the shape of a flow logically is a flow, this method makes * it so also in type. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala index b2e9bced042..184a1d08ab2 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala @@ -16,6 +16,7 @@ package org.apache.pekko.stream.scaladsl import scala.annotation.unchecked.uncheckedVariance import org.apache.pekko +import pekko.annotation.ApiMayChange import pekko.NotUsed import pekko.japi.Pair import pekko.stream._ @@ -36,6 +37,70 @@ object FlowWithContext { def fromTuples[In, CtxIn, Out, CtxOut, Mat]( flow: Flow[(In, CtxIn), (Out, CtxOut), Mat]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = new FlowWithContext(flow) + + /** + * Creates a FlowWithContext from an existing base FlowWithContext outputting an optional element + * and applying an additional viaFlow only if the element in the stream is defined. + * + * '''Emits when''' the provided viaFlow is runs with defined elements + * + * '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures + * + * '''Completes when''' upstream completes or the first element is emitted + * + * '''Cancels when''' downstream cancels + * + * @param flow The base flow that outputs an optional element + * @param viaFlow The flow that gets used if the optional element in is defined. This flow only works + * on the data portion of flow and ignores the context so this flow *must* not re-order, + * drop or emit multiple elements for one incoming element + * @param combineMat How to combine the materialized values of flow and viaFlow + * @return a FlowWithContext with the viaFlow applied onto defined elements of the flow. The output value + * is contained within an Option which indicates whether the original flow's element had viaFlow + * applied. + */ + @ApiMayChange + def unsafeOptionalVia[FIn, FOut, FViaOut, Ctx, FMat, FViaMat, Mat]( + flow: FlowWithContext[FIn, Ctx, Option[FOut], Ctx, FMat], + viaFlow: Flow[FOut, FViaOut, FViaMat])( + combineMat: (FMat, FViaMat) => Mat + ): FlowWithContext[FIn, Ctx, Option[FViaOut], Ctx, Mat] = + FlowWithContext.fromTuples(Flow.fromGraph(GraphDSL.createGraph(flow, viaFlow)(combineMat) { + implicit b => (f, viaF) => + import GraphDSL.Implicits._ + val broadcast = b.add(Broadcast[(Option[FOut], Ctx)](2)) + val merge = b.add(Merge[(Option[FViaOut], Ctx)](2)) + + val unzip = b.add(Unzip[FOut, Ctx]()) + val zipper = b.add(Zip[FViaOut, Ctx]()) + + val filterAvailable = Flow[(Option[FOut], Ctx)].collect { + case (Some(f), ctx) => (f, ctx) + } + + val filterUnavailable = Flow[(Option[FOut], Ctx)].filter { case (opt, _) => + opt.isEmpty + }.map { + case (_, ctx) => (Option.empty[FViaOut], ctx) + } + + val mapIntoOption = Flow[(FViaOut, Ctx)].map { + case (f, ctx) => (Some(f), ctx) + } + + f ~> broadcast.in + + broadcast.out(0) ~> filterAvailable ~> unzip.in + + unzip.out0 ~> viaF ~> zipper.in0 + unzip.out1 ~> zipper.in1 + + zipper.out ~> mapIntoOption ~> merge.in(0) + + broadcast.out(1) ~> filterUnavailable ~> merge.in(1) + + FlowShape(f.in, merge.out) + })) } /** diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala index a39a15df59b..01fcac6168f 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala @@ -24,7 +24,7 @@ import scala.concurrent.duration.FiniteDuration import org.apache.pekko import pekko.{ Done, NotUsed } import pekko.actor.{ ActorRef, Cancellable } -import pekko.annotation.InternalApi +import pekko.annotation.{ ApiMayChange, InternalApi } import pekko.stream._ import pekko.stream.impl._ import pekko.stream.impl.Stages.DefaultAttributes @@ -308,6 +308,57 @@ object Source { fromIterator(() => iterator).withAttributes(DefaultAttributes.cycledSource) } + /** + * Creates a Source from an existing base Source outputting an optional element + * and applying an additional viaFlow only if the element in the stream is defined. + * + * '''Emits when''' the provided viaFlow is runs with defined elements + * + * '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures + * + * '''Completes when''' upstream completes or the first element is emitted + * + * '''Cancels when''' downstream cancels + * + * @param source The base source that outputs an optional element + * @param viaFlow The flow that gets used if the optional element in is defined. + * @param combineMat How to combine the materialized values of source and viaFlow + * @return a Source with the viaFlow applied onto defined elements of the flow. The output value + * is contained within an Option which indicates whether the original source's element had viaFlow + * applied. + */ + @ApiMayChange + def optionalVia[SOut, FOut, SMat, FMat, Mat](source: Source[Option[SOut], SMat], + viaFlow: Flow[SOut, FOut, FMat])( + combineMat: (SMat, FMat) => Mat + ): Source[Option[FOut], Mat] = + Source.fromGraph(GraphDSL.createGraph(source, viaFlow)(combineMat) { implicit b => (s, viaF) => + import GraphDSL.Implicits._ + val broadcast = b.add(Broadcast[Option[SOut]](2)) + val merge = b.add(Merge[Option[FOut]](2)) + + val filterAvailable = Flow[Option[SOut]].collect { + case Some(f) => f + } + + val filterUnavailable = Flow[Option[SOut]].filter { opt => + opt.isEmpty + }.map { + _ => Option.empty[FOut] + } + + val mapIntoOption = Flow[FOut].map { + f => Some(f) + } + + s ~> broadcast.in + + broadcast.out(0) ~> filterAvailable ~> viaF ~> mapIntoOption ~> merge.in(0) + broadcast.out(1) ~> filterUnavailable ~> merge.in(1) + + SourceShape(merge.out) + }) + /** * A graph with the shape of a source logically is a source, this method makes * it so also in type. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala index 55a6ce316c8..b26697969d4 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala @@ -16,6 +16,7 @@ package org.apache.pekko.stream.scaladsl import scala.annotation.unchecked.uncheckedVariance import org.apache.pekko +import pekko.annotation.ApiMayChange import pekko.stream._ object SourceWithContext { @@ -25,6 +26,69 @@ object SourceWithContext { */ def fromTuples[Out, CtxOut, Mat](source: Source[(Out, CtxOut), Mat]): SourceWithContext[Out, CtxOut, Mat] = new SourceWithContext(source) + + /** + * Creates a SourceWithContext from an existing base SourceWithContext outputting an optional element + * and applying an additional viaFlow only if the element in the stream is defined. + * + * '''Emits when''' the provided viaFlow is runs with defined elements + * + * '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures + * + * '''Completes when''' upstream completes or the first element is emitted + * + * '''Cancels when''' downstream cancels + * + * @param source The base source that outputs an optional element + * @param viaFlow The flow that gets used if the optional element in is defined. This flow only works + * on the data portion of flow and ignores the context so this flow *must* not re-order, + * drop or emit multiple elements for one incoming element + * @param combineMat How to combine the materialized values of source and viaFlow + * @return a SourceWithContext with the viaFlow applied onto defined elements of the flow. The output value + * is contained within an Option which indicates whether the original source's element had viaFlow + * applied. + */ + @ApiMayChange + def unsafeOptionalVia[SOut, FOut, Ctx, SMat, FMat, Mat](source: SourceWithContext[Option[SOut], Ctx, SMat], + viaFlow: Flow[SOut, FOut, FMat])( + combineMat: (SMat, FMat) => Mat + ): SourceWithContext[Option[FOut], Ctx, Mat] = + SourceWithContext.fromTuples(Source.fromGraph(GraphDSL.createGraph(source, viaFlow)(combineMat) { + implicit b => (s, viaF) => + import GraphDSL.Implicits._ + val broadcast = b.add(Broadcast[(Option[SOut], Ctx)](2)) + val merge = b.add(Merge[(Option[FOut], Ctx)](2)) + + val unzip = b.add(Unzip[SOut, Ctx]()) + val zipper = b.add(Zip[FOut, Ctx]()) + + val filterAvailable = Flow[(Option[SOut], Ctx)].collect { + case (Some(f), ctx) => (f, ctx) + } + + val filterUnavailable = Flow[(Option[SOut], Ctx)].filter { case (opt, _) => + opt.isEmpty + }.map { + case (_, ctx) => (Option.empty[FOut], ctx) + } + + val mapIntoOption = Flow[(FOut, Ctx)].map { + case (f, ctx) => (Some(f), ctx) + } + + s ~> broadcast.in + + broadcast.out(0) ~> filterAvailable ~> unzip.in + + unzip.out0 ~> viaF ~> zipper.in0 + unzip.out1 ~> zipper.in1 + + zipper.out ~> mapIntoOption ~> merge.in(0) + + broadcast.out(1) ~> filterUnavailable ~> merge.in(1) + + SourceShape(merge.out) + })) } /**