Skip to content

Commit

Permalink
Fix uncurried Pekko Stream ops in javadsl
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich committed Jul 21, 2024
1 parent cd55767 commit 0cfec6f
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 43 deletions.
24 changes: 12 additions & 12 deletions stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1254,7 +1254,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* '''Cancels when''' downstream cancels
*/
def groupedWeighted(minWeight: Long)(
def groupedWeighted(minWeight: Long,
costFn: java.util.function.Function[Out, java.lang.Long]): javadsl.Flow[In, java.util.List[Out], Mat] =
new Flow(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava)) // TODO optimize to one step

Expand Down Expand Up @@ -1311,7 +1311,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]
*/
def limitWeighted(n: Long)(costFn: function.Function[Out, java.lang.Long]): javadsl.Flow[In, Out, Mat] = {
def limitWeighted(n: Long, costFn: function.Function[Out, java.lang.Long]): javadsl.Flow[In, Out, Mat] = {
new Flow(delegate.limitWeighted(n)(costFn.apply))
}

Expand Down Expand Up @@ -1355,7 +1355,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* '''Cancels when''' downstream cancels
*/
def scan[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Flow[In, T, Mat] =
def scan[T](zero: T, f: function.Function2[T, Out, T]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.scan(zero)(f.apply))

/**
Expand Down Expand Up @@ -1386,7 +1386,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* See also [[#scan]]
*/
def scanAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Flow[In, T, Mat] =
def scanAsync[T](zero: T, f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.scanAsync(zero) { (out, in) =>
f(out, in).asScala
})
Expand All @@ -1412,7 +1412,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* '''Cancels when''' downstream cancels
*/
def fold[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Flow[In, T, Mat] =
def fold[T](zero: T, f: function.Function2[T, Out, T]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.fold(zero)(f.apply))

/**
Expand Down Expand Up @@ -1464,7 +1464,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* '''Cancels when''' downstream cancels
*/
def foldAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Flow[In, T, Mat] =
def foldAsync[T](zero: T, f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.foldAsync(zero) { (out, in) =>
f(out, in).asScala
})
Expand Down Expand Up @@ -2636,7 +2636,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
@deprecated(
"Use .withAttributes(ActorAttributes.supervisionStrategy(equivalentDecider)) rather than a SubstreamCancelStrategy",
since = "1.1.0")
def splitWhen(substreamCancelStrategy: SubstreamCancelStrategy)(p: function.Predicate[Out]): SubFlow[In, Out, Mat] =
def splitWhen(substreamCancelStrategy: SubstreamCancelStrategy, p: function.Predicate[Out]): SubFlow[In, Out, Mat] =
new SubFlow(delegate.splitWhen(substreamCancelStrategy)(p.test))

/**
Expand Down Expand Up @@ -2697,7 +2697,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
@deprecated(
"Use .withAttributes(ActorAttributes.supervisionStrategy(equivalentDecider)) rather than a SubstreamCancelStrategy",
since = "1.1.0")
def splitAfter(substreamCancelStrategy: SubstreamCancelStrategy)(p: function.Predicate[Out]): SubFlow[In, Out, Mat] =
def splitAfter(substreamCancelStrategy: SubstreamCancelStrategy, p: function.Predicate[Out]): SubFlow[In, Out, Mat] =
new SubFlow(delegate.splitAfter(substreamCancelStrategy)(p.test))

/**
Expand Down Expand Up @@ -3497,7 +3497,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* '''Cancels when''' downstream cancels
*/
def zipAllMat[U, Mat2, Mat3, A >: Out](that: Graph[SourceShape[U], Mat2], thisElem: A, thatElem: U)(
def zipAllMat[U, Mat2, Mat3, A >: Out](that: Graph[SourceShape[U], Mat2], thisElem: A, thatElem: U,
matF: (Mat, Mat2) => Mat3): Flow[In, Pair[A, U], Mat3] =
new Flow(delegate.zipAllMat(that, thisElem, thatElem)(matF).map { case (a, u) => Pair.create(a, u) })

Expand Down Expand Up @@ -4169,7 +4169,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
* from downstream. It fails with the same error when received error message from
* downstream.
*/
def watchTermination[M]()(matF: function.Function2[Mat, CompletionStage[Done], M]): javadsl.Flow[In, Out, M] =
def watchTermination[M](matF: function.Function2[Mat, CompletionStage[Done], M]): javadsl.Flow[In, Out, M] =
new Flow(delegate.watchTermination()((left, right) => matF(left, right.asJava)))

/**
Expand All @@ -4180,7 +4180,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
* The `combine` function is used to combine the `FlowMonitor` with this flow's materialized value.
*/
@deprecated("Use monitor() or monitorMat(combine) instead", "Akka 2.5.17")
def monitor[M]()(combine: function.Function2[Mat, FlowMonitor[Out], M]): javadsl.Flow[In, Out, M] =
def monitor[M](combine: function.Function2[Mat, FlowMonitor[Out], M]): javadsl.Flow[In, Out, M] =
new Flow(delegate.monitorMat(combinerToScala(combine)))

/**
Expand Down Expand Up @@ -4504,7 +4504,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
* @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
*/
@ApiMayChange
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg])(
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg],
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
harvest: function.Function[Agg, Emit],
emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.Flow[In, Emit, Mat] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,8 @@ object Sink {
* normal end of the stream, or completed with `Failure` if there is a failure signaled in
* the stream.
*/
def foreachAsync[T](parallelism: Int)(
f: function.Function[T, CompletionStage[Void]]): Sink[T, CompletionStage[Done]] =
def foreachAsync[T](
parallelism: Int, f: function.Function[T, CompletionStage[Void]]): Sink[T, CompletionStage[Done]] =
new Sink(
scaladsl.Sink
.foreachAsync(parallelism)((x: T) => f(x).asScala.map(_ => ())(ExecutionContexts.parasitic))
Expand All @@ -225,7 +225,7 @@ object Sink {
@deprecated(
"Use `foreachAsync` instead, it allows you to choose how to run the procedure, by calling some other API returning a CompletionStage or using CompletableFuture.supplyAsync.",
since = "Akka 2.5.17")
def foreachParallel[T](parallel: Int)(f: function.Procedure[T])(
def foreachParallel[T](parallel: Int, f: function.Procedure[T],
ec: ExecutionContext): Sink[T, CompletionStage[Done]] =
new Sink(scaladsl.Sink.foreachParallel(parallel)(f.apply)(ec).toCompletionStage())

Expand Down
25 changes: 12 additions & 13 deletions stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1987,7 +1987,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* '''Cancels when''' downstream cancels
*/
def zipAllMat[U, Mat2, Mat3, A >: Out](that: Graph[SourceShape[U], Mat2], thisElem: A, thatElem: U)(
def zipAllMat[U, Mat2, Mat3, A >: Out](that: Graph[SourceShape[U], Mat2], thisElem: A, thatElem: U,
matF: (Mat, Mat2) => Mat3): Source[Pair[A, U], Mat3] =
new Source(delegate.zipAllMat(that, thisElem, thatElem)(matF).map { case (a, u) => Pair.create(a, u) })

Expand Down Expand Up @@ -3002,7 +3002,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* '''Cancels when''' downstream cancels
*/
def groupedWeighted(minWeight: Long)(costFn: java.util.function.Function[Out, java.lang.Long])
def groupedWeighted(minWeight: Long, costFn: java.util.function.Function[Out, java.lang.Long])
: javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] =
new Source(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava))

Expand Down Expand Up @@ -3055,9 +3055,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]
*/
def limitWeighted(n: Long)(costFn: function.Function[Out, java.lang.Long]): javadsl.Source[Out, Mat] = {
def limitWeighted(n: Long, costFn: function.Function[Out, java.lang.Long]): javadsl.Source[Out, Mat] =
new Source(delegate.limitWeighted(n)(costFn.apply))
}

/**
* Apply a sliding window over the stream and return the windows as groups of elements, with the last group
Expand Down Expand Up @@ -3099,7 +3098,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* '''Cancels when''' downstream cancels
*/
def scan[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Source[T, Mat] =
def scan[T](zero: T, f: function.Function2[T, Out, T]): javadsl.Source[T, Mat] =
new Source(delegate.scan(zero)(f.apply))

/**
Expand Down Expand Up @@ -3130,7 +3129,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* See also [[FlowOps#scan]]
*/
def scanAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Source[T, Mat] =
def scanAsync[T](zero: T, f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Source[T, Mat] =
new Source(delegate.scanAsync(zero) { (out, in) =>
f(out, in).asScala
})
Expand All @@ -3156,7 +3155,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* '''Cancels when''' downstream cancels
*/
def fold[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Source[T, Mat] =
def fold[T](zero: T, f: function.Function2[T, Out, T]): javadsl.Source[T, Mat] =
new Source(delegate.fold(zero)(f.apply))

/**
Expand Down Expand Up @@ -3206,7 +3205,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* '''Cancels when''' downstream cancels
*/
def foldAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Source[T, Mat] =
def foldAsync[T](zero: T, f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Source[T, Mat] =
new Source(delegate.foldAsync(zero) { (out, in) =>
f(out, in).asScala
})
Expand Down Expand Up @@ -4142,7 +4141,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
@deprecated(
"Use .withAttributes(ActorAttributes.supervisionStrategy(equivalentDecider)) rather than a SubstreamCancelStrategy",
since = "1.1.0")
def splitWhen(substreamCancelStrategy: SubstreamCancelStrategy)(p: function.Predicate[Out]): SubSource[Out, Mat] =
def splitWhen(substreamCancelStrategy: SubstreamCancelStrategy, p: function.Predicate[Out]): SubSource[Out, Mat] =
new SubSource(delegate.splitWhen(substreamCancelStrategy)(p.test))

/**
Expand Down Expand Up @@ -4202,7 +4201,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
@deprecated(
"Use .withAttributes(ActorAttributes.supervisionStrategy(equivalentDecider)) rather than a SubstreamCancelStrategy",
since = "1.1.0")
def splitAfter(substreamCancelStrategy: SubstreamCancelStrategy)(p: function.Predicate[Out]): SubSource[Out, Mat] =
def splitAfter(substreamCancelStrategy: SubstreamCancelStrategy, p: function.Predicate[Out]): SubSource[Out, Mat] =
new SubSource(delegate.splitAfter(substreamCancelStrategy)(p.test))

/**
Expand Down Expand Up @@ -4738,7 +4737,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
* from downstream. It fails with the same error when received error message from
* downstream.
*/
def watchTermination[M]()(matF: function.Function2[Mat, CompletionStage[Done], M]): javadsl.Source[Out, M] =
def watchTermination[M](matF: function.Function2[Mat, CompletionStage[Done], M]): javadsl.Source[Out, M] =
new Source(delegate.watchTermination()((left, right) => matF(left, right.asJava)))

/**
Expand All @@ -4748,7 +4747,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
* The `combine` function is used to combine the `FlowMonitor` with this flow's materialized value.
*/
@deprecated("Use monitor() or monitorMat(combine) instead", "Akka 2.5.17")
def monitor[M]()(combine: function.Function2[Mat, FlowMonitor[Out], M]): javadsl.Source[Out, M] =
def monitor[M](combine: function.Function2[Mat, FlowMonitor[Out], M]): javadsl.Source[Out, M] =
new Source(delegate.monitorMat(combinerToScala(combine)))

/**
Expand Down Expand Up @@ -5052,7 +5051,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
* @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
*/
@ApiMayChange
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg])(
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg],
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
harvest: function.Function[Agg, Emit],
emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.Source[Emit, Mat] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ object StreamConverters {
* Note that a flow can be materialized multiple times, so the function producing the ``Collector`` must be able
* to handle multiple invocations.
*/
def javaCollectorParallelUnordered[T, R](parallelism: Int)(
def javaCollectorParallelUnordered[T, R](parallelism: Int,
collector: function.Creator[Collector[T, _ <: Any, R]]): Sink[T, CompletionStage[R]] =
new Sink(
scaladsl.StreamConverters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ class SubFlow[In, Out, Mat](
*
* '''Cancels when''' downstream cancels
*/
def groupedWeighted(minWeight: Long)(
def groupedWeighted(minWeight: Long,
costFn: function.Function[Out, java.lang.Long]): SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] =
new SubFlow(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava)) // TODO optimize to one step

Expand Down Expand Up @@ -691,7 +691,7 @@ class SubFlow[In, Out, Mat](
*
* See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]
*/
def limitWeighted(n: Long)(costFn: function.Function[Out, java.lang.Long]): javadsl.SubFlow[In, Out, Mat] = {
def limitWeighted(n: Long, costFn: function.Function[Out, java.lang.Long]): javadsl.SubFlow[In, Out, Mat] = {
new SubFlow(delegate.limitWeighted(n)(costFn.apply))
}

Expand Down Expand Up @@ -735,7 +735,7 @@ class SubFlow[In, Out, Mat](
*
* '''Cancels when''' downstream cancels
*/
def scan[T](zero: T)(f: function.Function2[T, Out, T]): SubFlow[In, T, Mat] =
def scan[T](zero: T, f: function.Function2[T, Out, T]): SubFlow[In, T, Mat] =
new SubFlow(delegate.scan(zero)(f.apply))

/**
Expand Down Expand Up @@ -766,7 +766,7 @@ class SubFlow[In, Out, Mat](
*
* See also [[#scan]]
*/
def scanAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): SubFlow[In, T, Mat] =
def scanAsync[T](zero: T, f: function.Function2[T, Out, CompletionStage[T]]): SubFlow[In, T, Mat] =
new SubFlow(delegate.scanAsync(zero) { (out, in) =>
f(out, in).asScala
})
Expand All @@ -792,7 +792,7 @@ class SubFlow[In, Out, Mat](
*
* '''Cancels when''' downstream cancels
*/
def fold[T](zero: T)(f: function.Function2[T, Out, T]): SubFlow[In, T, Mat] =
def fold[T](zero: T, f: function.Function2[T, Out, T]): SubFlow[In, T, Mat] =
new SubFlow(delegate.fold(zero)(f.apply))

/**
Expand Down Expand Up @@ -3031,7 +3031,7 @@ class SubFlow[In, Out, Mat](
* @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
*/
@ApiMayChange
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg])(
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg],
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
harvest: function.Function[Agg, Emit],
emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.SubFlow[In, Emit, Mat] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -629,8 +629,8 @@ class SubSource[Out, Mat](
*
* '''Cancels when''' downstream cancels
*/
def groupedWeighted(minWeight: Long)(
costFn: function.Function[Out, java.lang.Long]): SubSource[java.util.List[Out @uncheckedVariance], Mat] =
def groupedWeighted(minWeight: Long, costFn: function.Function[Out, java.lang.Long])
: SubSource[java.util.List[Out @uncheckedVariance], Mat] =
new SubSource(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava)) // TODO optimize to one step

/**
Expand Down Expand Up @@ -697,7 +697,7 @@ class SubSource[Out, Mat](
*
* See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]
*/
def limitWeighted(n: Long)(costFn: function.Function[Out, java.lang.Long]): javadsl.SubSource[Out, Mat] = {
def limitWeighted(n: Long, costFn: function.Function[Out, java.lang.Long]): javadsl.SubSource[Out, Mat] = {
new SubSource(delegate.limitWeighted(n)(costFn.apply))
}

Expand Down Expand Up @@ -726,7 +726,7 @@ class SubSource[Out, Mat](
*
* '''Cancels when''' downstream cancels
*/
def scan[T](zero: T)(f: function.Function2[T, Out, T]): SubSource[T, Mat] =
def scan[T](zero: T, f: function.Function2[T, Out, T]): SubSource[T, Mat] =
new SubSource(delegate.scan(zero)(f.apply))

/**
Expand Down Expand Up @@ -757,7 +757,7 @@ class SubSource[Out, Mat](
*
* See also [[#scan]]
*/
def scanAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): SubSource[T, Mat] =
def scanAsync[T](zero: T, f: function.Function2[T, Out, CompletionStage[T]]): SubSource[T, Mat] =
new SubSource(delegate.scanAsync(zero) { (out, in) =>
f(out, in).asScala
})
Expand All @@ -783,7 +783,7 @@ class SubSource[Out, Mat](
*
* '''Cancels when''' downstream cancels
*/
def fold[T](zero: T)(f: function.Function2[T, Out, T]): SubSource[T, Mat] =
def fold[T](zero: T, f: function.Function2[T, Out, T]): SubSource[T, Mat] =
new SubSource(delegate.fold(zero)(f.apply))

/**
Expand Down Expand Up @@ -831,7 +831,7 @@ class SubSource[Out, Mat](
*
* '''Cancels when''' downstream cancels
*/
def foldAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): SubSource[T, Mat] =
def foldAsync[T](zero: T, f: function.Function2[T, Out, CompletionStage[T]]): SubSource[T, Mat] =
new SubSource(delegate.foldAsync(zero) { (out, in) =>
f(out, in).asScala
})
Expand Down Expand Up @@ -3002,7 +3002,7 @@ class SubSource[Out, Mat](
* @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
*/
@ApiMayChange
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg])(
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg],
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
harvest: function.Function[Agg, Emit],
emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.SubSource[Emit, Mat] =
Expand Down

0 comments on commit 0cfec6f

Please sign in to comment.