Skip to content

Commit

Permalink
Merge pull request #320 from euranova/use_api_versions_for_new_api_calls
Browse files Browse the repository at this point in the history
Use api versions for new api calls (depends on #319)
  • Loading branch information
joshuawscott authored Nov 12, 2018
2 parents ef63708 + 5b6dd4c commit a414095
Show file tree
Hide file tree
Showing 15 changed files with 248 additions and 144 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ The 0.9 client includes functionality that cannot be tested with older
clusters.

```
mix test --include integration --include consumer_group --include server_0_p_10_p_1 --include server_0_p_9_p_0 --include server_0_p_8_p_0
./all_tests.sh
```

##### Kafka >= 0.9.0
Expand Down
5 changes: 5 additions & 0 deletions all_tests.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#! /bin/sh

# WARN: when changing something here, there should probably also be a change in scripts/ci_tests.sh

mix test --include integration --include consumer_group --include server_0_p_10_and_later --include server_0_p_9_p_0 --include server_0_p_8_p_0
26 changes: 26 additions & 0 deletions lib/kafka_ex/api_versions.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
defmodule KafkaEx.ApiVersions do

def api_versions_map(api_versions) do
api_versions
|> Enum.reduce(%{}, fn version, version_map ->
version_map |> Map.put(version.api_key, version)
end)
end


def find_api_version(api_versions_map, message_type, {min_implemented_version, max_implemented_version}) do
if api_versions_map == [:unsupported] do
{:ok, min_implemented_version}
else
case KafkaEx.Protocol.api_key(message_type) do
nil -> :unknown_message_for_client
api_key -> case api_versions_map[api_key] do
%{min_version: min} when min > max_implemented_version -> :no_version_supported
%{max_version: max} when max < min_implemented_version -> :no_version_supported
%{max_version: max} -> {:ok, Enum.min([max_implemented_version, max])}
_ -> :unknown_message_for_server
end
end
end
end
end
2 changes: 1 addition & 1 deletion lib/kafka_ex/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ defmodule KafkaEx.Config do
defp server("0.8.0"), do: KafkaEx.Server0P8P0
defp server("0.8.2"), do: KafkaEx.Server0P8P2
defp server("0.9.0"), do: KafkaEx.Server0P9P0
defp server(_), do: KafkaEx.Server0P10P1
defp server(_), do: KafkaEx.Server0P10AndLater


# ssl_options should be an empty list by default if use_ssl is false
Expand Down
90 changes: 21 additions & 69 deletions lib/kafka_ex/protocol.ex
Original file line number Diff line number Diff line change
@@ -1,81 +1,33 @@
defmodule KafkaEx.Protocol do
@moduledoc false

@produce_request 0
@fetch_request 1
@offset_request 2
@metadata_request 3
@offset_commit_request 8
@offset_fetch_request 9
@consumer_metadata_request 10
@join_group_request 11
@heartbeat_request 12
@leave_group_request 13
@sync_group_request 14
@api_versions_request 18
@create_topics_request 19
@message_type_to_api_key %{
produce: 0,
fetch: 1,
offset: 2,
metadata: 3,
offset_commit: 8,
offset_fetch: 9,
consumer_metadata: 10,
join_group: 11,
heartbeat: 12,
leave_group: 13,
sync_group: 14,
api_versions: 18,
create_topics: 19,
}

# DescribeConfigs 32
# AlterConfigs 33 Valid resource types are "Topic" and "Broker".

@api_version 0

defp api_key(:produce) do
@produce_request
end

defp api_key(:fetch) do
@fetch_request
end

defp api_key(:offset) do
@offset_request
end

defp api_key(:metadata) do
@metadata_request
end

defp api_key(:offset_commit) do
@offset_commit_request
end

defp api_key(:offset_fetch) do
@offset_fetch_request
end

defp api_key(:consumer_metadata) do
@consumer_metadata_request
end

defp api_key(:join_group) do
@join_group_request
end

defp api_key(:heartbeat) do
@heartbeat_request
end

defp api_key(:leave_group) do
@leave_group_request
end

defp api_key(:sync_group) do
@sync_group_request
end

defp api_key(:api_versions) do
@api_versions_request
end

defp api_key(:create_topics) do
@create_topics_request
end
@default_api_version 0

def create_request(type, correlation_id, client_id) do
create_request(type, correlation_id, client_id, @api_version)
@spec api_key(atom) :: integer | nil
def api_key(type) do
Map.get(@message_type_to_api_key, type, nil)
end

def create_request(type, correlation_id, client_id, api_version) do
def create_request(type, correlation_id, client_id, api_version \\ @default_api_version) do
<< api_key(type) :: 16, api_version :: 16, correlation_id :: 32,
byte_size(client_id) :: 16, client_id :: binary >>
end
Expand Down
18 changes: 14 additions & 4 deletions lib/kafka_ex/protocol/create_topics.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@

defmodule KafkaEx.Protocol.CreateTopics do
alias KafkaEx.Protocol
@supported_versions_range {0, 0}
@default_api_version 0

@moduledoc """
Implementation of the Kafka CreateTopics request and response APIs
Expand Down Expand Up @@ -63,8 +65,14 @@ defmodule KafkaEx.Protocol.CreateTopics do
@type t :: %Response{topic_errors: [TopicError]}
end

@spec create_request(integer, binary, Request.t) :: binary
def create_request(correlation_id, client_id, create_topics_request) do
def api_version(api_versions) do
KafkaEx.ApiVersions.find_api_version(api_versions, :create_topics, @supported_versions_range)
end

@spec create_request(integer, binary, Request.t, integer) :: binary
def create_request(correlation_id, client_id, create_topics_request, api_version)

def create_request(correlation_id, client_id, create_topics_request, 0) do
Protocol.create_request(:create_topics, correlation_id, client_id) <>
encode_topic_requests(create_topics_request.create_topic_requests) <>
<< create_topics_request.timeout :: 32-signed >>
Expand Down Expand Up @@ -129,8 +137,10 @@ defmodule KafkaEx.Protocol.CreateTopics do
end
end

@spec parse_response(binary) :: [] | Response.t
def parse_response(<< _correlation_id :: 32-signed, topic_errors_count :: 32-signed, topic_errors :: binary >>) do
@spec parse_response(binary, integer) :: [] | Response.t
def parse_response(message, api_version)

def parse_response(<< _correlation_id :: 32-signed, topic_errors_count :: 32-signed, topic_errors :: binary >>, 0) do
%Response{topic_errors: parse_topic_errors(topic_errors_count, topic_errors)}
end

Expand Down
17 changes: 17 additions & 0 deletions lib/kafka_ex/protocol/metadata.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ defmodule KafkaEx.Protocol.Metadata do
alias KafkaEx.Protocol
import KafkaEx.Protocol.Common

@supported_versions_range {0, 1}
@default_api_version 0

@moduledoc """
Expand Down Expand Up @@ -97,8 +98,24 @@ defmodule KafkaEx.Protocol.Metadata do
}
end

