Skip to content

KafkaConsumerActor Configuration 0.7.0 to 0.8.x

Simon Souter edited this page Jul 28, 2016 · 1 revision

KafkaConsumerActor Configuration for versions 0.7-0.8.x

To create a KafkaConsumerActor, the dependencies in the KafkaConsumerActor.props() function need to be satisfied. This can be done with a Key and Value deserializer with all other consumer properties supplied in a Typesafe configuration.

//application.conf
{
    // Standard KafkaConsumer properties:
    bootstrap.servers = "localhost:9092",
    group.id = "test"
    enable.auto.commit = false
    auto.offset.reset = "earliest"
    topics = ["topic1"]

    //KafkaConsumerActor config
    schedule.interval = 3000 milliseconds
    unconfirmed.timeout = 3000 milliseconds
    buffer.size = 8
}
import cakesolutions.kafka.akka.KafkaConsumerActor

val consumer = system.actorOf(
  KafkaConsumerActor.props(conf, new StringDeserializer(), new StringDeserializer(), self)
)

KafkaConsumer.Conf and KafkaConsumerActor.Conf

An alternative approach is to provide KafkaConsumer.Conf and KafkaConsumerActor.Conf configuration case classes which can be created in the following ways:

import scala.concurrent.duration._
import cakesolutions.kafka.KafkaConsumer
import cakesolutions.kafka.akka.KafkaConsumerActor

// Configuration for the KafkaConsumer
val consumerConf = KafkaConsumer.Conf(
    new StringDeserializer,
    new StringDeserializer,
    bootstrapServers = "localhost:9092",
    groupId = "groupId",
    enableAutoCommit = false)

// Configuration specific to the Async Consumer Actor
val actorConf = KafkaConsumerActor.Conf(List("topic1"), 1.seconds, 3.seconds)

// Create the Actor
val consumer = system.actorOf(
  KafkaConsumerActor.props(consumerConf, actorConf, self)
)

Receiver Actor

In each of the above configuration examples it is assumed the Consumer Actor is created from the context of a parent actor, which is passed to the consumer via a reference to 'self'. This is an ActorRef to which consumed messages will be delivered and should expect to receive cakesolutions.kafka.akka.ConsumerRecords[K, V] containing a batch of Java client's ConsumerRecords consumed from Kafka.

import cakesolutions.kafka.akka.KafkaConsumerActor.Confirm
import cakesolutions.kafka.akka.ConsumerRecords

class ReceiverActor extends Actor {

  // Extractor for ensuring type safe cast of records
  val recordsExt = ConsumerRecords.extractor[Int, String]

  override def receive: Receive = {
    // Type safe cast of records to correct serialisation type
    case recordsExt(records) =>
      processRecords(records.pairs)
      sender() ! Confirm(records.offsets)
  }

  // Process the whole batch of received records.
  // The first value in the tuple is the optional key of a record.
  // The second value in the tuple is the actual value from a record.
  def processRecords(records: Seq[(Option[Int], String)]) = { ... }
}