From f9ad446a58dae77ccdec52719c5235e02ce8c770 Mon Sep 17 00:00:00 2001 From: Matthew de Detrich Date: Sun, 21 Jul 2024 08:53:13 +0200 Subject: [PATCH] Fix uncurried stream ops in javadsl --- .../apache/pekko/stream/javadsl/Flow.scala | 24 +++++++++--------- .../apache/pekko/stream/javadsl/Graph.scala | 2 +- .../apache/pekko/stream/javadsl/Sink.scala | 4 +-- .../apache/pekko/stream/javadsl/Source.scala | 25 +++++++++---------- .../stream/javadsl/StreamConverters.scala | 2 +- .../apache/pekko/stream/javadsl/SubFlow.scala | 14 +++++------ .../pekko/stream/javadsl/SubSource.scala | 14 +++++------ 7 files changed, 42 insertions(+), 43 deletions(-) diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 356c7e44c5f..1963a97f584 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -1254,7 +1254,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels */ - def groupedWeighted(minWeight: Long)( + def groupedWeighted(minWeight: Long, costFn: java.util.function.Function[Out, java.lang.Long]): javadsl.Flow[In, java.util.List[Out], Mat] = new Flow(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava)) // TODO optimize to one step @@ -1311,7 +1311,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]] */ - def limitWeighted(n: Long)(costFn: function.Function[Out, java.lang.Long]): javadsl.Flow[In, Out, Mat] = { + def limitWeighted(n: Long, costFn: function.Function[Out, java.lang.Long]): javadsl.Flow[In, Out, Mat] = { new Flow(delegate.limitWeighted(n)(costFn.apply)) } @@ -1355,7 +1355,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels */ - def scan[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Flow[In, T, Mat] = + def scan[T](zero: T, f: function.Function2[T, Out, T]): javadsl.Flow[In, T, Mat] = new Flow(delegate.scan(zero)(f.apply)) /** @@ -1386,7 +1386,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * See also [[#scan]] */ - def scanAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Flow[In, T, Mat] = + def scanAsync[T](zero: T, f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Flow[In, T, Mat] = new Flow(delegate.scanAsync(zero) { (out, in) => f(out, in).asScala }) @@ -1412,7 +1412,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels */ - def fold[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Flow[In, T, Mat] = + def fold[T](zero: T, f: function.Function2[T, Out, T]): javadsl.Flow[In, T, Mat] = new Flow(delegate.fold(zero)(f.apply)) /** @@ -1464,7 +1464,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels */ - def foldAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Flow[In, T, Mat] = + def foldAsync[T](zero: T, f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Flow[In, T, Mat] = new Flow(delegate.foldAsync(zero) { (out, in) => f(out, in).asScala }) @@ -2636,7 +2636,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr @deprecated( "Use .withAttributes(ActorAttributes.supervisionStrategy(equivalentDecider)) rather than a SubstreamCancelStrategy", since = "1.1.0") - def splitWhen(substreamCancelStrategy: SubstreamCancelStrategy)(p: function.Predicate[Out]): SubFlow[In, Out, Mat] = + def splitWhen(substreamCancelStrategy: SubstreamCancelStrategy, p: function.Predicate[Out]): SubFlow[In, Out, Mat] = new SubFlow(delegate.splitWhen(substreamCancelStrategy)(p.test)) /** @@ -2697,7 +2697,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr @deprecated( "Use .withAttributes(ActorAttributes.supervisionStrategy(equivalentDecider)) rather than a SubstreamCancelStrategy", since = "1.1.0") - def splitAfter(substreamCancelStrategy: SubstreamCancelStrategy)(p: function.Predicate[Out]): SubFlow[In, Out, Mat] = + def splitAfter(substreamCancelStrategy: SubstreamCancelStrategy, p: function.Predicate[Out]): SubFlow[In, Out, Mat] = new SubFlow(delegate.splitAfter(substreamCancelStrategy)(p.test)) /** @@ -3497,7 +3497,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels */ - def zipAllMat[U, Mat2, Mat3, A >: Out](that: Graph[SourceShape[U], Mat2], thisElem: A, thatElem: U)( + def zipAllMat[U, Mat2, Mat3, A >: Out](that: Graph[SourceShape[U], Mat2], thisElem: A, thatElem: U, matF: (Mat, Mat2) => Mat3): Flow[In, Pair[A, U], Mat3] = new Flow(delegate.zipAllMat(that, thisElem, thatElem)(matF).map { case (a, u) => Pair.create(a, u) }) @@ -4169,7 +4169,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * from downstream. It fails with the same error when received error message from * downstream. */ - def watchTermination[M]()(matF: function.Function2[Mat, CompletionStage[Done], M]): javadsl.Flow[In, Out, M] = + def watchTermination[M](matF: function.Function2[Mat, CompletionStage[Done], M]): javadsl.Flow[In, Out, M] = new Flow(delegate.watchTermination()((left, right) => matF(left, right.asJava))) /** @@ -4180,7 +4180,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * The `combine` function is used to combine the `FlowMonitor` with this flow's materialized value. */ @deprecated("Use monitor() or monitorMat(combine) instead", "Akka 2.5.17") - def monitor[M]()(combine: function.Function2[Mat, FlowMonitor[Out], M]): javadsl.Flow[In, Out, M] = + def monitor[M](combine: function.Function2[Mat, FlowMonitor[Out], M]): javadsl.Flow[In, Out, M] = new Flow(delegate.monitorMat(combinerToScala(combine))) /** @@ -4504,7 +4504,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval */ @ApiMayChange - def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg])( + def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg], aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]], harvest: function.Function[Agg, Emit], emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.Flow[In, Emit, Mat] = diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Graph.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Graph.scala index 6cec4cc7fb9..b486112c416 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Graph.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Graph.scala @@ -742,7 +742,7 @@ object GraphDSL extends GraphCreate { new GenericGraph(s, gbuilder.delegate.result(s)) } - final class Builder[+Mat]()(private[stream] implicit val delegate: scaladsl.GraphDSL.Builder[Mat]) { self => + final class Builder[+Mat](private[stream] implicit val delegate: scaladsl.GraphDSL.Builder[Mat]) { self => import pekko.stream.scaladsl.GraphDSL.Implicits._ /** diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala index b3b8a585279..4bd056d358d 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala @@ -204,7 +204,7 @@ object Sink { * normal end of the stream, or completed with `Failure` if there is a failure signaled in * the stream. */ - def foreachAsync[T](parallelism: Int)( + def foreachAsync[T](parallelism: Int, f: function.Function[T, CompletionStage[Void]]): Sink[T, CompletionStage[Done]] = new Sink( scaladsl.Sink @@ -225,7 +225,7 @@ object Sink { @deprecated( "Use `foreachAsync` instead, it allows you to choose how to run the procedure, by calling some other API returning a CompletionStage or using CompletableFuture.supplyAsync.", since = "Akka 2.5.17") - def foreachParallel[T](parallel: Int)(f: function.Procedure[T])( + def foreachParallel[T](parallel: Int, f: function.Procedure[T], ec: ExecutionContext): Sink[T, CompletionStage[Done]] = new Sink(scaladsl.Sink.foreachParallel(parallel)(f.apply)(ec).toCompletionStage()) diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index d3fbdfdfb63..bf384639d4f 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -1987,7 +1987,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels */ - def zipAllMat[U, Mat2, Mat3, A >: Out](that: Graph[SourceShape[U], Mat2], thisElem: A, thatElem: U)( + def zipAllMat[U, Mat2, Mat3, A >: Out](that: Graph[SourceShape[U], Mat2], thisElem: A, thatElem: U, matF: (Mat, Mat2) => Mat3): Source[Pair[A, U], Mat3] = new Source(delegate.zipAllMat(that, thisElem, thatElem)(matF).map { case (a, u) => Pair.create(a, u) }) @@ -3002,7 +3002,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels */ - def groupedWeighted(minWeight: Long)(costFn: java.util.function.Function[Out, java.lang.Long]) + def groupedWeighted(minWeight: Long, costFn: java.util.function.Function[Out, java.lang.Long]) : javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] = new Source(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava)) @@ -3055,9 +3055,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]] */ - def limitWeighted(n: Long)(costFn: function.Function[Out, java.lang.Long]): javadsl.Source[Out, Mat] = { + def limitWeighted(n: Long, costFn: function.Function[Out, java.lang.Long]): javadsl.Source[Out, Mat] = new Source(delegate.limitWeighted(n)(costFn.apply)) - } /** * Apply a sliding window over the stream and return the windows as groups of elements, with the last group @@ -3099,7 +3098,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels */ - def scan[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Source[T, Mat] = + def scan[T](zero: T, f: function.Function2[T, Out, T]): javadsl.Source[T, Mat] = new Source(delegate.scan(zero)(f.apply)) /** @@ -3130,7 +3129,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * See also [[FlowOps#scan]] */ - def scanAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Source[T, Mat] = + def scanAsync[T](zero: T, f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Source[T, Mat] = new Source(delegate.scanAsync(zero) { (out, in) => f(out, in).asScala }) @@ -3156,7 +3155,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels */ - def fold[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Source[T, Mat] = + def fold[T](zero: T, f: function.Function2[T, Out, T]): javadsl.Source[T, Mat] = new Source(delegate.fold(zero)(f.apply)) /** @@ -3206,7 +3205,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels */ - def foldAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Source[T, Mat] = + def foldAsync[T](zero: T, f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Source[T, Mat] = new Source(delegate.foldAsync(zero) { (out, in) => f(out, in).asScala }) @@ -4142,7 +4141,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ @deprecated( "Use .withAttributes(ActorAttributes.supervisionStrategy(equivalentDecider)) rather than a SubstreamCancelStrategy", since = "1.1.0") - def splitWhen(substreamCancelStrategy: SubstreamCancelStrategy)(p: function.Predicate[Out]): SubSource[Out, Mat] = + def splitWhen(substreamCancelStrategy: SubstreamCancelStrategy, p: function.Predicate[Out]): SubSource[Out, Mat] = new SubSource(delegate.splitWhen(substreamCancelStrategy)(p.test)) /** @@ -4202,7 +4201,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ @deprecated( "Use .withAttributes(ActorAttributes.supervisionStrategy(equivalentDecider)) rather than a SubstreamCancelStrategy", since = "1.1.0") - def splitAfter(substreamCancelStrategy: SubstreamCancelStrategy)(p: function.Predicate[Out]): SubSource[Out, Mat] = + def splitAfter(substreamCancelStrategy: SubstreamCancelStrategy, p: function.Predicate[Out]): SubSource[Out, Mat] = new SubSource(delegate.splitAfter(substreamCancelStrategy)(p.test)) /** @@ -4738,7 +4737,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * from downstream. It fails with the same error when received error message from * downstream. */ - def watchTermination[M]()(matF: function.Function2[Mat, CompletionStage[Done], M]): javadsl.Source[Out, M] = + def watchTermination[M](matF: function.Function2[Mat, CompletionStage[Done], M]): javadsl.Source[Out, M] = new Source(delegate.watchTermination()((left, right) => matF(left, right.asJava))) /** @@ -4748,7 +4747,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * The `combine` function is used to combine the `FlowMonitor` with this flow's materialized value. */ @deprecated("Use monitor() or monitorMat(combine) instead", "Akka 2.5.17") - def monitor[M]()(combine: function.Function2[Mat, FlowMonitor[Out], M]): javadsl.Source[Out, M] = + def monitor[M](combine: function.Function2[Mat, FlowMonitor[Out], M]): javadsl.Source[Out, M] = new Source(delegate.monitorMat(combinerToScala(combine))) /** @@ -5052,7 +5051,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval */ @ApiMayChange - def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg])( + def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg], aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]], harvest: function.Function[Agg, Emit], emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.Source[Emit, Mat] = diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/StreamConverters.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/StreamConverters.scala index c400ca844d7..c3638d5b596 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/StreamConverters.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/StreamConverters.scala @@ -267,7 +267,7 @@ object StreamConverters { * Note that a flow can be materialized multiple times, so the function producing the ``Collector`` must be able * to handle multiple invocations. */ - def javaCollectorParallelUnordered[T, R](parallelism: Int)( + def javaCollectorParallelUnordered[T, R](parallelism: Int, collector: function.Creator[Collector[T, _ <: Any, R]]): Sink[T, CompletionStage[R]] = new Sink( scaladsl.StreamConverters diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index 5e3eea36e58..addcc8b07b4 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -638,7 +638,7 @@ class SubFlow[In, Out, Mat]( * * '''Cancels when''' downstream cancels */ - def groupedWeighted(minWeight: Long)( + def groupedWeighted(minWeight: Long, costFn: function.Function[Out, java.lang.Long]): SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] = new SubFlow(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava)) // TODO optimize to one step @@ -691,7 +691,7 @@ class SubFlow[In, Out, Mat]( * * See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]] */ - def limitWeighted(n: Long)(costFn: function.Function[Out, java.lang.Long]): javadsl.SubFlow[In, Out, Mat] = { + def limitWeighted(n: Long, costFn: function.Function[Out, java.lang.Long]): javadsl.SubFlow[In, Out, Mat] = { new SubFlow(delegate.limitWeighted(n)(costFn.apply)) } @@ -735,7 +735,7 @@ class SubFlow[In, Out, Mat]( * * '''Cancels when''' downstream cancels */ - def scan[T](zero: T)(f: function.Function2[T, Out, T]): SubFlow[In, T, Mat] = + def scan[T](zero: T, f: function.Function2[T, Out, T]): SubFlow[In, T, Mat] = new SubFlow(delegate.scan(zero)(f.apply)) /** @@ -766,7 +766,7 @@ class SubFlow[In, Out, Mat]( * * See also [[#scan]] */ - def scanAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): SubFlow[In, T, Mat] = + def scanAsync[T](zero: T, f: function.Function2[T, Out, CompletionStage[T]]): SubFlow[In, T, Mat] = new SubFlow(delegate.scanAsync(zero) { (out, in) => f(out, in).asScala }) @@ -792,7 +792,7 @@ class SubFlow[In, Out, Mat]( * * '''Cancels when''' downstream cancels */ - def fold[T](zero: T)(f: function.Function2[T, Out, T]): SubFlow[In, T, Mat] = + def fold[T](zero: T, f: function.Function2[T, Out, T]): SubFlow[In, T, Mat] = new SubFlow(delegate.fold(zero)(f.apply)) /** @@ -844,7 +844,7 @@ class SubFlow[In, Out, Mat]( * * '''Cancels when''' downstream cancels */ - def foldAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): SubFlow[In, T, Mat] = + def foldAsync[T](zero: T, f: function.Function2[T, Out, CompletionStage[T]]): SubFlow[In, T, Mat] = new SubFlow(delegate.foldAsync(zero) { (out, in) => f(out, in).asScala }) @@ -3031,7 +3031,7 @@ class SubFlow[In, Out, Mat]( * @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval */ @ApiMayChange - def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg])( + def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg], aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]], harvest: function.Function[Agg, Emit], emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.SubFlow[In, Emit, Mat] = diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index ee63fbfb93c..ad68e3579e6 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -629,7 +629,7 @@ class SubSource[Out, Mat]( * * '''Cancels when''' downstream cancels */ - def groupedWeighted(minWeight: Long)( + def groupedWeighted(minWeight: Long, costFn: function.Function[Out, java.lang.Long]): SubSource[java.util.List[Out @uncheckedVariance], Mat] = new SubSource(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava)) // TODO optimize to one step @@ -697,7 +697,7 @@ class SubSource[Out, Mat]( * * See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]] */ - def limitWeighted(n: Long)(costFn: function.Function[Out, java.lang.Long]): javadsl.SubSource[Out, Mat] = { + def limitWeighted(n: Long, costFn: function.Function[Out, java.lang.Long]): javadsl.SubSource[Out, Mat] = { new SubSource(delegate.limitWeighted(n)(costFn.apply)) } @@ -726,7 +726,7 @@ class SubSource[Out, Mat]( * * '''Cancels when''' downstream cancels */ - def scan[T](zero: T)(f: function.Function2[T, Out, T]): SubSource[T, Mat] = + def scan[T](zero: T, f: function.Function2[T, Out, T]): SubSource[T, Mat] = new SubSource(delegate.scan(zero)(f.apply)) /** @@ -757,7 +757,7 @@ class SubSource[Out, Mat]( * * See also [[#scan]] */ - def scanAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): SubSource[T, Mat] = + def scanAsync[T](zero: T, f: function.Function2[T, Out, CompletionStage[T]]): SubSource[T, Mat] = new SubSource(delegate.scanAsync(zero) { (out, in) => f(out, in).asScala }) @@ -783,7 +783,7 @@ class SubSource[Out, Mat]( * * '''Cancels when''' downstream cancels */ - def fold[T](zero: T)(f: function.Function2[T, Out, T]): SubSource[T, Mat] = + def fold[T](zero: T, f: function.Function2[T, Out, T]): SubSource[T, Mat] = new SubSource(delegate.fold(zero)(f.apply)) /** @@ -831,7 +831,7 @@ class SubSource[Out, Mat]( * * '''Cancels when''' downstream cancels */ - def foldAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): SubSource[T, Mat] = + def foldAsync[T](zero: T, f: function.Function2[T, Out, CompletionStage[T]]): SubSource[T, Mat] = new SubSource(delegate.foldAsync(zero) { (out, in) => f(out, in).asScala }) @@ -3002,7 +3002,7 @@ class SubSource[Out, Mat]( * @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval */ @ApiMayChange - def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg])( + def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg], aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]], harvest: function.Function[Agg, Emit], emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.SubSource[Emit, Mat] =