Skip to content

Commit

Permalink
Alternative producer implementation (#1285)
Browse files Browse the repository at this point in the history
Refactoring of the producer so that it handles errors per record.
  • Loading branch information
erikvanoosten authored Jul 18, 2024
1 parent 7452812 commit fd40816
Showing 1 changed file with 50 additions and 42 deletions.
92 changes: 50 additions & 42 deletions zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ import zio.kafka.serde.Serializer
import zio.kafka.utils.SslHelper
import zio.stream.{ ZPipeline, ZStream }

import java.util.concurrent.atomic.AtomicLong
import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal

trait Producer {

Expand Down Expand Up @@ -457,53 +455,63 @@ private[producer] final class ProducerLive(
override def metrics: Task[Map[MetricName, Metric]] = ZIO.attemptBlocking(p.metrics().asScala.toMap)

/**
* Calls to send may block when updating metadata or when communication with the broker is (temporarily) lost,
* therefore this stream is run on the blocking thread pool.
* Currently sending has the following characteristics:
* - You can submit many chunks, they get buffered in the send queue.
* - A chunk only gets send after the previous chunk completes (completes means that the callbacks for each record
* was invoked).
* - The records in a chunk are send in one go, in order. Records for the same partition have a high chance that
* they land in the same batch (which is good for compression).
* - Record ordering is retained and guaranteed between chunks.
* - Record ordering is retained and guaranteed within a chunk (per partition) unless `retries` has been enabled
* (see https://kafka.apache.org/documentation/#producerconfigs_retries).
*/
val sendFromQueue: ZIO[Any, Nothing, Any] =
ZIO.blocking {
ZStream
.fromQueueWithShutdown(sendQueue)
.mapZIO { case (serializedRecords, done) =>
sendChunk(serializedRecords)
.flatMap(done.succeed(_))
}
.runDrain
ZIO.runtime[Any].flatMap { runtime =>
// Calls to 'send' may block when updating metadata or when communication with the broker is (temporarily) lost,
// therefore this stream is run on the blocking thread pool.
ZIO.blocking {
ZStream
.fromQueueWithShutdown(sendQueue)
.mapZIO { case (serializedRecords, done) =>
sendChunk(runtime, serializedRecords)
.flatMap(done.succeed(_))
}
.runDrain
}
}

private def sendChunk(
runtime: Runtime[Any],
serializedRecords: Chunk[ByteRecord]
): ZIO[Any, Nothing, Chunk[Either[Throwable, RecordMetadata]]] =
ZIO
.async[Any, Nothing, Chunk[Either[Throwable, RecordMetadata]]] { callback =>
try {
val it: Iterator[(ByteRecord, Int)] = serializedRecords.iterator.zipWithIndex
val res: Array[Either[Throwable, RecordMetadata]] =
new Array[Either[Throwable, RecordMetadata]](serializedRecords.length)
val count: AtomicLong = new AtomicLong
val length = serializedRecords.length

while (it.hasNext) {
val (rec, idx): (ByteRecord, Int) = it.next()

// Since we might be sending to multiple partitions, the callbacks
// are _not_ necessarily called in order.
val _ = p.send(
rec,
(metadata: RecordMetadata, err: Exception) => {
res(idx) = Either.cond(err == null, metadata, err)

if (count.incrementAndGet == length) {
callback(ZIO.succeed(Chunk.fromArray(res)))
}
}
)
}
} catch {
case NonFatal(e) =>
callback(ZIO.succeed(Chunk.fill(serializedRecords.size)(Left(e))))
}
}
for {
promises <- ZIO.foreach(serializedRecords)(sendRecord(runtime))
results <- ZIO.foreach(promises)(_.await.either)
} yield results

private def sendRecord(
runtime: Runtime[Any]
)(record: ByteRecord): ZIO[Any, Nothing, Promise[Throwable, RecordMetadata]] = {
def unsafeRun(f: => ZIO[Any, Nothing, Any]): Unit = {
val _ = Unsafe.unsafe(implicit u => runtime.unsafe.run(f))
}

for {
done <- Promise.make[Throwable, RecordMetadata]
_ <- ZIO
.attempt[Any] {
p.send(
record,
(metadata: RecordMetadata, err: Exception) =>
unsafeRun {
if (err == null) done.succeed(metadata)
else done.fail(err)
}
)
}
.catchAll(err => done.fail(err))
} yield done
}

private def serialize[R, K, V](
r: ProducerRecord[K, V],
Expand Down

0 comments on commit fd40816

Please sign in to comment.