diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index eb28ceee9..dd144fd10 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -22,6 +22,7 @@ import scala.jdk.CollectionConverters._ import scala.util.control.NonFatal private[consumer] final class Runloop( + runtime: Runtime[Any], hasGroupId: Boolean, consumer: ConsumerAccess, pollFrequency: Duration, @@ -145,17 +146,13 @@ private[consumer] final class Runloop( case err => cont(Exit.fail(err)) <* diagnostics.emitIfEnabled(DiagnosticEvent.Commit.Failure(offsets, err)) } + val callback = makeOffsetCommitCallback(onSuccess, onFailure) - ZIO - .runtime[Any] - .map(makeOffsetCommitCallback(onSuccess, onFailure)) - .flatMap { callback => - consumer.withConsumerM { c => - // We don't wait for the completion of the commit here, because it - // will only complete once we poll again. - ZIO.attempt(c.commitAsync(offsets.asJava, callback)) - } - } + consumer.withConsumerM { c => + // We don't wait for the completion of the commit here, because it + // will only complete once we poll again. + ZIO.attempt(c.commitAsync(offsets.asJava, callback)) + } .catchAll(onFailure) } @@ -175,14 +172,16 @@ private[consumer] final class Runloop( offsets.toMap } - private def makeOffsetCommitCallback(onSuccess: Task[Unit], onFailure: Exception => Task[Unit])( - runtime: Runtime[Any] - ): OffsetCommitCallback = new OffsetCommitCallback { - override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = - Unsafe.unsafe { implicit u => - runtime.unsafe.run(if (exception eq null) onSuccess else onFailure(exception)).getOrThrowFiberFailure() - } - } + private def makeOffsetCommitCallback( + onSuccess: Task[Unit], + onFailure: Exception => Task[Unit] + ): OffsetCommitCallback = + new OffsetCommitCallback { + override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = + Unsafe.unsafe { implicit u => + runtime.unsafe.run(if (exception eq null) onSuccess else onFailure(exception)).getOrThrowFiberFailure() + } + } /** * Does all needed to end revoked partitions: @@ -649,7 +648,9 @@ private[consumer] object Runloop { shutdownRef <- Ref.make(false) currentStateRef <- Ref.make(State.initial) subscribedRef <- Ref.make(false) + runtime <- ZIO.runtime[Any] runloop = new Runloop( + runtime, hasGroupId, consumer, pollFrequency,