Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add alsoTo/alsoToContext to FlowWithContext/SourceWithContext
Browse files Browse the repository at this point in the history
mdedetrich committed Aug 22, 2024
1 parent 60c480a commit 1e091bb
Showing 4 changed files with 31 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/src/main/paradox/release-notes/releases-1.1.md
Original file line number Diff line number Diff line change
@@ -42,6 +42,7 @@ The Stream API has been updated to add some extra functions.
* add Source.iterate operator ([PR1244](https://github.com/apache/pekko/pull/1244))
* added extra retry operators that allow users to provide a predicate to decide whether to retry based on the exception ([PR1269](https://github.com/apache/pekko/pull/1269))
* add optionalVia/unsafeOptionalDataVia operators ([PR1422](https://github.com/apache/pekko/pull/1422))
* add alsoTo/alsoToContext operators to `SourceWithContext`/`FlowWithContext` ([PR-1443](https://github.com/apache/pekko/pull/1443))

The Stream Testkit Java DSL has some extra functions.

Original file line number Diff line number Diff line change
@@ -138,6 +138,14 @@ final class FlowWithContext[-In, -CtxIn, +Out, +CtxOut, +Mat](delegate: Flow[(In
combine: (Mat, Mat2) => Mat3): FlowWithContext[In, CtxIn, Out2, Ctx2, Mat3] =
new FlowWithContext(delegate.viaMat(flow)(combine))

override def alsoTo(that: Graph[SinkShape[Out], _]): Repr[Out, CtxOut] =
FlowWithContext.fromTuples(delegate.alsoTo(Flow.fromFunction { (in: (Out, CtxOut)) => in._1 }.toMat(that)(
Keep.right)))

override def alsoToContext(that: Graph[SinkShape[CtxOut], _]): Repr[Out, CtxOut] =
FlowWithContext.fromTuples(delegate.alsoTo(Flow.fromFunction { (in: (Out, CtxOut)) => in._2 }.toMat(that)(
Keep.right)))

/**
* Context-preserving variant of [[pekko.stream.scaladsl.Flow.withAttributes]].
*
Original file line number Diff line number Diff line change
@@ -88,6 +88,20 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] {
def viaMat[Out2, Ctx2, Mat2, Mat3](flow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2])(
combine: (Mat, Mat2) => Mat3): ReprMat[Out2, Ctx2, Mat3]

/**
* Data variant of [[pekko.stream.scaladsl.FlowOps.alsoTo]]
*
* @see [[pekko.stream.scaladsl.FlowOps.alsoTo]]
*/
def alsoTo(that: Graph[SinkShape[Out], _]): Repr[Out, Ctx]

/**
* Context variant of [[pekko.stream.scaladsl.FlowOps.alsoTo]]
*
* @see [[pekko.stream.scaladsl.FlowOps.alsoTo]]
*/
def alsoToContext(that: Graph[SinkShape[Ctx], _]): Repr[Out, Ctx]

/**
* Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.map]].
*
Original file line number Diff line number Diff line change
@@ -155,6 +155,14 @@ final class SourceWithContext[+Out, +Ctx, +Mat] private[stream] (delegate: Sourc
def toMat[Mat2, Mat3](sink: Graph[SinkShape[(Out, Ctx)], Mat2])(combine: (Mat, Mat2) => Mat3): RunnableGraph[Mat3] =
delegate.toMat(sink)(combine)

override def alsoTo(that: Graph[SinkShape[Out], _]): Repr[Out, Ctx] =
SourceWithContext.fromTuples(delegate.alsoTo(Flow.fromFunction { (in: (Out, Ctx)) => in._1 }.toMat(that)(
Keep.right)))

override def alsoToContext(that: Graph[SinkShape[Ctx], _]): Repr[Out, Ctx] =
SourceWithContext.fromTuples(delegate.alsoTo(Flow.fromFunction { (in: (Out, Ctx)) => in._2 }.toMat(that)(
Keep.right)))

/**
* Connect this [[pekko.stream.scaladsl.SourceWithContext]] to a [[pekko.stream.scaladsl.Sink]] and run it.
* The returned value is the materialized value of the `Sink`.

0 comments on commit 1e091bb

Please sign in to comment.