Skip to content

Commit

Permalink
Kafka stage
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw committed Aug 24, 2023
1 parent 472d563 commit 90c6df9
Show file tree
Hide file tree
Showing 7 changed files with 262 additions and 198 deletions.
28 changes: 24 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -581,10 +581,10 @@ Dependency:
"com.softwaremill.ox" %% "kafka" % "0.0.10"
```

`Source`s which read from a Kafka topic, and drains which publish to Kafka topics are available through the `KafkaSource`
and `KafkaDrain` objects. In all cases either a manually constructed instance of a `KafkaProducer` / `KafkaConsumer`
is needed, or `ProducerSettings` / `ConsumerSetttings` need to be provided with the bootstrap servers, consumer group id,
key / value serializers, etc.
`Source`s which read from a Kafka topic, mapping stages and drains which publish to Kafka topics are available through
the `KafkaSource`, `KafkaStage` and `KafkaDrain` objects. In all cases either a manually constructed instance of a
`KafkaProducer` / `KafkaConsumer` is needed, or `ProducerSettings` / `ConsumerSetttings` need to be provided with the
bootstrap servers, consumer group id, key / value serializers, etc.

To read from a Kafka topic, use:

Expand Down Expand Up @@ -641,6 +641,26 @@ scoped {

The offsets are committed every second in a background process.

To publish data as a mapping stage:

```scala
import ox.channel.Source
import ox.kafka.{ProducerSettings, KafkaSink}
import ox.kafka.KafkaStage.*
import ox.scoped
import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata}

scoped {
val settings = ProducerSettings.default.bootstrapServers("localhost:9092")
val metadatas: Source[RecordMetadata] = Source
.fromIterable(List("a", "b", "c"))
.mapAsView(msg => ProducerRecord[String, String]("my_topic", msg))
.mapPublish(settings)

// process the metadatas source further
}
```

# Performance

Performance is unknown, hasn't been measured and the code hasn't been optimized. We'd welcome contributions in this
Expand Down
24 changes: 0 additions & 24 deletions kafka/src/main/scala/ox/kafka/KafkaDrain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -109,27 +109,3 @@ object KafkaDrain:
}
)
}

private def doCommit(packets: Source[SendPacket[_, _]])(using Ox) =
val commitInterval = 1.second
val ticks = Source.tick(commitInterval)
val toCommit = mutable.Map[TopicPartition, Long]()
var consumer: Sink[KafkaConsumerRequest[_, _]] = null // assuming all packets come from the same consumer

forever {
select(ticks, packets).orThrow match
case () =>
if consumer != null && toCommit.nonEmpty then
logger.trace(s"Committing ${toCommit.size} offsets.")
consumer.send(KafkaConsumerRequest.Commit(toCommit.toMap))
toCommit.clear()
case packet: SendPacket[_, _] =>
packet.commit.foreach { receivedMessage =>
if consumer == null then consumer = receivedMessage.consumer.asInstanceOf[Sink[KafkaConsumerRequest[_, _]]]
val tp = new TopicPartition(receivedMessage.topic, receivedMessage.partition)
toCommit.updateWith(tp) {
case Some(offset) => Some(math.max(offset, receivedMessage.offset))
case None => Some(receivedMessage.offset)
}
}
}
157 changes: 0 additions & 157 deletions kafka/src/main/scala/ox/kafka/KafkaSink.scala

This file was deleted.

4 changes: 1 addition & 3 deletions kafka/src/main/scala/ox/kafka/KafkaSource.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package ox.kafka

import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer}
import org.slf4j.LoggerFactory
import ox.*
import ox.channels.*

import scala.jdk.CollectionConverters.*
import scala.util.control.NonFatal

object KafkaSource:
Expand Down
Loading

0 comments on commit 90c6df9

Please sign in to comment.