Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Producer alternative send chunk #1284

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
test("produces messages") {
for {
topic <- randomTopic
firstMessage = "toto"
secondMessage = "tata"
firstMessage = "firstMessage"
secondMessage = "secondMessage"
consume = (n: Int) =>
Consumer
.plainStream(Subscription.topics(topic), Serde.byteArray, Serde.byteArray)
Expand Down
90 changes: 57 additions & 33 deletions zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package zio.kafka.producer

import org.apache.kafka.clients.producer.{ KafkaProducer, Producer => JProducer, ProducerRecord, RecordMetadata }
import org.apache.kafka.common.errors.AuthenticationException
import org.apache.kafka.common.errors.AuthorizationException
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.{ Metric, MetricName, PartitionInfo }
import zio._
import zio.kafka.serde.Serializer
import zio.kafka.utils.SslHelper
import zio.stream.{ ZPipeline, ZStream }

import java.util.concurrent.atomic.AtomicLong
import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal

trait Producer {

Expand Down Expand Up @@ -224,7 +224,7 @@ object Producer {
Queue.bounded[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])](
settings.sendBufferSize
)
producer = new ProducerLive(javaProducer, sendQueue)
producer = new ProducerLive(javaProducer, sendQueue, settings)
_ <- producer.sendFromQueue.forkScoped
} yield producer

Expand Down Expand Up @@ -361,7 +361,8 @@ object Producer {

private[producer] final class ProducerLive(
private[producer] val p: JProducer[Array[Byte], Array[Byte]],
sendQueue: Queue[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])]
sendQueue: Queue[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])],
settings: ProducerSettings
) extends Producer {

override def produce(record: ProducerRecord[Array[Byte], Array[Byte]]): Task[RecordMetadata] =
Expand Down Expand Up @@ -471,39 +472,62 @@ private[producer] final class ProducerLive(
.runDrain
}

private def sendRecordCancellable(
record: ByteRecord
)(cb: Either[Throwable, RecordMetadata] => Unit): () => Unit = {
val f = p.send(
record,
(metadata: RecordMetadata, exception: Exception) =>
if (exception != null)
cb(Left(exception))
else cb(Right(metadata))
)

() => {
val _ = f.cancel(false) // prevent warning for not using the result
()
}
}

private def send(record: ByteRecord)(implicit trace: Trace): Task[RecordMetadata] =
ZIO.asyncInterrupt[Any, Throwable, RecordMetadata] { cb =>
val cancel = sendRecordCancellable(record) {
case Left(e) => cb(ZIO.fail(e))
case Right(v) => cb(ZIO.succeed(v))
}

Left(ZIO.succeed(cancel()))
}

private def sendChunk(
serializedRecords: Chunk[ByteRecord]
): ZIO[Any, Nothing, Chunk[Either[Throwable, RecordMetadata]]] =
ZIO
.async[Any, Nothing, Chunk[Either[Throwable, RecordMetadata]]] { callback =>
try {
val it: Iterator[(ByteRecord, Int)] = serializedRecords.iterator.zipWithIndex
val res: Array[Either[Throwable, RecordMetadata]] =
new Array[Either[Throwable, RecordMetadata]](serializedRecords.length)
val count: AtomicLong = new AtomicLong
val length = serializedRecords.length

while (it.hasNext) {
val (rec, idx): (ByteRecord, Int) = it.next()

// Since we might be sending to multiple partitions, the callbacks
// are _not_ necessarily called in order.
val _ = p.send(
rec,
(metadata: RecordMetadata, err: Exception) => {
res(idx) = Either.cond(err == null, metadata, err)

if (count.incrementAndGet == length) {
callback(ZIO.succeed(Chunk.fromArray(res)))
}
}
)
(for {
fibers <-
// Since we might be sending to multiple partitions,
// the results might not return in order. Because of this,
// we add a zipWithIndex to keep track of the original order
ZIO
.foreach(serializedRecords.zipWithIndex) { record =>
val e: URIO[Any, Either[Throwable, RecordMetadata]] =
send(record._1)
.retry(Schedule.recurWhileZIO[Any, Throwable] {
case _: AuthorizationException | _: AuthenticationException =>
ZIO
.logDebug("retrying Kafka schedule due to AuthorizationException or AuthenticationException")
.when(settings.retryOnAuthFailures)
.as(settings.retryOnAuthFailures)
case _ => ZIO.succeed(false)
} && settings.authErrorRetrySchedule)
.either

e.map(v => (v, record._2)).fork
}
} catch {
case NonFatal(e) =>
callback(ZIO.succeed(Chunk.fill(serializedRecords.size)(Left(e))))
}
}
result <- Fiber
.collectAll(fibers)
.join
.map(_.sortBy(_._2).map(_._1))
} yield result)

private def serialize[R, K, V](
r: ProducerRecord[K, V],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import zio.kafka.security.KafkaCredentialStore
final case class ProducerSettings(
closeTimeout: Duration = 30.seconds,
sendBufferSize: Int = 4096,
properties: Map[String, AnyRef] = Map.empty
properties: Map[String, AnyRef] = Map.empty,
retryOnAuthFailures: Boolean = false,
authErrorRetrySchedule: Schedule[Any, Throwable, Any] = Schedule.recurs(5) && Schedule.spaced(500.millis)
) {
def driverSettings: Map[String, AnyRef] = properties

Expand All @@ -33,6 +35,10 @@ final case class ProducerSettings(
withProperties(credentialsStore.properties)

def withSendBufferSize(sendBufferSize: Int) = copy(sendBufferSize = sendBufferSize)

def withRetryOnAuthFailures(retry: Boolean) = copy(retryOnAuthFailures = retry)

def withAuthErrorRetrySchedule(schedule: Schedule[Any, Throwable, Any]) = copy(authErrorRetrySchedule = schedule)
}

object ProducerSettings {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ object TransactionalProducer {
Queue.bounded[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])](
settings.producerSettings.sendBufferSize
)
live = new ProducerLive(rawProducer, sendQueue)
live = new ProducerLive(rawProducer, sendQueue, settings.producerSettings)
_ <- live.sendFromQueue.forkScoped
} yield new LiveTransactionalProducer(live, semaphore)
}
Loading