Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add documentation for handling rebalance #150

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 73 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,87 @@ val metadata: IO[RecordMetadata] = producer.use { producer =>
## Consumer usage example

```scala
import cats.data.{NonEmptySet => Nes}

val consumer = Consumer.of[IO, String, String](config, ecBlocking)
val records: IO[ConsumerRecords[String, String]] = consumer.use { consumer =>
for {
_ <- consumer.subscribe(Nel("topic"), None)
_ <- consumer.subscribe(Nes("topic"))
records <- consumer.poll(100.millis)
} yield records
}
```

## Handling consumer group rebalance
It's possible to provide a callback for a consumer group rebalance event, which can be useful if you want to do some computations,
save the state, commit some offsets (or do anything with the consumer whenever partition assignment changes).
This can be done by providing an implementation of `RebalanceListener1` (or a more convenient version - `RebalanceListener1WithConsumer`)
which requires you to return a `RebalanceCallback` structure which describes what actions should be performed in a certain situation.
It allows you to use some of consumer's methods as well as a way to run an arbitrary computation.
### Note on long-running computations in a rebalance callback
Please note that all the actions are executed on the consumer `poll` thread which means that running heavy or
long-running computations is discouraged. This is due to to the following reasons:
- if executing callback takes too long (longer than Kafka consumer `max.poll.interval.ms` setting), the consumer will be assumed
'failed' and the group will rebalance once again. The default value is 300000 (5 minutes). You can see the official documentation [here](https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_max.poll.interval.ms)
- since the callback is executed by the means of providing an instance of `ToTry` (which runs the computation synchronously in the Java callback), it dictates the timeout for the callback computation to complete.
The current default implementation for `cats.effect.IO` is 1 minute (see `ToTry#ioToTry`)

### What you can currently do:
- lift a pure value via `RebalanceCallback.pure(a)`. There's also an instance of `cats.Monad` for `RebalanceCallback` which you can use via syntax extensions or direct summoning
- raise an error which should be an instance of `Throwable`. If your effect `F[_]` has an instance of `MonadError[F, Throwable]` in scope, then an instance of `MonadError[RebalanceCallback[F, *], Throwable]` will be derived, so you can use it to `raiseError` via syntax extensions or direct summoning
- use consumer methods, for example `RebalanceCallback.commit` or `RebalanceCallback.seek`
(see `RebalanceCallbackApi` to discover allowed consumer methods)
- lift any arbitrary computation in the `F[_]` effect via `RebalanceCallback.lift(...)`
- handle occuring errors in the `F[_]` effect via `callback.handleErroWith(...)` or using `MonadError` instance, described above

These operations can be composed due to the presence of `map`/`flatMap` methods as well as the presence of `cats.Monad` instance.
### Example
```scala
import cats.data.{NonEmptySet => Nes}

class SaveOffsetsOnRebalance[F[_]: Applicative] extends RebalanceListener1WithConsumer[F] {

// import is needed to use `fa.lift` syntax where
// `fa: F[A]`
// `fa.lift: RebalanceCallback[F, A]`
import RebalanceCallback.syntax._

def onPartitionsAssigned(partitions: Nes[TopicPartition]) =
for {
// read the offsets from an external store using some custom code not described here
offsets <- readOffsetsFromExternalStore[F](partitions).lift
a <- offsets.toList.foldMapM { case (partition, offset) => consumer.seek(partition, offset) }
} yield a

def onPartitionsRevoked(partitions: Nes[TopicPartition]) =
for {
positions <- partitions.foldM(Map.empty[TopicPartition, Offset]) {
case (offsets, partition) =>
for {
position <- consumer.position(partition)
} yield offsets + (partition -> position)
}
// save the offsets in an external store using some custom code not described here
a <- saveOffsetsInExternalStore[F](positions).lift
} yield a

// do not need to save the offsets since these partitions are probably owned by other consumers already
def onPartitionsLost(partitions: Nes[TopicPartition]) = RebalanceCallback.empty

private def readOffsetsFromExternalStore[F[_]: Applicative](partitions: Nes[TopicPartition]): F[Map[TopicPartition, Offset]] = ???
private def saveOffsetsInExternalStore[F[_]: Applicative](offsets: Map[TopicPartition, Offset]): F[Unit] = ???
}

val consumer = Consumer.of[IO, String, String](config, ecBlocking)
val listener = new SaveOffsetsOnRebalance[IO]
val records: IO[ConsumerRecords[String, String]] = consumer.use { consumer =>
for {
_ <- consumer.subscribe(Nes("topic"), listener)
records <- consumer.poll(100.millis)
} yield records
}
```

## Setup

```scala
Expand Down