diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 8ffb5e97f..5bdb867f9 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -256,7 +256,7 @@ private[consumer] final class Runloop( reqRecs.toArray[ByteArrayConsumerRecord](Array.ofDim[ByteArrayConsumerRecord](reqRecs.size)) ) - fulfillAction = fulfillAction *> req.cont.succeed(concatenatedChunk.map { record => + fulfillAction = fulfillAction *> req.succeed(concatenatedChunk.map { record => CommittableRecord( record = record, commitHandle = commit, @@ -353,14 +353,15 @@ private[consumer] final class Runloop( // Check shutdown again after polling (which takes up to the poll timeout) ZIO.ifZIO(isShutdown)( - pauseAllPartitions(c).as( + onTrue = pauseAllPartitions(c).as( Runloop.PollResult( Set(), state.pendingRequests, BufferedRecords.empty, Map[TopicPartition, PartitionStreamControl]() ) - ), { + ), + onFalse = { val tpsInResponse = records.partitions.asScala.toSet val currentAssigned = c.assignment().asScala.toSet @@ -474,8 +475,8 @@ private[consumer] final class Runloop( }) newPendingCommits <- ZIO.ifZIO(isRebalancing)( - ZIO.succeed(state.pendingCommits), - doCommit(state.pendingCommits).when(state.pendingCommits.nonEmpty).as(Chunk.empty) + onTrue = ZIO.succeed(state.pendingCommits), + onFalse = doCommit(state.pendingCommits).when(state.pendingCommits.nonEmpty).as(Chunk.empty) ) } yield State( pollResult.unfulfilledRequests, @@ -486,19 +487,19 @@ private[consumer] final class Runloop( private def handleRequests(state: State, reqs: Chunk[Runloop.Request]): UIO[State] = ZIO.ifZIO(isRebalancing)( - if (restartStreamsOnRebalancing) { - ZIO.foreachDiscard(reqs)(_.cont.fail(None)).as(state) + onTrue = if (restartStreamsOnRebalancing) { + ZIO.foreachDiscard(reqs)(_.end).as(state) } else { ZIO.succeed(state.addRequests(reqs)) }, - consumer + onFalse = consumer .withConsumer(_.assignment.asScala) .flatMap { assignment => ZIO.foldLeft(reqs)(state) { (state, req) => if (assignment.contains(req.tp)) ZIO.succeed(state.addRequest(req)) else - req.cont.fail(None).as(state) + req.end.as(state) } } .orElseSucceed(state.addRequests(reqs)) @@ -506,8 +507,8 @@ private[consumer] final class Runloop( private def handleCommit(state: State, cmd: Command.Commit): UIO[State] = ZIO.ifZIO(isRebalancing)( - ZIO.succeed(state.addCommit(cmd)), - doCommit(Chunk(cmd)).as(state) + onTrue = ZIO.succeed(state.addCommit(cmd)), + onFalse = doCommit(Chunk(cmd)).as(state) ) /** @@ -520,10 +521,10 @@ private[consumer] final class Runloop( cmd match { case Command.Poll => // End all pending requests - ZIO.foreachDiscard(state.pendingRequests)(_.cont.fail(None)) *> + ZIO.foreachDiscard(state.pendingRequests)(_.end) *> handlePoll(state.copy(pendingRequests = Chunk.empty, bufferedRecords = BufferedRecords.empty)) case Command.Requests(reqs) => - ZIO.foreachDiscard(reqs)(_.cont.fail(None)).as(state) + ZIO.foreachDiscard(reqs)(_.end).as(state) case cmd @ Command.Commit(_, _) => handleCommit(state, cmd) } @@ -532,7 +533,7 @@ private[consumer] final class Runloop( cmd match { case Command.Poll => // The consumer will throw an IllegalStateException if no call to subscribe - ZIO.ifZIO(subscribedRef.get)(handlePoll(state), ZIO.succeed(state)) + ZIO.ifZIO(subscribedRef.get)(onTrue = handlePoll(state), onFalse = ZIO.succeed(state)) case Command.Requests(reqs) => handleRequests(state, reqs).flatMap { state => // Optimization: eagerly poll if we have pending requests instead of waiting @@ -552,7 +553,7 @@ private[consumer] final class Runloop( ZStream.fromQueue(commitQueue) ) .runFoldZIO(State.initial) { (state, cmd) => - ZIO.ifZIO(isShutdown)(handleShutdown(state, cmd), handleOperational(state, cmd)) + ZIO.ifZIO(isShutdown)(onTrue = handleShutdown(state, cmd), onFalse = handleOperational(state, cmd)) } .onError(cause => partitions.offer(Take.failCause(cause))) .unit @@ -563,7 +564,14 @@ private[consumer] object Runloop { type ByteArrayCommittableRecord = CommittableRecord[Array[Byte], Array[Byte]] type ByteArrayConsumerRecord = ConsumerRecord[Array[Byte], Array[Byte]] - final case class Request(tp: TopicPartition, cont: Promise[Option[Throwable], Chunk[ByteArrayCommittableRecord]]) + final case class Request( + tp: TopicPartition, + private val cont: Promise[Option[Throwable], Chunk[ByteArrayCommittableRecord]] + ) { + @inline def succeed(data: Chunk[ByteArrayCommittableRecord]): UIO[Boolean] = cont.succeed(data) + @inline def end: UIO[Boolean] = cont.fail(None) + @inline def fail(throwable: Throwable): UIO[Boolean] = cont.fail(Some(throwable)) + } final case class PollResult( newlyAssigned: Set[TopicPartition], unfulfilledRequests: Chunk[Runloop.Request],