diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala index abbfe0d97..5a1b8fc56 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala @@ -1,18 +1,16 @@ package zio.kafka.producer -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata, Producer => JProducer} +import org.apache.kafka.clients.producer.{ KafkaProducer, Producer => JProducer, ProducerRecord, RecordMetadata } import org.apache.kafka.common.errors.AuthenticationException import org.apache.kafka.common.errors.AuthorizationException import org.apache.kafka.common.serialization.ByteArraySerializer -import org.apache.kafka.common.{Metric, MetricName, PartitionInfo} +import org.apache.kafka.common.{ Metric, MetricName, PartitionInfo } import zio._ import zio.kafka.serde.Serializer import zio.kafka.utils.SslHelper -import zio.stream.{ZPipeline, ZStream} +import zio.stream.{ ZPipeline, ZStream } -import java.util.concurrent.atomic.AtomicLong import scala.jdk.CollectionConverters._ -import scala.util.control.NonFatal trait Producer { @@ -59,7 +57,7 @@ trait Producer { * Produces a single record. The effect returned from this method has two layers and describes the completion of two * actions: * 1. The outer layer describes the enqueueing of the record to the Producer's internal buffer. - * 1. The inner layer describes receiving an acknowledgement from the broker for the transmission of the record. + * 2. The inner layer describes receiving an acknowledgement from the broker for the transmission of the record. * * It is usually recommended to not await the inner layer of every individual record, but enqueue a batch of records * and await all of their acknowledgements at once. That amortizes the cost of sending requests to Kafka and increases @@ -73,7 +71,7 @@ trait Producer { * Produces a single record. The effect returned from this method has two layers and describes the completion of two * actions: * 1. The outer layer describes the enqueueing of the record to the Producer's internal buffer. - * 1. The inner layer describes receiving an acknowledgement from the broker for the transmission of the record. + * 2. The inner layer describes receiving an acknowledgement from the broker for the transmission of the record. * * It is usually recommended to not await the inner layer of every individual record, but enqueue a batch of records * and await all of their acknowledgements at once. That amortizes the cost of sending requests to Kafka and increases @@ -139,7 +137,7 @@ trait Producer { * Produces a chunk of records. The effect returned from this method has two layers and describes the completion of * two actions: * 1. The outer layer describes the enqueueing of all the records to the Producer's internal buffer. - * 1. The inner layer describes receiving an acknowledgement from the broker for the transmission of the records. + * 2. The inner layer describes receiving an acknowledgement from the broker for the transmission of the records. * * It is possible that for chunks that exceed the producer's internal buffer size, the outer layer will also signal * the transmission of part of the chunk. Regardless, awaiting the inner layer guarantees the transmission of the @@ -155,7 +153,7 @@ trait Producer { * Produces a chunk of records. The effect returned from this method has two layers and describes the completion of * two actions: * 1. The outer layer describes the enqueueing of all the records to the Producer's internal buffer. - * 1. The inner layer describes receiving an acknowledgement from the broker for the transmission of the records. + * 2. The inner layer describes receiving an acknowledgement from the broker for the transmission of the records. * * It is possible that for chunks that exceed the producer's internal buffer size, the outer layer will also signal * the transmission of part of the chunk. Regardless, awaiting the inner layer guarantees the transmission of the @@ -474,36 +472,56 @@ final private[producer] class ProducerLive( .runDrain } + private def sendRecordCancellable(record: ByteRecord)(cb: Either[Throwable, RecordMetadata] => Unit): () => Unit = { + val f = p.send( + record, + (metadata: RecordMetadata, exception: Exception) => + if (exception != null) + cb(Left(exception)) + else cb(Right(metadata)) + ) + + () => f.cancel(false): Unit + } + + private def send(record: ByteRecord)(implicit trace: Trace): Task[RecordMetadata] = + ZIO.asyncInterrupt[Any, Throwable, RecordMetadata] { cb => + val cancel = sendRecordCancellable(record) { + case Left(e) => cb(ZIO.fail(e)) + case Right(v) => cb(ZIO.succeed(v)) + } + + Left(ZIO.succeed(cancel())) + } + private def sendChunk( serializedRecords: Chunk[ByteRecord] ): ZIO[Any, Nothing, Chunk[Either[Throwable, RecordMetadata]]] = for { fibers <- ZIO - .foreach(serializedRecords.zipWithIndex)(record => - ZIO - .fromFutureJava(p.send(record._1)) - .retry(Schedule.recurWhileZIO[Any, Throwable] { - case _: AuthorizationException | _: AuthenticationException => - ZIO - .logDebug("retrying Kafka schedule due to AuthorizationException or AuthenticationException") - .when(settings.retryOnAuthFailures) - .as(settings.retryOnAuthFailures) - case _ => ZIO.succeed(false) - } && settings.authErrorRetrySchedule) - .mapError(e => Chunk.fill(serializedRecords.length)(Left(e)): Chunk[Either[Throwable, RecordMetadata]]) - .fork - .map(fiber => (fiber, record._2)) - ) + .foreach(serializedRecords.zipWithIndex) { record => + val e: URIO[Any, Either[Throwable, RecordMetadata]] = + send(record._1) + .retry(Schedule.recurWhileZIO[Any, Throwable] { + case _: AuthorizationException | _: AuthenticationException => + ZIO + .logDebug("retrying Kafka schedule due to AuthorizationException or AuthenticationException") + .when(settings.retryOnAuthFailures) + .as(settings.retryOnAuthFailures) + case _ => ZIO.succeed(false) + } && settings.authErrorRetrySchedule) + .either + + e.map(v => (v, record._2)).fork + } + _ <- Fiber.awaitAll(fibers) result <- ZIO - .foreach(fibers)(f => f._1.join.map(v => (v, f._2))) - .map(_.sortBy(_._2).reverse) - .map(_.map(_._1)) - .map(v => v.map(Right(_)): Chunk[Either[Throwable, RecordMetadata]]) - .fold(identity, identity) + .foreach(fibers)(_.join) + .map(_.sortBy(_._2)) + .map(_.map(_._1)) } yield result - private def serialize[R, K, V]( r: ProducerRecord[K, V], keySerializer: Serializer[R, K],