Skip to content

Commit

Permalink
feat: Add isCompleted method to BoundedSourceQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Jun 18, 2024
1 parent ce9f5a0 commit e1ffb3c
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class BoundedSourceQueueSpec extends StreamSpec("""pekko.loglevel = debug
}

queue.complete()
assert(queue.isCompleted)

val subIt = Iterator.continually(sub.requestNext())
subIt.zip(elements.iterator).foreach {
Expand Down Expand Up @@ -81,6 +82,7 @@ class BoundedSourceQueueSpec extends StreamSpec("""pekko.loglevel = debug
val queue =
Source.queue[Int](1).toMat(Sink.fromSubscriber(sub))(Keep.left).run()
queue.complete()
assert(queue.isCompleted)
assertThrows[IllegalStateException](queue.complete())
}

Expand All @@ -89,6 +91,7 @@ class BoundedSourceQueueSpec extends StreamSpec("""pekko.loglevel = debug
val queue =
Source.queue[Int](1).toMat(Sink.fromSubscriber(sub))(Keep.left).run()
queue.fail(ex)
assert(queue.isCompleted)
assertThrows[IllegalStateException](queue.fail(ex))
}

Expand All @@ -98,6 +101,7 @@ class BoundedSourceQueueSpec extends StreamSpec("""pekko.loglevel = debug
Source.queue[Int](10).toMat(Sink.fromSubscriber(sub))(Keep.left).run()

queue.complete()
assert(queue.isCompleted)
queue.offer(1) should be(QueueOfferResult.QueueClosed)
sub.expectSubscriptionAndComplete()
}
Expand All @@ -108,6 +112,7 @@ class BoundedSourceQueueSpec extends StreamSpec("""pekko.loglevel = debug
Source.queue[Int](10).toMat(Sink.fromSubscriber(sub))(Keep.left).run()

queue.fail(ex)
assert(queue.isCompleted)
queue.offer(1) should be(QueueOfferResult.Failure(ex))
sub.request(1)
sub.expectError(ex)
Expand Down Expand Up @@ -180,6 +185,7 @@ class BoundedSourceQueueSpec extends StreamSpec("""pekko.loglevel = debug
// where enqueueing an element concurrently with Done reaching the stage can lead to Enqueued being returned
// but the element dropped (no guarantee of entering stream as documented in BoundedSourceQueue.offer
queue.complete()
assert(queue.isCompleted)

result.futureValue should be(counter.get())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ trait BoundedSourceQueue[T] {
*/
def complete(): Unit

/**
* Returns true if the stream has been completed, either normally or with failure.
*
* @since 1.1.0
*/
def isCompleted: Boolean

/**
* Completes the stream with a failure.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ import pekko.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, Ou
}

object Mat extends BoundedSourceQueue[T] {
override def offer(elem: T): QueueOfferResult = state.get() match {
final override def offer(elem: T): QueueOfferResult = state.get() match {
case Running | NeedsActivation =>
if (queue.add(elem)) {
// need to query state again because stage might have switched from Running -> NeedsActivation only after
Expand All @@ -130,21 +130,23 @@ import pekko.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, Ou
case Done(result) => result
}

override def complete(): Unit = {
final override def complete(): Unit = {
if (state.get().isInstanceOf[Done])
throw new IllegalStateException("The queue has already been completed.")
if (setDone(Done(QueueOfferResult.QueueClosed)))
Logic.callback.invoke(()) // if this thread won the completion race also schedule an async callback
}

override def fail(ex: Throwable): Unit = {
final override def isCompleted: Boolean = state.get().isInstanceOf[Done]

final override def fail(ex: Throwable): Unit = {
if (state.get().isInstanceOf[Done])
throw new IllegalStateException("The queue has already been completed.")
if (setDone(Done(QueueOfferResult.Failure(ex))))
Logic.callback.invoke(()) // if this thread won the completion race also schedule an async callback
}

override def size(): Int = queue.size()
final override def size(): Int = queue.size()
}

// some state transition helpers
Expand Down

0 comments on commit e1ffb3c

Please sign in to comment.