Skip to content

Commit

Permalink
=test Add BeenCalledTimesGate in tests to make sure some method sho…
Browse files Browse the repository at this point in the history
…uld only be called specified times.
  • Loading branch information
He-Pin committed Sep 11, 2023
1 parent b37b5cd commit e784cd7
Showing 1 changed file with 102 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import pekko.stream.testkit.TestSubscriber
import pekko.stream.testkit.scaladsl.TestSink
import pekko.stream.testkit.scaladsl.TestSource

import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.nowarn
import scala.concurrent.Await
import scala.concurrent.Promise
Expand All @@ -35,12 +36,37 @@ class FlowStatefulMapSpec extends StreamSpec {

val ex = new Exception("TEST") with NoStackTrace

object BeenCalledTimesGate {
def apply(): BeenCalledTimesGate = new BeenCalledTimesGate(1)

def apply(nTimes: Int): BeenCalledTimesGate = new BeenCalledTimesGate(nTimes)
}

class BeenCalledTimesGate(nTimes: Int) {
private val beenCalled = new AtomicInteger(0)

def mark(): Unit = beenCalled.updateAndGet(current => {
if (current == nTimes) {
throw new IllegalStateException(s"Has been called:[$nTimes] times, should not be called anymore.")
} else current + 1
})

def ensure(): Unit = if (beenCalled.get() != nTimes) {
throw new IllegalStateException(s"Expected to be called:[$nTimes], but only be called:[$beenCalled]")
}
}

"A StatefulMap" must {
"work in the happy case" in {
val gate = BeenCalledTimesGate()
val sinkProb = Source(List(1, 2, 3, 4, 5))
.statefulMap(() => 0)((agg, elem) => {
(agg + elem, (agg, elem))
}, _ => None)
},
_ => {
gate.mark()
None
})
.runWith(TestSink.probe[(Int, Int)])
sinkProb.expectSubscription().request(6)
sinkProb
Expand All @@ -50,9 +76,11 @@ class FlowStatefulMapSpec extends StreamSpec {
.expectNext((6, 4))
.expectNext((10, 5))
.expectComplete()
gate.ensure()
}

"can remember the state when complete" in {
val gate = BeenCalledTimesGate()
val sinkProb = Source(1 to 10)
.statefulMap(() => List.empty[Int])(
(state, elem) => {
Expand All @@ -63,64 +91,91 @@ class FlowStatefulMapSpec extends StreamSpec {
else
(newState, Nil)
},
state => Some(state.reverse))
state => {
gate.mark()
Some(state.reverse)
})
.mapConcat(identity)
.runWith(TestSink.probe[Int])
sinkProb.expectSubscription().request(10)
sinkProb.expectNextN(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).expectComplete()
gate.ensure()
}

"be able to resume" in {
val gate = BeenCalledTimesGate()
val testSink = Source(List(1, 2, 3, 4, 5))
.statefulMap(() => 0)((agg, elem) => {
if (elem % 2 == 0)
throw ex
else
(agg + elem, (agg, elem))
}, _ => None)
},
_ => {
gate.mark()
None
})
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.runWith(TestSink.probe[(Int, Int)])

testSink.expectSubscription().request(5)
testSink.expectNext((0, 1)).expectNext((1, 3)).expectNext((4, 5)).expectComplete()
gate.ensure()
}

"be able to restart" in {
val gate = BeenCalledTimesGate(2)
val testSink = Source(List(1, 2, 3, 4, 5))
.statefulMap(() => 0)((agg, elem) => {
if (elem % 3 == 0)
throw ex
else
(agg + elem, (agg, elem))
}, _ => None)
},
_ => {
gate.mark()
None
})
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
.runWith(TestSink.probe[(Int, Int)])

testSink.expectSubscription().request(5)
testSink.expectNext((0, 1)).expectNext((1, 2)).expectNext((0, 4)).expectNext((4, 5)).expectComplete()
gate.ensure()
}

"be able to stop" in {
val gate = BeenCalledTimesGate()
val testSink = Source(List(1, 2, 3, 4, 5))
.statefulMap(() => 0)((agg, elem) => {
if (elem % 3 == 0)
throw ex
else
(agg + elem, (agg, elem))
}, _ => None)
},
_ => {
gate.mark()
None
})
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider))
.runWith(TestSink.probe[(Int, Int)])

testSink.expectSubscription().request(5)
testSink.expectNext((0, 1)).expectNext((1, 2)).expectError(ex)
gate.ensure()
}

"fail on upstream failure" in {
val gate = BeenCalledTimesGate()
val (testSource, testSink) = TestSource
.probe[Int]
.statefulMap(() => 0)((agg, elem) => {
(agg + elem, (agg, elem))
}, _ => None)
},
_ => {
gate.mark()
None
})
.toMat(TestSink.probe[(Int, Int)])(Keep.both)
.run()

Expand All @@ -135,12 +190,20 @@ class FlowStatefulMapSpec extends StreamSpec {
testSink.expectNext((6, 4))
testSource.sendError(ex)
testSink.expectError(ex)
gate.ensure()
}

