diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapSpec.scala index f9d09bdbdb8..28f22d642af 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapSpec.scala @@ -24,6 +24,7 @@ import pekko.stream.testkit.TestSubscriber import pekko.stream.testkit.scaladsl.TestSink import pekko.stream.testkit.scaladsl.TestSource +import java.util.concurrent.atomic.AtomicInteger import scala.annotation.nowarn import scala.concurrent.Await import scala.concurrent.Promise @@ -35,12 +36,37 @@ class FlowStatefulMapSpec extends StreamSpec { val ex = new Exception("TEST") with NoStackTrace + object BeenCalledTimesGate { + def apply(): BeenCalledTimesGate = new BeenCalledTimesGate(1) + + def apply(nTimes: Int): BeenCalledTimesGate = new BeenCalledTimesGate(nTimes) + } + + class BeenCalledTimesGate(nTimes: Int) { + private val beenCalled = new AtomicInteger(0) + + def mark(): Unit = beenCalled.updateAndGet(current => { + if (current == nTimes) { + throw new IllegalStateException(s"Has been called:[$nTimes] times, should not be called anymore.") + } else current + 1 + }) + + def ensure(): Unit = if (beenCalled.get() != nTimes) { + throw new IllegalStateException(s"Expected to be called:[$nTimes], but only be called:[$beenCalled]") + } + } + "A StatefulMap" must { "work in the happy case" in { + val gate = BeenCalledTimesGate() val sinkProb = Source(List(1, 2, 3, 4, 5)) .statefulMap(() => 0)((agg, elem) => { (agg + elem, (agg, elem)) - }, _ => None) + }, + _ => { + gate.mark() + None + }) .runWith(TestSink.probe[(Int, Int)]) sinkProb.expectSubscription().request(6) sinkProb @@ -50,9 +76,11 @@ class FlowStatefulMapSpec extends StreamSpec { .expectNext((6, 4)) .expectNext((10, 5)) .expectComplete() + gate.ensure() } "can remember the state when complete" in { + val gate = BeenCalledTimesGate() val sinkProb = Source(1 to 10) .statefulMap(() => List.empty[Int])( (state, elem) => { @@ -63,64 +91,91 @@ class FlowStatefulMapSpec extends StreamSpec { else (newState, Nil) }, - state => Some(state.reverse)) + state => { + gate.mark() + Some(state.reverse) + }) .mapConcat(identity) .runWith(TestSink.probe[Int]) sinkProb.expectSubscription().request(10) sinkProb.expectNextN(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).expectComplete() + gate.ensure() } "be able to resume" in { + val gate = BeenCalledTimesGate() val testSink = Source(List(1, 2, 3, 4, 5)) .statefulMap(() => 0)((agg, elem) => { if (elem % 2 == 0) throw ex else (agg + elem, (agg, elem)) - }, _ => None) + }, + _ => { + gate.mark() + None + }) .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) .runWith(TestSink.probe[(Int, Int)]) testSink.expectSubscription().request(5) testSink.expectNext((0, 1)).expectNext((1, 3)).expectNext((4, 5)).expectComplete() + gate.ensure() } "be able to restart" in { + val gate = BeenCalledTimesGate(2) val testSink = Source(List(1, 2, 3, 4, 5)) .statefulMap(() => 0)((agg, elem) => { if (elem % 3 == 0) throw ex else (agg + elem, (agg, elem)) - }, _ => None) + }, + _ => { + gate.mark() + None + }) .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) .runWith(TestSink.probe[(Int, Int)]) testSink.expectSubscription().request(5) testSink.expectNext((0, 1)).expectNext((1, 2)).expectNext((0, 4)).expectNext((4, 5)).expectComplete() + gate.ensure() } "be able to stop" in { + val gate = BeenCalledTimesGate() val testSink = Source(List(1, 2, 3, 4, 5)) .statefulMap(() => 0)((agg, elem) => { if (elem % 3 == 0) throw ex else (agg + elem, (agg, elem)) - }, _ => None) + }, + _ => { + gate.mark() + None + }) .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider)) .runWith(TestSink.probe[(Int, Int)]) testSink.expectSubscription().request(5) testSink.expectNext((0, 1)).expectNext((1, 2)).expectError(ex) + gate.ensure() } "fail on upstream failure" in { + val gate = BeenCalledTimesGate() val (testSource, testSink) = TestSource .probe[Int] .statefulMap(() => 0)((agg, elem) => { (agg + elem, (agg, elem)) - }, _ => None) + }, + _ => { + gate.mark() + None + }) .toMat(TestSink.probe[(Int, Int)])(Keep.both) .run() @@ -135,12 +190,20 @@ class FlowStatefulMapSpec extends StreamSpec { testSink.expectNext((6, 4)) testSource.sendError(ex) testSink.expectError(ex) + gate.ensure() } "defer upstream failure and remember state" in { + val gate = BeenCalledTimesGate() val (testSource, testSink) = TestSource .probe[Int] - .statefulMap(() => 0)((agg, elem) => { (agg + elem, (agg, elem)) }, (state: Int) => Some((state, -1))) + .statefulMap(() => 0)((agg, elem) => { + (agg + elem, (agg, elem)) + }, + (state: Int) => { + gate.mark() + Some((state, -1)) + }) .toMat(TestSink.probe[(Int, Int)])(Keep.both) .run() @@ -156,9 +219,11 @@ class FlowStatefulMapSpec extends StreamSpec { testSource.sendError(ex) testSink.expectNext((10, -1)) testSink.expectError(ex) + gate.ensure() } "cancel upstream when downstream cancel" in { + val gate = BeenCalledTimesGate() val promise = Promise[Done]() val testSource = TestSource .probe[Int] @@ -166,6 +231,7 @@ class FlowStatefulMapSpec extends StreamSpec { (agg + elem, (agg, elem)) }, (state: Int) => { + gate.mark() promise.complete(Success(Done)) Some((state, -1)) }) @@ -173,9 +239,11 @@ class FlowStatefulMapSpec extends StreamSpec { .run() testSource.expectSubscription().expectCancellation() Await.result(promise.future, 3.seconds) shouldBe Done + gate.ensure() } "cancel upstream when downstream fail" in { + val gate = BeenCalledTimesGate() val promise = Promise[Done]() val testProb = TestSubscriber.probe[(Int, Int)]() val testSource = TestSource @@ -184,6 +252,7 @@ class FlowStatefulMapSpec extends StreamSpec { (agg + elem, (agg, elem)) }, (state: Int) => { + gate.mark() promise.complete(Success(Done)) Some((state, -1)) }) @@ -192,9 +261,11 @@ class FlowStatefulMapSpec extends StreamSpec { testProb.cancel(ex) testSource.expectCancellationWithCause(ex) Await.result(promise.future, 3.seconds) shouldBe Done + gate.ensure() } "call its onComplete callback on abrupt materializer termination" in { + val gate = BeenCalledTimesGate() @nowarn("msg=deprecated") val mat = ActorMaterializer() val promise = Promise[Done]() @@ -203,6 +274,7 @@ class FlowStatefulMapSpec extends StreamSpec { .single(1) .statefulMap(() => -1)((_, elem) => (elem, elem), _ => { + gate.mark() promise.complete(Success(Done)) None }) @@ -210,9 +282,11 @@ class FlowStatefulMapSpec extends StreamSpec { mat.shutdown() matVal.failed.futureValue shouldBe a[AbruptStageTerminationException] Await.result(promise.future, 3.seconds) shouldBe Done + gate.ensure() } "call its onComplete callback when stop" in { + val gate = BeenCalledTimesGate() val promise = Promise[Done]() Source .single(1) @@ -221,16 +295,23 @@ class FlowStatefulMapSpec extends StreamSpec { (elem, elem) }, _ => { + gate.mark() promise.complete(Success(Done)) None }) .runWith(Sink.ignore) Await.result(promise.future, 3.seconds) shouldBe Done + gate.ensure() } "be able to be used as zipWithIndex" in { + val gate = BeenCalledTimesGate() Source(List("A", "B", "C", "D")) - .statefulMap(() => 0L)((index, elem) => (index + 1, (elem, index)), _ => None) + .statefulMap(() => 0L)((index, elem) => (index + 1, (elem, index)), + _ => { + gate.mark() + None + }) .runWith(TestSink.probe[(String, Long)]) .request(4) .expectNext(("A", 0L)) @@ -238,9 +319,11 @@ class FlowStatefulMapSpec extends StreamSpec { .expectNext(("C", 2L)) .expectNext(("D", 3L)) .expectComplete() + gate.ensure() } "be able to be used as bufferUntilChanged" in { + val gate = BeenCalledTimesGate() val sink = TestSink.probe[List[String]] Source("A" :: "B" :: "B" :: "C" :: "C" :: "C" :: "D" :: Nil) .statefulMap(() => List.empty[String])( @@ -249,7 +332,10 @@ class FlowStatefulMapSpec extends StreamSpec { case head :: _ if head != elem => (elem :: Nil, buffer) case _ => (elem :: buffer, Nil) }, - buffer => Some(buffer)) + buffer => { + gate.mark() + Some(buffer) + }) .filter(_.nonEmpty) .alsoTo(Sink.foreach(println)) .runWith(sink) @@ -259,9 +345,11 @@ class FlowStatefulMapSpec extends StreamSpec { .expectNext(List("C", "C", "C")) .expectNext(List("D")) .expectComplete() + gate.ensure() } "be able to be used as distinctUntilChanged" in { + val gate = BeenCalledTimesGate() Source("A" :: "B" :: "B" :: "C" :: "C" :: "C" :: "D" :: Nil) .statefulMap(() => Option.empty[String])( (lastElement, elem) => @@ -269,7 +357,10 @@ class FlowStatefulMapSpec extends StreamSpec { case Some(head) if head == elem => (Some(elem), None) case _ => (Some(elem), Some(elem)) }, - _ => None) + _ => { + gate.mark() + None + }) .collect { case Some(elem) => elem } .runWith(TestSink.probe[String]) .request(4) @@ -278,6 +369,7 @@ class FlowStatefulMapSpec extends StreamSpec { .expectNext("C") .expectNext("D") .expectComplete() + gate.ensure() } } }