From f8d8c6442cc0d05bdc3e02d5dc1cb553741e7ec1 Mon Sep 17 00:00:00 2001 From: Gabriel Oliveira Date: Tue, 10 Oct 2023 10:04:38 -0300 Subject: [PATCH] chore: move shared_client logic from brod_client to producer --- lib/broadway_kafka/brod_client.ex | 33 ++++++++---------------------- lib/broadway_kafka/kafka_client.ex | 5 ++--- lib/broadway_kafka/producer.ex | 30 +++++++++++++++++++-------- test/brod_client_test.exs | 24 ++++++---------------- test/producer_test.exs | 25 +++++++++------------- 5 files changed, 48 insertions(+), 69 deletions(-) diff --git a/lib/broadway_kafka/brod_client.ex b/lib/broadway_kafka/brod_client.ex index 7daa4a1..ab19e84 100644 --- a/lib/broadway_kafka/brod_client.ex +++ b/lib/broadway_kafka/brod_client.ex @@ -154,17 +154,15 @@ defmodule BroadwayKafka.BrodClient do end @impl true - def prepare_for_start(broadway_opts) do - {_, producer_opts} = broadway_opts[:producer][:module] - init_opts = Keyword.put(producer_opts, :broadway, broadway_opts) - - case init(init_opts) do - {:error, message} -> - raise ArgumentError, "invalid options given to #{__MODULE__}.init/1, " <> message - - {:ok, config} -> - {child_specs(config), broadway_opts} - end + def shared_client_child_spec(config) do + [ + %{ + id: config.shared_client_id, + start: + {:brod, :start_link_client, + [config.hosts, config.shared_client_id, config.client_config]} + } + ] end defp lookup_offset(hosts, topic, partition, policy, client_config) do @@ -194,19 +192,6 @@ defmodule BroadwayKafka.BrodClient do ) end - defp child_specs(%{shared_client: false} = _config), do: [] - - defp child_specs(%{shared_client: true} = config) do - [ - %{ - id: config.shared_client_id, - start: - {:brod, :start_link_client, - [config.hosts, config.shared_client_id, config.client_config]} - } - ] - end - defp validate(opts, key, options \\ []) when is_list(opts) do has_key = Keyword.has_key?(opts, key) required = Keyword.get(options, :required, false) diff --git a/lib/broadway_kafka/kafka_client.ex b/lib/broadway_kafka/kafka_client.ex index 39bec74..d458b44 100644 --- a/lib/broadway_kafka/kafka_client.ex +++ b/lib/broadway_kafka/kafka_client.ex @@ -55,9 +55,8 @@ defmodule BroadwayKafka.KafkaClient do @callback connected?(:brod.client()) :: boolean @callback disconnect(:brod.client()) :: :ok - @callback prepare_for_start(broadway_opts :: keyword()) :: - {[child_spec], updated_opts :: keyword()} + @callback shared_client_child_spec(config()) :: [child_spec] when child_spec: :supervisor.child_spec() | {module, any} | module - @optional_callbacks prepare_for_start: 1 + @optional_callbacks shared_client_child_spec: 1 end diff --git a/lib/broadway_kafka/producer.ex b/lib/broadway_kafka/producer.ex index a0c7075..a676b55 100644 --- a/lib/broadway_kafka/producer.ex +++ b/lib/broadway_kafka/producer.ex @@ -237,7 +237,7 @@ defmodule BroadwayKafka.Producer do client = opts[:client] || BroadwayKafka.BrodClient - case client.init(opts) do + case opts[:initialized_client_config] || client.init(opts) do {:error, message} -> raise ArgumentError, "invalid options given to #{inspect(client)}.init/1, " <> message @@ -278,7 +278,7 @@ defmodule BroadwayKafka.Producer do shutting_down?: false, buffer: :queue.new(), max_demand: max_demand, - shared_client: config[:shared_client] + shared_client: config.shared_client } {:producer, connect(state)} @@ -516,17 +516,29 @@ defmodule BroadwayKafka.Producer do |> Keyword.put(:processors, [updated_processor_entry | other_processors_entries]) |> Keyword.put(:batchers, updated_batchers_entries) - {_, kafka_producer_opts} = opts[:producer][:module] - client = kafka_producer_opts[:client] || BroadwayKafka.BrodClient + {producer_mod, producer_opts} = opts[:producer][:module] - {client_child_specs, updated_opts} = - if function_exported?(client, :prepare_for_start, 1) do - client.prepare_for_start(updated_opts) + {extra_child_specs, initialized_client_config} = + if producer_opts[:shared_client] do + client = producer_opts[:client] || BroadwayKafka.BrodClient + + case client.init(Keyword.put(producer_opts, :broadway, opts)) do + {:error, message} -> + raise ArgumentError, "invalid options given to #{client}.init/1, " <> message + + {:ok, config} = result -> + {client.shared_client_child_spec(config), result} + end else - {[], updated_opts} + {[], nil} end - {allocators ++ client_child_specs, updated_opts} + new_producer_opts = + Keyword.put(producer_opts, :initialized_client_config, initialized_client_config) + + updated_opts = put_in(updated_opts, [:producer, :module], {producer_mod, new_producer_opts}) + + {allocators ++ extra_child_specs, updated_opts} end @impl :brod_group_member diff --git a/test/brod_client_test.exs b/test/brod_client_test.exs index 201c7f2..ef5722e 100644 --- a/test/brod_client_test.exs +++ b/test/brod_client_test.exs @@ -408,32 +408,20 @@ defmodule BroadwayKafka.BrodClientTest do end end - describe "prepare for start" do - test "should return an empty list and unchanged opts when shared_client is not true" do - broadway_opts = [ - name: :my_broadway, - producer: [ - module: {BroadwayKafka.Producer, @opts} - ] - ] - - assert {[], ^broadway_opts} = BrodClient.prepare_for_start(broadway_opts) - end - - test "should return :brod_client child spec and unchanged opts when shared_client is true" do + describe "shared_client_child_spec" do + test "should return child spec" do module_opts = @opts |> Keyword.put(:shared_client, true) |> Keyword.put(:client_config, client_id_prefix: "my_prefix.") broadway_opts = [ - name: :my_broadway, - producer: [ - module: {BroadwayKafka.Producer, module_opts} - ] + name: :my_broadway ] - assert {child_specs, ^broadway_opts} = BrodClient.prepare_for_start(broadway_opts) + {:ok, config} = BrodClient.init(Keyword.put(module_opts, :broadway, broadway_opts)) + + assert child_specs = BrodClient.shared_client_child_spec(config) assert [ %{ diff --git a/test/producer_test.exs b/test/producer_test.exs index f1ed538..4cc9dd3 100644 --- a/test/producer_test.exs +++ b/test/producer_test.exs @@ -121,22 +121,17 @@ defmodule BroadwayKafka.ProducerTest do end @impl true - def prepare_for_start(broadway_opts) do - {_, kafka_producer_opts} = broadway_opts[:producer][:module] - parent_pid = kafka_producer_opts[:test_pid] - - child_specs = [ + def shared_client_child_spec(config) do + [ Supervisor.child_spec( - {Task, fn -> send(parent_pid, :prepare_for_start_1) end}, - id: :prepare_for_start_1 + {Task, fn -> send(config.test_pid, :child_started_1) end}, + id: :child_started_1 ), Supervisor.child_spec( - {Task, fn -> send(parent_pid, :prepare_for_start_2) end}, - id: :prepare_for_start_2 + {Task, fn -> send(config.test_pid, :child_started_2) end}, + id: :child_started_2 ) ] - - {child_specs, broadway_opts} end end @@ -260,12 +255,12 @@ defmodule BroadwayKafka.ProducerTest do stop_broadway(pid) end - test "start all child processes defined in prepare_for_start/1 callback" do + test "start all child processes defined in shared_client_child_spec/1 callback" do {:ok, message_server} = MessageServer.start_link() - {:ok, pid} = start_broadway(message_server) + {:ok, pid} = start_broadway(message_server, shared_client: true) - assert_receive :prepare_for_start_1 - assert_receive :prepare_for_start_2 + assert_receive :child_started_1 + assert_receive :child_started_2 stop_broadway(pid) end