diff --git a/lib/kafka_ex.ex b/lib/kafka_ex.ex index 1a3994f7..fdb702b7 100644 --- a/lib/kafka_ex.ex +++ b/lib/kafka_ex.ex @@ -67,7 +67,7 @@ defmodule KafkaEx do def create_worker(name, worker_init \\ []) do case build_worker_options(worker_init) do {:ok, worker_init} -> - Supervisor.start_child(KafkaEx.Supervisor, [worker_init, name]) + KafkaEx.Supervisor.start_child([worker_init, name]) {:error, error} -> {:error, error} end diff --git a/lib/kafka_ex/gen_consumer.ex b/lib/kafka_ex/gen_consumer.ex index 89c2ce64..4c5d8b0f 100644 --- a/lib/kafka_ex/gen_consumer.ex +++ b/lib/kafka_ex/gen_consumer.ex @@ -60,7 +60,7 @@ defmodule KafkaEx.GenConsumer do * `{:sync_commit, new_state}` causes synchronous offset commits. * `{:async_commit, new_state}` causes asynchronous offset commits. - Note that with both of the offset commit strategies, only of the final offset + Note that with both of the offset commit strategies, only if the final offset in the message set is committed and this is done after the messages are consumed. If you want to commit the offset of every message consumed, use the synchronous offset commit strategy and implement calls to @@ -141,7 +141,7 @@ defmodule KafkaEx.GenConsumer do {:async_commit, %{state | messages: state.messages ++ message_set}} end - def handle_call(:messages, _from, state) + def handle_call(:messages, _from, state) do {:reply, state.messages, %{state | calls: state.calls + 1}} end end diff --git a/lib/kafka_ex/server.ex b/lib/kafka_ex/server.ex index 4a006581..e8e84cd0 100644 --- a/lib/kafka_ex/server.ex +++ b/lib/kafka_ex/server.ex @@ -39,18 +39,18 @@ defmodule KafkaEx.Server do use_ssl: false ) - @type t :: %State{ - metadata: Metadata.Response.t, - brokers: [Broker.t], - event_pid: nil | pid, - consumer_metadata: ConsumerMetadata.Response.t, - correlation_id: integer, - metadata_update_interval: nil | integer, - consumer_group_update_interval: nil | integer, - worker_name: atom, - ssl_options: KafkaEx.ssl_options, - use_ssl: boolean - } + @type t :: %State{ + metadata: Metadata.Response.t, + brokers: [Broker.t], + event_pid: nil | pid, + consumer_metadata: ConsumerMetadata.Response.t, + correlation_id: integer, + metadata_update_interval: nil | integer, + consumer_group_update_interval: nil | integer, + worker_name: atom, + ssl_options: KafkaEx.ssl_options, + use_ssl: boolean, + } @spec increment_correlation_id(t) :: t def increment_correlation_id(%State{correlation_id: cid} = state) do @@ -287,7 +287,7 @@ defmodule KafkaEx.Server do correlation_id = state.correlation_id + 1 produce_request_data = Produce.create_request(correlation_id, @client_id, produce_request) {broker, state, corr_id} = case MetadataResponse.broker_for_topic(state.metadata, state.brokers, produce_request.topic, produce_request.partition) do - nil -> + nil -> {retrieved_corr_id, _} = retrieve_metadata(state.brokers, state.correlation_id, config_sync_timeout(), produce_request.topic) state = %{update_metadata(state) | correlation_id: retrieved_corr_id} { @@ -387,20 +387,20 @@ defmodule KafkaEx.Server do def retrieve_metadata(brokers, correlation_id, sync_timeout, topic, retry, _error_code) do metadata_request = Metadata.create_request(correlation_id, @client_id, topic) data = first_broker_response(metadata_request, brokers, sync_timeout) - response = case data do - nil -> - Logger.log(:error, "Unable to fetch metadata from any brokers. Timeout is #{sync_timeout}.") - raise "Unable to fetch metadata from any brokers. Timeout is #{sync_timeout}." - :no_metadata_available - data -> - Metadata.parse_response(data) - end - - case Enum.find(response.topic_metadatas, &(&1.error_code == :leader_not_available)) do - nil -> {correlation_id + 1, response} - topic_metadata -> - :timer.sleep(300) - retrieve_metadata(brokers, correlation_id + 1, sync_timeout, topic, retry - 1, topic_metadata.error_code) + if data do + response = Metadata.parse_response(data) + + case Enum.find(response.topic_metadatas, &(&1.error_code == :leader_not_available)) do + nil -> {correlation_id + 1, response} + topic_metadata -> + :timer.sleep(300) + retrieve_metadata(brokers, correlation_id + 1, sync_timeout, topic, retry - 1, topic_metadata.error_code) + end + else + message = "Unable to fetch metadata from any brokers. Timeout is #{sync_timeout}." + Logger.log(:error, message) + raise message + :no_metadata_available end end diff --git a/lib/kafka_ex/server_0_p_8_p_2.ex b/lib/kafka_ex/server_0_p_8_p_2.ex index 3591bc5f..4a0fd3e2 100644 --- a/lib/kafka_ex/server_0_p_8_p_2.ex +++ b/lib/kafka_ex/server_0_p_8_p_2.ex @@ -57,17 +57,15 @@ defmodule KafkaEx.Server0P8P2 do state = update_metadata(state) {:ok, _} = :timer.send_interval(state.metadata_update_interval, :update_metadata) - state = - if consumer_group?(state) do - # If we are using consumer groups then initialize the state and start the update cycle - {_, updated_state} = update_consumer_metadata(state) - {:ok, _} = :timer.send_interval(state.consumer_group_update_interval, :update_consumer_metadata) - updated_state - else - state - end + if consumer_group?(state) do + # If we are using consumer groups then initialize the state and start the update cycle + {_, updated_state} = update_consumer_metadata(state) + {:ok, _} = :timer.send_interval(state.consumer_group_update_interval, :update_consumer_metadata) + {:ok, updated_state} + else + {:ok, state} + end - {:ok, state} end def kafka_server_consumer_group(state) do @@ -225,12 +223,8 @@ defmodule KafkaEx.Server0P8P2 do def consumer_group?(%State{consumer_group: :no_consumer_group}), do: false def consumer_group?(_), do: true - def consumer_group_if_auto_commit?(true, state) do - consumer_group?(state) - end - def consumer_group_if_auto_commit?(false, _state) do - true - end + def consumer_group_if_auto_commit?(true, state), do: consumer_group?(state) + def consumer_group_if_auto_commit?(false, _state), do: true defp first_broker_response(request, state) do first_broker_response(request, state.brokers, config_sync_timeout()) diff --git a/lib/kafka_ex/supervisor.ex b/lib/kafka_ex/supervisor.ex index 57687f36..285aca1d 100644 --- a/lib/kafka_ex/supervisor.ex +++ b/lib/kafka_ex/supervisor.ex @@ -8,6 +8,10 @@ defmodule KafkaEx.Supervisor do {:ok, pid} end + def start_child(opts) do + Supervisor.start_child(__MODULE__, opts) + end + def stop_child(child) do Supervisor.terminate_child(__MODULE__, child) end