Skip to content

Commit

Permalink
[POC] Interrupible version
Browse files Browse the repository at this point in the history
  • Loading branch information
guizmaii committed Jan 11, 2023
1 parent 9aec63c commit 9e0457d
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 34 deletions.
58 changes: 27 additions & 31 deletions zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala
Original file line number Diff line number Diff line change
@@ -1,20 +1,13 @@
package zio.kafka.producer

import org.apache.kafka.clients.producer.{
Callback,
KafkaProducer,
Producer => JProducer,
ProducerRecord,
RecordMetadata
}
import org.apache.kafka.clients.producer.{ KafkaProducer, Producer => JProducer, ProducerRecord, RecordMetadata }
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.{ Metric, MetricName }
import zio._
import zio.kafka.serde.Serializer
import zio.stream.{ ZPipeline, ZStream }

import java.util.concurrent.atomic.AtomicLong
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._

trait Producer {
Expand Down Expand Up @@ -127,7 +120,6 @@ object Producer {
private[producer] final case class Live(
p: JProducer[Array[Byte], Array[Byte]],
producerSettings: ProducerSettings,
runtime: Runtime[Any],
sendQueue: Queue[(Chunk[ByteRecord], Promise[Throwable, Chunk[RecordMetadata]])]
) extends Producer {

Expand Down Expand Up @@ -164,30 +156,35 @@ object Producer {
ZStream
.fromQueueWithShutdown(sendQueue)
.mapZIO { case (serializedRecords, done) =>
ZIO.attempt {
ZIO.suspendSucceed {
val it: Iterator[(ByteRecord, Int)] = serializedRecords.iterator.zipWithIndex
val res: Array[RecordMetadata] = new Array[RecordMetadata](serializedRecords.length)
val length = serializedRecords.length
val res: Array[RecordMetadata] = new Array[RecordMetadata](length)
val count: AtomicLong = new AtomicLong

while (it.hasNext) {
val (rec, idx): (ByteRecord, Int) = it.next()

p.send(
rec,
new Callback {
def onCompletion(metadata: RecordMetadata, err: Exception): Unit =
Unsafe.unsafe { implicit u =>
(if (err != null) runtime.unsafe.run(done.fail(err)).getOrThrowFiberFailure(): Unit
else {
res(idx) = metadata
if (count.incrementAndGet == serializedRecords.length)
runtime.unsafe.run(done.succeed(Chunk.fromArray(res))).getOrThrowFiberFailure(): Unit
}): @nowarn("msg=discarded non-Unit value")
()
}
}
)
val send: ((ByteRecord, Int)) => Task[Unit] = { case (rec: ByteRecord, idx: Int) =>
ZIO.asyncInterrupt { callback =>
val future =
p.send(
rec,
(metadata: RecordMetadata, err: Exception) =>
if (err != null) callback.apply(done.fail(err).unit)
else {
res(idx) = metadata
if (count.incrementAndGet == length) {
callback.apply(done.succeed(Chunk.fromArray(res)).unit)
}
}
)

Left(ZIO.attempt(future.cancel(false)).unit.orDie)
}
}

// Code copied from `ZIO.foreachDiscard`
//
// `ZIO.foreachDiscard` takes an Iterable, which forces us to allocate an additional Chunk when we `zipWithIndex`
ZIO.whileLoop(it.hasNext)(send.apply(it.next()))(_ => ())
}
.foldCauseZIO(done.failCause, _ => ZIO.unit)
}
Expand Down Expand Up @@ -260,10 +257,9 @@ object Producer {
new ByteArraySerializer()
)
)
runtime <- ZIO.runtime[Any]
sendQueue <-
Queue.bounded[(Chunk[ByteRecord], Promise[Throwable, Chunk[RecordMetadata]])](settings.sendBufferSize)
producer <- ZIO.acquireRelease(ZIO.succeed(Live(rawProducer, settings, runtime, sendQueue)))(_.close)
producer <- ZIO.acquireRelease(ZIO.succeed(Live(rawProducer, settings, sendQueue)))(_.close)
_ <- ZIO.blocking(producer.sendFromQueue).forkScoped
} yield producer

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import org.apache.kafka.clients.producer.{ KafkaProducer, RecordMetadata }
import org.apache.kafka.common.errors.InvalidGroupIdException
import org.apache.kafka.common.serialization.ByteArraySerializer
import zio.Cause.Fail
import zio.kafka.consumer.OffsetBatch
import zio._
import zio.kafka.consumer.OffsetBatch

import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -84,13 +84,12 @@ object TransactionalProducer {
)
_ <- ZIO.attemptBlocking(rawProducer.initTransactions())
semaphore <- Semaphore.make(1)
runtime <- ZIO.runtime[Any]
sendQueue <-
Queue.bounded[(Chunk[ByteRecord], Promise[Throwable, Chunk[RecordMetadata]])](
settings.producerSettings.sendBufferSize
)
live <- ZIO.acquireRelease(
ZIO.succeed(Producer.Live(rawProducer, settings.producerSettings, runtime, sendQueue))
ZIO.succeed(Producer.Live(rawProducer, settings.producerSettings, sendQueue))
)(_.close)
_ <- ZIO.blocking(live.sendFromQueue).forkScoped
} yield LiveTransactionalProducer(live, semaphore)
Expand Down

0 comments on commit 9e0457d

Please sign in to comment.