Skip to content

Commit

Permalink
next approach on send including cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
domdorn committed Jul 17, 2024
1 parent 7bca691 commit 62f5689
Showing 1 changed file with 48 additions and 30 deletions.
78 changes: 48 additions & 30 deletions zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
package zio.kafka.producer

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata, Producer => JProducer}
import org.apache.kafka.clients.producer.{ KafkaProducer, Producer => JProducer, ProducerRecord, RecordMetadata }
import org.apache.kafka.common.errors.AuthenticationException
import org.apache.kafka.common.errors.AuthorizationException
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.{Metric, MetricName, PartitionInfo}
import org.apache.kafka.common.{ Metric, MetricName, PartitionInfo }
import zio._
import zio.kafka.serde.Serializer
import zio.kafka.utils.SslHelper
import zio.stream.{ZPipeline, ZStream}
import zio.stream.{ ZPipeline, ZStream }

import java.util.concurrent.atomic.AtomicLong
import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal

trait Producer {

Expand Down Expand Up @@ -59,7 +57,7 @@ trait Producer {
* Produces a single record. The effect returned from this method has two layers and describes the completion of two
* actions:
* 1. The outer layer describes the enqueueing of the record to the Producer's internal buffer.
* 1. The inner layer describes receiving an acknowledgement from the broker for the transmission of the record.
* 2. The inner layer describes receiving an acknowledgement from the broker for the transmission of the record.
*
* It is usually recommended to not await the inner layer of every individual record, but enqueue a batch of records
* and await all of their acknowledgements at once. That amortizes the cost of sending requests to Kafka and increases
Expand All @@ -73,7 +71,7 @@ trait Producer {
* Produces a single record. The effect returned from this method has two layers and describes the completion of two
* actions:
* 1. The outer layer describes the enqueueing of the record to the Producer's internal buffer.
* 1. The inner layer describes receiving an acknowledgement from the broker for the transmission of the record.
* 2. The inner layer describes receiving an acknowledgement from the broker for the transmission of the record.
*
* It is usually recommended to not await the inner layer of every individual record, but enqueue a batch of records
* and await all of their acknowledgements at once. That amortizes the cost of sending requests to Kafka and increases
Expand Down Expand Up @@ -139,7 +137,7 @@ trait Producer {
* Produces a chunk of records. The effect returned from this method has two layers and describes the completion of
* two actions:
* 1. The outer layer describes the enqueueing of all the records to the Producer's internal buffer.
* 1. The inner layer describes receiving an acknowledgement from the broker for the transmission of the records.
* 2. The inner layer describes receiving an acknowledgement from the broker for the transmission of the records.
*
* It is possible that for chunks that exceed the producer's internal buffer size, the outer layer will also signal
* the transmission of part of the chunk. Regardless, awaiting the inner layer guarantees the transmission of the
Expand All @@ -155,7 +153,7 @@ trait Producer {
* Produces a chunk of records. The effect returned from this method has two layers and describes the completion of
* two actions:
* 1. The outer layer describes the enqueueing of all the records to the Producer's internal buffer.
* 1. The inner layer describes receiving an acknowledgement from the broker for the transmission of the records.
* 2. The inner layer describes receiving an acknowledgement from the broker for the transmission of the records.
*
* It is possible that for chunks that exceed the producer's internal buffer size, the outer layer will also signal
* the transmission of part of the chunk. Regardless, awaiting the inner layer guarantees the transmission of the
Expand Down Expand Up @@ -474,36 +472,56 @@ final private[producer] class ProducerLive(
.runDrain
}

private def sendRecordCancellable(record: ByteRecord)(cb: Either[Throwable, RecordMetadata] => Unit): () => Unit = {
val f = p.send(
record,
(metadata: RecordMetadata, exception: Exception) =>
if (exception != null)
cb(Left(exception))
else cb(Right(metadata))
)

() => f.cancel(false): Unit
}

private def send(record: ByteRecord)(implicit trace: Trace): Task[RecordMetadata] =
ZIO.asyncInterrupt[Any, Throwable, RecordMetadata] { cb =>
val cancel = sendRecordCancellable(record) {
case Left(e) => cb(ZIO.fail(e))
case Right(v) => cb(ZIO.succeed(v))
}

Left(ZIO.succeed(cancel()))
}

private def sendChunk(
serializedRecords: Chunk[ByteRecord]
): ZIO[Any, Nothing, Chunk[Either[Throwable, RecordMetadata]]] =
for {
fibers <-
ZIO
.foreach(serializedRecords.zipWithIndex)(record =>
ZIO
.fromFutureJava(p.send(record._1))
.retry(Schedule.recurWhileZIO[Any, Throwable] {
case _: AuthorizationException | _: AuthenticationException =>
ZIO
.logDebug("retrying Kafka schedule due to AuthorizationException or AuthenticationException")
.when(settings.retryOnAuthFailures)
.as(settings.retryOnAuthFailures)
case _ => ZIO.succeed(false)
} && settings.authErrorRetrySchedule)
.mapError(e => Chunk.fill(serializedRecords.length)(Left(e)): Chunk[Either[Throwable, RecordMetadata]])
.fork
.map(fiber => (fiber, record._2))
)
.foreach(serializedRecords.zipWithIndex) { record =>
val e: URIO[Any, Either[Throwable, RecordMetadata]] =
send(record._1)
.retry(Schedule.recurWhileZIO[Any, Throwable] {
case _: AuthorizationException | _: AuthenticationException =>
ZIO
.logDebug("retrying Kafka schedule due to AuthorizationException or AuthenticationException")
.when(settings.retryOnAuthFailures)
.as(settings.retryOnAuthFailures)
case _ => ZIO.succeed(false)
} && settings.authErrorRetrySchedule)
.either

e.map(v => (v, record._2)).fork
}
_ <- Fiber.awaitAll(fibers)
result <- ZIO
.foreach(fibers)(f => f._1.join.map(v => (v, f._2)))
.map(_.sortBy(_._2).reverse)
.map(_.map(_._1))
.map(v => v.map(Right(_)): Chunk[Either[Throwable, RecordMetadata]])
.fold(identity, identity)
.foreach(fibers)(_.join)
.map(_.sortBy(_._2))
.map(_.map(_._1))
} yield result


private def serialize[R, K, V](
r: ProducerRecord[K, V],
keySerializer: Serializer[R, K],
Expand Down

0 comments on commit 62f5689

Please sign in to comment.