Skip to content

Commit

Permalink
=str Fold InHandler and outHandler for UniqueBidiKillSwitchStage.
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Dec 18, 2023
1 parent a159aee commit fecd517
Showing 1 changed file with 7 additions and 13 deletions.
20 changes: 7 additions & 13 deletions stream/src/main/scala/org/apache/pekko/stream/KillSwitch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -118,29 +118,23 @@ object KillSwitches {

val logic = new KillableGraphStageLogic(promise.future, shape) {

setHandler(shape.in1,
new InHandler {
setHandlers(shape.in1, shape.out1,
new InHandler with OutHandler {
override def onPush(): Unit = push(shape.out1, grab(shape.in1))
override def onUpstreamFinish(): Unit = complete(shape.out1)
override def onUpstreamFailure(ex: Throwable): Unit = fail(shape.out1, ex)
override def onPull(): Unit = pull(shape.in1)
override def onDownstreamFinish(cause: Throwable): Unit = cancel(shape.in1, cause)
})
setHandler(shape.in2,
new InHandler {

setHandlers(shape.in2, shape.out2,
new InHandler with OutHandler {
override def onPush(): Unit = push(shape.out2, grab(shape.in2))
override def onUpstreamFinish(): Unit = complete(shape.out2)
override def onUpstreamFailure(ex: Throwable): Unit = fail(shape.out2, ex)
})
setHandler(shape.out1,
new OutHandler {
override def onPull(): Unit = pull(shape.in1)
override def onDownstreamFinish(cause: Throwable): Unit = cancel(shape.in1, cause)
})
setHandler(shape.out2,
new OutHandler {
override def onPull(): Unit = pull(shape.in2)
override def onDownstreamFinish(cause: Throwable): Unit = cancel(shape.in2, cause)
})

}

(logic, switch)
Expand Down

0 comments on commit fecd517

Please sign in to comment.