Skip to content

Commit

Permalink
Ease code reading
Browse files Browse the repository at this point in the history
  • Loading branch information
guizmaii committed Jan 21, 2023
1 parent 5ed8b2d commit b56fb35
Showing 1 changed file with 23 additions and 15 deletions.
38 changes: 23 additions & 15 deletions zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ private[consumer] final class Runloop(
reqRecs.toArray[ByteArrayConsumerRecord](Array.ofDim[ByteArrayConsumerRecord](reqRecs.size))
)

fulfillAction = fulfillAction *> req.continue.succeed(concatenatedChunk.map { record =>
fulfillAction = fulfillAction *> req.continue(concatenatedChunk.map { record =>
CommittableRecord(
record = record,
commitHandle = commit,
Expand Down Expand Up @@ -354,14 +354,15 @@ private[consumer] final class Runloop(

// Check shutdown again after polling (which takes up to the poll timeout)
ZIO.ifZIO(isShutdown)(
pauseAllPartitions(c).as(
onTrue = pauseAllPartitions(c).as(
Runloop.PollResult(
Set(),
state.pendingRequests,
BufferedRecords.empty,
Map[TopicPartition, PartitionStreamControl]()
)
), {
),
onFalse = {
val tpsInResponse = records.partitions.asScala.toSet
val currentAssigned = c.assignment().asScala.toSet

Expand Down Expand Up @@ -475,8 +476,8 @@ private[consumer] final class Runloop(
})
newPendingCommits <-
ZIO.ifZIO(isRebalancing)(
ZIO.succeed(state.pendingCommits),
doCommit(state.pendingCommits).when(state.pendingCommits.nonEmpty).as(Chunk.empty)
onTrue = ZIO.succeed(state.pendingCommits),
onFalse = doCommit(state.pendingCommits).when(state.pendingCommits.nonEmpty).as(Chunk.empty)
)
} yield State(
pollResult.unfulfilledRequests,
Expand All @@ -487,28 +488,28 @@ private[consumer] final class Runloop(

private def handleRequests(state: State, reqs: Chunk[Runloop.Request]): UIO[State] =
ZIO.ifZIO(isRebalancing)(
if (restartStreamsOnRebalancing) {
ZIO.foreachDiscard(reqs)(_.continue.fail(None)).as(state)
onTrue = if (restartStreamsOnRebalancing) {
ZIO.foreachDiscard(reqs)(_.end).as(state)
} else {
ZIO.succeed(state.addRequests(reqs))
},
consumer
onFalse = consumer
.withConsumer(_.assignment.asScala)
.flatMap { assignment =>
ZIO.foldLeft(reqs)(state) { (state, req) =>
if (assignment.contains(req.tp))
ZIO.succeed(state.addRequest(req))
else
req.continue.fail(None).as(state)
req.end.as(state)
}
}
.orElseSucceed(state.addRequests(reqs))
)

private def handleCommit(state: State, cmd: Command.Commit): UIO[State] =
ZIO.ifZIO(isRebalancing)(
ZIO.succeed(state.addCommit(cmd)),
doCommit(Chunk(cmd)).as(state)
onTrue = ZIO.succeed(state.addCommit(cmd)),
onFalse = doCommit(Chunk(cmd)).as(state)
)

/**
Expand All @@ -521,10 +522,10 @@ private[consumer] final class Runloop(
cmd match {
case Command.Poll() =>
// End all pending requests
ZIO.foreachDiscard(state.pendingRequests)(_.continue.fail(None)) *>
ZIO.foreachDiscard(state.pendingRequests)(_.end) *>
handlePoll(state.copy(pendingRequests = Chunk.empty, bufferedRecords = BufferedRecords.empty))
case Command.Requests(reqs) =>
ZIO.foreachDiscard(reqs)(_.continue.fail(None)).as(state)
ZIO.foreachDiscard(reqs)(_.end).as(state)
case cmd @ Command.Commit(_, _) =>
handleCommit(state, cmd)
}
Expand All @@ -533,7 +534,7 @@ private[consumer] final class Runloop(
cmd match {
case Command.Poll() =>
// The consumer will throw an IllegalStateException if no call to subscribe
ZIO.ifZIO(subscribedRef.get)(handlePoll(state), ZIO.succeed(state))
ZIO.ifZIO(subscribedRef.get)(onTrue = handlePoll(state), onFalse = ZIO.succeed(state))
case Command.Requests(reqs) =>
handleRequests(state, reqs).flatMap { state =>
// Optimization: eagerly poll if we have pending requests instead of waiting
Expand Down Expand Up @@ -564,7 +565,14 @@ private[consumer] object Runloop {
type ByteArrayCommittableRecord = CommittableRecord[Array[Byte], Array[Byte]]
type ByteArrayConsumerRecord = ConsumerRecord[Array[Byte], Array[Byte]]

final case class Request(tp: TopicPartition, continue: Promise[Option[Throwable], Chunk[ByteArrayCommittableRecord]])
final case class Request(
tp: TopicPartition,
private val cont: Promise[Option[Throwable], Chunk[ByteArrayCommittableRecord]]
) {
@inline def continue(data: Chunk[ByteArrayCommittableRecord]): UIO[Boolean] = cont.succeed(data)
@inline def end: UIO[Boolean] = cont.fail(None)
@inline def fail(throwable: Throwable): UIO[Boolean] = cont.fail(Some(throwable))
}
final case class PollResult(
newlyAssigned: Set[TopicPartition],
unfulfilledRequests: Chunk[Runloop.Request],
Expand Down

0 comments on commit b56fb35

Please sign in to comment.