diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala index 06650a7b77..05f79cfa78 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala @@ -248,7 +248,7 @@ class SourceWithContextSpec extends StreamSpec { )(Keep.none) .runWith(TestSink.probe[(Option[Int], Int)]) .request(4) - .expectNext((Some(1), 1), (None, 2), (None, 3), (Some(4), 4)) + .expectNextN(List((Some(1), 1), (None, 2), (None, 3), (Some(4), 4))) .expectComplete() } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala index 48f6a286ac..a88886a97c 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala @@ -68,7 +68,7 @@ object SourceWithContext { } val filterUnavailable = Flow[(Option[SOut], Ctx)].collect { - case (None, ctx) => (Option.empty[FOut], ctx) + case x @ (None, _) => x.asInstanceOf[(Option[FOut], Ctx)] } val mapIntoOption = Flow[(FOut, Ctx)].map {