From fcccbc96a40a7411bb81a4766659b7bec501f155 Mon Sep 17 00:00:00 2001 From: He-Pin Date: Sat, 23 Sep 2023 14:51:31 +0800 Subject: [PATCH] =str Tweak the stream mapAsyncPartitioned operator --- .../stream/MapAsyncPartitionedSpec.scala | 27 ++- .../pekko/stream/MapAsyncPartitioned.scala | 157 +++++------------- .../org/apache/pekko/stream/impl/Stages.scala | 2 + .../apache/pekko/stream/javadsl/Flow.scala | 48 +++++- .../stream/javadsl/FlowWithContext.scala | 29 ++-- .../apache/pekko/stream/javadsl/Source.scala | 56 +++++-- .../stream/javadsl/SourceWithContext.scala | 36 ++-- .../apache/pekko/stream/javadsl/SubFlow.scala | 66 ++++++++ .../pekko/stream/javadsl/SubSource.scala | 66 ++++++++ .../apache/pekko/stream/scaladsl/Flow.scala | 106 ++++++++---- .../stream/scaladsl/FlowWithContext.scala | 31 ---- .../stream/scaladsl/FlowWithContextOps.scala | 28 ++++ .../apache/pekko/stream/scaladsl/Source.scala | 28 ---- .../stream/scaladsl/SourceWithContext.scala | 29 ---- 14 files changed, 408 insertions(+), 301 deletions(-) diff --git a/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala b/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala index d70e7fabce5..ee8a7e50652 100644 --- a/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala +++ b/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala @@ -63,8 +63,7 @@ private object MapAsyncPartitionedSpec { value = i.toString) } - def extractPartition(e: TestKeyValue): Int = - e.key + val partitioner: TestKeyValue => Int = kv => kv.key type Operation = TestKeyValue => Future[(Int, String)] @@ -125,7 +124,7 @@ class MapAsyncPartitionedSpec val result = Source(elements) - .mapAsyncPartitionedUnordered(parallelism = 2)(extractPartition)(blockingOperation) + .mapAsyncPartitionedUnordered(parallelism = 2)(partitioner)(blockingOperation) .runWith(Sink.seq) .futureValue .map(_._2) @@ -137,7 +136,7 @@ class MapAsyncPartitionedSpec forAll(minSuccessful(1000)) { (parallelism: Parallelism, elements: Seq[TestKeyValue]) => val result = Source(elements.toIndexedSeq) - .mapAsyncPartitionedUnordered(parallelism.value)(extractPartition)(asyncOperation) + .mapAsyncPartitionedUnordered(parallelism.value)(partitioner)(asyncOperation) .runWith(Sink.seq) .futureValue @@ -153,7 +152,7 @@ class MapAsyncPartitionedSpec val result = Source .fromIterator(() => elements.iterator) - .mapAsyncPartitionedUnordered(parallelism = 1)(extractPartition)(asyncOperation) + .mapAsyncPartitionedUnordered(parallelism = 1)(partitioner)(asyncOperation) .runWith(Sink.seq) .futureValue @@ -169,7 +168,7 @@ class MapAsyncPartitionedSpec val result = Source .fromIterator(() => elements.iterator) - .mapAsyncPartitionedUnordered(parallelism.value)(extractPartition)(blockingOperation) + .mapAsyncPartitionedUnordered(parallelism.value)(partitioner)(blockingOperation) .runWith(Sink.seq) .futureValue @@ -232,7 +231,7 @@ class MapAsyncPartitionedSpec val result = Source(elements) - .mapAsyncPartitionedUnordered(parallelism = 2)(extractPartition)(fun) + .mapAsyncPartitionedUnordered(parallelism = 2)(partitioner)(fun) .runWith(Sink.seq) .futureValue @@ -244,7 +243,7 @@ class MapAsyncPartitionedSpec an[IllegalArgumentException] shouldBe thrownBy { Source(infiniteStream()) .mapAsyncPartitionedUnordered( - parallelism = zeroOrNegativeParallelism)(extractPartition = identity)(f = (_, _) => Future.unit) + parallelism = zeroOrNegativeParallelism)(partitioner = identity)(f = (_, _) => Future.unit) .runWith(Sink.ignore) .futureValue } @@ -272,7 +271,7 @@ class MapAsyncPartitionedSpec val result = Source(elements) - .mapAsyncPartitioned(parallelism = 2)(extractPartition)(processElement) + .mapAsyncPartitioned(parallelism = 2)(partitioner)(processElement) .runWith(Sink.seq) .futureValue .map(_._2) @@ -289,7 +288,7 @@ class MapAsyncPartitionedSpec forAll(minSuccessful(1000)) { (parallelism: Parallelism, elements: Seq[TestKeyValue]) => val result = Source(elements.toIndexedSeq) - .mapAsyncPartitioned(parallelism.value)(extractPartition)(asyncOperation) + .mapAsyncPartitioned(parallelism.value)(partitioner)(asyncOperation) .runWith(Sink.seq) .futureValue @@ -305,7 +304,7 @@ class MapAsyncPartitionedSpec val result = Source .fromIterator(() => elements.iterator) - .mapAsyncPartitioned(parallelism = 1)(extractPartition)(asyncOperation) + .mapAsyncPartitioned(parallelism = 1)(partitioner)(asyncOperation) .runWith(Sink.seq) .futureValue @@ -321,7 +320,7 @@ class MapAsyncPartitionedSpec val result = Source .fromIterator(() => elements.iterator) - .mapAsyncPartitioned(parallelism.value)(extractPartition)(blockingOperation) + .mapAsyncPartitioned(parallelism.value)(partitioner)(blockingOperation) .runWith(Sink.seq) .futureValue @@ -384,7 +383,7 @@ class MapAsyncPartitionedSpec val result = Source(elements) - .mapAsyncPartitioned(parallelism = 2)(extractPartition)(fun) + .mapAsyncPartitioned(parallelism = 2)(partitioner)(fun) .runWith(Sink.seq) .futureValue @@ -396,7 +395,7 @@ class MapAsyncPartitionedSpec an[IllegalArgumentException] shouldBe thrownBy { Source(infiniteStream()) .mapAsyncPartitioned( - parallelism = zeroOrNegativeParallelism)(extractPartition = identity)(f = (_, _) => Future.unit) + parallelism = zeroOrNegativeParallelism)(partitioner = identity)(f = (_, _) => Future.unit) .runWith(Sink.ignore) .futureValue } diff --git a/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala b/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala index f328d66fe0f..5484cdfb56a 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala @@ -21,134 +21,66 @@ import scala.collection.mutable import scala.concurrent.Future import scala.util.control.{ NoStackTrace, NonFatal } import scala.util.{ Failure, Success, Try } - import org.apache.pekko +import org.apache.pekko.annotation.InternalApi import pekko.dispatch.ExecutionContexts import pekko.stream.ActorAttributes.SupervisionStrategy -import pekko.stream.Attributes.{ Name, SourceLocation } -import pekko.stream.MapAsyncPartitioned._ -import pekko.stream.scaladsl.{ Flow, FlowWithContext, Source, SourceWithContext } import pekko.stream.stage._ +import org.apache.pekko.util.OptionVal +/** + * Internal API + */ +@InternalApi private[stream] object MapAsyncPartitioned { + private val NotYetThere = Failure(new Exception with NoStackTrace) - private def extractPartitionWithCtx[In, Ctx, Partition](extract: In => Partition)(tuple: (In, Ctx)): Partition = - extract(tuple._1) - - private def fWithCtx[In, Out, Ctx, Partition](f: (In, Partition) => Future[Out])(tuple: (In, Ctx), - partition: Partition): Future[(Out, Ctx)] = - f(tuple._1, partition).map(_ -> tuple._2)(ExecutionContexts.parasitic) - - def mapSourceOrdered[In, Out, Partition, Mat](source: Source[In, Mat], parallelism: Int)( - extractPartition: In => Partition)( - f: (In, Partition) => Future[Out]): Source[Out, Mat] = - source.via(new MapAsyncPartitioned[In, Out, Partition](orderedOutput = true, parallelism, extractPartition, f)) - - def mapSourceUnordered[In, Out, Partition, Mat](source: Source[In, Mat], parallelism: Int)( - extractPartition: In => Partition)( - f: (In, Partition) => Future[Out]): Source[Out, Mat] = - source.via(new MapAsyncPartitioned[In, Out, Partition](orderedOutput = false, parallelism, extractPartition, f)) - - def mapSourceWithContextOrdered[In, Ctx, T, Partition, Mat](flow: SourceWithContext[In, Ctx, Mat], parallelism: Int)( - extractPartition: In => Partition)( - f: (In, Partition) => Future[T]): SourceWithContext[T, Ctx, Mat] = - flow.via( - new MapAsyncPartitioned[(In, Ctx), (T, Ctx), Partition]( - orderedOutput = true, - parallelism, - extractPartitionWithCtx(extractPartition), - fWithCtx[In, T, Ctx, Partition](f))) - - def mapSourceWithContextUnordered[In, Ctx, T, Partition, Mat](flow: SourceWithContext[In, Ctx, Mat], - parallelism: Int)(extractPartition: In => Partition)( - f: (In, Partition) => Future[T]): SourceWithContext[T, Ctx, Mat] = - flow.via( - new MapAsyncPartitioned[(In, Ctx), (T, Ctx), Partition]( - orderedOutput = false, - parallelism, - extractPartitionWithCtx(extractPartition), - fWithCtx[In, T, Ctx, Partition](f))) - - def mapFlowOrdered[In, Out, T, Partition, Mat](flow: Flow[In, Out, Mat], parallelism: Int)( - extractPartition: Out => Partition)( - f: (Out, Partition) => Future[T]): Flow[In, T, Mat] = - flow.via(new MapAsyncPartitioned[Out, T, Partition](orderedOutput = true, parallelism, extractPartition, - f)) - - def mapFlowUnordered[In, Out, T, Partition, Mat](flow: Flow[In, Out, Mat], parallelism: Int)( - extractPartition: Out => Partition)( - f: (Out, Partition) => Future[T]): Flow[In, T, Mat] = - flow.via(new MapAsyncPartitioned[Out, T, Partition](orderedOutput = false, parallelism, - extractPartition, f)) - - def mapFlowWithContextOrdered[In, Out, CtxIn, CtxOut, T, Partition, Mat]( - flow: FlowWithContext[In, CtxIn, Out, CtxOut, Mat], parallelism: Int)( - extractPartition: Out => Partition)( - f: (Out, Partition) => Future[T]): FlowWithContext[In, CtxIn, T, CtxOut, Mat] = - flow.via( - new MapAsyncPartitioned[(Out, CtxOut), (T, CtxOut), Partition]( - orderedOutput = true, - parallelism, - extractPartitionWithCtx(extractPartition), - fWithCtx[Out, T, CtxOut, Partition](f))) - - def mapFlowWithContextUnordered[In, Out, CtxIn, CtxOut, T, Partition, Mat]( - flow: FlowWithContext[In, CtxIn, Out, CtxOut, Mat], parallelism: Int)(extractPartition: Out => Partition)( - f: (Out, Partition) => Future[T]): FlowWithContext[In, CtxIn, T, CtxOut, Mat] = - flow.via( - new MapAsyncPartitioned[(Out, CtxOut), (T, CtxOut), Partition]( - orderedOutput = false, - parallelism, - extractPartitionWithCtx(extractPartition), - fWithCtx[Out, T, CtxOut, Partition](f))) - - private[stream] val NotYetThere: Failure[Nothing] = Failure(new Exception with NoStackTrace) - - private[stream] final class Holder[In, Out]( - val in: In, - var out: Try[Out], - callback: AsyncCallback[Holder[In, Out]]) extends (Try[Out] => Unit) { - - // To support both fail-fast when the supervision directive is Stop - // and not calling the decider multiple times (#23888) we need to cache the decider result and re-use that - private var cachedSupervisionDirective: Option[Supervision.Directive] = None + private final class Holder[In, Out](val in: In, var out: Try[Out], val cb: AsyncCallback[Holder[In, Out]]) extends ( + Try[Out] => Unit) { + private var cachedSupervisionDirective: OptionVal[Supervision.Directive] = OptionVal.None def supervisionDirectiveFor(decider: Supervision.Decider, ex: Throwable): Supervision.Directive = { cachedSupervisionDirective match { - case Some(d) => d + case OptionVal.Some(d) => d case _ => val d = decider(ex) - cachedSupervisionDirective = Some(d) + cachedSupervisionDirective = OptionVal.Some(d) d } } - def setOut(t: Try[Out]): Unit = + def setOut(t: Try[Out]): Unit = { out = t + } override def apply(t: Try[Out]): Unit = { setOut(t) - callback.invoke(this) + cb.invoke(this) } + + override def toString = s"Holder($in, $out)" } } -private[stream] class MapAsyncPartitioned[In, Out, Partition]( - orderedOutput: Boolean, +/** + * Internal API + */ +@InternalApi +private[stream] final class MapAsyncPartitioned[In, Out, Partition]( parallelism: Int, - extractPartition: In => Partition, + orderedOutput: Boolean, + partitioner: In => Partition, f: (In, Partition) => Future[Out]) extends GraphStage[FlowShape[In, Out]] { + require(parallelism >= 1, "parallelism must be at least 1") + require(partitioner != null, "partitioner function should not be null") + require(f != null, "f function should not be null.") + import MapAsyncPartitioned._ - if (parallelism < 1) throw new IllegalArgumentException("parallelism must be at least 1") - - private val in = Inlet[In]("MapAsyncPartitionOrdered.in") - private val out = Outlet[Out]("MapAsyncPartitionOrdered.out") + private val in = Inlet[In]("MapAsyncPartitioned.in") + private val out = Outlet[Out]("MapAsyncPartitioned.out") override val shape: FlowShape[In, Out] = FlowShape(in, out) - override def initialAttributes: Attributes = - Attributes(Name("MapAsyncPartitionOrdered")) and SourceLocation.forLambda(f) - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { private val contextPropagation = pekko.stream.impl.ContextPropagation() @@ -191,13 +123,12 @@ private[stream] class MapAsyncPartitioned[In, Out, Partition]( buffer = mutable.Queue() } - override def onPull(): Unit = - pushNextIfPossible() + override def onPull(): Unit = pushNextIfPossible() override def onPush(): Unit = { try { val element = grab(in) - val partition = extractPartition(element) + val partition = partitioner(element) val wrappedInput = new Contextual( contextPropagation.currentContext(), @@ -217,8 +148,7 @@ private[stream] class MapAsyncPartitioned[In, Out, Partition]( pullIfNeeded() } - override def onUpstreamFinish(): Unit = - if (idle()) completeStage() + override def onUpstreamFinish(): Unit = if (idle()) completeStage() private def processElement(partition: Partition, wrappedInput: Contextual[Holder[In, Out]]): Unit = { import wrappedInput.{ element => holder } @@ -289,7 +219,7 @@ private[stream] class MapAsyncPartitioned[In, Out, Partition]( buffer = buffer.filter { case (partition, wrappedInput) => import wrappedInput.{ element => holder } - if ((holder.out eq MapAsyncPartitioned.NotYetThere) || !isAvailable(out)) { + if ((holder.out eq NotYetThere) || !isAvailable(out)) { true } else { partitionsInProgress -= partition @@ -321,12 +251,14 @@ private[stream] class MapAsyncPartitioned[In, Out, Partition]( } private def drainQueue(): Unit = { - buffer.foreach { - case (partition, wrappedInput) => - if (canStartNextElement(partition)) { - wrappedInput.resume() - processElement(partition, wrappedInput) - } + if (buffer.nonEmpty) { + buffer.foreach { + case (partition, wrappedInput) => + if (canStartNextElement(partition)) { + wrappedInput.resume() + processElement(partition, wrappedInput) + } + } } } @@ -335,11 +267,10 @@ private[stream] class MapAsyncPartitioned[In, Out, Partition]( else if (buffer.size < parallelism && !hasBeenPulled(in)) tryPull(in) // else already pulled and waiting for next element - private def idle(): Boolean = - buffer.isEmpty + private def idle(): Boolean = buffer.isEmpty private def canStartNextElement(partition: Partition): Boolean = - !partitionsInProgress(partition) && partitionsInProgress.size < parallelism + !partitionsInProgress.contains(partition) && partitionsInProgress.size < parallelism setHandlers(in, out, this) } diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala index 4a36c8aa145..6b56a08e13d 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala @@ -40,6 +40,8 @@ import pekko.stream.Attributes._ val mapError = name("mapError") val mapAsync = name("mapAsync") val mapAsyncUnordered = name("mapAsyncUnordered") + val mapAsyncPartition = name("mapAsyncPartition") + val mapAsyncPartitionUnordered = name("mapAsyncPartitionUnordered") val ask = name("ask") val grouped = name("grouped") val groupedWithin = name("groupedWithin") diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 1d1c8f78543..ec04d0b859f 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -842,30 +842,66 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr /** * Transforms this stream. Works very similarly to [[#mapAsync]] but with an additional * partition step before the transform step. The transform function receives the an individual - * stream entry and the calculated partition value for that entry. + * stream entry and the calculated partition value for that entry. The max parallelism of per partition is 1. + * + * The function `partitioner` is always invoked on the elements in the order they arrive. + * The function `f` is always invoked on the elements which in the same partition in the order they arrive. + * + * If the function `partitioner` or `f` throws an exception or if the [[CompletionStage]] is completed + * with failure and the supervision decision is [[pekko.stream.Supervision.Stop]] + * the stream will be completed with failure, otherwise the stream continues and the current element is dropped. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the Future returned by the provided function finishes for the next element in sequence + * + * '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream + * backpressures + * + * '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted + * + * '''Cancels when''' downstream cancels * * @since 1.1.0 * @see [[#mapAsync]] * @see [[#mapAsyncPartitionedUnordered]] */ def mapAsyncPartitioned[T, P](parallelism: Int, - extractPartition: function.Function[Out, P], + partitioner: function.Function[Out, P], f: function.Function2[Out, P, CompletionStage[T]]): Flow[In, T, Mat] = - MapAsyncPartitioned.mapFlowOrdered(delegate, parallelism)(extractPartition(_))(f(_, _).asScala).asJava + new Flow(delegate.mapAsyncPartitioned(parallelism)(partitioner(_))(f(_, _).asScala)) /** * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] but with an additional * partition step before the transform step. The transform function receives the an individual - * stream entry and the calculated partition value for that entry. + * stream entry and the calculated partition value for that entry.The max parallelism of per partition is 1. + * + * The function `partitioner` is always invoked on the elements in the order they arrive. + * The function `f` is always invoked on the elements which in the same partition in the order they arrive. + * + * If the function `partitioner` or `f` throws an exception or if the [[CompletionStage]] is completed + * with failure and the supervision decision is [[pekko.stream.Supervision.Stop]] + * the stream will be completed with failure, otherwise the stream continues and the current element is dropped. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the Future returned by the provided function finishes and downstream available. + * + * '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream + * backpressures + * + * '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted + * + * '''Cancels when''' downstream cancels * * @since 1.1.0 * @see [[#mapAsyncUnordered]] * @see [[#mapAsyncPartitioned]] */ def mapAsyncPartitionedUnordered[T, P](parallelism: Int, - extractPartition: function.Function[Out, P], + partitioner: function.Function[Out, P], f: function.Function2[Out, P, CompletionStage[T]]): Flow[In, T, Mat] = - MapAsyncPartitioned.mapFlowUnordered(delegate, parallelism)(extractPartition(_))(f(_, _).asScala).asJava + new Flow(delegate.mapAsyncPartitionedUnordered(parallelism)(partitioner(_))(f(_, _).asScala)) /** * Transform this stream by applying the given function to each of the elements diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala index 84736777dbf..4ea73f18ecc 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala @@ -173,39 +173,36 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat]( def map[Out2](f: function.Function[Out, Out2]): FlowWithContext[In, CtxIn, Out2, CtxOut, Mat] = viaScala(_.map(f.apply)) + /** + * Context-preserving variant of [[pekko.stream.javadsl.Flow.mapAsync]]. + * + * @see [[pekko.stream.javadsl.Flow.mapAsync]] + */ def mapAsync[Out2]( parallelism: Int, f: function.Function[Out, CompletionStage[Out2]]): FlowWithContext[In, CtxIn, Out2, CtxOut, Mat] = viaScala(_.mapAsync[Out2](parallelism)(o => f.apply(o).asScala)) /** - * Transforms this stream. Works very similarly to [[#mapAsync]] but with an additional - * partition step before the transform step. The transform function receives the an individual - * stream entry and the calculated partition value for that entry. + * Context-preserving variant of [[pekko.stream.javadsl.Flow.mapAsyncPartitioned]]. * - * @since 1.1.0 - * @see [[#mapAsync]] - * @see [[#mapAsyncPartitionedUnordered]] + * @see [[pekko.stream.javadsl.Flow.mapAsyncPartitioned]] */ def mapAsyncPartitioned[Out2, P](parallelism: Int, - extractPartition: function.Function[Out, P], + partitioner: function.Function[Out, P], f: function.Function2[Out, P, CompletionStage[Out2]]): FlowWithContext[In, CtxIn, Out2, CtxOut, Mat] = { - viaScala(_.mapAsyncPartitioned(parallelism)(extractPartition(_))(f(_, _).asScala)) + viaScala(_.mapAsyncPartitioned(parallelism)(partitioner(_))(f(_, _).asScala)) } /** - * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] but with an additional - * partition step before the transform step. The transform function receives the an individual - * stream entry and the calculated partition value for that entry. + * Context-preserving variant of [[pekko.stream.javadsl.Flow.mapAsyncPartitionedUnordered]]. * - * @since 1.1.0 - * @see [[#mapAsyncUnordered]] - * @see [[#mapAsyncPartitioned]] + * @see [[pekko.stream.javadsl.Flow.mapAsyncPartitionedUnordered]] */ def mapAsyncPartitionedUnordered[Out2, P](parallelism: Int, - extractPartition: function.Function[Out, P], + partitioner: function.Function[Out, P], f: function.Function2[Out, P, CompletionStage[Out2]]): FlowWithContext[In, CtxIn, Out2, CtxOut, Mat] = { - viaScala(_.mapAsyncPartitionedUnordered(parallelism)(extractPartition(_))(f(_, _).asScala)) + viaScala(_.mapAsyncPartitionedUnordered(parallelism)(partitioner(_))(f(_, _).asScala)) } /** diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index 416652e5b71..a38b59659f1 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -2496,32 +2496,68 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ /** * Transforms this stream. Works very similarly to [[#mapAsync]] but with an additional * partition step before the transform step. The transform function receives the an individual - * stream entry and the calculated partition value for that entry. + * stream entry and the calculated partition value for that entry. The max parallelism of per partition is 1. + * + * The function `partitioner` is always invoked on the elements in the order they arrive. + * The function `f` is always invoked on the elements which in the same partition in the order they arrive. + * + * If the function `partitioner` or `f` throws an exception or if the [[CompletionStage]] is completed + * with failure and the supervision decision is [[pekko.stream.Supervision.Stop]] + * the stream will be completed with failure, otherwise the stream continues and the current element is dropped. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the Future returned by the provided function finishes for the next element in sequence + * + * '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream + * backpressures + * + * '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted + * + * '''Cancels when''' downstream cancels * * @since 1.1.0 * @see [[#mapAsync]] * @see [[#mapAsyncPartitionedUnordered]] */ - def mapAsyncPartitioned[T, P](parallelism: Int, - extractPartition: function.Function[Out, P], + def mapAsyncPartitioned[T, P]( + parallelism: Int, + partitioner: function.Function[Out, P], f: function.Function2[Out, P, CompletionStage[T]]): javadsl.Source[T, Mat] = - MapAsyncPartitioned.mapSourceOrdered(delegate, parallelism)(extractPartition(_))(f(_, - _).asScala).asJava + new Source(delegate.mapAsyncPartitioned(parallelism)(partitioner(_))(f(_, _).asScala)) /** * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] but with an additional * partition step before the transform step. The transform function receives the an individual - * stream entry and the calculated partition value for that entry. + * stream entry and the calculated partition value for that entry.The max parallelism of per partition is 1. + * + * The function `partitioner` is always invoked on the elements in the order they arrive. + * The function `f` is always invoked on the elements which in the same partition in the order they arrive. + * + * If the function `partitioner` or `f` throws an exception or if the [[CompletionStage]] is completed + * with failure and the supervision decision is [[pekko.stream.Supervision.Stop]] + * the stream will be completed with failure, otherwise the stream continues and the current element is dropped. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the Future returned by the provided function finishes and downstream available. + * + * '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream + * backpressures + * + * '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted + * + * '''Cancels when''' downstream cancels * * @since 1.1.0 * @see [[#mapAsyncUnordered]] * @see [[#mapAsyncPartitioned]] */ - def mapAsyncPartitionedUnordered[T, P](parallelism: Int, - extractPartition: function.Function[Out, P], + def mapAsyncPartitionedUnordered[T, P]( + parallelism: Int, + partitioner: function.Function[Out, P], f: function.Function2[Out, P, CompletionStage[T]]): javadsl.Source[T, Mat] = - MapAsyncPartitioned.mapSourceUnordered(delegate, parallelism)(extractPartition(_))(f(_, - _).asScala).asJava + new Source(delegate.mapAsyncPartitionedUnordered(parallelism)(partitioner(_))(f(_, _).asScala)) /** * Transform this stream by applying the given function to each of the elements diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala index 687c01fc05f..b160745aac7 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala @@ -169,44 +169,36 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon def map[Out2](f: function.Function[Out, Out2]): SourceWithContext[Out2, Ctx, Mat] = viaScala(_.map(f.apply)) + /** + * Context-preserving variant of [[pekko.stream.javadsl.Source.mapAsync]]. + * + * @see [[pekko.stream.javadsl.Source.mapAsync]] + */ def mapAsync[Out2]( parallelism: Int, f: function.Function[Out, CompletionStage[Out2]]): SourceWithContext[Out2, Ctx, Mat] = viaScala(_.mapAsync[Out2](parallelism)(o => f.apply(o).asScala)) /** - * Transforms this stream. Works very similarly to [[#mapAsync]] but with an additional - * partition step before the transform step. The transform function receives the an individual - * stream entry and the calculated partition value for that entry. + * Context-preserving variant of [[pekko.stream.javadsl.Source.mapAsyncPartitioned]]. * - * @since 1.1.0 - * @see [[#mapAsync]] - * @see [[#mapAsyncPartitionedUnordered]] + * @see [[pekko.stream.javadsl.Source.mapAsyncPartitioned]] */ def mapAsyncPartitioned[Out2, P](parallelism: Int, - extractPartition: function.Function[Out, P], + partitioner: function.Function[Out, P], f: function.Function2[Out, P, CompletionStage[Out2]]): SourceWithContext[Out2, Ctx, Mat] = { - MapAsyncPartitioned.mapSourceWithContextOrdered(delegate, parallelism)(extractPartition(_))(f(_, - _).asScala) - .asJava + viaScala(_.mapAsyncPartitioned(parallelism)(partitioner(_))(f(_, _).asScala)) } /** - * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] but with an additional - * partition step before the transform step. The transform function receives the an individual - * stream entry and the calculated partition value for that entry. + * Context-preserving variant of [[pekko.stream.javadsl.Source.mapAsyncPartitionedUnordered]]. * - * @since 1.1.0 - * @see [[#mapAsyncUnordered]] - * @see [[#mapAsyncPartitioned]] + * @see [[pekko.stream.javadsl.Source.mapAsyncPartitionedUnordered]] */ def mapAsyncPartitionedUnordered[Out2, P](parallelism: Int, - extractPartition: function.Function[Out, P], - f: function.Function2[Out, P, CompletionStage[Out2]]): SourceWithContext[Out2, Ctx, Mat] = { - MapAsyncPartitioned.mapSourceWithContextUnordered(delegate, parallelism)(extractPartition(_))(f(_, - _).asScala) - .asJava - } + partitioner: function.Function[Out, P], + f: function.Function2[Out, P, CompletionStage[Out2]]): SourceWithContext[Out2, Ctx, Mat] = + viaScala(_.mapAsyncPartitionedUnordered(parallelism)(partitioner(_))(f(_, _).asScala)) /** * Context-preserving variant of [[pekko.stream.javadsl.Source.mapConcat]]. diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index 1b6309ff83d..a4b2042ec0b 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -348,6 +348,72 @@ class SubFlow[In, Out, Mat]( def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): SubFlow[In, T, Mat] = new SubFlow(delegate.mapAsyncUnordered(parallelism)(x => f(x).asScala)) + /** + * Transforms this stream. Works very similarly to [[#mapAsync]] but with an additional + * partition step before the transform step. The transform function receives the an individual + * stream entry and the calculated partition value for that entry. The max parallelism of per partition is 1. + * + * The function `partitioner` is always invoked on the elements in the order they arrive. + * The function `f` is always invoked on the elements which in the same partition in the order they arrive. + * + * If the function `partitioner` or `f` throws an exception or if the [[CompletionStage]] is completed + * with failure and the supervision decision is [[pekko.stream.Supervision.Stop]] + * the stream will be completed with failure, otherwise the stream continues and the current element is dropped. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the Future returned by the provided function finishes for the next element in sequence + * + * '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream + * backpressures + * + * '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted + * + * '''Cancels when''' downstream cancels + * + * @since 1.1.0 + * @see [[#mapAsync]] + * @see [[#mapAsyncPartitionedUnordered]] + */ + def mapAsyncPartitioned[T, P]( + parallelism: Int, + partitioner: function.Function[Out, P], + f: function.Function2[Out, P, CompletionStage[T]]): SubFlow[In, T, Mat] = + new SubFlow(delegate.mapAsyncPartitioned(parallelism)(partitioner(_))(f(_, _).asScala)) + + /** + * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] but with an additional + * partition step before the transform step. The transform function receives the an individual + * stream entry and the calculated partition value for that entry.The max parallelism of per partition is 1. + * + * The function `partitioner` is always invoked on the elements in the order they arrive. + * The function `f` is always invoked on the elements which in the same partition in the order they arrive. + * + * If the function `partitioner` or `f` throws an exception or if the [[CompletionStage]] is completed + * with failure and the supervision decision is [[pekko.stream.Supervision.Stop]] + * the stream will be completed with failure, otherwise the stream continues and the current element is dropped. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the Future returned by the provided function finishes and downstream available. + * + * '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream + * backpressures + * + * '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted + * + * '''Cancels when''' downstream cancels + * + * @since 1.1.0 + * @see [[#mapAsyncUnordered]] + * @see [[#mapAsyncPartitioned]] + */ + def mapAsyncPartitionedUnordered[T, P]( + parallelism: Int, + partitioner: function.Function[Out, P], + f: function.Function2[Out, P, CompletionStage[T]]): SubFlow[In, T, Mat] = + new SubFlow(delegate.mapAsyncPartitionedUnordered(parallelism)(partitioner(_))(f(_, _).asScala)) + /** * Only pass on those elements that satisfy the given predicate. * diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index 6104422b66b..b27675682a9 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -339,6 +339,72 @@ class SubSource[Out, Mat]( def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): SubSource[T, Mat] = new SubSource(delegate.mapAsyncUnordered(parallelism)(x => f(x).asScala)) + /** + * Transforms this stream. Works very similarly to [[#mapAsync]] but with an additional + * partition step before the transform step. The transform function receives the an individual + * stream entry and the calculated partition value for that entry. The max parallelism of per partition is 1. + * + * The function `partitioner` is always invoked on the elements in the order they arrive. + * The function `f` is always invoked on the elements which in the same partition in the order they arrive. + * + * If the function `partitioner` or `f` throws an exception or if the [[CompletionStage]] is completed + * with failure and the supervision decision is [[pekko.stream.Supervision.Stop]] + * the stream will be completed with failure, otherwise the stream continues and the current element is dropped. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the Future returned by the provided function finishes for the next element in sequence + * + * '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream + * backpressures + * + * '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted + * + * '''Cancels when''' downstream cancels + * + * @since 1.1.0 + * @see [[#mapAsync]] + * @see [[#mapAsyncPartitionedUnordered]] + */ + def mapAsyncPartitioned[T, P]( + parallelism: Int, + partitioner: function.Function[Out, P], + f: function.Function2[Out, P, CompletionStage[T]]): SubSource[T, Mat] = + new SubSource(delegate.mapAsyncPartitioned(parallelism)(partitioner(_))(f(_, _).asScala)) + + /** + * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] but with an additional + * partition step before the transform step. The transform function receives the an individual + * stream entry and the calculated partition value for that entry.The max parallelism of per partition is 1. + * + * The function `partitioner` is always invoked on the elements in the order they arrive. + * The function `f` is always invoked on the elements which in the same partition in the order they arrive. + * + * If the function `partitioner` or `f` throws an exception or if the [[CompletionStage]] is completed + * with failure and the supervision decision is [[pekko.stream.Supervision.Stop]] + * the stream will be completed with failure, otherwise the stream continues and the current element is dropped. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the Future returned by the provided function finishes and downstream available. + * + * '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream + * backpressures + * + * '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted + * + * '''Cancels when''' downstream cancels + * + * @since 1.1.0 + * @see [[#mapAsyncUnordered]] + * @see [[#mapAsyncPartitioned]] + */ + def mapAsyncPartitionedUnordered[T, P]( + parallelism: Int, + partitioner: function.Function[Out, P], + f: function.Function2[Out, P, CompletionStage[T]]): SubSource[T, Mat] = + new SubSource(delegate.mapAsyncPartitionedUnordered(parallelism)(partitioner(_))(f(_, _).asScala)) + /** * Only pass on those elements that satisfy the given predicate. * diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 42b29030d77..59089b49677 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -20,12 +20,10 @@ import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration import scala.reflect.ClassTag - import org.reactivestreams.Processor import org.reactivestreams.Publisher import org.reactivestreams.Subscriber import org.reactivestreams.Subscription - import org.apache.pekko import pekko.Done import pekko.NotUsed @@ -163,36 +161,6 @@ final class Flow[-In, +Out, +Mat]( override def mapMaterializedValue[Mat2](f: Mat => Mat2): ReprMat[Out, Mat2] = new Flow(traversalBuilder.transformMat(f), shape) - /** - * Transforms this stream. Works very similarly to [[#mapAsync]] but with an additional - * partition step before the transform step. The transform function receives the an individual - * stream entry and the calculated partition value for that entry. - * - * @since 1.1.0 - * @see [[#mapAsync]] - * @see [[#mapAsyncPartitionedUnordered]] - */ - def mapAsyncPartitioned[T, P](parallelism: Int)( - extractPartition: Out => P)( - f: (Out, P) => Future[T]): Flow[In, T, Mat] = { - MapAsyncPartitioned.mapFlowOrdered(this, parallelism)(extractPartition)(f) - } - - /** - * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] but with an additional - * partition step before the transform step. The transform function receives the an individual - * stream entry and the calculated partition value for that entry. - * - * @since 1.1.0 - * @see [[#mapAsyncUnordered]] - * @see [[#mapAsyncPartitioned]] - */ - def mapAsyncPartitionedUnordered[T, P](parallelism: Int)( - extractPartition: Out => P)( - f: (Out, P) => Future[T]): Flow[In, T, Mat] = { - MapAsyncPartitioned.mapFlowUnordered(this, parallelism)(extractPartition)(f) - } - /** * Materializes this [[Flow]], immediately returning (1) its materialized value, and (2) a newly materialized [[Flow]]. * The returned flow is partial materialized and do not support multiple times materialization. @@ -1173,6 +1141,80 @@ trait FlowOps[+Out, +Mat] { */ def mapAsyncUnordered[T](parallelism: Int)(f: Out => Future[T]): Repr[T] = via(MapAsyncUnordered(parallelism, f)) + /** + * Transforms this stream. Works very similarly to [[#mapAsync]] but with an additional + * partition step before the transform step. The transform function receives the an individual + * stream entry and the calculated partition value for that entry. The max parallelism of per partition is 1. + * + * The function `partitioner` is always invoked on the elements in the order they arrive. + * The function `f` is always invoked on the elements which in the same partition in the order they arrive. + * + * If the function `partitioner` or `f` throws an exception or if the [[Future]] is completed + * with failure and the supervision decision is [[pekko.stream.Supervision.Stop]] + * the stream will be completed with failure, otherwise the stream continues and the current element is dropped. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the Future returned by the provided function finishes for the next element in sequence + * + * '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream + * backpressures + * + * '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted + * + * '''Cancels when''' downstream cancels + * + * @since 1.1.0 + * @see [[#mapAsync]] + * @see [[#mapAsyncPartitionedUnordered]] + */ + def mapAsyncPartitioned[T, P](parallelism: Int)( + partitioner: Out => P)( + f: (Out, P) => Future[T]): Repr[T] = { + (if (parallelism == 1) { + via(MapAsyncUnordered(1, elem => f(elem, partitioner(elem)))) + } else { + via(new MapAsyncPartitioned(parallelism, orderedOutput = true, partitioner, f)) + }) + .withAttributes(DefaultAttributes.mapAsyncPartition and SourceLocation.forLambda(f)) + } + + /** + * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] but with an additional + * partition step before the transform step. The transform function receives the an individual + * stream entry and the calculated partition value for that entry.The max parallelism of per partition is 1. + * + * The function `partitioner` is always invoked on the elements in the order they arrive. + * The function `f` is always invoked on the elements which in the same partition in the order they arrive. + * + * If the function `partitioner` or `f` throws an exception or if the [[Future]] is completed + * with failure and the supervision decision is [[pekko.stream.Supervision.Stop]] + * the stream will be completed with failure, otherwise the stream continues and the current element is dropped. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the Future returned by the provided function finishes and downstream available. + * + * '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream + * backpressures + * + * '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted + * + * '''Cancels when''' downstream cancels + * + * @since 1.1.0 + * @see [[#mapAsyncUnordered]] + * @see [[#mapAsyncPartitioned]] + */ + def mapAsyncPartitionedUnordered[T, P](parallelism: Int)(partitioner: Out => P)( + f: (Out, P) => Future[T]): Repr[T] = { + (if (parallelism == 1) { + via(MapAsyncUnordered(1, elem => f(elem, partitioner(elem)))) + } else { + via(new MapAsyncPartitioned(parallelism, orderedOutput = false, partitioner, f)) + }).withAttributes(DefaultAttributes.mapAsyncPartitionUnordered and SourceLocation.forLambda(f)) + } + /** * Use the `ask` pattern to send a request-reply message to the target `ref` actor. * If any of the asks times out it will fail the stream with a [[pekko.pattern.AskTimeoutException]]. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala index 1def589f37c..8925afc47a5 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala @@ -14,7 +14,6 @@ package org.apache.pekko.stream.scaladsl import scala.annotation.unchecked.uncheckedVariance -import scala.concurrent.Future import org.apache.pekko import pekko.NotUsed import pekko.japi.Pair @@ -90,36 +89,6 @@ final class FlowWithContext[-In, -CtxIn, +Out, +CtxOut, +Mat](delegate: Flow[(In def mapMaterializedValue[Mat2](f: Mat => Mat2): FlowWithContext[In, CtxIn, Out, CtxOut, Mat2] = new FlowWithContext(delegate.mapMaterializedValue(f)) - /** - * Transforms this stream. Works very similarly to [[#mapAsync]] but with an additional - * partition step before the transform step. The transform function receives the an individual - * stream entry and the calculated partition value for that entry. - * - * @since 1.1.0 - * @see [[#mapAsync]] - * @see [[#mapAsyncPartitionedUnordered]] - */ - def mapAsyncPartitioned[T, P](parallelism: Int)( - extractPartition: Out => P)( - f: (Out, P) => Future[T]): FlowWithContext[In, CtxIn, T, CtxOut, Mat] = { - MapAsyncPartitioned.mapFlowWithContextOrdered(this, parallelism)(extractPartition)(f) - } - - /** - * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] but with an additional - * partition step before the transform step. The transform function receives the an individual - * stream entry and the calculated partition value for that entry. - * - * @since 1.1.0 - * @see [[#mapAsyncUnordered]] - * @see [[#mapAsyncPartitioned]] - */ - def mapAsyncPartitionedUnordered[T, P](parallelism: Int)( - extractPartition: Out => P)( - f: (Out, P) => Future[T]): FlowWithContext[In, CtxIn, T, CtxOut, Mat] = { - MapAsyncPartitioned.mapFlowWithContextUnordered(this, parallelism)(extractPartition)(f) - } - def asFlow: Flow[(In, CtxIn), (Out, CtxOut), Mat] = delegate def asJava[JIn <: In, JCtxIn <: CtxIn, JOut >: Out, JCtxOut >: CtxOut, JMat >: Mat] diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala index 370f49e520c..2f973691d4c 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala @@ -113,6 +113,34 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] { case (e, ctx) => f(e).map(o => (o, ctx))(ExecutionContexts.parasitic) }) + /** + * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.mapAsyncPartitioned]]. + * + * @see [[pekko.stream.scaladsl.FlowOps.mapAsyncPartitioned]] + */ + def mapAsyncPartitioned[Out2, P](parallelism: Int)( + partitioner: Out => P)( + f: (Out, P) => Future[Out2]): Repr[Out2, Ctx] = { + via(flow[Out, Ctx].mapAsyncPartitioned(parallelism)(pair => partitioner(pair._1)) { + (pair, partition) => + f(pair._1, partition).map((_, pair._2))(ExecutionContexts.parasitic) + }) + } + + /** + * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.mapAsyncPartitionedUnordered]]. + * + * @see [[pekko.stream.scaladsl.FlowOps.mapAsyncPartitionedUnordered]] + */ + def mapAsyncPartitionedUnordered[Out2, P](parallelism: Int)( + partitioner: Out => P)( + f: (Out, P) => Future[Out2]): Repr[Out2, Ctx] = { + via(flow[Out, Ctx].mapAsyncPartitionedUnordered(parallelism)(pair => partitioner(pair._1)) { + (pair, partition) => + f(pair._1, partition).map((_, pair._2))(ExecutionContexts.parasitic) + }) + } + /** * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.collect]]. * diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala index 0c55691475a..cd33cfb799e 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala @@ -99,34 +99,6 @@ final class Source[+Out, +Mat]( override def mapMaterializedValue[Mat2](f: Mat => Mat2): ReprMat[Out, Mat2] = new Source[Out, Mat2](traversalBuilder.transformMat(f.asInstanceOf[Any => Any]), shape) - /** - * Transforms this stream. Works very similarly to [[#mapAsync]] but with an additional - * partition step before the transform step. The transform function receives the an individual - * stream entry and the calculated partition value for that entry. - * - * @since 1.1.0 - * @see [[#mapAsync]] - * @see [[#mapAsyncPartitionedUnordered]] - */ - def mapAsyncPartitioned[T, P](parallelism: Int)( - extractPartition: Out => P)(f: (Out, P) => Future[T]): Source[T, Mat] = { - MapAsyncPartitioned.mapSourceOrdered(this, parallelism)(extractPartition)(f) - } - - /** - * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] but with an additional - * partition step before the transform step. The transform function receives the an individual - * stream entry and the calculated partition value for that entry. - * - * @since 1.1.0 - * @see [[#mapAsyncUnordered]] - * @see [[#mapAsyncPartitioned]] - */ - def mapAsyncPartitionedUnordered[T, P](parallelism: Int)( - extractPartition: Out => P)(f: (Out, P) => Future[T]): Source[T, Mat] = { - MapAsyncPartitioned.mapSourceUnordered(this, parallelism)(extractPartition)(f) - } - /** * Materializes this Source, immediately returning (1) its materialized value, and (2) a new Source * that can be used to consume elements from the newly materialized Source. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala index 47d2c14bed1..5ce8bcd752e 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala @@ -14,7 +14,6 @@ package org.apache.pekko.stream.scaladsl import scala.annotation.unchecked.uncheckedVariance -import scala.concurrent.Future import org.apache.pekko import pekko.stream._ @@ -78,34 +77,6 @@ final class SourceWithContext[+Out, +Ctx, +Mat] private[stream] (delegate: Sourc def mapMaterializedValue[Mat2](f: Mat => Mat2): SourceWithContext[Out, Ctx, Mat2] = new SourceWithContext(delegate.mapMaterializedValue(f)) - /** - * Transforms this stream. Works very similarly to [[#mapAsync]] but with an additional - * partition step before the transform step. The transform function receives the an individual - * stream entry and the calculated partition value for that entry. - * - * @since 1.1.0 - * @see [[#mapAsync]] - * @see [[#mapAsyncPartitionedUnordered]] - */ - def mapAsyncPartitioned[T, P](parallelism: Int)( - extractPartition: Out => P)(f: (Out, P) => Future[T]): SourceWithContext[T, Ctx, Mat] = { - MapAsyncPartitioned.mapSourceWithContextOrdered(this, parallelism)(extractPartition)(f) - } - - /** - * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] but with an additional - * partition step before the transform step. The transform function receives the an individual - * stream entry and the calculated partition value for that entry. - * - * @since 1.1.0 - * @see [[#mapAsyncUnordered]] - * @see [[#mapAsyncPartitioned]] - */ - def mapAsyncPartitionedUnordered[T, P](parallelism: Int)( - extractPartition: Out => P)(f: (Out, P) => Future[T]): SourceWithContext[T, Ctx, Mat] = { - MapAsyncPartitioned.mapSourceWithContextUnordered(this, parallelism)(extractPartition)(f) - } - /** * Connect this [[pekko.stream.scaladsl.SourceWithContext]] to a [[pekko.stream.scaladsl.Sink]], * concatenating the processing steps of both.