From eb1c24262942cab0a9079060b78461274e9702e3 Mon Sep 17 00:00:00 2001 From: Blaz Merela Date: Tue, 30 Aug 2022 10:04:34 +0200 Subject: [PATCH] Enable graceful shutdown of a GenConsumer Add a user-configurable :shutdown to GenConsumer's child_spec. Addresses #434. --- lib/kafka_ex/gen_consumer.ex | 3 +++ lib/kafka_ex/gen_consumer/supervisor.ex | 5 ++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/lib/kafka_ex/gen_consumer.ex b/lib/kafka_ex/gen_consumer.ex index 7a67ff88..c65b7ac0 100644 --- a/lib/kafka_ex/gen_consumer.ex +++ b/lib/kafka_ex/gen_consumer.ex @@ -209,6 +209,7 @@ defmodule KafkaEx.GenConsumer do | {:commit_threshold, non_neg_integer} | {:auto_offset_reset, :none | :earliest | :latest} | {:api_versions, map()} + | {:shutdown, timeout()} | {:extra_consumer_args, map()} @typedoc """ @@ -438,6 +439,8 @@ defmodule KafkaEx.GenConsumer do * `:fetch_options` - Optional keyword list that is passed along to the `KafkaEx.fetch` call. + * `:shutdown` - Optional amount of time in milliseconds that the supervisor will wait for a `GenConsumer` to terminate after emitting a `Process.exit(child, :shutdown)` signal. Defaults to `5_000`. + * `:extra_consumer_args` - Optional parameter that is passed along to the `GenConsumer.init` call in the consumer module. Note that if `init/3` is not implemented, the default implementation calls to `init/2`, dropping the extra diff --git a/lib/kafka_ex/gen_consumer/supervisor.ex b/lib/kafka_ex/gen_consumer/supervisor.ex index f69f6ade..4a7ce739 100644 --- a/lib/kafka_ex/gen_consumer/supervisor.ex +++ b/lib/kafka_ex/gen_consumer/supervisor.ex @@ -15,6 +15,8 @@ defmodule KafkaEx.GenConsumer.Supervisor do use DynamicSupervisor + @default_worker_shutdown 5_000 + @doc """ Starts a `GenConsumer.Supervisor` process linked to the current process. @@ -57,7 +59,8 @@ defmodule KafkaEx.GenConsumer.Supervisor do id: gen_consumer_module, start: {gen_consumer_module, :start_link, - [consumer_module, group_name, topic, partition, opts]} + [consumer_module, group_name, topic, partition, opts]}, + shutdown: Keyword.get(opts, :shutdown, @default_worker_shutdown) } end