Skip to content

Commit

Permalink
Pass the ZIO Runtime in the RunLoop (#607)
Browse files Browse the repository at this point in the history
  • Loading branch information
guizmaii authored Jan 21, 2023
1 parent d2b8fbf commit eda4656
Showing 1 changed file with 19 additions and 18 deletions.
37 changes: 19 additions & 18 deletions zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal

private[consumer] final class Runloop(
runtime: Runtime[Any],
hasGroupId: Boolean,
consumer: ConsumerAccess,
pollFrequency: Duration,
Expand Down Expand Up @@ -145,17 +146,13 @@ private[consumer] final class Runloop(
case err =>
cont(Exit.fail(err)) <* diagnostics.emitIfEnabled(DiagnosticEvent.Commit.Failure(offsets, err))
}
val callback = makeOffsetCommitCallback(onSuccess, onFailure)

ZIO
.runtime[Any]
.map(makeOffsetCommitCallback(onSuccess, onFailure))
.flatMap { callback =>
consumer.withConsumerM { c =>
// We don't wait for the completion of the commit here, because it
// will only complete once we poll again.
ZIO.attempt(c.commitAsync(offsets.asJava, callback))
}
}
consumer.withConsumerM { c =>
// We don't wait for the completion of the commit here, because it
// will only complete once we poll again.
ZIO.attempt(c.commitAsync(offsets.asJava, callback))
}
.catchAll(onFailure)
}

Expand All @@ -175,14 +172,16 @@ private[consumer] final class Runloop(
offsets.toMap
}

private def makeOffsetCommitCallback(onSuccess: Task[Unit], onFailure: Exception => Task[Unit])(
runtime: Runtime[Any]
): OffsetCommitCallback = new OffsetCommitCallback {
override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit =
Unsafe.unsafe { implicit u =>
runtime.unsafe.run(if (exception eq null) onSuccess else onFailure(exception)).getOrThrowFiberFailure()
}
}
private def makeOffsetCommitCallback(
onSuccess: Task[Unit],
onFailure: Exception => Task[Unit]
): OffsetCommitCallback =
new OffsetCommitCallback {
override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit =
Unsafe.unsafe { implicit u =>
runtime.unsafe.run(if (exception eq null) onSuccess else onFailure(exception)).getOrThrowFiberFailure()
}
}

/**
* Does all needed to end revoked partitions:
Expand Down Expand Up @@ -649,7 +648,9 @@ private[consumer] object Runloop {
shutdownRef <- Ref.make(false)
currentStateRef <- Ref.make(State.initial)
subscribedRef <- Ref.make(false)
runtime <- ZIO.runtime[Any]
runloop = new Runloop(
runtime,
hasGroupId,
consumer,
pollFrequency,
Expand Down

0 comments on commit eda4656

Please sign in to comment.