diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala index cf3a9710e8..0c8c8ef25d 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala @@ -605,10 +605,7 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I else if (head != finalOffset) { // If our final consumer goes away, we roll forward the buffer so a subsequent consumer does not // see the already consumed elements. This feature is quite handy. - while (head != finalOffset) { - queue(head & Mask) = null - head += 1 - } + cleanQueueInRange(head, finalOffset) head = finalOffset tryPull() } @@ -617,6 +614,20 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I } } + private def cleanQueueInRange(headOffset: Int, upToOffset: Int): Unit = { + // We need to clean the queue from headOffset to upToOffset + if (headOffset != upToOffset) { + val startIdx = headOffset & Mask + val endIdx = upToOffset & Mask + if (startIdx <= endIdx) { + java.util.Arrays.fill(queue, startIdx, endIdx, null) + } else { + java.util.Arrays.fill(queue, startIdx, queue.length, null) + java.util.Arrays.fill(queue, 0, endIdx, null) + } + } + } + // Producer API // We are full if the distance between the slowest (known) consumer and the fastest (known) consumer is // the buffer size. We must wait until the slowest either advances, or cancels. @@ -677,11 +688,15 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I if (offsetOfConsumerRemoved == head) { // Try to advance along the wheel. We can skip any wheel slots which have no waiting Consumers, until // we either find a nonempty one, or we reached the end of the buffer. - while (consumerWheel(head & WheelMask).isEmpty && head != tail) { - queue(head & Mask) = null - head += 1 + var upToOffset = head + while (consumerWheel(upToOffset & WheelMask).isEmpty) { + upToOffset += 1 unblocked = true } + if (upToOffset != head) { + cleanQueueInRange(head, upToOffset) + head = upToOffset + } } unblocked }