def api_version(api_versions) do
case KafkaEx.ApiVersions.find_api_version(api_versions, :metadata, @supported_versions_range) do
{:ok, version} -> version
# those three should never happen since :metadata is part of the protocol since the beginning.
# they are left here as this will server as reference implementation
# :unknown_message_for_server ->
# :unknown_message_for_client ->
# :no_version_supported ->
_ -> @default_api_version
end
end

def create_request(correlation_id, client_id, topics, api_version \\ @default_api_version)

def create_request(correlation_id, client_id, nil, api_version) do
create_request(correlation_id, client_id, "", api_version)
end

def create_request(correlation_id, client_id, "", api_version) do
topic_count = if 0 == api_version, do: 0, else: -1
KafkaEx.Protocol.create_request(:metadata, correlation_id, client_id, api_version) <> << topic_count :: 32-signed >>
Expand Down
43 changes: 24 additions & 19 deletions lib/kafka_ex/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ defmodule KafkaEx.Server do
consumer_group_update_interval: nil,
worker_name: KafkaEx.Server,
ssl_options: [],
use_ssl: false
use_ssl: false,
api_versions: []
)

@type t :: %State{
Expand All @@ -51,6 +52,7 @@ defmodule KafkaEx.Server do
worker_name: atom,
ssl_options: KafkaEx.ssl_options,
use_ssl: boolean,
api_versions: [KafkaEx.Protocol.ApiVersions.ApiVersion],
}

