Skip to content

Commit

Permalink
feat: Optimize recoverWith stream operator for single source.
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Jan 9, 2024
1 parent 468aab0 commit ccce5c0
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,20 @@ class FlowRecoverWithSpec extends StreamSpec {
.expectComplete()
}

"recover with single source" in {
Source(1 to 4)
.map { a =>
if (a == 3) throw ex else a
}
.recoverWith { case _: Throwable => Source.single(3) }
.runWith(TestSink[Int]())
.request(2)
.expectNextN(1 to 2)
.request(1)
.expectNext(3)
.expectComplete()
}

"cancel substream if parent is terminated when there is a handler" in {
Source(1 to 4)
.map { a =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2174,8 +2174,13 @@ private[pekko] object TakeWithin {
case source: Graph[SourceShape[T] @unchecked, M @unchecked] if TraversalBuilder.isEmptySource(source) =>
completeStage()
case other: Graph[SourceShape[T] @unchecked, M @unchecked] =>
switchTo(other)
attempt += 1
TraversalBuilder.getSingleSource(other) match {
case OptionVal.Some(singleSource) =>
emit(out, singleSource.elem.asInstanceOf[T], () => completeStage())
case _ =>
switchTo(other)
attempt += 1
}
case _ => throw new IllegalStateException() // won't happen, compiler exhaustiveness check pleaser
}
} else
Expand Down

0 comments on commit ccce5c0

Please sign in to comment.