Skip to content

Commit

Permalink
chore: move shared_client logic from brod_client to producer
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah committed Oct 10, 2023
1 parent 9367887 commit f8d8c64
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 69 deletions.
33 changes: 9 additions & 24 deletions lib/broadway_kafka/brod_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -154,17 +154,15 @@ defmodule BroadwayKafka.BrodClient do
end

@impl true
def prepare_for_start(broadway_opts) do
{_, producer_opts} = broadway_opts[:producer][:module]
init_opts = Keyword.put(producer_opts, :broadway, broadway_opts)

case init(init_opts) do
{:error, message} ->
raise ArgumentError, "invalid options given to #{__MODULE__}.init/1, " <> message

{:ok, config} ->
{child_specs(config), broadway_opts}
end
def shared_client_child_spec(config) do
[
%{
id: config.shared_client_id,
start:
{:brod, :start_link_client,
[config.hosts, config.shared_client_id, config.client_config]}
}
]
end

defp lookup_offset(hosts, topic, partition, policy, client_config) do
Expand Down Expand Up @@ -194,19 +192,6 @@ defmodule BroadwayKafka.BrodClient do
)
end

defp child_specs(%{shared_client: false} = _config), do: []

defp child_specs(%{shared_client: true} = config) do
[
%{
id: config.shared_client_id,
start:
{:brod, :start_link_client,
[config.hosts, config.shared_client_id, config.client_config]}
}
]
end

defp validate(opts, key, options \\ []) when is_list(opts) do
has_key = Keyword.has_key?(opts, key)
required = Keyword.get(options, :required, false)
Expand Down
5 changes: 2 additions & 3 deletions lib/broadway_kafka/kafka_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,8 @@ defmodule BroadwayKafka.KafkaClient do
@callback connected?(:brod.client()) :: boolean
@callback disconnect(:brod.client()) :: :ok

@callback prepare_for_start(broadway_opts :: keyword()) ::
{[child_spec], updated_opts :: keyword()}
@callback shared_client_child_spec(config()) :: [child_spec]
when child_spec: :supervisor.child_spec() | {module, any} | module

@optional_callbacks prepare_for_start: 1
@optional_callbacks shared_client_child_spec: 1
end
30 changes: 21 additions & 9 deletions lib/broadway_kafka/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ defmodule BroadwayKafka.Producer do

client = opts[:client] || BroadwayKafka.BrodClient

case client.init(opts) do
case opts[:initialized_client_config] || client.init(opts) do
{:error, message} ->
raise ArgumentError, "invalid options given to #{inspect(client)}.init/1, " <> message

Expand Down Expand Up @@ -278,7 +278,7 @@ defmodule BroadwayKafka.Producer do
shutting_down?: false,
buffer: :queue.new(),
max_demand: max_demand,
shared_client: config[:shared_client]
shared_client: config.shared_client
}

{:producer, connect(state)}
Expand Down Expand Up @@ -516,17 +516,29 @@ defmodule BroadwayKafka.Producer do
|> Keyword.put(:processors, [updated_processor_entry | other_processors_entries])
|> Keyword.put(:batchers, updated_batchers_entries)

{_, kafka_producer_opts} = opts[:producer][:module]
client = kafka_producer_opts[:client] || BroadwayKafka.BrodClient
{producer_mod, producer_opts} = opts[:producer][:module]

{client_child_specs, updated_opts} =
if function_exported?(client, :prepare_for_start, 1) do
client.prepare_for_start(updated_opts)
{extra_child_specs, initialized_client_config} =
if producer_opts[:shared_client] do
client = producer_opts[:client] || BroadwayKafka.BrodClient

case client.init(Keyword.put(producer_opts, :broadway, opts)) do
{:error, message} ->
raise ArgumentError, "invalid options given to #{client}.init/1, " <> message

{:ok, config} = result ->
{client.shared_client_child_spec(config), result}
end
else
{[], updated_opts}
{[], nil}
end

{allocators ++ client_child_specs, updated_opts}
new_producer_opts =
Keyword.put(producer_opts, :initialized_client_config, initialized_client_config)

updated_opts = put_in(updated_opts, [:producer, :module], {producer_mod, new_producer_opts})

{allocators ++ extra_child_specs, updated_opts}
end

@impl :brod_group_member
Expand Down
24 changes: 6 additions & 18 deletions test/brod_client_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -408,32 +408,20 @@ defmodule BroadwayKafka.BrodClientTest do
end
end

describe "prepare for start" do
test "should return an empty list and unchanged opts when shared_client is not true" do
broadway_opts = [
name: :my_broadway,
producer: [
module: {BroadwayKafka.Producer, @opts}
]
]

assert {[], ^broadway_opts} = BrodClient.prepare_for_start(broadway_opts)
end

test "should return :brod_client child spec and unchanged opts when shared_client is true" do
describe "shared_client_child_spec" do
test "should return child spec" do
module_opts =
@opts
|> Keyword.put(:shared_client, true)
|> Keyword.put(:client_config, client_id_prefix: "my_prefix.")

broadway_opts = [
name: :my_broadway,
producer: [
module: {BroadwayKafka.Producer, module_opts}
]
name: :my_broadway
]

assert {child_specs, ^broadway_opts} = BrodClient.prepare_for_start(broadway_opts)
{:ok, config} = BrodClient.init(Keyword.put(module_opts, :broadway, broadway_opts))

assert child_specs = BrodClient.shared_client_child_spec(config)

assert [
%{
Expand Down
25 changes: 10 additions & 15 deletions test/producer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -121,22 +121,17 @@ defmodule BroadwayKafka.ProducerTest do
end

@impl true
def prepare_for_start(broadway_opts) do
{_, kafka_producer_opts} = broadway_opts[:producer][:module]
parent_pid = kafka_producer_opts[:test_pid]

child_specs = [
def shared_client_child_spec(config) do
[
Supervisor.child_spec(
{Task, fn -> send(parent_pid, :prepare_for_start_1) end},
id: :prepare_for_start_1
{Task, fn -> send(config.test_pid, :child_started_1) end},
id: :child_started_1
),
Supervisor.child_spec(
{Task, fn -> send(parent_pid, :prepare_for_start_2) end},
id: :prepare_for_start_2
{Task, fn -> send(config.test_pid, :child_started_2) end},
id: :child_started_2
)
]

{child_specs, broadway_opts}
end
end

Expand Down Expand Up @@ -260,12 +255,12 @@ defmodule BroadwayKafka.ProducerTest do
stop_broadway(pid)
end

test "start all child processes defined in prepare_for_start/1 callback" do
test "start all child processes defined in shared_client_child_spec/1 callback" do
{:ok, message_server} = MessageServer.start_link()
{:ok, pid} = start_broadway(message_server)
{:ok, pid} = start_broadway(message_server, shared_client: true)

assert_receive :prepare_for_start_1
assert_receive :prepare_for_start_2
assert_receive :child_started_1
assert_receive :child_started_2

stop_broadway(pid)
end
Expand Down

0 comments on commit f8d8c64

Please sign in to comment.