From 751379207614577e41e282ebee99e0511c229ad0 Mon Sep 17 00:00:00 2001 From: Jehan Bruggeman Date: Fri, 19 Oct 2018 11:06:51 +0200 Subject: [PATCH 1/4] Add module to determine API version to use --- lib/kafka_ex/api_versions.ex | 25 ++++++++ lib/kafka_ex/protocol.ex | 90 +++++++---------------------- test/kafka_ex/api_versions_test.exs | 42 ++++++++++++++ 3 files changed, 88 insertions(+), 69 deletions(-) create mode 100644 lib/kafka_ex/api_versions.ex create mode 100644 test/kafka_ex/api_versions_test.exs diff --git a/lib/kafka_ex/api_versions.ex b/lib/kafka_ex/api_versions.ex new file mode 100644 index 00000000..11a33b1c --- /dev/null +++ b/lib/kafka_ex/api_versions.ex @@ -0,0 +1,25 @@ +defmodule KafkaEx.ApiVersions do + + def api_versions_map(api_versions) do + api_versions + |> Enum.map(fn version -> {version.api_key, version} end) + |> Map.new + end + + def find_api_version(api_versions_map, message_type, {min_implemented_version, max_implemented_version}) do + if api_versions_map == [:unsupported] do + {:ok, min_implemented_version} + else + case KafkaEx.Protocol.api_key(message_type) do + nil -> :unknown_message_for_client + api_key -> case api_versions_map[api_key] do + %{min_version: min} when min > max_implemented_version -> :no_version_supported + %{max_version: max} when max < min_implemented_version -> :no_version_supported + %{max_version: max} -> {:ok, Enum.min([max_implemented_version, max])} + _ -> :unknown_message_for_server + end + end + end + + end +end \ No newline at end of file diff --git a/lib/kafka_ex/protocol.ex b/lib/kafka_ex/protocol.ex index ce01b275..6003d978 100644 --- a/lib/kafka_ex/protocol.ex +++ b/lib/kafka_ex/protocol.ex @@ -1,81 +1,33 @@ defmodule KafkaEx.Protocol do @moduledoc false - @produce_request 0 - @fetch_request 1 - @offset_request 2 - @metadata_request 3 - @offset_commit_request 8 - @offset_fetch_request 9 - @consumer_metadata_request 10 - @join_group_request 11 - @heartbeat_request 12 - @leave_group_request 13 - @sync_group_request 14 - @api_versions_request 18 - @create_topics_request 19 + @message_type_to_api_key %{ + produce: 0, + fetch: 1, + offset: 2, + metadata: 3, + offset_commit: 8, + offset_fetch: 9, + consumer_metadata: 10, + join_group: 11, + heartbeat: 12, + leave_group: 13, + sync_group: 14, + api_versions: 18, + create_topics: 19, + } + # DescribeConfigs 32 # AlterConfigs 33 Valid resource types are "Topic" and "Broker". - @api_version 0 - - defp api_key(:produce) do - @produce_request - end - - defp api_key(:fetch) do - @fetch_request - end - - defp api_key(:offset) do - @offset_request - end - - defp api_key(:metadata) do - @metadata_request - end - - defp api_key(:offset_commit) do - @offset_commit_request - end - - defp api_key(:offset_fetch) do - @offset_fetch_request - end - - defp api_key(:consumer_metadata) do - @consumer_metadata_request - end - - defp api_key(:join_group) do - @join_group_request - end - - defp api_key(:heartbeat) do - @heartbeat_request - end - - defp api_key(:leave_group) do - @leave_group_request - end - - defp api_key(:sync_group) do - @sync_group_request - end - - defp api_key(:api_versions) do - @api_versions_request - end - - defp api_key(:create_topics) do - @create_topics_request - end + @default_api_version 0 - def create_request(type, correlation_id, client_id) do - create_request(type, correlation_id, client_id, @api_version) + @spec api_key(atom) :: integer | nil + def api_key(type) do + Map.get(@message_type_to_api_key, type, nil) end - def create_request(type, correlation_id, client_id, api_version) do + def create_request(type, correlation_id, client_id, api_version \\ @default_api_version) do << api_key(type) :: 16, api_version :: 16, correlation_id :: 32, byte_size(client_id) :: 16, client_id :: binary >> end diff --git a/test/kafka_ex/api_versions_test.exs b/test/kafka_ex/api_versions_test.exs new file mode 100644 index 00000000..7b96e9fc --- /dev/null +++ b/test/kafka_ex/api_versions_test.exs @@ -0,0 +1,42 @@ +defmodule ApiVersionsTest do + use ExUnit.Case + @api_versions %{ + 0 => %KafkaEx.Protocol.ApiVersions.ApiVersion { + api_key: 0, + min_version: 0, + max_version: 3, + }, + 1 => %KafkaEx.Protocol.ApiVersions.ApiVersion { + api_key: 1, + min_version: 7, + max_version: 8, + }, + 3 => %KafkaEx.Protocol.ApiVersions.ApiVersion { + api_key: 3, + min_version: 2, + max_version: 4, + } + } + + test "can correctly determine the adequate api version when api versions is not support" do + assert {:ok, 1} == KafkaEx.ApiVersions.find_api_version([:unsupported], :metadata, {1, 3}) + end + + test "KafkaEx.ApiVersions can correctly determine the adequate api version when a match exists" do + assert {:ok, 3} == KafkaEx.ApiVersions.find_api_version(@api_versions, :metadata, {1, 3}) + assert {:ok, 0} == KafkaEx.ApiVersions.find_api_version(@api_versions, :produce, {0, 0}) + end + + test "KafkaEx.ApiVersions replies an error when there's no version match" do + assert :no_version_supported == KafkaEx.ApiVersions.find_api_version(@api_versions, :fetch, {0, 6}) + end + + test "KafkaEx.ApiVersions replies an error when the api_key is unknown to the server" do + assert :unknown_message_for_server == KafkaEx.ApiVersions.find_api_version(@api_versions, :create_topics, {0, 1}) + end + + test "KafkaEx.ApiVersions replies an error when the api_key is unknown to the client" do + assert :unknown_message_for_client == KafkaEx.ApiVersions.find_api_version(@api_versions, :this_does_not_exist, {0, 1}) + end +end + From 450b1b0dbefbe282aa33d739cfd855e0ebe90a2b Mon Sep 17 00:00:00 2001 From: Jehan Bruggeman Date: Mon, 22 Oct 2018 10:24:55 +0200 Subject: [PATCH 2/4] Use api_versions to control the version of Medatadata being called. --- README.md | 2 +- all_tests.sh | 5 +++ lib/kafka_ex/api_versions.ex | 1 - lib/kafka_ex/config.ex | 2 +- lib/kafka_ex/protocol/metadata.ex | 12 ++++++ lib/kafka_ex/server.ex | 43 +++++++++++-------- ...p_10_p_1.ex => server_0_p_10_and_later.ex} | 29 ++++++++++--- lib/kafka_ex/server_0_p_8_p_2.ex | 11 ++++- lib/kafka_ex/server_0_p_9_p_0.ex | 13 +++++- scripts/ci_tests.sh | 2 +- .../server0_p_10_and_later_test.exs | 43 +++++++++++++++++++ test/integration/server0_p_10_p_1_test.exs | 40 +---------------- 12 files changed, 133 insertions(+), 70 deletions(-) create mode 100755 all_tests.sh rename lib/kafka_ex/{server_0_p_10_p_1.ex => server_0_p_10_and_later.ex} (87%) create mode 100644 test/integration/server0_p_10_and_later_test.exs diff --git a/README.md b/README.md index c207e9e4..100df378 100644 --- a/README.md +++ b/README.md @@ -357,7 +357,7 @@ The 0.9 client includes functionality that cannot be tested with older clusters. ``` -mix test --include integration --include consumer_group --include server_0_p_10_p_1 --include server_0_p_9_p_0 --include server_0_p_8_p_0 +./all_tests.sh ``` ##### Kafka >= 0.9.0 diff --git a/all_tests.sh b/all_tests.sh new file mode 100755 index 00000000..f0d1d2c4 --- /dev/null +++ b/all_tests.sh @@ -0,0 +1,5 @@ +#! /bin/sh + +# WARN: when changing something here, there should probably also be a change in scripts/ci_tests.sh + +mix test --include integration --include consumer_group --include server_0_p_10_and_later --include server_0_p_9_p_0 --include server_0_p_8_p_0 diff --git a/lib/kafka_ex/api_versions.ex b/lib/kafka_ex/api_versions.ex index 11a33b1c..03338297 100644 --- a/lib/kafka_ex/api_versions.ex +++ b/lib/kafka_ex/api_versions.ex @@ -20,6 +20,5 @@ defmodule KafkaEx.ApiVersions do end end end - end end \ No newline at end of file diff --git a/lib/kafka_ex/config.ex b/lib/kafka_ex/config.ex index 08bc916d..1b5ba86d 100644 --- a/lib/kafka_ex/config.ex +++ b/lib/kafka_ex/config.ex @@ -81,7 +81,7 @@ defmodule KafkaEx.Config do defp server("0.8.0"), do: KafkaEx.Server0P8P0 defp server("0.8.2"), do: KafkaEx.Server0P8P2 defp server("0.9.0"), do: KafkaEx.Server0P9P0 - defp server(_), do: KafkaEx.Server0P10P1 + defp server(_), do: KafkaEx.Server0P10AndLater # ssl_options should be an empty list by default if use_ssl is false diff --git a/lib/kafka_ex/protocol/metadata.ex b/lib/kafka_ex/protocol/metadata.ex index 7fd52a07..ca08ae8c 100644 --- a/lib/kafka_ex/protocol/metadata.ex +++ b/lib/kafka_ex/protocol/metadata.ex @@ -2,6 +2,7 @@ defmodule KafkaEx.Protocol.Metadata do alias KafkaEx.Protocol import KafkaEx.Protocol.Common + @supported_versions_range {0, 1} @default_api_version 0 @moduledoc """ @@ -97,8 +98,19 @@ defmodule KafkaEx.Protocol.Metadata do } end + def api_version(api_versions) do + case KafkaEx.ApiVersions.find_api_version(api_versions, :metadata, @supported_versions_range) do + {:ok, version} -> version + _ -> @default_api_version + end + end + def create_request(correlation_id, client_id, topics, api_version \\ @default_api_version) + def create_request(correlation_id, client_id, nil, api_version) do + create_request(correlation_id, client_id, "", api_version) + end + def create_request(correlation_id, client_id, "", api_version) do topic_count = if 0 == api_version, do: 0, else: -1 KafkaEx.Protocol.create_request(:metadata, correlation_id, client_id, api_version) <> << topic_count :: 32-signed >> diff --git a/lib/kafka_ex/server.ex b/lib/kafka_ex/server.ex index 87d5a5b8..acbb7142 100644 --- a/lib/kafka_ex/server.ex +++ b/lib/kafka_ex/server.ex @@ -37,7 +37,8 @@ defmodule KafkaEx.Server do consumer_group_update_interval: nil, worker_name: KafkaEx.Server, ssl_options: [], - use_ssl: false + use_ssl: false, + api_versions: [] ) @type t :: %State{ @@ -51,6 +52,7 @@ defmodule KafkaEx.Server do worker_name: atom, ssl_options: KafkaEx.ssl_options, use_ssl: boolean, + api_versions: [KafkaEx.Protocol.ApiVersions.ApiVersion], } @spec increment_correlation_id(t) :: t @@ -313,8 +315,8 @@ defmodule KafkaEx.Server do def kafka_server_produce_send_request(correlation_id, produce_request, produce_request_data, state) do {broker, state, corr_id} = case MetadataResponse.broker_for_topic(state.metadata, state.brokers, produce_request.topic, produce_request.partition) do nil -> - {retrieved_corr_id, _} = retrieve_metadata(state.brokers, state.correlation_id, config_sync_timeout(), produce_request.topic) - state = %{update_metadata(state) | correlation_id: retrieved_corr_id} + {retrieved_corr_id, _} = retrieve_metadata(state.brokers, state.correlation_id, config_sync_timeout(), produce_request.topic, state.api_versions) + state = update_metadata(%{state | correlation_id: retrieved_corr_id}) { MetadataResponse.broker_for_topic(state.metadata, state.brokers, produce_request.topic, produce_request.partition), state, @@ -383,7 +385,7 @@ defmodule KafkaEx.Server do end def kafka_server_metadata(topic, state) do - {correlation_id, metadata} = retrieve_metadata(state.brokers, state.correlation_id, config_sync_timeout(), topic) + {correlation_id, metadata} = retrieve_metadata(state.brokers, state.correlation_id, config_sync_timeout(), topic, state.api_versions) updated_state = %{state | metadata: metadata, correlation_id: correlation_id} {:reply, metadata, updated_state} end @@ -392,10 +394,8 @@ defmodule KafkaEx.Server do {:noreply, update_metadata(state)} end - def update_metadata(state), do: update_metadata(state, 0) - - def update_metadata(state, api_version) do - {correlation_id, metadata} = retrieve_metadata(state.brokers, state.correlation_id, config_sync_timeout(), [], api_version) + def update_metadata(state) do + {correlation_id, metadata} = retrieve_metadata(state.brokers, state.correlation_id, config_sync_timeout(), nil, state.api_versions) metadata_brokers = metadata.brokers |> Enum.map(&(%{&1 | is_controller: &1.node_id == metadata.controller_id})) brokers = state.brokers |> remove_stale_brokers(metadata_brokers) @@ -404,30 +404,27 @@ defmodule KafkaEx.Server do end # credo:disable-for-next-line Credo.Check.Refactor.FunctionArity - def retrieve_metadata(brokers, correlation_id, sync_timeout, topic \\ [], api_version \\ 0) do - retrieve_metadata(brokers, correlation_id, sync_timeout, topic, @retry_count, 0, api_version) + def retrieve_metadata(brokers, correlation_id, sync_timeout, topic \\ [], server_api_versions \\ [:unsupported]) do + retrieve_metadata(brokers, correlation_id, sync_timeout, topic, @retry_count, 0, server_api_versions) end - def retrieve_metadata(brokers, correlation_id, sync_timeout, topic, retry, error_code, api_version \\ 0) - # credo:disable-for-next-line Credo.Check.Refactor.FunctionArity - def retrieve_metadata(_, correlation_id, _sync_timeout, topic, 0, error_code, api_version) do - Logger.log(:error, "Metadata request for topic #{inspect topic} failed with error_code #{inspect error_code}") - {correlation_id, %Metadata.Response{}} + def retrieve_metadata(brokers, correlation_id, sync_timeout, topic, retry, error_code, server_api_versions \\ [:unsupported]) do + api_version = Metadata.api_version(server_api_versions) + retrieve_metadata_with_version(brokers, correlation_id, sync_timeout, topic, retry, error_code, api_version) end # credo:disable-for-next-line Credo.Check.Refactor.FunctionArity - def retrieve_metadata(brokers, correlation_id, sync_timeout, topic, retry, _error_code, api_version) do + def retrieve_metadata_with_version(brokers, correlation_id, sync_timeout, topic, retry, _error_code, api_version) do metadata_request = Metadata.create_request(correlation_id, @client_id, topic, api_version) data = first_broker_response(metadata_request, brokers, sync_timeout) if data do response = Metadata.parse_response(data, api_version) - case Enum.find(response.topic_metadatas, &(&1.error_code == :leader_not_available)) do nil -> {correlation_id + 1, response} topic_metadata -> :timer.sleep(300) - retrieve_metadata(brokers, correlation_id + 1, sync_timeout, topic, retry - 1, topic_metadata.error_code, api_version) + retrieve_metadata_with_version(brokers, correlation_id + 1, sync_timeout, topic, retry - 1, topic_metadata.error_code, api_version) end else message = "Unable to fetch metadata from any brokers. Timeout is #{sync_timeout}." @@ -437,6 +434,13 @@ defmodule KafkaEx.Server do end end + # credo:disable-for-next-line Credo.Check.Refactor.FunctionArity + def retrieve_metadata_with_version(_, correlation_id, _sync_timeout, topic, 0, error_code, server_api_versions) do + Logger.log(:error, "Metadata request for topic #{inspect topic} failed with error_code #{inspect error_code}") + {correlation_id, %Metadata.Response{}} + end + + defoverridable [ kafka_server_produce: 2, kafka_server_offset: 4, kafka_server_metadata: 2, kafka_server_update_metadata: 1, @@ -470,7 +474,8 @@ defmodule KafkaEx.Server do metadata_update_interval: metadata_update_interval, ssl_options: ssl_options, use_ssl: use_ssl, - worker_name: name + worker_name: name, + api_versions: [:unsupported] } state = update_metadata(state) diff --git a/lib/kafka_ex/server_0_p_10_p_1.ex b/lib/kafka_ex/server_0_p_10_and_later.ex similarity index 87% rename from lib/kafka_ex/server_0_p_10_p_1.ex rename to lib/kafka_ex/server_0_p_10_and_later.ex index fe56446a..8a958c4e 100644 --- a/lib/kafka_ex/server_0_p_10_p_1.ex +++ b/lib/kafka_ex/server_0_p_10_and_later.ex @@ -1,4 +1,4 @@ -defmodule KafkaEx.Server0P10P1 do +defmodule KafkaEx.Server0P10AndLater do @moduledoc """ Implements kafkaEx.Server behaviors for kafka 0.10.1 API. """ @@ -16,7 +16,6 @@ defmodule KafkaEx.Server0P10P1 do require Logger - @metadata_api_version 1 @consumer_group_update_interval 30_000 @@ -66,10 +65,26 @@ defmodule KafkaEx.Server0P10P1 do brokers = Enum.map(uris, fn({host, port}) -> %Broker{host: host, port: port, socket: NetworkClient.create_socket(host, port, ssl_options, use_ssl)} end) - {correlation_id, metadata} = retrieve_metadata(brokers, 0, config_sync_timeout(), [], @metadata_api_version) - state = %State{metadata: metadata, brokers: brokers, correlation_id: correlation_id, consumer_group: consumer_group, metadata_update_interval: metadata_update_interval, consumer_group_update_interval: consumer_group_update_interval, worker_name: name, ssl_options: ssl_options, use_ssl: use_ssl} + { _, %KafkaEx.Protocol.ApiVersions.Response{ api_versions: api_versions, error_code: :no_error }, state } = kafka_api_versions(%State{brokers: brokers}) + api_versions = KafkaEx.ApiVersions.api_versions_map(api_versions) + + {correlation_id, metadata} = retrieve_metadata(brokers, state.correlation_id, config_sync_timeout(), [], api_versions) + + state = %State{ + metadata: metadata, + brokers: brokers, + correlation_id: correlation_id, + consumer_group: consumer_group, + metadata_update_interval: metadata_update_interval, + consumer_group_update_interval: consumer_group_update_interval, + worker_name: name, + ssl_options: ssl_options, + use_ssl: use_ssl, + api_versions: api_versions, + } + # Get the initial "real" broker list and start a regular refresh cycle. - state = update_metadata(state, @metadata_api_version) + state = update_metadata(state) {:ok, _} = :timer.send_interval(state.metadata_update_interval, :update_metadata) state = @@ -86,13 +101,13 @@ defmodule KafkaEx.Server0P10P1 do end def kafka_server_metadata(topic, state) do - {correlation_id, metadata} = retrieve_metadata(state.brokers, state.correlation_id, config_sync_timeout(), topic, @metadata_api_version) + {correlation_id, metadata} = retrieve_metadata(state.brokers, state.correlation_id, config_sync_timeout(), topic, state.api_versions) updated_state = %{state | metadata: metadata, correlation_id: correlation_id} {:reply, metadata, updated_state} end def kafka_server_update_metadata(state) do - {:noreply, update_metadata(state, @metadata_api_version)} + {:noreply, update_metadata(state)} end def kafka_api_versions(state) do diff --git a/lib/kafka_ex/server_0_p_8_p_2.ex b/lib/kafka_ex/server_0_p_8_p_2.ex index 1847bc2b..ee4b97d1 100644 --- a/lib/kafka_ex/server_0_p_8_p_2.ex +++ b/lib/kafka_ex/server_0_p_8_p_2.ex @@ -53,7 +53,16 @@ defmodule KafkaEx.Server0P8P2 do brokers = Enum.map(uris, fn({host, port}) -> %Broker{host: host, port: port, socket: NetworkClient.create_socket(host, port)} end) {correlation_id, metadata} = retrieve_metadata(brokers, 0, config_sync_timeout()) - state = %State{metadata: metadata, brokers: brokers, correlation_id: correlation_id, consumer_group: consumer_group, metadata_update_interval: metadata_update_interval, consumer_group_update_interval: consumer_group_update_interval, worker_name: name} + state = %State{ + metadata: metadata, + brokers: brokers, + correlation_id: correlation_id, + consumer_group: consumer_group, + metadata_update_interval: metadata_update_interval, + consumer_group_update_interval: consumer_group_update_interval, + worker_name: name, + api_versions: [:unsupported] + } # Get the initial "real" broker list and start a regular refresh cycle. state = update_metadata(state) {:ok, _} = :timer.send_interval(state.metadata_update_interval, :update_metadata) diff --git a/lib/kafka_ex/server_0_p_9_p_0.ex b/lib/kafka_ex/server_0_p_9_p_0.ex index 88e43b83..20312e3b 100644 --- a/lib/kafka_ex/server_0_p_9_p_0.ex +++ b/lib/kafka_ex/server_0_p_9_p_0.ex @@ -69,7 +69,18 @@ defmodule KafkaEx.Server0P9P0 do brokers = Enum.map(uris, fn({host, port}) -> %Broker{host: host, port: port, socket: NetworkClient.create_socket(host, port, ssl_options, use_ssl)} end) {correlation_id, metadata} = retrieve_metadata(brokers, 0, config_sync_timeout()) - state = %State{metadata: metadata, brokers: brokers, correlation_id: correlation_id, consumer_group: consumer_group, metadata_update_interval: metadata_update_interval, consumer_group_update_interval: consumer_group_update_interval, worker_name: name, ssl_options: ssl_options, use_ssl: use_ssl} + state = %State{ + metadata: metadata, + brokers: brokers, + correlation_id: correlation_id, + consumer_group: consumer_group, + metadata_update_interval: metadata_update_interval, + consumer_group_update_interval: consumer_group_update_interval, + worker_name: name, + ssl_options: ssl_options, + use_ssl: use_ssl, + api_versions: [:unsupported] + } # Get the initial "real" broker list and start a regular refresh cycle. state = update_metadata(state) {:ok, _} = :timer.send_interval(state.metadata_update_interval, :update_metadata) diff --git a/scripts/ci_tests.sh b/scripts/ci_tests.sh index a005b302..756ad98d 100755 --- a/scripts/ci_tests.sh +++ b/scripts/ci_tests.sh @@ -16,7 +16,7 @@ else TEST_COMMAND=test fi -INCLUDED_TESTS="--include integration --include consumer_group --include server_0_p_10_p_1 --include server_0_p_9_p_0 --include server_0_p_8_p_0" +INCLUDED_TESTS="--include integration --include consumer_group --include server_0_p_10_and_later --include server_0_p_9_p_0 --include server_0_p_8_p_0" mix "$TEST_COMMAND" $INCLUDED_TESTS diff --git a/test/integration/server0_p_10_and_later_test.exs b/test/integration/server0_p_10_and_later_test.exs new file mode 100644 index 00000000..1eb54c19 --- /dev/null +++ b/test/integration/server0_p_10_and_later_test.exs @@ -0,0 +1,43 @@ +defmodule KafkaEx.Server0P10P1AndLater.Test do + use ExUnit.Case + import TestHelper + + @moduletag :server_0_p_10_and_later + + @tag :create_topic + test "can create a topic" do + name = "create_topic_#{:rand.uniform(2000000)}" + + request = %{ + topic: name, + num_partitions: 10, + replication_factor: 1, + replica_assignment: [], + config_entries: [ + %{config_name: "cleanup.policy", config_value: "compact"}, + %{config_name: "min.compaction.lag.ms", config_value: "0"} + ]} + + resp = KafkaEx.create_topics([request], timeout: 2000) + assert {:no_error, name} == parse_create_topic_resp(resp) + + resp = KafkaEx.create_topics([request], timeout: 2000) + assert {:topic_already_exists, name} == parse_create_topic_resp(resp) + + wait_for(fn -> + topics = KafkaEx.metadata.topic_metadatas |> Enum.map(&(&1.topic)) + assert Enum.member?(topics, name) + end) + end + + def parse_create_topic_resp(response) do + %KafkaEx.Protocol.CreateTopics.Response{ + topic_errors: [ + %KafkaEx.Protocol.CreateTopics.TopicError{ + error_code: error_code, + topic_name: topic_name + } + ]} = response + {error_code, topic_name} + end +end diff --git a/test/integration/server0_p_10_p_1_test.exs b/test/integration/server0_p_10_p_1_test.exs index d6bf3498..3211ecb1 100644 --- a/test/integration/server0_p_10_p_1_test.exs +++ b/test/integration/server0_p_10_p_1_test.exs @@ -1,46 +1,10 @@ defmodule KafkaEx.Server0P10P1.Test do use ExUnit.Case - import TestHelper + @moduletag :server_0_p_10_and_later @moduletag :server_0_p_10_p_1 - @tag :create_topic - test "can create a topic" do - name = "create_topic_#{:rand.uniform(2000000)}" - - request = %{ - topic: name, - num_partitions: 10, - replication_factor: 1, - replica_assignment: [], - config_entries: [ - %{config_name: "cleanup.policy", config_value: "compact"}, - %{config_name: "min.compaction.lag.ms", config_value: "0"} - ]} - - resp = KafkaEx.create_topics([request], timeout: 2000) - assert {:no_error, name} == parse_create_topic_resp(resp) - - resp = KafkaEx.create_topics([request], timeout: 2000) - assert {:topic_already_exists, name} == parse_create_topic_resp(resp) - - wait_for(fn -> - topics = KafkaEx.metadata.topic_metadatas |> Enum.map(&(&1.topic)) - assert Enum.member?(topics, name) - end) - end - - def parse_create_topic_resp(response) do - %KafkaEx.Protocol.CreateTopics.Response{ - topic_errors: [ - %KafkaEx.Protocol.CreateTopics.TopicError{ - error_code: error_code, - topic_name: topic_name - } - ]} = response - {error_code, topic_name} - end - + # specific to this server version because we want to test that the api_versions list is exact @tag :api_version test "can retrieve api versions" do From b33aa6061f34fdfd10494dd4214078a1b706a5ad Mon Sep 17 00:00:00 2001 From: Jehan Bruggeman Date: Mon, 22 Oct 2018 10:43:28 +0200 Subject: [PATCH 3/4] Check api_version for create_topics to stay compatible 0.10.0.0 --- lib/kafka_ex/protocol/create_topics.ex | 18 ++++++++++++++---- lib/kafka_ex/protocol/metadata.ex | 5 +++++ lib/kafka_ex/server_0_p_10_and_later.ex | 11 +++++++++-- 3 files changed, 28 insertions(+), 6 deletions(-) diff --git a/lib/kafka_ex/protocol/create_topics.ex b/lib/kafka_ex/protocol/create_topics.ex index 44531a54..4a37ce51 100644 --- a/lib/kafka_ex/protocol/create_topics.ex +++ b/lib/kafka_ex/protocol/create_topics.ex @@ -1,6 +1,8 @@ defmodule KafkaEx.Protocol.CreateTopics do alias KafkaEx.Protocol + @supported_versions_range {0, 0} + @default_api_version 0 @moduledoc """ Implementation of the Kafka CreateTopics request and response APIs @@ -63,8 +65,14 @@ defmodule KafkaEx.Protocol.CreateTopics do @type t :: %Response{topic_errors: [TopicError]} end - @spec create_request(integer, binary, Request.t) :: binary - def create_request(correlation_id, client_id, create_topics_request) do + def api_version(api_versions) do + KafkaEx.ApiVersions.find_api_version(api_versions, :create_topics, @supported_versions_range) + end + + @spec create_request(integer, binary, Request.t, integer) :: binary + def create_request(correlation_id, client_id, create_topics_request, api_version) + + def create_request(correlation_id, client_id, create_topics_request, 0) do Protocol.create_request(:create_topics, correlation_id, client_id) <> encode_topic_requests(create_topics_request.create_topic_requests) <> << create_topics_request.timeout :: 32-signed >> @@ -129,8 +137,10 @@ defmodule KafkaEx.Protocol.CreateTopics do end end - @spec parse_response(binary) :: [] | Response.t - def parse_response(<< _correlation_id :: 32-signed, topic_errors_count :: 32-signed, topic_errors :: binary >>) do + @spec parse_response(binary, integer) :: [] | Response.t + def parse_response(message, api_version) + + def parse_response(<< _correlation_id :: 32-signed, topic_errors_count :: 32-signed, topic_errors :: binary >>, 0) do %Response{topic_errors: parse_topic_errors(topic_errors_count, topic_errors)} end diff --git a/lib/kafka_ex/protocol/metadata.ex b/lib/kafka_ex/protocol/metadata.ex index ca08ae8c..d790d151 100644 --- a/lib/kafka_ex/protocol/metadata.ex +++ b/lib/kafka_ex/protocol/metadata.ex @@ -101,6 +101,11 @@ defmodule KafkaEx.Protocol.Metadata do def api_version(api_versions) do case KafkaEx.ApiVersions.find_api_version(api_versions, :metadata, @supported_versions_range) do {:ok, version} -> version + # those three should never happen since :metadata is part of the protocol since the beginning. + # they are left here as this will server as reference implementation + # :unknown_message_for_server -> + # :unknown_message_for_client -> + # :no_version_supported -> _ -> @default_api_version end end diff --git a/lib/kafka_ex/server_0_p_10_and_later.ex b/lib/kafka_ex/server_0_p_10_and_later.ex index 8a958c4e..dc8ce6c8 100644 --- a/lib/kafka_ex/server_0_p_10_and_later.ex +++ b/lib/kafka_ex/server_0_p_10_and_later.ex @@ -120,12 +120,19 @@ defmodule KafkaEx.Server0P10AndLater do end def kafka_create_topics(requests, network_timeout, state) do + api_version = case CreateTopics.api_version(state.api_versions) do + {:ok, api_version} -> api_version + _ -> raise "CreateTopic is not supported in this version of Kafka, or the versions supported by the client do not match the ones supported by the server." + end + + IO.puts "API version for create_topics: #{api_version}" + create_topics_request = %CreateTopics.Request{ create_topic_requests: requests, timeout: network_timeout } - mainRequest = CreateTopics.create_request(state.correlation_id, @client_id, create_topics_request) + mainRequest = CreateTopics.create_request(state.correlation_id, @client_id, create_topics_request, api_version) broker = state.brokers |> Enum.find(&(&1.is_controller)) @@ -138,7 +145,7 @@ defmodule KafkaEx.Server0P10AndLater do |> NetworkClient.send_sync_request(mainRequest, config_sync_timeout()) |> case do {:error, reason} -> {:error, reason} - response -> CreateTopics.parse_response(response) + response -> CreateTopics.parse_response(response, api_version) end {response, %{state | correlation_id: state.correlation_id + 1}} end From 5b6dd4ca0a0121dad9a56f7f896629a7340d582d Mon Sep 17 00:00:00 2001 From: Jehan Bruggeman Date: Thu, 8 Nov 2018 00:20:18 +0100 Subject: [PATCH 4/4] Make api_versions compatible with Elixir 1.1 --- lib/kafka_ex/api_versions.ex | 6 ++++-- lib/kafka_ex/server_0_p_10_and_later.ex | 2 -- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/kafka_ex/api_versions.ex b/lib/kafka_ex/api_versions.ex index 03338297..f29cc3ec 100644 --- a/lib/kafka_ex/api_versions.ex +++ b/lib/kafka_ex/api_versions.ex @@ -2,10 +2,12 @@ defmodule KafkaEx.ApiVersions do def api_versions_map(api_versions) do api_versions - |> Enum.map(fn version -> {version.api_key, version} end) - |> Map.new + |> Enum.reduce(%{}, fn version, version_map -> + version_map |> Map.put(version.api_key, version) + end) end + def find_api_version(api_versions_map, message_type, {min_implemented_version, max_implemented_version}) do if api_versions_map == [:unsupported] do {:ok, min_implemented_version} diff --git a/lib/kafka_ex/server_0_p_10_and_later.ex b/lib/kafka_ex/server_0_p_10_and_later.ex index dc8ce6c8..8d6d17d4 100644 --- a/lib/kafka_ex/server_0_p_10_and_later.ex +++ b/lib/kafka_ex/server_0_p_10_and_later.ex @@ -125,8 +125,6 @@ defmodule KafkaEx.Server0P10AndLater do _ -> raise "CreateTopic is not supported in this version of Kafka, or the versions supported by the client do not match the ones supported by the server." end - IO.puts "API version for create_topics: #{api_version}" - create_topics_request = %CreateTopics.Request{ create_topic_requests: requests, timeout: network_timeout