diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/BoundedSourceQueueSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/BoundedSourceQueueSpec.scala index 9989c816456..01bac3691a4 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/BoundedSourceQueueSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/BoundedSourceQueueSpec.scala @@ -45,6 +45,7 @@ class BoundedSourceQueueSpec extends StreamSpec("""pekko.loglevel = debug } queue.complete() + assert(queue.isCompleted) val subIt = Iterator.continually(sub.requestNext()) subIt.zip(elements.iterator).foreach { @@ -81,6 +82,7 @@ class BoundedSourceQueueSpec extends StreamSpec("""pekko.loglevel = debug val queue = Source.queue[Int](1).toMat(Sink.fromSubscriber(sub))(Keep.left).run() queue.complete() + assert(queue.isCompleted) assertThrows[IllegalStateException](queue.complete()) } @@ -89,6 +91,7 @@ class BoundedSourceQueueSpec extends StreamSpec("""pekko.loglevel = debug val queue = Source.queue[Int](1).toMat(Sink.fromSubscriber(sub))(Keep.left).run() queue.fail(ex) + assert(queue.isCompleted) assertThrows[IllegalStateException](queue.fail(ex)) } @@ -98,6 +101,7 @@ class BoundedSourceQueueSpec extends StreamSpec("""pekko.loglevel = debug Source.queue[Int](10).toMat(Sink.fromSubscriber(sub))(Keep.left).run() queue.complete() + assert(queue.isCompleted) queue.offer(1) should be(QueueOfferResult.QueueClosed) sub.expectSubscriptionAndComplete() } @@ -108,6 +112,7 @@ class BoundedSourceQueueSpec extends StreamSpec("""pekko.loglevel = debug Source.queue[Int](10).toMat(Sink.fromSubscriber(sub))(Keep.left).run() queue.fail(ex) + assert(queue.isCompleted) queue.offer(1) should be(QueueOfferResult.Failure(ex)) sub.request(1) sub.expectError(ex) @@ -180,6 +185,7 @@ class BoundedSourceQueueSpec extends StreamSpec("""pekko.loglevel = debug // where enqueueing an element concurrently with Done reaching the stage can lead to Enqueued being returned // but the element dropped (no guarantee of entering stream as documented in BoundedSourceQueue.offer queue.complete() + assert(queue.isCompleted) result.futureValue should be(counter.get()) } diff --git a/stream/src/main/scala/org/apache/pekko/stream/BoundedSourceQueue.scala b/stream/src/main/scala/org/apache/pekko/stream/BoundedSourceQueue.scala index d5b2377e6d2..cf55dee581c 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/BoundedSourceQueue.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/BoundedSourceQueue.scala @@ -38,6 +38,13 @@ trait BoundedSourceQueue[T] { */ def complete(): Unit + /** + * Returns true if the stream has been completed, either normally or with failure. + * + * @since 1.1.0 + */ + def isCompleted: Boolean + /** * Completes the stream with a failure. */ diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/BoundedSourceQueue.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/BoundedSourceQueue.scala index a904145e8a3..5fd37481ea8 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/BoundedSourceQueue.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/BoundedSourceQueue.scala @@ -114,7 +114,7 @@ import pekko.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, Ou } object Mat extends BoundedSourceQueue[T] { - override def offer(elem: T): QueueOfferResult = state.get() match { + final override def offer(elem: T): QueueOfferResult = state.get() match { case Running | NeedsActivation => if (queue.add(elem)) { // need to query state again because stage might have switched from Running -> NeedsActivation only after @@ -130,21 +130,23 @@ import pekko.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, Ou case Done(result) => result } - override def complete(): Unit = { + final override def complete(): Unit = { if (state.get().isInstanceOf[Done]) throw new IllegalStateException("The queue has already been completed.") if (setDone(Done(QueueOfferResult.QueueClosed))) Logic.callback.invoke(()) // if this thread won the completion race also schedule an async callback } - override def fail(ex: Throwable): Unit = { + final override def isCompleted: Boolean = state.get().isInstanceOf[Done] + + final override def fail(ex: Throwable): Unit = { if (state.get().isInstanceOf[Done]) throw new IllegalStateException("The queue has already been completed.") if (setDone(Done(QueueOfferResult.Failure(ex)))) Logic.callback.invoke(()) // if this thread won the completion race also schedule an async callback } - override def size(): Int = queue.size() + final override def size(): Int = queue.size() } // some state transition helpers