@spec increment_correlation_id(t) :: t
Expand Down Expand Up @@ -313,8 +315,8 @@ defmodule KafkaEx.Server do
def kafka_server_produce_send_request(correlation_id, produce_request, produce_request_data, state) do
{broker, state, corr_id} = case MetadataResponse.broker_for_topic(state.metadata, state.brokers, produce_request.topic, produce_request.partition) do
nil ->
{retrieved_corr_id, _} = retrieve_metadata(state.brokers, state.correlation_id, config_sync_timeout(), produce_request.topic)
state = %{update_metadata(state) | correlation_id: retrieved_corr_id}
{retrieved_corr_id, _} = retrieve_metadata(state.brokers, state.correlation_id, config_sync_timeout(), produce_request.topic, state.api_versions)
state = update_metadata(%{state | correlation_id: retrieved_corr_id})
{
MetadataResponse.broker_for_topic(state.metadata, state.brokers, produce_request.topic, produce_request.partition),
state,
Expand Down Expand Up @@ -383,7 +385,7 @@ defmodule KafkaEx.Server do
end

def kafka_server_metadata(topic, state) do
{correlation_id, metadata} = retrieve_metadata(state.brokers, state.correlation_id, config_sync_timeout(), topic)
{correlation_id, metadata} = retrieve_metadata(state.brokers, state.correlation_id, config_sync_timeout(), topic, state.api_versions)
updated_state = %{state | metadata: metadata, correlation_id: correlation_id}
{:reply, metadata, updated_state}
end
Expand All @@ -392,10 +394,8 @@ defmodule KafkaEx.Server do
{:noreply, update_metadata(state)}
end

def update_metadata(state), do: update_metadata(state, 0)

def update_metadata(state, api_version) do
{correlation_id, metadata} = retrieve_metadata(state.brokers, state.correlation_id, config_sync_timeout(), [], api_version)
def update_metadata(state) do
{correlation_id, metadata} = retrieve_metadata(state.brokers, state.correlation_id, config_sync_timeout(), nil, state.api_versions)
metadata_brokers = metadata.brokers |> Enum.map(&(%{&1 | is_controller: &1.node_id == metadata.controller_id}))
brokers = state.brokers
|> remove_stale_brokers(metadata_brokers)
Expand All @@ -404,30 +404,27 @@ defmodule KafkaEx.Server do
end

# credo:disable-for-next-line Credo.Check.Refactor.FunctionArity
def retrieve_metadata(brokers, correlation_id, sync_timeout, topic \\ [], api_version \\ 0) do
retrieve_metadata(brokers, correlation_id, sync_timeout, topic, @retry_count, 0, api_version)
def retrieve_metadata(brokers, correlation_id, sync_timeout, topic \\ [], server_api_versions \\ [:unsupported]) do
retrieve_metadata(brokers, correlation_id, sync_timeout, topic, @retry_count, 0, server_api_versions)
end

def retrieve_metadata(brokers, correlation_id, sync_timeout, topic, retry, error_code, api_version \\ 0)

# credo:disable-for-next-line Credo.Check.Refactor.FunctionArity
def retrieve_metadata(_, correlation_id, _sync_timeout, topic, 0, error_code, api_version) do
Logger.log(:error, "Metadata request for topic #{inspect topic} failed with error_code #{inspect error_code}")
{correlation_id, %Metadata.Response{}}
def retrieve_metadata(brokers, correlation_id, sync_timeout, topic, retry, error_code, server_api_versions \\ [:unsupported]) do
api_version = Metadata.api_version(server_api_versions)
retrieve_metadata_with_version(brokers, correlation_id, sync_timeout, topic, retry, error_code, api_version)
end

# credo:disable-for-next-line Credo.Check.Refactor.FunctionArity
def retrieve_metadata(brokers, correlation_id, sync_timeout, topic, retry, _error_code, api_version) do
def retrieve_metadata_with_version(brokers, correlation_id, sync_timeout, topic, retry, _error_code, api_version) do
metadata_request = Metadata.create_request(correlation_id, @client_id, topic, api_version)
data = first_broker_response(metadata_request, brokers, sync_timeout)
if data do
response = Metadata.parse_response(data, api_version)

case Enum.find(response.topic_metadatas, &(&1.error_code == :leader_not_available)) do
nil -> {correlation_id + 1, response}
topic_metadata ->
:timer.sleep(300)
retrieve_metadata(brokers, correlation_id + 1, sync_timeout, topic, retry - 1, topic_metadata.error_code, api_version)
retrieve_metadata_with_version(brokers, correlation_id + 1, sync_timeout, topic, retry - 1, topic_metadata.error_code, api_version)
end
else
message = "Unable to fetch metadata from any brokers. Timeout is #{sync_timeout}."
Expand All @@ -437,6 +434,13 @@ defmodule KafkaEx.Server do
end
end

# credo:disable-for-next-line Credo.Check.Refactor.FunctionArity
def retrieve_metadata_with_version(_, correlation_id, _sync_timeout, topic, 0, error_code, server_api_versions) do
Logger.log(:error, "Metadata request for topic #{inspect topic} failed with error_code #{inspect error_code}")
{correlation_id, %Metadata.Response{}}
end


defoverridable [
kafka_server_produce: 2, kafka_server_offset: 4,
kafka_server_metadata: 2, kafka_server_update_metadata: 1,
Expand Down Expand Up @@ -470,7 +474,8 @@ defmodule KafkaEx.Server do
metadata_update_interval: metadata_update_interval,
ssl_options: ssl_options,
use_ssl: use_ssl,
worker_name: name
worker_name: name,
api_versions: [:unsupported]
}

state = update_metadata(state)
Expand Down
Loading

0 comments on commit a414095

Please sign in to comment.