From b56fb35a9614d521e0e8f044ec359b584c369db4 Mon Sep 17 00:00:00 2001 From: jules Ivanic Date: Sat, 21 Jan 2023 18:04:43 +0800 Subject: [PATCH] Ease code reading --- .../zio/kafka/consumer/internal/Runloop.scala | 38 +++++++++++-------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 6438d3c94..bd65c8465 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -257,7 +257,7 @@ private[consumer] final class Runloop( reqRecs.toArray[ByteArrayConsumerRecord](Array.ofDim[ByteArrayConsumerRecord](reqRecs.size)) ) - fulfillAction = fulfillAction *> req.continue.succeed(concatenatedChunk.map { record => + fulfillAction = fulfillAction *> req.continue(concatenatedChunk.map { record => CommittableRecord( record = record, commitHandle = commit, @@ -354,14 +354,15 @@ private[consumer] final class Runloop( // Check shutdown again after polling (which takes up to the poll timeout) ZIO.ifZIO(isShutdown)( - pauseAllPartitions(c).as( + onTrue = pauseAllPartitions(c).as( Runloop.PollResult( Set(), state.pendingRequests, BufferedRecords.empty, Map[TopicPartition, PartitionStreamControl]() ) - ), { + ), + onFalse = { val tpsInResponse = records.partitions.asScala.toSet val currentAssigned = c.assignment().asScala.toSet @@ -475,8 +476,8 @@ private[consumer] final class Runloop( }) newPendingCommits <- ZIO.ifZIO(isRebalancing)( - ZIO.succeed(state.pendingCommits), - doCommit(state.pendingCommits).when(state.pendingCommits.nonEmpty).as(Chunk.empty) + onTrue = ZIO.succeed(state.pendingCommits), + onFalse = doCommit(state.pendingCommits).when(state.pendingCommits.nonEmpty).as(Chunk.empty) ) } yield State( pollResult.unfulfilledRequests, @@ -487,19 +488,19 @@ private[consumer] final class Runloop( private def handleRequests(state: State, reqs: Chunk[Runloop.Request]): UIO[State] = ZIO.ifZIO(isRebalancing)( - if (restartStreamsOnRebalancing) { - ZIO.foreachDiscard(reqs)(_.continue.fail(None)).as(state) + onTrue = if (restartStreamsOnRebalancing) { + ZIO.foreachDiscard(reqs)(_.end).as(state) } else { ZIO.succeed(state.addRequests(reqs)) }, - consumer + onFalse = consumer .withConsumer(_.assignment.asScala) .flatMap { assignment => ZIO.foldLeft(reqs)(state) { (state, req) => if (assignment.contains(req.tp)) ZIO.succeed(state.addRequest(req)) else - req.continue.fail(None).as(state) + req.end.as(state) } } .orElseSucceed(state.addRequests(reqs)) @@ -507,8 +508,8 @@ private[consumer] final class Runloop( private def handleCommit(state: State, cmd: Command.Commit): UIO[State] = ZIO.ifZIO(isRebalancing)( - ZIO.succeed(state.addCommit(cmd)), - doCommit(Chunk(cmd)).as(state) + onTrue = ZIO.succeed(state.addCommit(cmd)), + onFalse = doCommit(Chunk(cmd)).as(state) ) /** @@ -521,10 +522,10 @@ private[consumer] final class Runloop( cmd match { case Command.Poll() => // End all pending requests - ZIO.foreachDiscard(state.pendingRequests)(_.continue.fail(None)) *> + ZIO.foreachDiscard(state.pendingRequests)(_.end) *> handlePoll(state.copy(pendingRequests = Chunk.empty, bufferedRecords = BufferedRecords.empty)) case Command.Requests(reqs) => - ZIO.foreachDiscard(reqs)(_.continue.fail(None)).as(state) + ZIO.foreachDiscard(reqs)(_.end).as(state) case cmd @ Command.Commit(_, _) => handleCommit(state, cmd) } @@ -533,7 +534,7 @@ private[consumer] final class Runloop( cmd match { case Command.Poll() => // The consumer will throw an IllegalStateException if no call to subscribe - ZIO.ifZIO(subscribedRef.get)(handlePoll(state), ZIO.succeed(state)) + ZIO.ifZIO(subscribedRef.get)(onTrue = handlePoll(state), onFalse = ZIO.succeed(state)) case Command.Requests(reqs) => handleRequests(state, reqs).flatMap { state => // Optimization: eagerly poll if we have pending requests instead of waiting @@ -564,7 +565,14 @@ private[consumer] object Runloop { type ByteArrayCommittableRecord = CommittableRecord[Array[Byte], Array[Byte]] type ByteArrayConsumerRecord = ConsumerRecord[Array[Byte], Array[Byte]] - final case class Request(tp: TopicPartition, continue: Promise[Option[Throwable], Chunk[ByteArrayCommittableRecord]]) + final case class Request( + tp: TopicPartition, + private val cont: Promise[Option[Throwable], Chunk[ByteArrayCommittableRecord]] + ) { + @inline def continue(data: Chunk[ByteArrayCommittableRecord]): UIO[Boolean] = cont.succeed(data) + @inline def end: UIO[Boolean] = cont.fail(None) + @inline def fail(throwable: Throwable): UIO[Boolean] = cont.fail(Some(throwable)) + } final case class PollResult( newlyAssigned: Set[TopicPartition], unfulfilledRequests: Chunk[Runloop.Request],