Skip to content

Commit

Permalink
Kafka sink
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw committed Aug 22, 2023
1 parent c5072a0 commit 3a93022
Showing 1 changed file with 32 additions and 0 deletions.
32 changes: 32 additions & 0 deletions kafka/src/main/scala/ox/kafka/KafkaSink.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package ox.kafka

import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}
import ox.*
import ox.channels.{ChannelClosed, Sink, StageCapacity}

object KafkaSink:
def publish[K, V](settings: ProducerSettings[K, V])(using StageCapacity, Ox): Sink[ProducerRecord[K, V]] =
val producer = new KafkaProducer(settings.toProperties, settings.keySerializer, settings.valueSerializer)
publish(producer, closeWhenComplete = true)

def publish[K, V](producer: KafkaProducer[K, V], closeWhenComplete: Boolean)(using StageCapacity, Ox): Sink[ProducerRecord[K, V]] =
val c = StageCapacity.newChannel[ProducerRecord[K, V]]

fork {
try
repeatWhile {
c.receive() match
case e: ChannelClosed.Error => c.error(e.toThrowable); false
case ChannelClosed.Done => false
case record: ProducerRecord[K, V] @unchecked =>
producer.send(
record,
(_: RecordMetadata, exception: Exception) => {
if exception != null then c.error(exception)
}
); true
}
finally if closeWhenComplete then producer.close()
}

c

0 comments on commit 3a93022

Please sign in to comment.