From 136aed2d51cbef1e60ad8acf4f2b011af54a6e29 Mon Sep 17 00:00:00 2001 From: Dominik Dorn Date: Tue, 16 Jul 2024 17:26:23 +0200 Subject: [PATCH] producer: alternative impl without async --- .../scala/zio/kafka/producer/Producer.scala | 71 +++++++------------ 1 file changed, 27 insertions(+), 44 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala index 12d40c7abf..abbfe0d978 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala @@ -477,50 +477,33 @@ final private[producer] class ProducerLive( private def sendChunk( serializedRecords: Chunk[ByteRecord] ): ZIO[Any, Nothing, Chunk[Either[Throwable, RecordMetadata]]] = - ZIO - .async[Any, Nothing, Chunk[Either[Throwable, RecordMetadata]]] { callback => - try { - val it: Iterator[(ByteRecord, Int)] = serializedRecords.iterator.zipWithIndex - val res: Array[Either[Throwable, RecordMetadata]] = - new Array[Either[Throwable, RecordMetadata]](serializedRecords.length) - val count: AtomicLong = new AtomicLong - val length = serializedRecords.length - - while (it.hasNext) { - val (rec, idx): (ByteRecord, Int) = it.next() - - // Since we might be sending to multiple partitions, the callbacks - // are _not_ necessarily called in order. - val _ = p.send( - rec, - (metadata: RecordMetadata, err: Exception) => { - res(idx) = Either.cond(err == null, metadata, err) - - if (count.incrementAndGet == length) { - callback(ZIO.succeed(Chunk.fromArray(res))) - } - } - ) - } - } catch { - case NonFatal(e) => - callback(ZIO.succeed(Chunk.fill(serializedRecords.size)(Left(e)))) - } - } - .flatMap(r => - r.headOption match { - case Some(value) => - value match { - case Left(value) => ZIO.fail(value) - case _ => ZIO.succeed(r) - } - case None => ZIO.succeed(r) - }) - .retry(Schedule.recurWhile[Throwable] { - case _: AuthorizationException | _: AuthenticationException => settings.retryOnAuthFailures - case _ => false - } && settings.authErrorRetrySchedule) - .catchAll(e => ZIO.succeed(Chunk.fill(serializedRecords.size)(Left(e)))) + for { + fibers <- + ZIO + .foreach(serializedRecords.zipWithIndex)(record => + ZIO + .fromFutureJava(p.send(record._1)) + .retry(Schedule.recurWhileZIO[Any, Throwable] { + case _: AuthorizationException | _: AuthenticationException => + ZIO + .logDebug("retrying Kafka schedule due to AuthorizationException or AuthenticationException") + .when(settings.retryOnAuthFailures) + .as(settings.retryOnAuthFailures) + case _ => ZIO.succeed(false) + } && settings.authErrorRetrySchedule) + .mapError(e => Chunk.fill(serializedRecords.length)(Left(e)): Chunk[Either[Throwable, RecordMetadata]]) + .fork + .map(fiber => (fiber, record._2)) + ) + result <- ZIO + .foreach(fibers)(f => f._1.join.map(v => (v, f._2))) + .map(_.sortBy(_._2).reverse) + .map(_.map(_._1)) + .map(v => v.map(Right(_)): Chunk[Either[Throwable, RecordMetadata]]) + .fold(identity, identity) + } yield result + + private def serialize[R, K, V]( r: ProducerRecord[K, V], keySerializer: Serializer[R, K],