Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add optionalVia and unsafeOptionalVia
Browse files Browse the repository at this point in the history
mdedetrich committed Aug 2, 2024

Verified

This commit was signed with the committer’s verified signature.
mdedetrich Matthew de Detrich
1 parent 5bf60da commit 2c1d6a1
Showing 11 changed files with 345 additions and 1 deletion.
1 change: 1 addition & 0 deletions docs/src/main/paradox/release-notes/releases-1.1.md
Original file line number Diff line number Diff line change
@@ -41,6 +41,7 @@ The Stream API has been updated to add some extra functions.
* add Sink.forall operator ([PR989](https://github.com/apache/pekko/pull/989))
* add Source.iterate operator ([PR1244](https://github.com/apache/pekko/pull/1244))
* added extra retry operators that allow users to provide a predicate to decide whether to retry based on the exception ([PR1269](https://github.com/apache/pekko/pull/1269))
* add optionalVia/unsafeOptionalVia operators ([PR1422](https://github.com/apache/pekko/pull/1422))

The Stream Testkit Java DSL has some extra functions.

Original file line number Diff line number Diff line change
@@ -548,6 +548,22 @@ class FlowSpec extends StreamSpec(ConfigFactory.parseString("pekko.actor.debug.r
"should be created from a function easily" in {
Source(0 to 9).via(Flow.fromFunction(_ + 1)).runWith(Sink.seq).futureValue should ===(1 to 10)
}

"Apply a viaFlow with optional elements using optionalVia" in {
val data = List(Some("1"), None, None, Some("4"))

val flow = Flow[Option[String]]

Source(data).via(
Flow.optionalVia(
flow,
Flow[String].map(_.toInt)
)(Keep.none)
).runWith(TestSink.probe[Option[Int]])
.request(4)
.expectNext(Some(1), None, None, Some(4))
.expectComplete()
}
}

/**
Original file line number Diff line number Diff line change
@@ -93,5 +93,25 @@ class FlowWithContextSpec extends StreamSpec {
.expectNext((1, 1), (2, 2), (3, 3), (4, 4))
.expectComplete()
}

"Apply a viaFlow with optional elements using unsafeOptionalVia" in {
val data = List((Some("1"), 1), (None, 2), (None, 3), (Some("4"), 4))

val flow = Flow[(Option[String], Int)]
.asFlowWithContext[Option[String], Int, Int](collapseContext = Tuple2.apply)(extractContext = _._2)
.map(_._1)

SourceWithContext
.fromTuples(Source(data)).via(
FlowWithContext.unsafeOptionalVia(
flow,
Flow[String].map(_.toInt)
)(Keep.none)
)
.runWith(TestSink.probe[(Option[Int], Int)])
.request(4)
.expectNext((Some(1), 1), (None, 2), (None, 3), (Some(4), 4))
.expectComplete()
}
}
}
Original file line number Diff line number Diff line change
@@ -444,6 +444,19 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
import Attributes._
val s: Source[Int, NotUsed] = Source.single(42).async.addAttributes(none).named("")
}

"Apply a viaFlow with optional elements using optionalVia" in {
val data = List(Some("1"), None, None, Some("4"))

Source.optionalVia(
Source(data),
Flow[String].map(_.toInt)
)(Keep.none)
.runWith(TestSink.probe[Option[Int]])
.request(4)
.expectNext(Some(1), None, None, Some(4))
.expectComplete()
}
}

"A Source.run" must {
Original file line number Diff line number Diff line change
@@ -157,5 +157,20 @@ class SourceWithContextSpec extends StreamSpec {
.expectNext((1, 1), (2, 2), (3, 3), (4, 4))
.expectComplete()
}

"Apply a viaFlow with optional elements using unsafeOptionalVia" in {
val data = List((Some("1"), 1), (None, 2), (None, 3), (Some("4"), 4))

val source = SourceWithContext.fromTuples(Source(data))

SourceWithContext.unsafeOptionalVia(
source,
Flow[String].map(_.toInt)
)(Keep.none)
.runWith(TestSink.probe[(Option[Int], Int)])
.request(4)
.expectNext((Some(1), 1), (None, 2), (None, 3), (Some(4), 4))
.expectComplete()
}
}
}
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@

package org.apache.pekko.stream.javadsl

import java.util.Optional
import java.util.concurrent.CompletionStage

import scala.annotation.unchecked.uncheckedVariance
@@ -39,6 +40,29 @@ object FlowWithContext {
under: Flow[Pair[In, CtxIn], Pair[Out, CtxOut], Mat]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
new FlowWithContext(under)

/**
* Creates a FlowWithContext from an existing base FlowWithContext outputting an optional element
* and applying an additional viaFlow only if the element in the stream is defined.
*
* @param flow The base flow that outputs an optional element
* @param viaFlow The flow that gets used if the optional element in is defined. This flow only works
* on the data portion of flow and ignores the context so this flow *must* not re-order,
* drop or emit multiple elements for one incoming element
* @param combineMat How to combine the materialized values of flow and viaFlow
* @return a FlowWithContext with the viaFlow applied onto defined elements of the flow. The output value
* is contained within an Option which indicates whether the original flow's element had viaFlow
* applied.
*/
@ApiMayChange
def unsafeOptionalVia[FIn, FOut, FViaOut, Ctx, FMat, FViaMat, Mat](
flow: FlowWithContext[FIn, Ctx, Optional[FOut], Ctx, FMat],
viaFlow: Flow[FOut, FViaOut, FViaMat],
combineMat: (FMat, FViaMat) => Mat
): FlowWithContext[FIn, Ctx, Optional[FViaOut], Ctx, Mat] = {
import pekko.util.OptionConverters._
scaladsl.FlowWithContext.unsafeOptionalVia(flow.map(_.toScala).asScala, viaFlow.asScala)(combineMat).map(_.toJava).asJava
}

}

/**
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@

package org.apache.pekko.stream.javadsl

import java.util.Optional
import java.util.concurrent.CompletionStage

import scala.annotation.unchecked.uncheckedVariance
@@ -38,6 +39,37 @@ object SourceWithContext {
def fromPairs[Out, CtxOut, Mat](under: Source[Pair[Out, CtxOut], Mat]): SourceWithContext[Out, CtxOut, Mat] = {
new SourceWithContext(scaladsl.SourceWithContext.fromTuples(under.asScala.map(_.toScala)))
}

/**
* Creates a SourceWithContext from an existing base SourceWithContext outputting an optional element
* and applying an additional viaFlow only if the element in the stream is defined.
*
* '''Emits when''' the provided viaFlow is runs with defined elements
*
* '''Backpressures when''' the viaFlow runs for defined elements and downstream backpressures
*
* '''Completes when''' upstream completes or the first element is emitted
*
* '''Cancels when''' downstream cancels
*
* @param source The base source that outputs an optional element
* @param viaFlow The flow that gets used if the optional element in is defined. This flow only works
* on the data portion of flow and ignores the context so this flow *must* not re-order,
* drop or emit multiple elements for one incoming element
* @param combineMat How to combine the materialized values of source and viaFlow
* @return a SourceWithContext with the viaFlow applied onto defined elements of the flow. The output value
* is contained within an Option which indicates whether the original source's element had viaFlow
* applied.
*/
@ApiMayChange
def unsafeOptionalVia[SOut, FOut, Ctx, SMat, FMat, Mat](source: SourceWithContext[Optional[SOut], Ctx, SMat],
viaFlow: Flow[SOut, FOut, FMat],
combineMat: (SMat, FMat) => Mat
): SourceWithContext[Optional[FOut], Ctx, Mat] = {
import pekko.util.OptionConverters._
scaladsl.SourceWithContext.unsafeOptionalVia(source.map(_.toScala).asScala, viaFlow.asScala)(combineMat).map(
_.toJava).asJava
}
}

/**
51 changes: 51 additions & 0 deletions stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
Original file line number Diff line number Diff line change
@@ -428,6 +428,57 @@ object Flow {
*/
def fromFunction[A, B](f: A => B): Flow[A, B, NotUsed] = apply[A].map(f)

/**
* Creates a FlowW from an existing base Flow outputting an optional element and
* applying an additional viaFlow only if the element in the stream is defined.
*
* '''Emits when''' the provided viaFlow is runs with defined elements
*
* '''Backpressures when''' the viaFlow runs for defined elements and downstream backpressures
*
* '''Completes when''' upstream completes or the first element is emitted
*
* '''Cancels when''' downstream cancels
*
* @param flow The base flow that outputs an optional element
* @param viaFlow The flow that gets used if the optional element in is defined.
* @param combineMat How to combine the materialized values of flow and viaFlow
* @return a Flow with the viaFlow applied onto defined elements of the flow. The output value
* is contained within an Option which indicates whether the original flow's element had viaFlow
* applied.
*/
@ApiMayChange
def optionalVia[FIn, FOut, FViaOut, FMat, FViaMat, Mat](flow: Flow[FIn, Option[FOut], FMat],
viaFlow: Flow[FOut, FViaOut, FViaMat])(
combineMat: (FMat, FViaMat) => Mat
): Flow[FIn, Option[FViaOut], Mat] =
Flow.fromGraph(GraphDSL.createGraph(flow, viaFlow)(combineMat) { implicit b => (s, viaF) =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[Option[FOut]](2))
val merge = b.add(Merge[Option[FViaOut]](2))

val filterAvailable = Flow[Option[FOut]].collect {
case Some(f) => f
}

val filterUnavailable = Flow[Option[FOut]].filter { opt =>
opt.isEmpty
}.map {
_ => Option.empty[FViaOut]
}

val mapIntoOption = Flow[FViaOut].map {
f => Some(f)
}

s ~> broadcast.in

broadcast.out(0) ~> filterAvailable ~> viaF ~> mapIntoOption ~> merge.in(0)
broadcast.out(1) ~> filterUnavailable ~> merge.in(1)

FlowShape(s.in, merge.out)
})

/**
* A graph with the shape of a flow logically is a flow, this method makes
* it so also in type.
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ package org.apache.pekko.stream.scaladsl
import scala.annotation.unchecked.uncheckedVariance

import org.apache.pekko
import pekko.annotation.ApiMayChange
import pekko.NotUsed
import pekko.japi.Pair
import pekko.stream._
@@ -36,6 +37,70 @@ object FlowWithContext {
def fromTuples[In, CtxIn, Out, CtxOut, Mat](
flow: Flow[(In, CtxIn), (Out, CtxOut), Mat]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
new FlowWithContext(flow)

/**
* Creates a FlowWithContext from an existing base FlowWithContext outputting an optional element
* and applying an additional viaFlow only if the element in the stream is defined.
*
* '''Emits when''' the provided viaFlow is runs with defined elements
*
* '''Backpressures when''' the viaFlow runs for defined elements and downstream backpressures
*
* '''Completes when''' upstream completes or the first element is emitted
*
* '''Cancels when''' downstream cancels
*
* @param flow The base flow that outputs an optional element
* @param viaFlow The flow that gets used if the optional element in is defined. This flow only works
* on the data portion of flow and ignores the context so this flow *must* not re-order,
* drop or emit multiple elements for one incoming element
* @param combineMat How to combine the materialized values of flow and viaFlow
* @return a FlowWithContext with the viaFlow applied onto defined elements of the flow. The output value
* is contained within an Option which indicates whether the original flow's element had viaFlow
* applied.
*/
@ApiMayChange
def unsafeOptionalVia[FIn, FOut, FViaOut, Ctx, FMat, FViaMat, Mat](
flow: FlowWithContext[FIn, Ctx, Option[FOut], Ctx, FMat],
viaFlow: Flow[FOut, FViaOut, FViaMat])(
combineMat: (FMat, FViaMat) => Mat
): FlowWithContext[FIn, Ctx, Option[FViaOut], Ctx, Mat] =
FlowWithContext.fromTuples(Flow.fromGraph(GraphDSL.createGraph(flow, viaFlow)(combineMat) {
implicit b => (f, viaF) =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[(Option[FOut], Ctx)](2))
val merge = b.add(Merge[(Option[FViaOut], Ctx)](2))

val unzip = b.add(Unzip[FOut, Ctx]())
val zipper = b.add(Zip[FViaOut, Ctx]())

val filterAvailable = Flow[(Option[FOut], Ctx)].collect {
case (Some(f), ctx) => (f, ctx)
}

val filterUnavailable = Flow[(Option[FOut], Ctx)].filter { case (opt, _) =>
opt.isEmpty
}.map {
case (_, ctx) => (Option.empty[FViaOut], ctx)
}

val mapIntoOption = Flow[(FViaOut, Ctx)].map {
case (f, ctx) => (Some(f), ctx)
}

f ~> broadcast.in

broadcast.out(0) ~> filterAvailable ~> unzip.in

unzip.out0 ~> viaF ~> zipper.in0
unzip.out1 ~> zipper.in1

zipper.out ~> mapIntoOption ~> merge.in(0)

broadcast.out(1) ~> filterUnavailable ~> merge.in(1)

FlowShape(f.in, merge.out)
}))
}

/**
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@ import scala.concurrent.duration.FiniteDuration
import org.apache.pekko
import pekko.{ Done, NotUsed }
import pekko.actor.{ ActorRef, Cancellable }
import pekko.annotation.InternalApi
import pekko.annotation.{ ApiMayChange, InternalApi }
import pekko.stream._
import pekko.stream.impl._
import pekko.stream.impl.Stages.DefaultAttributes
@@ -308,6 +308,57 @@ object Source {
fromIterator(() => iterator).withAttributes(DefaultAttributes.cycledSource)
}

/**
* Creates a Source from an existing base Source outputting an optional element
* and applying an additional viaFlow only if the element in the stream is defined.
*
* '''Emits when''' the provided viaFlow is runs with defined elements
*
* '''Backpressures when''' the viaFlow runs for defined elements and downstream backpressures
*
* '''Completes when''' upstream completes or the first element is emitted
*
* '''Cancels when''' downstream cancels
*
* @param source The base source that outputs an optional element
* @param viaFlow The flow that gets used if the optional element in is defined.
* @param combineMat How to combine the materialized values of source and viaFlow
* @return a Source with the viaFlow applied onto defined elements of the flow. The output value
* is contained within an Option which indicates whether the original source's element had viaFlow
* applied.
*/
@ApiMayChange
def optionalVia[SOut, FOut, SMat, FMat, Mat](source: Source[Option[SOut], SMat],
viaFlow: Flow[SOut, FOut, FMat])(
combineMat: (SMat, FMat) => Mat
): Source[Option[FOut], Mat] =
Source.fromGraph(GraphDSL.createGraph(source, viaFlow)(combineMat) { implicit b => (s, viaF) =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[Option[SOut]](2))
val merge = b.add(Merge[Option[FOut]](2))

val filterAvailable = Flow[Option[SOut]].collect {
case Some(f) => f
}

val filterUnavailable = Flow[Option[SOut]].filter { opt =>
opt.isEmpty
}.map {
_ => Option.empty[FOut]
}

val mapIntoOption = Flow[FOut].map {
f => Some(f)
}

s ~> broadcast.in

broadcast.out(0) ~> filterAvailable ~> viaF ~> mapIntoOption ~> merge.in(0)
broadcast.out(1) ~> filterUnavailable ~> merge.in(1)

SourceShape(merge.out)
})

/**
* A graph with the shape of a source logically is a source, this method makes
* it so also in type.
Loading

0 comments on commit 2c1d6a1

Please sign in to comment.