Skip to content

Commit

Permalink
Enable graceful shutdown of a GenConsumer
Browse files Browse the repository at this point in the history
Add a user-configurable :shutdown to GenConsumer's child_spec.

Addresses kafkaex#434.
  • Loading branch information
b1az committed Aug 30, 2022
1 parent 97b577f commit eb1c242
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 1 deletion.
3 changes: 3 additions & 0 deletions lib/kafka_ex/gen_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ defmodule KafkaEx.GenConsumer do
| {:commit_threshold, non_neg_integer}
| {:auto_offset_reset, :none | :earliest | :latest}
| {:api_versions, map()}
| {:shutdown, timeout()}
| {:extra_consumer_args, map()}

@typedoc """
Expand Down Expand Up @@ -438,6 +439,8 @@ defmodule KafkaEx.GenConsumer do
* `:fetch_options` - Optional keyword list that is passed along to the
`KafkaEx.fetch` call.
* `:shutdown` - Optional amount of time in milliseconds that the supervisor will wait for a `GenConsumer` to terminate after emitting a `Process.exit(child, :shutdown)` signal. Defaults to `5_000`.
* `:extra_consumer_args` - Optional parameter that is passed along to the
`GenConsumer.init` call in the consumer module. Note that if `init/3` is not
implemented, the default implementation calls to `init/2`, dropping the extra
Expand Down
5 changes: 4 additions & 1 deletion lib/kafka_ex/gen_consumer/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ defmodule KafkaEx.GenConsumer.Supervisor do

use DynamicSupervisor

@default_worker_shutdown 5_000

@doc """
Starts a `GenConsumer.Supervisor` process linked to the current process.
Expand Down Expand Up @@ -57,7 +59,8 @@ defmodule KafkaEx.GenConsumer.Supervisor do
id: gen_consumer_module,
start:
{gen_consumer_module, :start_link,
[consumer_module, group_name, topic, partition, opts]}
[consumer_module, group_name, topic, partition, opts]},
shutdown: Keyword.get(opts, :shutdown, @default_worker_shutdown)
}
end

Expand Down

0 comments on commit eb1c242

Please sign in to comment.