"defer upstream failure and remember state" in {
val gate = BeenCalledTimesGate()
val (testSource, testSink) = TestSource
.probe[Int]
.statefulMap(() => 0)((agg, elem) => { (agg + elem, (agg, elem)) }, (state: Int) => Some((state, -1)))
.statefulMap(() => 0)((agg, elem) => {
(agg + elem, (agg, elem))
},
(state: Int) => {
gate.mark()
Some((state, -1))
})
.toMat(TestSink.probe[(Int, Int)])(Keep.both)
.run()

Expand All @@ -156,26 +219,31 @@ class FlowStatefulMapSpec extends StreamSpec {
testSource.sendError(ex)
testSink.expectNext((10, -1))
testSink.expectError(ex)
gate.ensure()
}

"cancel upstream when downstream cancel" in {
val gate = BeenCalledTimesGate()
val promise = Promise[Done]()
val testSource = TestSource
.probe[Int]
.statefulMap(() => 100)((agg, elem) => {
(agg + elem, (agg, elem))
},
(state: Int) => {
gate.mark()
promise.complete(Success(Done))
Some((state, -1))
})
.toMat(Sink.cancelled)(Keep.left)
.run()
testSource.expectSubscription().expectCancellation()
Await.result(promise.future, 3.seconds) shouldBe Done
gate.ensure()
}

"cancel upstream when downstream fail" in {
val gate = BeenCalledTimesGate()
val promise = Promise[Done]()
val testProb = TestSubscriber.probe[(Int, Int)]()
val testSource = TestSource
Expand All @@ -184,6 +252,7 @@ class FlowStatefulMapSpec extends StreamSpec {
(agg + elem, (agg, elem))
},
(state: Int) => {
gate.mark()
promise.complete(Success(Done))
Some((state, -1))
})
Expand All @@ -192,9 +261,11 @@ class FlowStatefulMapSpec extends StreamSpec {
testProb.cancel(ex)
testSource.expectCancellationWithCause(ex)
Await.result(promise.future, 3.seconds) shouldBe Done
gate.ensure()
}

"call its onComplete callback on abrupt materializer termination" in {
val gate = BeenCalledTimesGate()
@nowarn("msg=deprecated")
val mat = ActorMaterializer()
val promise = Promise[Done]()
Expand All @@ -203,16 +274,19 @@ class FlowStatefulMapSpec extends StreamSpec {
.single(1)
.statefulMap(() => -1)((_, elem) => (elem, elem),
_ => {
gate.mark()
promise.complete(Success(Done))
None
})
.runWith(Sink.never)(mat)
mat.shutdown()
matVal.failed.futureValue shouldBe a[AbruptStageTerminationException]
Await.result(promise.future, 3.seconds) shouldBe Done
gate.ensure()
}

"call its onComplete callback when stop" in {
val gate = BeenCalledTimesGate()
val promise = Promise[Done]()
Source
.single(1)
Expand All @@ -221,26 +295,35 @@ class FlowStatefulMapSpec extends StreamSpec {
(elem, elem)
},
_ => {
gate.mark()
promise.complete(Success(Done))
None
})
.runWith(Sink.ignore)
Await.result(promise.future, 3.seconds) shouldBe Done
gate.ensure()
}

"be able to be used as zipWithIndex" in {
val gate = BeenCalledTimesGate()
Source(List("A", "B", "C", "D"))
.statefulMap(() => 0L)((index, elem) => (index + 1, (elem, index)), _ => None)
.statefulMap(() => 0L)((index, elem) => (index + 1, (elem, index)),
_ => {
gate.mark()
None
})
.runWith(TestSink.probe[(String, Long)])
.request(4)
.expectNext(("A", 0L))
.expectNext(("B", 1L))
.expectNext(("C", 2L))
.expectNext(("D", 3L))
.expectComplete()
gate.ensure()
}

"be able to be used as bufferUntilChanged" in {
val gate = BeenCalledTimesGate()
val sink = TestSink.probe[List[String]]
Source("A" :: "B" :: "B" :: "C" :: "C" :: "C" :: "D" :: Nil)
.statefulMap(() => List.empty[String])(
Expand All @@ -249,7 +332,10 @@ class FlowStatefulMapSpec extends StreamSpec {
case head :: _ if head != elem => (elem :: Nil, buffer)
case _ => (elem :: buffer, Nil)
},
buffer => Some(buffer))
buffer => {
gate.mark()
Some(buffer)
})
.filter(_.nonEmpty)
.alsoTo(Sink.foreach(println))
.runWith(sink)
Expand All @@ -259,17 +345,22 @@ class FlowStatefulMapSpec extends StreamSpec {
.expectNext(List("C", "C", "C"))
.expectNext(List("D"))
.expectComplete()
gate.ensure()
}

"be able to be used as distinctUntilChanged" in {
val gate = BeenCalledTimesGate()
Source("A" :: "B" :: "B" :: "C" :: "C" :: "C" :: "D" :: Nil)
.statefulMap(() => Option.empty[String])(
(lastElement, elem) =>
lastElement match {
case Some(head) if head == elem => (Some(elem), None)
case _ => (Some(elem), Some(elem))
},
_ => None)
_ => {
gate.mark()
None
})
.collect { case Some(elem) => elem }
.runWith(TestSink.probe[String])
.request(4)
Expand All @@ -278,6 +369,7 @@ class FlowStatefulMapSpec extends StreamSpec {
.expectNext("C")
.expectNext("D")
.expectComplete()
gate.ensure()
}
}
}

0 comments on commit e784cd7

Please sign in to comment.