Skip to content

Commit

Permalink
Add a Producer sink for producing record chunks (#35)
Browse files Browse the repository at this point in the history
* Draft of Producer sink

* Remove unused import

* Remove passthrough functionality that was not going to work

* Remove inefficient non-chunked sink
  • Loading branch information
svroonland authored and iravid committed Sep 29, 2019
1 parent ddbed58 commit 2e3afdd
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 5 deletions.
22 changes: 18 additions & 4 deletions src/main/scala/zio/kafka/client/Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package zio.kafka.client
import java.util.concurrent.Future
import java.util.concurrent.atomic.AtomicLong

import scala.collection.JavaConverters._

import org.apache.kafka.clients.producer.{ Callback, KafkaProducer, ProducerRecord, RecordMetadata }
import org.apache.kafka.common.serialization.Serde

import zio.{ Chunk, Promise, UIO, ZIO, ZManaged }
import zio._
import zio.blocking._
import zio.stream.ZSink

import scala.collection.JavaConverters._

trait Producer[K, V] {
def produce(record: ProducerRecord[K, V]): BlockingTask[RecordMetadata]
Expand Down Expand Up @@ -96,4 +96,18 @@ object Producer {
.map(unsafeMake)
}

/**
* Sink that produces records to Kafka in chunks
*
* @param settings
* @tparam K
* @tparam V
* @return
*/
def sink[K: Serde, V: Serde](
settings: ProducerSettings
): ZManaged[Blocking, Throwable, ZSink[Blocking, Throwable, Nothing, Chunk[ProducerRecord[K, V]], Unit]] =
make[K, V](settings).map { producer =>
ZSink.drain.contramapM(producer.produceChunk)
}
}
2 changes: 1 addition & 1 deletion src/main/scala/zio/kafka/client/ProducerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import org.apache.kafka.clients.producer.ProducerConfig
case class ProducerSettings(
bootstrapServers: List[String],
closeTimeout: Duration,
extraDriverSettings: Map[String, String]
extraDriverSettings: Map[String, AnyRef]
) {
def driverSettings: Map[String, AnyRef] =
Map(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers.mkString(",")) ++
Expand Down

0 comments on commit 2e3afdd

Please sign in to comment.