Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Misc tidy ups #302

Merged
merged 5 commits into from
Aug 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/kafka_ex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ defmodule KafkaEx do
def create_worker(name, worker_init \\ []) do
case build_worker_options(worker_init) do
{:ok, worker_init} ->
Supervisor.start_child(KafkaEx.Supervisor, [worker_init, name])
KafkaEx.Supervisor.start_child([worker_init, name])
{:error, error} ->
{:error, error}
end
Expand Down
4 changes: 2 additions & 2 deletions lib/kafka_ex/gen_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ defmodule KafkaEx.GenConsumer do
* `{:sync_commit, new_state}` causes synchronous offset commits.
* `{:async_commit, new_state}` causes asynchronous offset commits.

Note that with both of the offset commit strategies, only of the final offset
Note that with both of the offset commit strategies, only if the final offset
in the message set is committed and this is done after the messages are
consumed. If you want to commit the offset of every message consumed, use
the synchronous offset commit strategy and implement calls to
Expand Down Expand Up @@ -141,7 +141,7 @@ defmodule KafkaEx.GenConsumer do
{:async_commit, %{state | messages: state.messages ++ message_set}}
end

def handle_call(:messages, _from, state)
def handle_call(:messages, _from, state) do
{:reply, state.messages, %{state | calls: state.calls + 1}}
end
end
Expand Down
54 changes: 27 additions & 27 deletions lib/kafka_ex/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,18 @@ defmodule KafkaEx.Server do
use_ssl: false
)

@type t :: %State{
metadata: Metadata.Response.t,
brokers: [Broker.t],
event_pid: nil | pid,
consumer_metadata: ConsumerMetadata.Response.t,
correlation_id: integer,
metadata_update_interval: nil | integer,
consumer_group_update_interval: nil | integer,
worker_name: atom,
ssl_options: KafkaEx.ssl_options,
use_ssl: boolean
}
@type t :: %State{
metadata: Metadata.Response.t,
brokers: [Broker.t],
event_pid: nil | pid,
consumer_metadata: ConsumerMetadata.Response.t,
correlation_id: integer,
metadata_update_interval: nil | integer,
consumer_group_update_interval: nil | integer,
worker_name: atom,
ssl_options: KafkaEx.ssl_options,
use_ssl: boolean,
}

@spec increment_correlation_id(t) :: t
def increment_correlation_id(%State{correlation_id: cid} = state) do
Expand Down Expand Up @@ -287,7 +287,7 @@ defmodule KafkaEx.Server do
correlation_id = state.correlation_id + 1
produce_request_data = Produce.create_request(correlation_id, @client_id, produce_request)
{broker, state, corr_id} = case MetadataResponse.broker_for_topic(state.metadata, state.brokers, produce_request.topic, produce_request.partition) do
nil ->
nil ->
{retrieved_corr_id, _} = retrieve_metadata(state.brokers, state.correlation_id, config_sync_timeout(), produce_request.topic)
state = %{update_metadata(state) | correlation_id: retrieved_corr_id}
{
Expand Down Expand Up @@ -387,20 +387,20 @@ defmodule KafkaEx.Server do
def retrieve_metadata(brokers, correlation_id, sync_timeout, topic, retry, _error_code) do
metadata_request = Metadata.create_request(correlation_id, @client_id, topic)
data = first_broker_response(metadata_request, brokers, sync_timeout)
response = case data do
nil ->
Logger.log(:error, "Unable to fetch metadata from any brokers. Timeout is #{sync_timeout}.")
raise "Unable to fetch metadata from any brokers. Timeout is #{sync_timeout}."
:no_metadata_available
data ->
Metadata.parse_response(data)
end

case Enum.find(response.topic_metadatas, &(&1.error_code == :leader_not_available)) do
nil -> {correlation_id + 1, response}
topic_metadata ->
:timer.sleep(300)
retrieve_metadata(brokers, correlation_id + 1, sync_timeout, topic, retry - 1, topic_metadata.error_code)
if data do
response = Metadata.parse_response(data)

case Enum.find(response.topic_metadatas, &(&1.error_code == :leader_not_available)) do
nil -> {correlation_id + 1, response}
topic_metadata ->
:timer.sleep(300)
retrieve_metadata(brokers, correlation_id + 1, sync_timeout, topic, retry - 1, topic_metadata.error_code)
end
else
message = "Unable to fetch metadata from any brokers. Timeout is #{sync_timeout}."
Logger.log(:error, message)
raise message
:no_metadata_available
end
end

Expand Down
26 changes: 10 additions & 16 deletions lib/kafka_ex/server_0_p_8_p_2.ex
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,15 @@ defmodule KafkaEx.Server0P8P2 do
state = update_metadata(state)
{:ok, _} = :timer.send_interval(state.metadata_update_interval, :update_metadata)

state =
if consumer_group?(state) do
# If we are using consumer groups then initialize the state and start the update cycle
{_, updated_state} = update_consumer_metadata(state)
{:ok, _} = :timer.send_interval(state.consumer_group_update_interval, :update_consumer_metadata)
updated_state
else
state
end
if consumer_group?(state) do
# If we are using consumer groups then initialize the state and start the update cycle
{_, updated_state} = update_consumer_metadata(state)
{:ok, _} = :timer.send_interval(state.consumer_group_update_interval, :update_consumer_metadata)
{:ok, updated_state}
else
{:ok, state}
end

{:ok, state}
end

def kafka_server_consumer_group(state) do
Expand Down Expand Up @@ -225,12 +223,8 @@ defmodule KafkaEx.Server0P8P2 do
def consumer_group?(%State{consumer_group: :no_consumer_group}), do: false
def consumer_group?(_), do: true

def consumer_group_if_auto_commit?(true, state) do
consumer_group?(state)
end
def consumer_group_if_auto_commit?(false, _state) do
true
end
def consumer_group_if_auto_commit?(true, state), do: consumer_group?(state)
def consumer_group_if_auto_commit?(false, _state), do: true

defp first_broker_response(request, state) do
first_broker_response(request, state.brokers, config_sync_timeout())
Expand Down
4 changes: 4 additions & 0 deletions lib/kafka_ex/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ defmodule KafkaEx.Supervisor do
{:ok, pid}
end

def start_child(opts) do
Supervisor.start_child(__MODULE__, opts)
end

def stop_child(child) do
Supervisor.terminate_child(__MODULE__, child)
end
Expand Down