From 71eef869229d37bc595f4571295b63222117971c Mon Sep 17 00:00:00 2001 From: Blaz Merela Date: Thu, 25 Aug 2022 10:01:40 +0200 Subject: [PATCH 1/5] Move `ConsumerGroup.Manager.start_link/4` to conventional `start_link/1` The change is acceptable as `ConsumerGroup` is the only client of `ConsumerGroup.Manager`. The defaults on `opts` parameter aren't used and their loss is thus likewise acceptable. Fixes a compiler warning due to a deprecated usage of `Supervisor.Spec.worker/3`. --- lib/kafka_ex/consumer_group.ex | 6 ++---- lib/kafka_ex/consumer_group/manager.ex | 10 +++++----- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/lib/kafka_ex/consumer_group.ex b/lib/kafka_ex/consumer_group.ex index 2ab5c5a2..0ae9fadc 100644 --- a/lib/kafka_ex/consumer_group.ex +++ b/lib/kafka_ex/consumer_group.ex @@ -363,10 +363,8 @@ defmodule KafkaEx.ConsumerGroup do opts = Keyword.put(opts, :supervisor_pid, self()) children = [ - worker( - KafkaEx.ConsumerGroup.Manager, - [{gen_consumer_module, consumer_module}, group_name, topics, opts] - ) + {KafkaEx.ConsumerGroup.Manager, + {{gen_consumer_module, consumer_module}, group_name, topics, opts}} ] Supervisor.init(children, diff --git a/lib/kafka_ex/consumer_group/manager.ex b/lib/kafka_ex/consumer_group/manager.ex index ec22e041..3cbf4873 100644 --- a/lib/kafka_ex/consumer_group/manager.ex +++ b/lib/kafka_ex/consumer_group/manager.ex @@ -53,18 +53,18 @@ defmodule KafkaEx.ConsumerGroup.Manager do @doc false # use `KafkaEx.ConsumerGroup.start_link/4` instead - @spec start_link( + @spec start_link({ {module, module}, binary, [binary], KafkaEx.GenConsumer.options() - ) :: GenServer.on_start() - def start_link( + }) :: GenServer.on_start() + def start_link({ {gen_consumer_module, consumer_module}, group_name, topics, - opts \\ [] - ) do + opts + }) do gen_server_opts = Keyword.get(opts, :gen_server_opts, []) consumer_opts = Keyword.drop(opts, [:gen_server_opts]) From b025ac6502497db9d2fe444e3cba66990821ec5e Mon Sep 17 00:00:00 2001 From: Blaz Merela Date: Thu, 25 Aug 2022 12:33:55 +0200 Subject: [PATCH 2/5] Add the conventional `GenConsumer.Supervisor.start_link/1` This enables the usage of `child_spec/1` and removal of the deprecated `Supervisor.Spec.supervisor/3` call. Fixes #390. --- lib/kafka_ex/consumer_group.ex | 14 ++++- lib/kafka_ex/gen_consumer/supervisor.ex | 73 ++++++++++++++++++++----- 2 files changed, 69 insertions(+), 18 deletions(-) diff --git a/lib/kafka_ex/consumer_group.ex b/lib/kafka_ex/consumer_group.ex index 0ae9fadc..1020f9fe 100644 --- a/lib/kafka_ex/consumer_group.ex +++ b/lib/kafka_ex/consumer_group.ex @@ -334,9 +334,17 @@ defmodule KafkaEx.ConsumerGroup do opts ) do child = - supervisor( - KafkaEx.GenConsumer.Supervisor, - [{gen_consumer_module, consumer_module}, group_name, assignments, opts], + Supervisor.child_spec( + { + KafkaEx.GenConsumer.Supervisor, + %{ + gen_consumer_module: gen_consumer_module, + consumer_module: consumer_module, + group_name: group_name, + assignments: assignments, + opts: opts + } + }, id: :consumer ) diff --git a/lib/kafka_ex/gen_consumer/supervisor.ex b/lib/kafka_ex/gen_consumer/supervisor.ex index f69f6ade..e47bf036 100644 --- a/lib/kafka_ex/gen_consumer/supervisor.ex +++ b/lib/kafka_ex/gen_consumer/supervisor.ex @@ -15,15 +15,23 @@ defmodule KafkaEx.GenConsumer.Supervisor do use DynamicSupervisor + if Version.match?(System.version(), ">= 1.7.0") do + @doc since: "0.14.0" + end + @doc """ Starts a `GenConsumer.Supervisor` process linked to the current process. `gen_consumer_module` is a module that implements the `GenServer` behaviour which consumes events from Kafka. + `consumer_module` is a module that implements the `GenConsumer` behaviour. - `group_name` is the name of a consumer group, and `assignments` is a list of - partitions for the `GenConsumer`s to consume. `opts` accepts the same - options as `KafkaEx.GenConsumer.start_link/5`. + + `group_name` is the name of a consumer group. + + `assignments` is a list of partitions for the `GenConsumer`s to consume. + + `opts` accepts the same options as `KafkaEx.GenConsumer.start_link/5`. ### Return Values @@ -32,20 +40,22 @@ defmodule KafkaEx.GenConsumer.Supervisor do If the supervisor and its consumers are successfully created, this function returns `{:ok, pid}`, where `pid` is the PID of the supervisor. """ - @spec start_link( - {gen_consumer_module :: module, consumer_module :: module}, - consumer_group_name :: binary, - assigned_partitions :: [ + @spec start_link(%{ + gen_consumer_module: module, + consumer_module: module, + group_name: binary, + assignments: [ {topic_name :: binary, partition_id :: non_neg_integer} ], - KafkaEx.GenConsumer.options() - ) :: Elixir.Supervisor.on_start() - def start_link( - {gen_consumer_module, consumer_module}, - group_name, - assignments, - opts \\ [] - ) do + opts: KafkaEx.GenConsumer.options() + }) :: Supervisor.on_start() + def start_link(%{ + gen_consumer_module: gen_consumer_module, + consumer_module: consumer_module, + group_name: group_name, + assignments: assignments, + opts: opts + }) do start_link_result = DynamicSupervisor.start_link( __MODULE__, @@ -71,6 +81,39 @@ defmodule KafkaEx.GenConsumer.Supervisor do end end + @deprecated "Use start_link/1 instead" + @doc """ + Starts a `GenConsumer.Supervisor` process linked to the current process. + + Refer to `start_link/1` for documentation of each parameter. + + ### Return Values + + Same as `start_link/1`. + """ + @spec start_link( + {gen_consumer_module :: module, consumer_module :: module}, + consumer_group_name :: binary, + assigned_partitions :: [ + {topic_name :: binary, partition_id :: non_neg_integer} + ], + KafkaEx.GenConsumer.options() + ) :: Elixir.Supervisor.on_start() + def start_link( + {gen_consumer_module, consumer_module}, + group_name, + assignments, + opts \\ [] + ) do + start_link(%{ + gen_consumer_module: gen_consumer_module, + consumer_module: consumer_module, + group_name: group_name, + assignments: assignments, + opts: opts + }) + end + @doc """ Returns a list of child pids From 573ef07695ce3ffb21a24c0b821bdeae93fdb719 Mon Sep 17 00:00:00 2001 From: Blaz Merela Date: Thu, 25 Aug 2022 12:35:31 +0200 Subject: [PATCH 3/5] Shorten boolean logic --- lib/kafka_ex/consumer_group.ex | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/lib/kafka_ex/consumer_group.ex b/lib/kafka_ex/consumer_group.ex index 1020f9fe..05ce8b39 100644 --- a/lib/kafka_ex/consumer_group.ex +++ b/lib/kafka_ex/consumer_group.ex @@ -297,11 +297,9 @@ defmodule KafkaEx.ConsumerGroup do def active?(supervisor_pid, timeout \\ 5000) do consumer_supervisor = consumer_supervisor_pid(supervisor_pid, timeout) - if consumer_supervisor && Process.alive?(consumer_supervisor) do + consumer_supervisor && + Process.alive?(consumer_supervisor) && GenConsumer.Supervisor.active?(consumer_supervisor) - else - false - end end @doc """ From e513413f11ae5804aeb9a6407b7c944c4d5c6aeb Mon Sep 17 00:00:00 2001 From: Blaz Merela Date: Thu, 25 Aug 2022 12:39:09 +0200 Subject: [PATCH 4/5] Improve README --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index c0c84d34..20b3073f 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ KafkaEx [![API Docs](https://img.shields.io/badge/api-docs-yellow.svg?style=flat)](http://hexdocs.pm/kafka_ex/) KafkaEx is an Elixir client for [Apache Kafka](http://kafka.apache.org/) with -support for Kafka versions 0.8.0 and newer. KafkaEx requires Elixir 1.5+ and +support for Kafka versions 0.8.0 and newer. KafkaEx requires Elixir 1.6+ and Erlang OTP 19+. See [http://hexdocs.pm/kafka_ex/](http://hexdocs.pm/kafka_ex/) for @@ -18,7 +18,7 @@ documentation, [https://github.com/kafkaex/kafka_ex/](https://github.com/kafkaex/kafka_ex/) for code. -KakfaEx supports the following Kafka features: +KafkaEx supports the following Kafka features: * Broker and Topic Metadata * Produce Messages From d7552e7a9c7724eadb5a664e29fa471280dbb7a3 Mon Sep 17 00:00:00 2001 From: Blaz Merela Date: Mon, 29 Aug 2022 11:56:25 +0200 Subject: [PATCH 5/5] Ignore Dialyzer's PLTs --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index 279b41ef..ffc324db 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,6 @@ doc .elixir_ls cover + +# Dialyzer's Persistent Lookup Table +priv/plts/ \ No newline at end of file