Skip to content

Commit

Permalink
Protect agains user diagnostics, better rebalance events (#1102)
Browse files Browse the repository at this point in the history
Diagnostics is a feature of zio-kafka that allows users to listen to key events. Since zio-kafka calls out to the user's implementation of the Diagnostics trait, there are no guarantees on how well it behaves.

This is even more important inside the rebalance listener where we (soon, with #1098) run on the same-thread-runtime and can not afford to be switched to another thread by ZIO operations that are normally safe to use.

To protect against these issues the user's diagnostics implementation is run on a separate fiber, feeding from a queue of events.

In addition, the rebalance events are replaced by a single event which is emitted from outside the rebalance listener. The new event gives the full picture of a rebalance, including which streams were ended. Previously it was not clear which rebalance events belonged to the same rebalance.

**Breaking change**

Since the rebalance events are changed, this is a breaking change.
  • Loading branch information
erikvanoosten authored Nov 14, 2023
1 parent c9a498e commit d1c3029
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 17 deletions.
17 changes: 13 additions & 4 deletions zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.apache.kafka.common._
import zio._
import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.kafka.consumer.diagnostics.Diagnostics.ConcurrentDiagnostics
import zio.kafka.consumer.internal.{ ConsumerAccess, RunloopAccess }
import zio.kafka.serde.{ Deserializer, Serde }
import zio.kafka.utils.SslHelper
Expand Down Expand Up @@ -175,15 +176,23 @@ object Consumer {
} yield consumer
}

/**
* A new consumer.
*
* @param diagnostics
* an optional callback for key events in the consumer life-cycle. The callbacks will be executed in a separate
* fiber. Since the events are queued, failure to handle these events leads to out of memory errors
*/
def make(
settings: ConsumerSettings,
diagnostics: Diagnostics = Diagnostics.NoOp
): ZIO[Scope, Throwable, Consumer] =
for {
_ <- ZIO.addFinalizer(diagnostics.emit(Finalization.ConsumerFinalized))
_ <- SslHelper.validateEndpoint(settings.driverSettings)
consumerAccess <- ConsumerAccess.make(settings)
runloopAccess <- RunloopAccess.make(settings, consumerAccess, diagnostics)
wrappedDiagnostics <- ConcurrentDiagnostics.make(diagnostics)
_ <- ZIO.addFinalizer(wrappedDiagnostics.emit(Finalization.ConsumerFinalized))
_ <- SslHelper.validateEndpoint(settings.driverSettings)
consumerAccess <- ConsumerAccess.make(settings)
runloopAccess <- RunloopAccess.make(settings, consumerAccess, wrappedDiagnostics)
} yield new ConsumerLive(consumerAccess, runloopAccess)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ object DiagnosticEvent {
final case class Failure(offsets: Map[TopicPartition, OffsetAndMetadata], cause: Throwable) extends Commit
}

sealed trait Rebalance extends DiagnosticEvent
object Rebalance {
final case class Revoked(partitions: Set[TopicPartition]) extends Rebalance
final case class Assigned(partitions: Set[TopicPartition]) extends Rebalance
final case class Lost(partitions: Set[TopicPartition]) extends Rebalance
}
final case class Rebalance(
revoked: Set[TopicPartition],
assigned: Set[TopicPartition],
lost: Set[TopicPartition],
ended: Set[TopicPartition]
) extends DiagnosticEvent

sealed trait Finalization extends DiagnosticEvent
object Finalization {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package zio.kafka.consumer.diagnostics

import zio.{ Queue, Scope, UIO, ZIO }
import zio.stream.ZStream
import zio._
import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization.ConsumerFinalized

trait Diagnostics {
def emit(event: => DiagnosticEvent): UIO[Unit]
Expand All @@ -10,11 +12,32 @@ object Diagnostics {
override def emit(event: => DiagnosticEvent): UIO[Unit] = ZIO.unit
}

final case class SlidingQueue private (queue: Queue[DiagnosticEvent]) extends Diagnostics {
final case class SlidingQueue private[Diagnostics] (queue: Queue[DiagnosticEvent]) extends Diagnostics {
override def emit(event: => DiagnosticEvent): UIO[Unit] = queue.offer(event).unit
}

object SlidingQueue {
def make(queueSize: Int = 16): ZIO[Scope, Nothing, SlidingQueue] =
ZIO.acquireRelease(Queue.sliding[DiagnosticEvent](queueSize))(_.shutdown).map(SlidingQueue(_))
}

object ConcurrentDiagnostics {

/**
* @return
* a `Diagnostics` that runs the wrapped `Diagnostics` concurrently in a separate fiber. Events are emitting to
* the fiber via an unbounded queue
*/
def make(wrapped: Diagnostics): ZIO[Scope, Nothing, Diagnostics] =
if (wrapped == Diagnostics.NoOp) ZIO.succeed(Diagnostics.NoOp)
else {
for {
queue <- ZIO.acquireRelease(Queue.unbounded[DiagnosticEvent])(_.shutdown)
fib <- ZStream.fromQueue(queue).tap(wrapped.emit(_)).takeUntil(_ == ConsumerFinalized).runDrain.forkScoped
_ <- ZIO.addFinalizer(queue.offer(ConsumerFinalized) *> fib.await)
} yield new Diagnostics {
override def emit(event: => DiagnosticEvent): UIO[Unit] = queue.offer(event).unit
}
}
}
}
15 changes: 10 additions & 5 deletions zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import org.apache.kafka.common.errors.RebalanceInProgressException
import zio._
import zio.kafka.consumer.Consumer.{ CommitTimeout, OffsetRetrieval }
import zio.kafka.consumer._
import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization
import zio.kafka.consumer.diagnostics.DiagnosticEvent.{ Finalization, Rebalance }
import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics }
import zio.kafka.consumer.fetch.FetchStrategy
import zio.kafka.consumer.internal.ConsumerAccess.ByteArrayKafkaConsumer
Expand Down Expand Up @@ -91,7 +91,6 @@ private[consumer] final class Runloop private (
onAssigned = (assignedTps, _) =>
for {
_ <- ZIO.logDebug(s"${assignedTps.size} partitions are assigned")
_ <- diagnostics.emit(DiagnosticEvent.Rebalance.Assigned(assignedTps))
rebalanceEvent <- lastRebalanceEvent.get
state <- currentStateRef.get
streamsToEnd = if (restartStreamsOnRebalancing && !rebalanceEvent.wasInvoked) state.assignedStreams
Expand All @@ -103,7 +102,6 @@ private[consumer] final class Runloop private (
onRevoked = (revokedTps, _) =>
for {
_ <- ZIO.logDebug(s"${revokedTps.size} partitions are revoked")
_ <- diagnostics.emit(DiagnosticEvent.Rebalance.Revoked(revokedTps))
rebalanceEvent <- lastRebalanceEvent.get
state <- currentStateRef.get
streamsToEnd = if (restartStreamsOnRebalancing && !rebalanceEvent.wasInvoked) state.assignedStreams
Expand All @@ -115,7 +113,6 @@ private[consumer] final class Runloop private (
onLost = (lostTps, _) =>
for {
_ <- ZIO.logDebug(s"${lostTps.size} partitions are lost")
_ <- diagnostics.emit(DiagnosticEvent.Rebalance.Lost(lostTps))
rebalanceEvent <- lastRebalanceEvent.get
state <- currentStateRef.get
lostStreams = state.assignedStreams.filter(control => lostTps.contains(control.tp))
Expand Down Expand Up @@ -307,7 +304,7 @@ private[consumer] final class Runloop private (
_ <- ZIO.logDebug(
s"Starting poll with ${state.pendingRequests.size} pending requests and" +
s" ${state.pendingCommits.size} pending commits," +
s" resuming ${partitionsToFetch} partitions"
s" resuming $partitionsToFetch partitions"
)
_ <- currentStateRef.set(state)
pollResult <-
Expand Down Expand Up @@ -387,6 +384,14 @@ private[consumer] final class Runloop private (
_ <-
committedOffsetsRef.update(_.keepPartitions(updatedAssignedStreams.map(_.tp).toSet)): Task[Unit]

_ <- diagnostics.emit(
Rebalance(
revoked = revokedTps,
assigned = assignedTps,
lost = lostTps,
ended = endedStreams.map(_.tp).toSet
)
)
} yield Runloop.PollResult(
records = polledRecords,
ignoreRecordsForTps = ignoreRecordsForTps,
Expand Down

0 comments on commit d1c3029

Please sign in to comment.