diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala index 831c1e3d078..022ac54bc02 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala @@ -62,6 +62,20 @@ class FlowRecoverWithSpec extends StreamSpec { .expectComplete() } + "recover with single source" in { + Source(1 to 4) + .map { a => + if (a == 3) throw ex else a + } + .recoverWith { case _: Throwable => Source.single(3) } + .runWith(TestSink[Int]()) + .request(2) + .expectNextN(1 to 2) + .request(1) + .expectNext(3) + .expectComplete() + } + "cancel substream if parent is terminated when there is a handler" in { Source(1 to 4) .map { a => diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala index 8acc1ca7c31..cd8e26de59e 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala @@ -2174,8 +2174,13 @@ private[pekko] object TakeWithin { case source: Graph[SourceShape[T] @unchecked, M @unchecked] if TraversalBuilder.isEmptySource(source) => completeStage() case other: Graph[SourceShape[T] @unchecked, M @unchecked] => - switchTo(other) - attempt += 1 + TraversalBuilder.getSingleSource(other) match { + case OptionVal.Some(singleSource) => + emit(out, singleSource.elem.asInstanceOf[T], () => completeStage()) + case _ => + switchTo(other) + attempt += 1 + } case _ => throw new IllegalStateException() // won't happen, compiler exhaustiveness check pleaser } } else