Skip to content

Commit

Permalink
producer: alternative impl without async
Browse files Browse the repository at this point in the history
  • Loading branch information
domdorn committed Jul 16, 2024
1 parent bec944f commit 136aed2
Showing 1 changed file with 27 additions and 44 deletions.
71 changes: 27 additions & 44 deletions zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -477,50 +477,33 @@ final private[producer] class ProducerLive(
private def sendChunk(
serializedRecords: Chunk[ByteRecord]
): ZIO[Any, Nothing, Chunk[Either[Throwable, RecordMetadata]]] =
ZIO
.async[Any, Nothing, Chunk[Either[Throwable, RecordMetadata]]] { callback =>
try {
val it: Iterator[(ByteRecord, Int)] = serializedRecords.iterator.zipWithIndex
val res: Array[Either[Throwable, RecordMetadata]] =
new Array[Either[Throwable, RecordMetadata]](serializedRecords.length)
val count: AtomicLong = new AtomicLong
val length = serializedRecords.length

while (it.hasNext) {
val (rec, idx): (ByteRecord, Int) = it.next()

// Since we might be sending to multiple partitions, the callbacks
// are _not_ necessarily called in order.
val _ = p.send(
rec,
(metadata: RecordMetadata, err: Exception) => {
res(idx) = Either.cond(err == null, metadata, err)

if (count.incrementAndGet == length) {
callback(ZIO.succeed(Chunk.fromArray(res)))
}
}
)
}
} catch {
case NonFatal(e) =>
callback(ZIO.succeed(Chunk.fill(serializedRecords.size)(Left(e))))
}
}
.flatMap(r =>
r.headOption match {
case Some(value) =>
value match {
case Left(value) => ZIO.fail(value)
case _ => ZIO.succeed(r)
}
case None => ZIO.succeed(r)
})
.retry(Schedule.recurWhile[Throwable] {
case _: AuthorizationException | _: AuthenticationException => settings.retryOnAuthFailures
case _ => false
} && settings.authErrorRetrySchedule)
.catchAll(e => ZIO.succeed(Chunk.fill(serializedRecords.size)(Left(e))))
for {
fibers <-
ZIO
.foreach(serializedRecords.zipWithIndex)(record =>
ZIO
.fromFutureJava(p.send(record._1))
.retry(Schedule.recurWhileZIO[Any, Throwable] {
case _: AuthorizationException | _: AuthenticationException =>
ZIO
.logDebug("retrying Kafka schedule due to AuthorizationException or AuthenticationException")
.when(settings.retryOnAuthFailures)
.as(settings.retryOnAuthFailures)
case _ => ZIO.succeed(false)
} && settings.authErrorRetrySchedule)
.mapError(e => Chunk.fill(serializedRecords.length)(Left(e)): Chunk[Either[Throwable, RecordMetadata]])
.fork
.map(fiber => (fiber, record._2))
)
result <- ZIO
.foreach(fibers)(f => f._1.join.map(v => (v, f._2)))
.map(_.sortBy(_._2).reverse)
.map(_.map(_._1))
.map(v => v.map(Right(_)): Chunk[Either[Throwable, RecordMetadata]])
.fold(identity, identity)
} yield result


private def serialize[R, K, V](
r: ProducerRecord[K, V],
keySerializer: Serializer[R, K],
Expand Down

0 comments on commit 136aed2

Please sign in to comment.