diff --git a/zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala index 202dc44b3..96e193a5e 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala @@ -129,8 +129,8 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { test("produces messages") { for { topic <- randomTopic - firstMessage = "toto" - secondMessage = "tata" + firstMessage = "firstMessage" + secondMessage = "secondMessage" consume = (n: Int) => Consumer .plainStream(Subscription.topics(topic), Serde.byteArray, Serde.byteArray) diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala index 939f36d31..199a11aa5 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala @@ -1,6 +1,8 @@ package zio.kafka.producer import org.apache.kafka.clients.producer.{ KafkaProducer, Producer => JProducer, ProducerRecord, RecordMetadata } +import org.apache.kafka.common.errors.AuthenticationException +import org.apache.kafka.common.errors.AuthorizationException import org.apache.kafka.common.serialization.ByteArraySerializer import org.apache.kafka.common.{ Metric, MetricName, PartitionInfo } import zio._ @@ -8,9 +10,7 @@ import zio.kafka.serde.Serializer import zio.kafka.utils.SslHelper import zio.stream.{ ZPipeline, ZStream } -import java.util.concurrent.atomic.AtomicLong import scala.jdk.CollectionConverters._ -import scala.util.control.NonFatal trait Producer { @@ -224,7 +224,7 @@ object Producer { Queue.bounded[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])]( settings.sendBufferSize ) - producer = new ProducerLive(javaProducer, sendQueue) + producer = new ProducerLive(javaProducer, sendQueue, settings) _ <- producer.sendFromQueue.forkScoped } yield producer @@ -361,7 +361,8 @@ object Producer { private[producer] final class ProducerLive( private[producer] val p: JProducer[Array[Byte], Array[Byte]], - sendQueue: Queue[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])] + sendQueue: Queue[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])], + settings: ProducerSettings ) extends Producer { override def produce(record: ProducerRecord[Array[Byte], Array[Byte]]): Task[RecordMetadata] = @@ -471,39 +472,62 @@ private[producer] final class ProducerLive( .runDrain } + private def sendRecordCancellable( + record: ByteRecord + )(cb: Either[Throwable, RecordMetadata] => Unit): () => Unit = { + val f = p.send( + record, + (metadata: RecordMetadata, exception: Exception) => + if (exception != null) + cb(Left(exception)) + else cb(Right(metadata)) + ) + + () => { + val _ = f.cancel(false) // prevent warning for not using the result + () + } + } + + private def send(record: ByteRecord)(implicit trace: Trace): Task[RecordMetadata] = + ZIO.asyncInterrupt[Any, Throwable, RecordMetadata] { cb => + val cancel = sendRecordCancellable(record) { + case Left(e) => cb(ZIO.fail(e)) + case Right(v) => cb(ZIO.succeed(v)) + } + + Left(ZIO.succeed(cancel())) + } + private def sendChunk( serializedRecords: Chunk[ByteRecord] ): ZIO[Any, Nothing, Chunk[Either[Throwable, RecordMetadata]]] = - ZIO - .async[Any, Nothing, Chunk[Either[Throwable, RecordMetadata]]] { callback => - try { - val it: Iterator[(ByteRecord, Int)] = serializedRecords.iterator.zipWithIndex - val res: Array[Either[Throwable, RecordMetadata]] = - new Array[Either[Throwable, RecordMetadata]](serializedRecords.length) - val count: AtomicLong = new AtomicLong - val length = serializedRecords.length - - while (it.hasNext) { - val (rec, idx): (ByteRecord, Int) = it.next() - - // Since we might be sending to multiple partitions, the callbacks - // are _not_ necessarily called in order. - val _ = p.send( - rec, - (metadata: RecordMetadata, err: Exception) => { - res(idx) = Either.cond(err == null, metadata, err) - - if (count.incrementAndGet == length) { - callback(ZIO.succeed(Chunk.fromArray(res))) - } - } - ) + (for { + fibers <- + // Since we might be sending to multiple partitions, + // the results might not return in order. Because of this, + // we add a zipWithIndex to keep track of the original order + ZIO + .foreach(serializedRecords.zipWithIndex) { record => + val e: URIO[Any, Either[Throwable, RecordMetadata]] = + send(record._1) + .retry(Schedule.recurWhileZIO[Any, Throwable] { + case _: AuthorizationException | _: AuthenticationException => + ZIO + .logDebug("retrying Kafka schedule due to AuthorizationException or AuthenticationException") + .when(settings.retryOnAuthFailures) + .as(settings.retryOnAuthFailures) + case _ => ZIO.succeed(false) + } && settings.authErrorRetrySchedule) + .either + + e.map(v => (v, record._2)).fork } - } catch { - case NonFatal(e) => - callback(ZIO.succeed(Chunk.fill(serializedRecords.size)(Left(e)))) - } - } + result <- Fiber + .collectAll(fibers) + .join + .map(_.sortBy(_._2).map(_._1)) + } yield result) private def serialize[R, K, V]( r: ProducerRecord[K, V], diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/ProducerSettings.scala b/zio-kafka/src/main/scala/zio/kafka/producer/ProducerSettings.scala index 06f185bda..f0071b2d1 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/ProducerSettings.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/ProducerSettings.scala @@ -7,7 +7,9 @@ import zio.kafka.security.KafkaCredentialStore final case class ProducerSettings( closeTimeout: Duration = 30.seconds, sendBufferSize: Int = 4096, - properties: Map[String, AnyRef] = Map.empty + properties: Map[String, AnyRef] = Map.empty, + retryOnAuthFailures: Boolean = false, + authErrorRetrySchedule: Schedule[Any, Throwable, Any] = Schedule.recurs(5) && Schedule.spaced(500.millis) ) { def driverSettings: Map[String, AnyRef] = properties @@ -33,6 +35,10 @@ final case class ProducerSettings( withProperties(credentialsStore.properties) def withSendBufferSize(sendBufferSize: Int) = copy(sendBufferSize = sendBufferSize) + + def withRetryOnAuthFailures(retry: Boolean) = copy(retryOnAuthFailures = retry) + + def withAuthErrorRetrySchedule(schedule: Schedule[Any, Throwable, Any]) = copy(authErrorRetrySchedule = schedule) } object ProducerSettings { diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala index c27f90633..cfb29e6b8 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala @@ -100,7 +100,7 @@ object TransactionalProducer { Queue.bounded[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])]( settings.producerSettings.sendBufferSize ) - live = new ProducerLive(rawProducer, sendQueue) + live = new ProducerLive(rawProducer, sendQueue, settings.producerSettings) _ <- live.sendFromQueue.forkScoped } yield new LiveTransactionalProducer(live, semaphore) }