Skip to content

Commit

Permalink
perf: Reduce loops in when clean queue in BroadcastHub
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Dec 28, 2024
1 parent eb22d8d commit b6dc72f
Showing 1 changed file with 22 additions and 7 deletions.
29 changes: 22 additions & 7 deletions stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala
Original file line number Diff line number Diff line change
Expand Up @@ -605,10 +605,7 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I
else if (head != finalOffset) {
// If our final consumer goes away, we roll forward the buffer so a subsequent consumer does not
// see the already consumed elements. This feature is quite handy.
while (head != finalOffset) {
queue(head & Mask) = null
head += 1
}
cleanQueueInRange(head, finalOffset)
head = finalOffset
tryPull()
}
Expand All @@ -617,6 +614,20 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I
}
}

private def cleanQueueInRange(headOffset: Int, upToOffset: Int): Unit = {
// We need to clean the queue from headOffset to upToOffset
if (headOffset != upToOffset) {
val startIdx = headOffset & Mask
val endIdx = upToOffset & Mask
if (startIdx <= endIdx) {
java.util.Arrays.fill(queue, startIdx, endIdx, null)
} else {
java.util.Arrays.fill(queue, startIdx, queue.length, null)
java.util.Arrays.fill(queue, 0, endIdx, null)
}
}
}

// Producer API
// We are full if the distance between the slowest (known) consumer and the fastest (known) consumer is
// the buffer size. We must wait until the slowest either advances, or cancels.
Expand Down Expand Up @@ -677,11 +688,15 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I
if (offsetOfConsumerRemoved == head) {
// Try to advance along the wheel. We can skip any wheel slots which have no waiting Consumers, until
// we either find a nonempty one, or we reached the end of the buffer.
while (consumerWheel(head & WheelMask).isEmpty && head != tail) {
queue(head & Mask) = null
head += 1
var upToOffset = head
while (consumerWheel(upToOffset & WheelMask).isEmpty) {
upToOffset += 1
unblocked = true
}
if (upToOffset != head) {
cleanQueueInRange(head, upToOffset)
head = upToOffset
}
}
unblocked
}
Expand Down

0 comments on commit b6dc72f

Please sign in to comment.