Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

zipWithIndex is broken for org.apache.pekko.stream.scaladsl.GraphDSL.Implicits.PortOps #1525

Closed
remyhaemmerle-da opened this issue Oct 9, 2024 · 6 comments
Assignees
Labels
bug Something isn't working

Comments

@remyhaemmerle-da
Copy link

remyhaemmerle-da commented Oct 9, 2024

On the one hand, org.apache.pekko.stream.scaladsl.GraphDSL.Implicits.PortOps inherits from org.apache.pekko.stream.scaladsl.Flow.

On the other hand, as of #591 the implementation of FlowOps#zipWithIndex calls FlowOps#withAttributes

 def zipWithIndex: Repr[(Out, Long)] =
    statefulMap(() => 0L)((index, out) =>
        (index + 1L, (out, index)), _ => None)
      .withAttributes(DefaultAttributes.zipWithIndex)

However PortOpsImpl the implementation of PortOps does not support withAttributes

  override def withAttributes(attr: Attributes): Repr[Out] = throw settingAttrNotSupported

Here is a small program that works with pekko 1.0.2 but crashes with 1.1.1

// adapted from https://doc.akka.io/docs/akka/current/stream/stream-graphs.html
import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.stream._
import pekko.stream.scaladsl._

object Example extends App {
  import GraphDSL.Implicits._

  implicit val system = ActorSystem("GraphAndShapeExampleSystem")
  implicit val materializer = ActorMaterializer()

  val pickMaxOfThree = GraphDSL.create() { implicit b =>
    import GraphDSL.Implicits._

    val zip1 = b.add(ZipWith[Int, Int, Int](math.max _))
    val zip2 = b.add(ZipWith[Int, Int, Int](math.max _))
    zip1.out ~> zip2.in0

    UniformFanInShape(zip2.out, zip1.in0, zip1.in1, zip2.in1)
  }

  val resultSink = Sink.foreach(println)

  val g = RunnableGraph.fromGraph(GraphDSL.createGraph(resultSink) { implicit b => sink =>
    import GraphDSL.Implicits._

    // importing the partial graph will return its shape (inlets & outlets)
    val pm3 = b.add(pickMaxOfThree)

    Source.single(1) ~> pm3.in(0)
    Source.single(2) ~> pm3.in(1)
    Source.single(3) ~> pm3.in(2)
    pm3.out.zipWithIndex ~> sink.in
    ClosedShape
  })

  g.run()
}
@mdedetrich
Copy link
Contributor

Assigning to @He-Pin since the change was done by him

@mdedetrich mdedetrich added the bug Something isn't working label Oct 9, 2024
@pjfanning
Copy link
Contributor

@remyhaemmerle-da Can you include the compile error or stack trace? It saves the volunteers a lot of time not to have to go and reproduce the issue.

@remyhaemmerle-da
Copy link
Author

remyhaemmerle-da commented Oct 9, 2024

Here is the stack trace, with pekko 1.1.1

[error] java.lang.UnsupportedOperationException: Cannot set attributes on chained ops from a junction output port
[error]         at org.apache.pekko.stream.scaladsl.GraphDSL$Implicits$PortOpsImpl.withAttributes(Graph.scala:1854)
[error]         at org.apache.pekko.stream.scaladsl.GraphDSL$Implicits$PortOpsImpl.withAttributes(Graph.scala:1845)
[error]         at org.apache.pekko.stream.scaladsl.FlowOps.zipWithIndex(Flow.scala:3310)
[error]         at org.apache.pekko.stream.scaladsl.FlowOps.zipWithIndex$(Flow.scala:3307)
[error]         at org.apache.pekko.stream.scaladsl.GraphDSL$Implicits$PortOpsImpl.zipWithIndex(Graph.scala:1845)
[error]         at Example$.$anonfun$g$2(Example.scala:33)
[error]         at org.apache.pekko.stream.scaladsl.GraphApply.createGraph(GraphApply.scala:53)
[error]         at org.apache.pekko.stream.scaladsl.GraphApply.createGraph$(GraphApply.scala:50)
[error]         at org.apache.pekko.stream.scaladsl.GraphDSL$.createGraph(Graph.scala:1589)
[error]         at Example$.delayedEndpoint$Example$1(Example.scala:24)
[error]         at Example$delayedInit$body.apply(Example.scala:6)
[error]         at scala.Function0.apply$mcV$sp(Function0.scala:42)
[error]         at scala.Function0.apply$mcV$sp$(Function0.scala:42)
[error]         at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
[error]         at scala.App.$anonfun$main$1(App.scala:98)
[error]         at scala.App.$anonfun$main$1$adapted(App.scala:98)
[error]         at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:619)
[error]         at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:617)
[error]         at scala.collection.AbstractIterable.foreach(Iterable.scala:935)
[error]         at scala.App.main(App.scala:98)
[error]         at scala.App.main$(App.scala:96)
[error]         at Example$.main(Example.scala:6)
[error]         at Example.main(Example.scala)
[error]         at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
[error]         at java.base/java.lang.reflect.Method.invoke(Method.java:580)
[error] stack trace is suppressed; run last Compile / run for the full output
[error] (Compile / run) java.lang.UnsupportedOperationException: Cannot set attributes on chained ops from a junction output port
[error] Total time: 7 s, completed Oct 9, 2024, 6:41:49 PM

@pjfanning
Copy link
Contributor

I plan to revert #591. It didn't lead to any major perf gain and it breaks this use case.

We can always revisit.

#591 was one of a few changes of this style made for v1.1.0 and I think we're going to have to check all those changes for similar issues.

@pjfanning
Copy link
Contributor

I still favour starting by reverting #591 but I think we could consider later following up and seeing if we can make PortOps support attributes instead of throwing UnsupportedOperationException.

new UnsupportedOperationException("Cannot set attributes on chained ops from a junction output port")

@pjfanning
Copy link
Contributor

#1526 was merged

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants