Skip to content

Commit

Permalink
Buffer records for unrequested partitions when polling (#40)
Browse files Browse the repository at this point in the history
The consumer might assign partitions and return records for them in
the same call to poll despite those partitions being paused in the
rebalance listener.

We would previously die on these records, but now we buffer them.
  • Loading branch information
iravid authored Oct 1, 2019
1 parent 453c41e commit 80d4b9a
Showing 1 changed file with 121 additions and 47 deletions.
168 changes: 121 additions & 47 deletions src/main/scala/zio/kafka/client/Runloop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import zio.stream._
import zio.blocking.Blocking
import zio.clock.Clock

import scala.collection.mutable
import scala.collection.JavaConverters._
import org.apache.kafka.clients.consumer.ConsumerRecords

Expand Down Expand Up @@ -104,18 +105,35 @@ object Runloop {

case class State[K, V](
pendingRequests: List[Command.Request[K, V]],
pendingCommits: List[Command.Commit[K, V]]
pendingCommits: List[Command.Commit[K, V]],
bufferedRecords: Map[TopicPartition, Chunk[ConsumerRecord[K, V]]]
) {
def addCommit(c: Command.Commit[K, V]) = copy(pendingCommits = c :: pendingCommits)
def setCommits(reqs: List[Command.Commit[K, V]]) = copy(pendingCommits = reqs)
def addRequest(c: Command.Request[K, V]) = copy(pendingRequests = c :: pendingRequests)
def setRequests(reqs: List[Command.Request[K, V]]) = copy(pendingRequests = reqs)
def clearCommits = copy(pendingCommits = Nil)
def clearRequests = copy(pendingRequests = Nil)
def addBufferedRecords(recs: Map[TopicPartition, Chunk[ConsumerRecord[K, V]]]) =
copy(
bufferedRecords = recs.foldLeft(bufferedRecords) {
case (acc, (tp, recs)) =>
acc.get(tp) match {
case Some(existingRecs) => acc + (tp -> (existingRecs ++ recs))
case None => acc + (tp -> recs)
}
}
)

def removeBufferedRecordsFor(tp: TopicPartition) =
copy(bufferedRecords = bufferedRecords - tp)

def setBufferedRecords(recs: Map[TopicPartition, Chunk[ConsumerRecord[K, V]]]) =
copy(bufferedRecords = recs)
}

object State {
def initial[K, V]: State[K, V] = State(Nil, Nil)
def initial[K, V]: State[K, V] = State(Nil, Nil, Map())
}

def apply[K, V](deps: Deps[K, V]): ZManaged[Clock with Blocking, Throwable, Runloop[K, V]] = {
Expand Down Expand Up @@ -144,7 +162,7 @@ object Runloop {
for {
runtime <- ZIO.runtime[Any]
offsets = {
val offsets = scala.collection.mutable.Map[TopicPartition, OffsetAndMetadata]()
val offsets = mutable.Map[TopicPartition, OffsetAndMetadata]()

cmds.foreach { commit =>
commit.offsets.foreach {
Expand Down Expand Up @@ -178,32 +196,84 @@ object Runloop {
def handlePoll(state: State[K, V]): BlockingTask[State[K, V]] =
for {
pollResult <- deps.consumer.withConsumerM { c =>
def endRevoked(reqs: List[Command.Request[K, V]], revoked: TopicPartition => Boolean) =
UIO.foldLeft(reqs)(List[Command.Request[K, V]]()) { (acc, req) =>
if (revoked(req.tp)) req.cont.fail(None).as(acc)
else ZIO.succeed(req :: acc)
def endRevoked(
reqs: List[Command.Request[K, V]],
bufferedRecords: Map[TopicPartition, Chunk[ConsumerRecord[K, V]]],
revoked: TopicPartition => Boolean
): UIO[(List[Command.Request[K, V]], Map[TopicPartition, Chunk[ConsumerRecord[K, V]]])] = {
var acc = List[Command.Request[K, V]]()
val buf = mutable.Map[TopicPartition, Chunk[ConsumerRecord[K, V]]]()
buf ++= bufferedRecords

var revokeAction: UIO[_] = UIO.unit

val reqsIt = reqs.iterator
while (reqsIt.hasNext) {
val req = reqsIt.next
if (revoked(req.tp)) {
revokeAction = revokeAction *> req.cont.fail(None)
buf -= req.tp
} else acc ::= req
}

revokeAction.as((acc.reverse, buf.toMap))
}

def fulfillRequests(
pendingRequests: List[Command.Request[K, V]],
bufferedRecords: Map[TopicPartition, Chunk[ConsumerRecord[K, V]]],
records: ConsumerRecords[K, V]
) =
UIO
.foldLeft(pendingRequests)(List[Command.Request[K, V]]()) { (acc, req) =>
val reqRecs = records.records(req.tp)

if (reqRecs.size == 0) ZIO.succeed(req :: acc)
else
req.cont
.succeed(
Chunk
.fromArray(
reqRecs.toArray(Array.ofDim[ConsumerRecord[K, V]](reqRecs.size))
)
.map(CommittableRecord(_, commit(_)))
)
.as(acc)
): UIO[(List[Command.Request[K, V]], Map[TopicPartition, Chunk[ConsumerRecord[K, V]]])] = {
var acc = List[Command.Request[K, V]]()
val buf = mutable.Map[TopicPartition, Chunk[ConsumerRecord[K, V]]]()
buf ++= bufferedRecords

var fulfillAction: UIO[_] = UIO.unit

val reqsIt = pendingRequests.iterator
while (reqsIt.hasNext) {
val req = reqsIt.next
val bufferedChunk = buf.getOrElse(req.tp, Chunk.empty)
val reqRecs = records.records(req.tp)

if ((bufferedChunk.length + reqRecs.size) == 0)
acc ::= req
else {
val concatenatedChunk = bufferedChunk ++
Chunk.fromArray(
reqRecs.toArray(Array.ofDim[ConsumerRecord[K, V]](reqRecs.size))
)

fulfillAction = fulfillAction *> req.cont.succeed(
concatenatedChunk.map(CommittableRecord(_, commit(_)))
)
buf -= req.tp
}
}

fulfillAction.as((acc, buf.toMap))
}

def bufferUnrequestedPartitions(
records: ConsumerRecords[K, V],
unrequestedTps: Iterable[TopicPartition]
): Map[TopicPartition, Chunk[ConsumerRecord[K, V]]] = {
val builder = Map.newBuilder[TopicPartition, Chunk[ConsumerRecord[K, V]]]
builder.sizeHint(unrequestedTps.size)

val tpsIt = unrequestedTps.iterator
while (tpsIt.hasNext) {
val tp = tpsIt.next
val recs = records.records(tp)

if (recs.size > 0)
builder += (tp -> Chunk.fromArray(
recs.toArray(Array.ofDim[ConsumerRecord[K, V]](recs.size))
))
}

builder.result()
}

def actualPoll = Task.effectSuspend {
val prevAssigned = c.assignment().asScala
Expand All @@ -214,37 +284,41 @@ object Runloop {
val pollTimeout =
if (requestedPartitions.nonEmpty) deps.pollTimeout.asJava
else 0.millis.asJava
val records = c.poll(pollTimeout)
val tpsInResponse = records.partitions.asScala
val unexpectedTps = tpsInResponse -- requestedPartitions

if (unexpectedTps.nonEmpty)
ZIO.dieMessage(s"Received unexpected records from Kafka for partitions: $unexpectedTps")
else {
val currentAssigned = c.assignment().asScala
val newlyAssigned = currentAssigned -- prevAssigned
val revoked = prevAssigned -- currentAssigned

endRevoked(state.pendingRequests, revoked(_))
.flatMap(fulfillRequests(_, records))
.map((newlyAssigned.toSet, _))
}
val records = c.poll(pollTimeout)

val tpsInResponse = records.partitions.asScala
val currentAssigned = c.assignment().asScala
val newlyAssigned = currentAssigned -- prevAssigned
val revoked = prevAssigned -- currentAssigned
val unrequestedRecords =
bufferUnrequestedPartitions(records, tpsInResponse -- requestedPartitions)

endRevoked(
state.pendingRequests,
state.addBufferedRecords(unrequestedRecords).bufferedRecords,
revoked(_)
).flatMap {
case (pendingRequests, bufferedRecords) =>
fulfillRequests(pendingRequests, bufferedRecords, records)
}.map((newlyAssigned.toSet, _))
}

ZIO(!c.subscription().isEmpty())
.flatMap(
if (_) actualPoll
else ZIO.succeed((Set(), state.pendingRequests))
else
ZIO.succeed(
(Set(), (state.pendingRequests, Map[TopicPartition, Chunk[ConsumerRecord[K, V]]]()))
)
)
}
(newlyAssigned, pendingRequests) = pollResult
_ <- ZIO.traverse_(newlyAssigned)(tp => deps.newPartition(tp, partition(tp)))
stillRebalancing <- deps.isRebalancing
newState <- if (!stillRebalancing && state.pendingCommits.nonEmpty)
doCommit(state.pendingCommits)
.as(state.setRequests(pendingRequests).clearCommits)
else ZIO.succeed(state.setRequests(pendingRequests))
} yield newState
(newlyAssigned, (pendingRequests, bufferedRecords)) = pollResult
_ <- ZIO.traverse_(newlyAssigned)(tp => deps.newPartition(tp, partition(tp)))
stillRebalancing <- deps.isRebalancing
newPendingCommits <- if (!stillRebalancing && state.pendingCommits.nonEmpty)
doCommit(state.pendingCommits).as(Nil)
else ZIO.succeed(state.pendingCommits)
} yield State(pendingRequests, newPendingCommits, bufferedRecords)

def handleRequest(state: State[K, V], req: Command.Request[K, V]): URIO[Blocking, State[K, V]] =
deps.consumer
Expand Down

0 comments on commit 80d4b9a

Please sign in to comment.