Skip to content

KafkaConsumerActor Configuration

simonsouter edited this page Aug 3, 2018 · 9 revisions

To create a KafkaConsumerActor, the dependencies in a KafkaConsumerActor.props() function need to be satisfied. This can be done by providing the KafkaConsumer.Conf and KafkaConsumerActor.Conf configuration case classes which can be created like this:

import scala.concurrent.duration._
import cakesolutions.kafka.KafkaConsumer
import cakesolutions.kafka.akka.KafkaConsumerActor

// Configuration for the KafkaConsumer
val consumerConf = KafkaConsumer.Conf(
  keyDeserializer = new StringDeserializer,
  valueDeserializer = new StringDeserializer,
  bootstrapServers = "localhost:9092",
  groupId = "group",
  enableAutoCommit = false,
  autoCommitInterval= 1000,
  sessionTimeoutMs = 10000,
  maxPartitionFetchBytes = ConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES,
  maxPollRecords = 500,
  maxPollInterval = 300000,
  maxMetaDataAge  = 300000,
  autoOffsetReset = OffsetResetStrategy.LATEST,
  isolationLevel = IsolationLevel.READ_UNCOMMITTED
)

// Configuration specific to the Async Consumer Actor
val actorConf = KafkaConsumerActor.Conf(
  scheduleInterval = 1.seconds,   // scheduling interval for Kafka polling when consumer is inactive
  unconfirmedTimeout = 3.seconds, // duration for how long to wait for a confirmation before redelivery
  maxRedeliveries = 3             // maximum number of times a unconfirmed message will be redelivered
)

// Create the Actor
val consumer = system.actorOf(
  KafkaConsumerActor.props(consumerConf, actorConf, receiverActor)
)

KafkaConsumerActor.Conf

Schedule Interval

The schedule interval is the delay before Akka will dispatch a new poll() request when its not busy. Providing a margin of schedule interval delay significantly reduces demand on the thread dispatcher but introduces a slight latency. The default of 1 second provides a reasonable compromise, although '500 millis' is acceptable if we to minimize latency as much as possible.

schedule.interval = 1 second

Unconfirmed Timeout

The consumer actor dispatches messages asynchronously to a downstream user component and expects a confirmation. If the confirmation is not received within this unconfirmed timeout, it assumes the message was lost and redelivers it. This is an at-least-once acknowledgement pattern. If message processing might take longer than this time, it must be increased.

unconfirmed.timeout = 3 seconds

It is possible to disable redeliveries by specifying an unconfirmed timeout of 0.

Max Redeliveries

Specifies the maximum number of times a unconfirmed message will be redelivered to an unresponsive client. This is to prevent overwhelming a client with redeliveries if it is unable to progress. If a client continues to fail to confirm a delivery beyond the max redeliveries, a (BackingOff)[https://github.com/cakesolutions/scala-kafka-client/wiki/Message%20Exchange%20Patterns#backingoff] message will be sent.

Typesafe Configuration

An alternative configuration approach is to provide a Key and Value deserializer with all of the other consumer properties supplied in a Typesafe configuration file:

//application.conf
{
    // Standard KafkaConsumer properties:
    bootstrap.servers = "localhost:9092",
    group.id = "group"
    enable.auto.commit = false
    auto.offset.reset = "earliest"

    // KafkaConsumerActor config
    schedule.interval = 1 second
    unconfirmed.timeout = 3 seconds
    max.redeliveries= 3
}
import cakesolutions.kafka.akka.KafkaConsumerActor
import com.typesafe.config.ConfigFactory

val conf = ConfigFactory.load()

val consumer = system.actorOf(

  //Construct the KafkaConsumerActor with Typesafe config
  KafkaConsumerActor.props(conf, new StringDeserializer(), new StringDeserializer(), receiverActor)
)

Receiver Actor

In each of the above configuration examples it is assumed the Consumer Actor is created from the context of a parent actor, which passes to the consumer a receiverActor ActorRef. This is an ActorRef to which consumed messages will be delivered. The user is expected to implement this receiver Actor and provide their own processing logic. This receiver actor should expect to receive a delivery of cakesolutions.kafka.akka.ConsumerRecords[K, V] containing a batch of Java client's ConsumerRecords consumed from Kafka.

import cakesolutions.kafka.akka.KafkaConsumerActor.Confirm
import cakesolutions.kafka.akka.ConsumerRecords

class ReceiverActor extends Actor {

  // Extractor for ensuring type safe cast of records
  val recordsExt = ConsumerRecords.extractor[Int, String]

  // Akka will dispatch messages here sequentially for processing.  The next batch is prepared in parallel and can be dispatched immediately
  // after the Confirm.  Performance is only limited by processing time and network bandwidth. 
  override def receive: Receive = {
    // Type safe cast of records to correct serialisation type
    case recordsExt(records) =>
    
      // Provide the records for processing as a sequence of tuples
      processRecords(records.pairs)
      
      // Or provide them using the raw Java type of ConsumerRecords[Key,Value]
      processRecords(records.records)
      
      // Confirm and commit back to Kafka
      sender() ! Confirm(records.offsets)
  }

  // Process the whole batch of received records.
  // The first value in the tuple is the optional key of a record.
  // The second value in the tuple is the actual value from a record.
  def processRecords(records: Seq[(Option[Int], String)]) = { ... }
  
  // Or process the batch of records via the raw kafka client records model
  def processRecords(records: org.apache.kafka.clients.consumer.ConsumerRecords[Int, String]) = { ... }
}

The interaction between the KafkaConsumerActor and the users custom receiver Actor is described in the next section: [Message Exchange Patterns](Message Exchange Patterns)