diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/BoundedSourceQueueSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/BoundedSourceQueueSpec.scala index 9989c816456..01bac3691a4 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/BoundedSourceQueueSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/BoundedSourceQueueSpec.scala @@ -45,6 +45,7 @@ class BoundedSourceQueueSpec extends StreamSpec("""pekko.loglevel = debug } queue.complete() + assert(queue.isCompleted) val subIt = Iterator.continually(sub.requestNext()) subIt.zip(elements.iterator).foreach { @@ -81,6 +82,7 @@ class BoundedSourceQueueSpec extends StreamSpec("""pekko.loglevel = debug val queue = Source.queue[Int](1).toMat(Sink.fromSubscriber(sub))(Keep.left).run() queue.complete() + assert(queue.isCompleted) assertThrows[IllegalStateException](queue.complete()) } @@ -89,6 +91,7 @@ class BoundedSourceQueueSpec extends StreamSpec("""pekko.loglevel = debug val queue = Source.queue[Int](1).toMat(Sink.fromSubscriber(sub))(Keep.left).run() queue.fail(ex) + assert(queue.isCompleted) assertThrows[IllegalStateException](queue.fail(ex)) } @@ -98,6 +101,7 @@ class BoundedSourceQueueSpec extends StreamSpec("""pekko.loglevel = debug Source.queue[Int](10).toMat(Sink.fromSubscriber(sub))(Keep.left).run() queue.complete() + assert(queue.isCompleted) queue.offer(1) should be(QueueOfferResult.QueueClosed) sub.expectSubscriptionAndComplete() } @@ -108,6 +112,7 @@ class BoundedSourceQueueSpec extends StreamSpec("""pekko.loglevel = debug Source.queue[Int](10).toMat(Sink.fromSubscriber(sub))(Keep.left).run() queue.fail(ex) + assert(queue.isCompleted) queue.offer(1) should be(QueueOfferResult.Failure(ex)) sub.request(1) sub.expectError(ex) @@ -180,6 +185,7 @@ class BoundedSourceQueueSpec extends StreamSpec("""pekko.loglevel = debug // where enqueueing an element concurrently with Done reaching the stage can lead to Enqueued being returned // but the element dropped (no guarantee of entering stream as documented in BoundedSourceQueue.offer queue.complete() + assert(queue.isCompleted) result.futureValue should be(counter.get()) } diff --git a/stream/src/main/mima-filters/1.0.x.backwards.excludes/pr-1374-boundedsourcequeue-iscompleted-classes.backwards.excludes b/stream/src/main/mima-filters/1.0.x.backwards.excludes/pr-1374-boundedsourcequeue-iscompleted-classes.backwards.excludes new file mode 100644 index 00000000000..a190b6293fd --- /dev/null +++ b/stream/src/main/mima-filters/1.0.x.backwards.excludes/pr-1374-boundedsourcequeue-iscompleted-classes.backwards.excludes @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Add BoundedSourceQueue.isCompleted method +ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.stream.BoundedSourceQueue.isCompleted") \ No newline at end of file diff --git a/stream/src/main/scala/org/apache/pekko/stream/BoundedSourceQueue.scala b/stream/src/main/scala/org/apache/pekko/stream/BoundedSourceQueue.scala index d5b2377e6d2..cf55dee581c 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/BoundedSourceQueue.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/BoundedSourceQueue.scala @@ -38,6 +38,13 @@ trait BoundedSourceQueue[T] { */ def complete(): Unit + /** + * Returns true if the stream has been completed, either normally or with failure. + * + * @since 1.1.0 + */ + def isCompleted: Boolean + /** * Completes the stream with a failure. */ diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/BoundedSourceQueue.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/BoundedSourceQueue.scala index a904145e8a3..5fd37481ea8 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/BoundedSourceQueue.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/BoundedSourceQueue.scala @@ -114,7 +114,7 @@ import pekko.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, Ou } object Mat extends BoundedSourceQueue[T] { - override def offer(elem: T): QueueOfferResult = state.get() match { + final override def offer(elem: T): QueueOfferResult = state.get() match { case Running | NeedsActivation => if (queue.add(elem)) { // need to query state again because stage might have switched from Running -> NeedsActivation only after @@ -130,21 +130,23 @@ import pekko.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, Ou case Done(result) => result } - override def complete(): Unit = { + final override def complete(): Unit = { if (state.get().isInstanceOf[Done]) throw new IllegalStateException("The queue has already been completed.") if (setDone(Done(QueueOfferResult.QueueClosed))) Logic.callback.invoke(()) // if this thread won the completion race also schedule an async callback } - override def fail(ex: Throwable): Unit = { + final override def isCompleted: Boolean = state.get().isInstanceOf[Done] + + final override def fail(ex: Throwable): Unit = { if (state.get().isInstanceOf[Done]) throw new IllegalStateException("The queue has already been completed.") if (setDone(Done(QueueOfferResult.Failure(ex)))) Logic.callback.invoke(()) // if this thread won the completion race also schedule an async callback } - override def size(): Int = queue.size() + final override def size(): Int = queue.size() } // some state transition helpers