-
Notifications
You must be signed in to change notification settings - Fork 115
Message Exchange Patterns
Once the KafkaConsumerActor is created with the required configuration as described in [Consumer Configuration](KafkaConsumerActor Configuration), communication between the client/receiver actor and the consumer actor is via Actor messaging.
The first step required after configuration of the KafkaConsumerActor is to initiate a Subscription.
A Subscribe message is sent to the ConsumerActor to initiate a Kafka topic subscription in a specific subscription mode. The subscription modes
are roughly modelled as:
import cakesolutions.kafka.akka.KafkaConsumerActor.Subscribe._
// Subscribe to the provided topics with Auto Partition Assignment
case class AutoPartition(topics: Iterable[String])
// Subscribe to the provided topics with Auto Partition Assignment with client managed offset commit positions for each partition.
case class AutoPartitionWithManualOffset(topics: Iterable[String],
assignedListener: List[TopicPartition] => Offsets,
revokedListener: List[TopicPartition] => Unit)
// Subscribe to the provided set of Topic and Partitions in Manual Partition mode
case class ManualPartition(topicPartitions: Iterable[TopicPartition])
// Subscribe to the provided set of Topic and Partitions in Manual Partition mode, and seek from the provided topic/ partition offsets.
case class ManualOffset(offsets: Offsets)
The most common use case is to subscribe in Auto Partition Assignment mode to a given set of topics:
import cakesolutions.kafka.akka.KafkaConsumerActor._
// Send the initiating subscription to the consumer actor
consumer ! Subscribe.AutoPartition(Seq("topic1"))
In this mode, a number of separate clients can subscribe to the topic, potentially on different physical nodes with a common group-id. Kafka assigns a share of partitions to each member of the group and each partitions is consumed in parallel. It can be expected that partition assignments change as member join and leave the group in the rebalance process. Usually there is an opportunity for the client to commit in-flight messages before the rebalancing occurs.
This mode also utilises the Auto Partition Assignment mode, but allows the client to manage their own offset positions for each Topic+Partition. This subscription mode is typically used when performing some parallel stateful computation and storing the offset position along with the state in some kind of persistent store. This allows for exactly-once state manipulation against an at-least-once delivery stream. This approach is discussed here: Kafka Consumer API in section "Manual Offset Control".
import cakesolutions.kafka.akka.KafkaConsumerActor._
// Send the initiating subscription to the consumer actor
consumer ! Subscribe.AutoPartitionWithManualOffset(List("topic1"), assignedListener, revokedListener)
// Client provided callback
def assignedListener(tps: List[TopicPartition]): Offsets = {
...
}
// Client provided callback
def revokedListener(tps: List[TopicPartition]): Unit = {
...
}
The client should provide callbacks to receive notifications of when partitions have been assigned or revoked. When a partition has been assigned, the client should lookup the latest offsets for the given partitions from its persistent store, and return those. The KafkaConsumerActor will seek to the specified positions.
The client should ensure that received records are confirmed with 'commit = false' to ensure consumed records are not committed back to kafka.
The Auto partition assignment mechanism is not always required. It is reasonable to have a single consumer to a single partition topic, which despite not being parallelisable, does provide an order guarantee in the processing of the topic. The throughput of this single consumer is already considerably rapid with Kafka, that this mode can often be considered as a primary consumption pattern.
An additional topology when using a multi partition topic, involves a group of nodes cooperating to subscribe to a fixed partition of the topic. It then becomes the responsibility of the client environment to ensure that a consumer process exists for each partition.
This subscription mode assumes that processed messages will be committed to Kafka i.e. confirmed with a commit flag of true. Resubscriptions will consume from the last commit point.
import cakesolutions.kafka.akka.KafkaConsumerActor._
import org.apache.kafka.common.TopicPartition
consumer ! ManualPartition(Seq(new TopicPartition("Topic1", 0)))
This allows the client to self manage commit offsets as described Kafka Consumer API in section "Manual Offset Control".
An initial subscription will usually be done first with ManualPartition, but subsequent resubscriptions will provide the required topic/partitions, as well offset positions indicating the initial seek point. Responsibility for persisting the commit points and seeking to them is moved to the client.
import cakesolutions.kafka.akka.KafkaConsumerActor._
// offsets represents a set of topic/partitions and offset positions.
consumer ! ManualOffset(offset)
Once the client has sent the consumer a Subscribe message, it can assume that the subscription will be made and any messages consumer will be delivered to the provided ActorRef callback. By default, in typical exception scenarios, such as a connection loss with Kafka, the consumer will always attempt to retry until its resolved. For any unrecoverable errors, a fatal exception propagates to the supervisor actor.
import cakesolutions.kafka.akka.KafkaConsumerActor.Confirm
case class Confirm(offsets: Offsets, commit: Boolean = false)
consumer ! Confirm(offsets)
For each set of records received by the client, a corresponding Confirm(offsets)
message should be sent back to the consumer
to acknowledge the message once the received has concluded processing. If the client is subsequently delivering the messages downstream
to another Kafka topic or storing in a database for example, to ensure a complete end to end at-least-once delivery pipeline, the commit should
happen once the messages are confirmed to the downstream system.
If the message is not confirmed within ("unconfirmed.timeout") it is redelivered (by default).
If commit is provided as true, they offsets are committed to Kafka. If commit is false, the records are removed from the Consumer Actor's buffer, but no commit to Kafka is made.
Is sent to the client when the max.redelivery count is exceeded i.e. the client has failed to confirm delivery of a message and it has been redelivered the maximum number of times. This hook can be useful for a client to trigger a shutdown or restart of the KafkaConsumerActor in case no further progress can be made.
import cakesolutions.kafka.akka.BackingOff
case class BackingOff(redeliveryCount: Int)
import cakesolutions.kafka.akka.ConsumerRecords
case class ConsumerRecords(offsets: Offsets, records: ConsumerRecords[K, V])
The payload delivered to the client contains the offsets for the records sent and the Java client's ConsumerRecords
,
which contains a sequence of Records for each Topic Partition.
The ConsumerRecords
can be iterated and read as described in the Kafka Client docs.
The Offsets can be used when confirming the message to commit the offsets to Kafka.
import cakesolutions.kafka.akka.KafkaConsumerActor.Unsubscribe
case object Unsubscribe
consumer ! Unsubscribe
The Consumer Actor clears its state and disconnects from Kafka.
Produced by Cake Solutions
Team Blog | Twitter @cakesolutions | Careers