Skip to content

Commit

Permalink
Check api_version for create_topics to stay compatible 0.10.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
jbruggem committed Oct 22, 2018
1 parent a76557c commit 081de12
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 6 deletions.
18 changes: 14 additions & 4 deletions lib/kafka_ex/protocol/create_topics.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@

defmodule KafkaEx.Protocol.CreateTopics do
alias KafkaEx.Protocol
@supported_versions_range {0, 0}
@default_api_version 0

@moduledoc """
Implementation of the Kafka CreateTopics request and response APIs
Expand Down Expand Up @@ -63,8 +65,14 @@ defmodule KafkaEx.Protocol.CreateTopics do
@type t :: %Response{topic_errors: [TopicError]}
end

@spec create_request(integer, binary, Request.t) :: binary
def create_request(correlation_id, client_id, create_topics_request) do
def api_version(api_versions) do
KafkaEx.ApiVersions.find_api_version(api_versions, :create_topics, @supported_versions_range)
end

@spec create_request(integer, binary, Request.t, integer) :: binary
def create_request(correlation_id, client_id, create_topics_request, api_version)

def create_request(correlation_id, client_id, create_topics_request, 0) do
Protocol.create_request(:create_topics, correlation_id, client_id) <>
encode_topic_requests(create_topics_request.create_topic_requests) <>
<< create_topics_request.timeout :: 32-signed >>
Expand Down Expand Up @@ -129,8 +137,10 @@ defmodule KafkaEx.Protocol.CreateTopics do
end
end

@spec parse_response(binary) :: [] | Response.t
def parse_response(<< _correlation_id :: 32-signed, topic_errors_count :: 32-signed, topic_errors :: binary >>) do
@spec parse_response(binary, integer) :: [] | Response.t
def parse_response(message, api_version)

def parse_response(<< _correlation_id :: 32-signed, topic_errors_count :: 32-signed, topic_errors :: binary >>, 0) do
%Response{topic_errors: parse_topic_errors(topic_errors_count, topic_errors)}
end

Expand Down
5 changes: 5 additions & 0 deletions lib/kafka_ex/protocol/metadata.ex
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ defmodule KafkaEx.Protocol.Metadata do
def api_version(api_versions) do
case KafkaEx.ApiVersions.find_api_version(api_versions, :metadata, @supported_versions_range) do
{:ok, version} -> version
# those three should never happen since :metadata is part of the protocol since the beginning.
# they are left here as this will server as reference implementation
# :unknown_message_for_server ->
# :unknown_message_for_client ->
# :no_version_supported ->
_ -> @default_api_version
end
end
Expand Down
11 changes: 9 additions & 2 deletions lib/kafka_ex/server_0_p_10_and_later.ex
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,19 @@ defmodule KafkaEx.Server0P10AndLater do
end

def kafka_create_topics(requests, network_timeout, state) do
api_version = case CreateTopics.api_version(state.api_versions) do
{:ok, api_version} -> api_version
_ -> raise "CreateTopic is not supported in this version of Kafka, or the versions supported by the client do not match the ones supported by the server."
end

IO.puts "API version for create_topics: #{api_version}"

create_topics_request = %CreateTopics.Request{
create_topic_requests: requests,
timeout: network_timeout
}

mainRequest = CreateTopics.create_request(state.correlation_id, @client_id, create_topics_request)
mainRequest = CreateTopics.create_request(state.correlation_id, @client_id, create_topics_request, api_version)

broker = state.brokers |> Enum.find(&(&1.is_controller))

Expand All @@ -138,7 +145,7 @@ defmodule KafkaEx.Server0P10AndLater do
|> NetworkClient.send_sync_request(mainRequest, config_sync_timeout())
|> case do
{:error, reason} -> {:error, reason}
response -> CreateTopics.parse_response(response)
response -> CreateTopics.parse_response(response, api_version)
end
{response, %{state | correlation_id: state.correlation_id + 1}}
end
Expand Down

0 comments on commit 081de12

Please sign in to comment.