Skip to content

Commit

Permalink
Kafka publish & commit
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw committed Aug 22, 2023
1 parent 3a93022 commit e3a922e
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 23 deletions.
4 changes: 3 additions & 1 deletion kafka/src/main/scala/ox/kafka/ConsumerSettings.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package ox.kafka

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.serialization.{Deserializer, StringDeserializer}
import ox.kafka.ConsumerSettings.AutoOffsetReset

Expand Down Expand Up @@ -29,6 +29,8 @@ case class ConsumerSettings[K, V](
otherProperties.foreach { case (key, value) => props.put(key, value) }
props

def toConsumer: KafkaConsumer[K, V] = KafkaConsumer(toProperties, keyDeserializer, valueDeserializer)

object ConsumerSettings:
private val StringDeserializerInstance = new StringDeserializer
def default(groupId: String): ConsumerSettings[String, String] =
Expand Down
124 changes: 115 additions & 9 deletions kafka/src/main/scala/ox/kafka/KafkaSink.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
package ox.kafka

import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.TopicPartition
import ox.*
import ox.channels.{ChannelClosed, Sink, StageCapacity}
import ox.channels.*

import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable
import scala.concurrent.duration.*
import scala.jdk.CollectionConverters.*

object KafkaSink:
def publish[K, V](settings: ProducerSettings[K, V])(using StageCapacity, Ox): Sink[ProducerRecord[K, V]] =
val producer = new KafkaProducer(settings.toProperties, settings.keySerializer, settings.valueSerializer)
publish(producer, closeWhenComplete = true)
publish(settings.toProducer, closeWhenComplete = true)

def publish[K, V](producer: KafkaProducer[K, V], closeWhenComplete: Boolean)(using StageCapacity, Ox): Sink[ProducerRecord[K, V]] =
val c = StageCapacity.newChannel[ProducerRecord[K, V]]
Expand All @@ -19,14 +25,114 @@ object KafkaSink:
case e: ChannelClosed.Error => c.error(e.toThrowable); false
case ChannelClosed.Done => false
case record: ProducerRecord[K, V] @unchecked =>
producer.send(
record,
(_: RecordMetadata, exception: Exception) => {
if exception != null then c.error(exception)
}
); true
tapException {
producer.send(
record,
(_: RecordMetadata, exception: Exception) => {
if exception != null then c.error(exception)
}
)
}(c.error)
true
}
finally if closeWhenComplete then producer.close()
}

c

/** @return
* A sink, which accepts commit packets. For each packet, first all messages (producer records) are sent. Then, all messages up to the
* offsets of the consumer messages are committed.
*/
def publishAndCommit[K, V](consumerSettings: ConsumerSettings[K, V], producerSettings: ProducerSettings[K, V])(using
StageCapacity,
Ox
): Sink[SendPacket[K, V]] =
publishAndCommit(consumerSettings.toConsumer, producerSettings.toProducer, closeWhenComplete = true)

/** @param consumer
* The consumer that is used to commit offsets.
* @param producer
* The producer that is used to send messages.
* @return
* A sink, which accepts send packets. For each packet, first all `send` messages (producer records) are sent. Then, all `commit`
* messages (consumer records) up to their offsets are committed.
*/
def publishAndCommit[K, V](consumer: KafkaConsumer[K, V], producer: KafkaProducer[K, V], closeWhenComplete: Boolean)(using
StageCapacity,
Ox
): Sink[SendPacket[K, V]] =
val c = StageCapacity.newChannel[SendPacket[K, V]]
val toCommit = Channel[SendPacket[_, _]](128)

fork {
try
// starting a nested scope, so that the committer is interrupted when the main process ends
scoped {
// committer
fork(tapException(doCommit(consumer, toCommit))(c.error))

repeatWhile {
c.receive() match
case e: ChannelClosed.Error => c.error(e.toThrowable); false
case ChannelClosed.Done => false
case packet: SendPacket[K, V] @unchecked =>
tapException(sendPacket(producer, packet, toCommit, c.error))(c.error)
true
}
}
finally
if closeWhenComplete then
consumer.close()
producer.close()
}

c

private def sendPacket[K, V](
producer: KafkaProducer[K, V],
packet: SendPacket[K, V],
toCommit: Sink[SendPacket[_, _]],
onSendException: Exception => Unit
) =
val leftToSend = new AtomicInteger(packet.send.size)
packet.send.foreach { toSend =>
producer.send(
toSend,
(_: RecordMetadata, exception: Exception) => {
if exception == null
then
if leftToSend.decrementAndGet() == 0 then toCommit.send(packet)
else onSendException(exception)
}
)
}

private def doCommit(consumer: KafkaConsumer[_, _], packets: Source[SendPacket[_, _]])(using Ox) =
val commitInterval = 1.second
val ticks = Source.tick(commitInterval)
val toCommit = mutable.Map[TopicPartition, Long]()

forever {
select(ticks, packets).orThrow match
case () =>
consumer.commitSync(toCommit.view.mapValues(new OffsetAndMetadata(_)).toMap.asJava)
toCommit.clear()
case packet: SendPacket[_, _] =>
packet.commit.foreach { record =>
val tp = new TopicPartition(record.topic(), record.partition())
toCommit.updateWith(tp) {
case Some(offset) => Some(math.max(offset, record.offset()))
case None => Some(record.offset())
}
}
}

case class SendPacket[K, V](send: List[ProducerRecord[K, V]], commit: List[ConsumerRecord[_, _]])

