Skip to content

Error Handling and Supervision

Simon Souter edited this page Nov 23, 2016 · 1 revision

When using Kafka's Java Driver directly is can be tricky to implement proper Exception handling to cover all error scenarios to ensure an application will attempt to retry where possible, or escalate a problem in an unrecoverable case.

The KafkaConsumerActor provides a much simpler model of failure handling that utilises Akka's supervision mechanism and provides a simple error handling strategy that is easy to reason about.

Usually when building streaming applications it is typical to expect during temporal failures, such as network issues or Kafka cluster failure, that the application will automatically recover once the cause of the failure is resolved. Another reasonable approach might be to attempt to restore service a given number of times, before escalating the error or terminating, providing the opportunity for a monitoring system to notice the error, or a cloud management system to relocate the node to another server, for example. The worst possible scenario is that the application hangs and never recovers.

Once a Subscription message has been sent to the KafkaConsumerActor it can be assumed that it will do its best to establish the subscription to Kafka and deliver any received messages. If it experiences an Exception from the Kafka driver it will propagate this to its parent Actor via the Actor supervision mechanism.

If no specific supervision strategy is defined, the default supervision strategy will apply. That is, for any temporal errors identified by a cakesolutions.kafka.akka.ConsumerException, the KafkaConsumerActor will automatically be restarted. This will result in it attempting to reestablish the original subscription from the last confirmed positions in the Kafka topic, regardless of Subscription mode or commit strategy.

If a more serious unrecoverable Exception has occurred this will be propagated as a RuntimeException, which by default will be escalated, potentially causing the application to terminate.

This default behavior is already reasonable for many use cases. This behavior can be changed easily though, by providing a custom Supervision Strategy as in the following example that specifies a maximun number of restarts before a temporal Exception is promoted to an Escalation:

import akka.actor.{Actor, ActorLogging, OneForOneStrategy, SupervisorStrategy}
import cakesolutions.kafka._
import cakesolutions.kafka.akka.KafkaConsumerActor._
import cakesolutions.kafka.akka._

class Consumer(
  kafkaConfig: KafkaConsumer.Conf[String, String],
  actorConfig: KafkaConsumerActor.Conf) extends Actor with ActorLogging{

  override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10) {
    case _: KafkaConsumerActor.ConsumerException =>
      log.info("Consumer exception caught. Restarting consumer.")
      SupervisorStrategy.Restart
    case _ =>
      SupervisorStrategy.Escalate
  }

  val recordsExt = ConsumerRecords.extractor[String, String]

  val consumer = context.actorOf(
    KafkaConsumerActor.props(kafkaConfig, actorConfig, self)
  )

  consumer ! Subscribe.AutoPartition(List("topic1"))

  override def receive: Receive = {

    // Records from Kafka
    case recordsExt(records) =>
      records.pairs.foreach { case (key, value) =>
        log.info(s"Received [$key,$value]")
      }
      sender() ! Confirm(records.offsets, commit = true)
  }
}

One thing to be aware of is that connection issues are not reliably propagated via the Kafka driver, which silently attempts a reconnection in the background.