Skip to content

Commit

Permalink
chore: Add tests for not invoking onComplete twice for statefulMap …
Browse files Browse the repository at this point in the history
…operator.
  • Loading branch information
He-Pin committed Dec 25, 2023
1 parent 413e79f commit c78e2d7
Showing 1 changed file with 70 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@

package org.apache.pekko.stream.scaladsl

import java.util.concurrent.atomic.AtomicInteger

import scala.annotation.nowarn
import scala.concurrent.Await
import scala.concurrent.Promise
import scala.concurrent.duration.DurationInt
import scala.util.Success
import scala.util.control.NoStackTrace

import org.apache.pekko
import pekko.Done
import pekko.stream.AbruptStageTerminationException
Expand All @@ -21,16 +30,10 @@ import pekko.stream.ActorMaterializer
import pekko.stream.Supervision
import pekko.stream.testkit.StreamSpec
import pekko.stream.testkit.TestSubscriber
import pekko.stream.testkit.Utils.TE
import pekko.stream.testkit.scaladsl.TestSink
import pekko.stream.testkit.scaladsl.TestSource

import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.nowarn
import scala.concurrent.Await
import scala.concurrent.Promise
import scala.concurrent.duration.DurationInt
import scala.util.Success
import scala.util.control.NoStackTrace
import pekko.testkit.EventFilter

class FlowStatefulMapSpec extends StreamSpec {

Expand Down Expand Up @@ -371,5 +374,64 @@ class FlowStatefulMapSpec extends StreamSpec {
.expectComplete()
gate.ensure()
}

"will not call `onComplete` twice if `f` fail" in {
val closedCounter = new AtomicInteger(0)
val probe = Source
.repeat(1)
.statefulMap(() => "opening resource")(
(_, _) => throw TE("failing read"),
_ => {
closedCounter.incrementAndGet()
None
})
.runWith(TestSink.probe[String])

probe.request(1)
probe.expectError(TE("failing read"))
closedCounter.get() should ===(1)
}

"will not call `onComplete` twice if both `f` and `onComplete` fail" in {
val closedCounter = new AtomicInteger(0)
val probe = Source
.repeat(1)
.statefulMap(() => "opening resource")((_, _) => throw TE("failing read"),
_ => {
if (closedCounter.incrementAndGet() == 1) {
throw TE("boom")
}
None
})
.runWith(TestSink.probe[Int])

EventFilter[TE](occurrences = 1).intercept {
probe.request(1)
probe.expectError(TE("boom"))
}
closedCounter.get() should ===(1)
}

"will not call `onComplete` twice if `onComplete` fail on upstream complete" in {
val closedCounter = new AtomicInteger(0)
val (pub, sub) = TestSource[Int]()
.statefulMap(() => "opening resource")((state, value) => (state, value),
_ => {
closedCounter.incrementAndGet()
throw TE("boom")
})
.toMat(TestSink.probe[Int])(Keep.both)
.run()

EventFilter[TE](occurrences = 1).intercept {
sub.request(1)
pub.sendNext(1)
sub.expectNext(1)
sub.request(1)
pub.sendComplete()
sub.expectError(TE("boom"))
}
closedCounter.get() shouldBe 1
}
}
}

0 comments on commit c78e2d7

Please sign in to comment.