object SendPacket:
def apply[K, V](send: ProducerRecord[K, V], commit: ConsumerRecord[_, _]): SendPacket[K, V] =
SendPacket(List(send), List(commit))

def apply[K, V](send: List[ProducerRecord[K, V]], commit: ConsumerRecord[_, _]): SendPacket[K, V] =
SendPacket(send, List(commit))
7 changes: 1 addition & 6 deletions kafka/src/main/scala/ox/kafka/KafkaSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,7 @@ object KafkaSource:
StageCapacity,
Ox
): Source[ConsumerRecord[K, V]] =
subscribe(
new KafkaConsumer(settings.toProperties, settings.keyDeserializer, settings.valueDeserializer),
closeWhenComplete = true,
topic,
otherTopics: _*
)
subscribe(settings.toConsumer, closeWhenComplete = true, topic, otherTopics: _*)

def subscribe[K, V](kafkaConsumer: KafkaConsumer[K, V], closeWhenComplete: Boolean, topic: String, otherTopics: String*)(using
StageCapacity,
Expand Down
4 changes: 3 additions & 1 deletion kafka/src/main/scala/ox/kafka/ProducerSettings.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package ox.kafka

import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
import org.apache.kafka.common.serialization.{Serializer, StringSerializer}

import java.util.Properties
Expand All @@ -22,6 +22,8 @@ case class ProducerSettings[K, V](
otherProperties.foreach { case (key, value) => props.put(key, value) }
props

def toProducer: KafkaProducer[K, V] = KafkaProducer(toProperties, keySerializer, valueSerializer)

object ProducerSettings:
private val StringSerializerInstance = new StringSerializer
def default: ProducerSettings[String, String] =
Expand Down
58 changes: 52 additions & 6 deletions kafka/src/test/scala/ox/kafka/KafkaTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.channels.*
import ox.kafka.ConsumerSettings.AutoOffsetReset.Earliest
import ox.scoped
import ox.*

class KafkaTest extends AnyFlatSpec with Matchers with EmbeddedKafka with BeforeAndAfterAll {

private var kafkaPort: Int = _
private var bootstrapServer: String = _

override def beforeAll(): Unit =
kafkaPort = EmbeddedKafka.start().config.kafkaPort
bootstrapServer = s"localhost:${EmbeddedKafka.start().config.kafkaPort}"

override def afterAll(): Unit =
EmbeddedKafka.stop()
Expand All @@ -32,11 +32,10 @@ class KafkaTest extends AnyFlatSpec with Matchers with EmbeddedKafka with Before

scoped {
// then
val settings = ConsumerSettings.default(group).bootstrapServers(s"localhost:$kafkaPort").autoOffsetReset(Earliest)
val settings = ConsumerSettings.default(group).bootstrapServers(bootstrapServer).autoOffsetReset(Earliest)
val source = KafkaSource.subscribe(settings, topic)

source.receive().orThrow.value() shouldBe "msg1"

source.receive().orThrow.value() shouldBe "msg2"
source.receive().orThrow.value() shouldBe "msg3"

Expand All @@ -55,7 +54,7 @@ class KafkaTest extends AnyFlatSpec with Matchers with EmbeddedKafka with Before

// when
scoped {
val settings = ProducerSettings.default.bootstrapServers(s"localhost:$kafkaPort")
val settings = ProducerSettings.default.bootstrapServers(bootstrapServer)
Source
.fromIterable(List("a", "b", "c"))
.mapAsView(msg => ProducerRecord[String, String](topic, msg))
Expand All @@ -66,4 +65,51 @@ class KafkaTest extends AnyFlatSpec with Matchers with EmbeddedKafka with Before
given Deserializer[String] = new StringDeserializer()
consumeNumberMessagesFrom[String](topic, 3) shouldBe List("a", "b", "c")
}

it should "commit offsets of processed messages" in {
// given
val sourceTopic = "t3_1"
val destTopic = "t3_2"
val group1 = "g3_1"
val group2 = "g3_2"

val consumerSettings = ConsumerSettings.default(group1).bootstrapServers(bootstrapServer).autoOffsetReset(Earliest)
val producerSettings = ProducerSettings.default.bootstrapServers(bootstrapServer)

// when
publishStringMessageToKafka(sourceTopic, "10")
publishStringMessageToKafka(sourceTopic, "25")
publishStringMessageToKafka(sourceTopic, "92")

scoped {
// then
fork {
KafkaSource
.subscribe(consumerSettings, sourceTopic)
.map(in => (in.value().toLong * 2, in))
.map((value, original) => SendPacket(ProducerRecord[String, String](destTopic, value.toString), original))
.pipeTo(KafkaSink.publishAndCommit(consumerSettings, producerSettings))
}

val inDest = KafkaSource.subscribe(consumerSettings, destTopic)
inDest.receive().orThrow.value() shouldBe "20"
inDest.receive().orThrow.value() shouldBe "50"
inDest.receive().orThrow.value() shouldBe "184"

// interrupting the stream processing
}

// sending some more messages to source
publishStringMessageToKafka(sourceTopic, "4")

scoped {
// reading from source, using the same consumer group as before, should start from the last committed offset
val inSource = KafkaSource.subscribe(consumerSettings, sourceTopic)
inSource.receive().orThrow.value() shouldBe "4"

// while reading using another group, should start from the earliest offset
val inSource2 = KafkaSource.subscribe(consumerSettings.groupId(group2), sourceTopic)
inSource2.receive().orThrow.value() shouldBe "10"
}
}
}

0 comments on commit e3a922e

Please sign in to comment.