Skip to content

Commit

Permalink
updating to brod 3.16 and new kafka resp schemas (#100)
Browse files Browse the repository at this point in the history
* updating to brod 3.16 and new kafka resp schemas

* bumping minimum supported elixir version
  • Loading branch information
jeffgrunewald authored Nov 21, 2021
1 parent 8a461b8 commit 53eb831
Show file tree
Hide file tree
Showing 11 changed files with 70 additions and 62 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ jobs:
- uses: actions/checkout@v2
- uses: erlef/setup-beam@v1
with:
otp-version: 21.3
elixir-version: 1.8.2
otp-version: 22.3
elixir-version: 1.10.4
- name: Get depedencies
run: |
mix local.rebar --force
Expand All @@ -28,8 +28,8 @@ jobs:
- uses: actions/checkout@v2
- uses: erlef/setup-beam@v1
with:
otp-version: 21.3
elixir-version: 1.8.2
otp-version: 22.3
elixir-version: 1.10.4
- name: Get dependencies
run: |
mix local.rebar --force
Expand All @@ -45,8 +45,8 @@ jobs:
- uses: actions/checkout@v2
- uses: erlef/setup-beam@v1
with:
otp-version: 21.3
elixir-version: 1.8.2
otp-version: 22.3
elixir-version: 1.10.4
- name: Retrieve cached PLT
uses: actions/cache@v1
with:
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/master.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ jobs:
- uses: actions/checkout@v2
- uses: erlef/setup-beam@v1
with:
otp-version: 21.3
elixir-version: 1.8.2
otp-version: 22.3
elixir-version: 1.10.4
- name: Get dependencies
run: |
mix local.rebar --force
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ jobs:
- uses: actions/checkout@v2
- uses: erlef/setup-beam@v1
with:
otp-version: 21.3
elixir-version: 1.8.2
otp-version: 22.3
elixir-version: 1.10.4
- name: Build
run: |
mix local.rebar --force
Expand Down
2 changes: 2 additions & 0 deletions .tool-versions
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
erlang 22.3.4.21
elixir 1.10.4-otp-22
2 changes: 1 addition & 1 deletion lib/elsa/partitioner/random.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ defmodule Elsa.Partitioner.Random do
@behaviour Elsa.Partitioner

def partition(count, _key) do
:crypto.rand_uniform(0, count)
:rand.uniform(count) - 1
end
end
27 changes: 15 additions & 12 deletions lib/elsa/topic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ defmodule Elsa.Topic do
{:ok, metadata} = :brod.get_metadata(reformat_endpoints(endpoints), :all)

topics =
metadata.topic_metadata
metadata.topics
|> Enum.map(fn topic_metadata ->
{topic_metadata.topic, Enum.count(topic_metadata.partition_metadata)}
{topic_metadata.name, Enum.count(topic_metadata.partitions)}
end)

{:ok, topics}
Expand Down Expand Up @@ -48,14 +48,14 @@ defmodule Elsa.Topic do
config =
opts
|> Keyword.get(:config, [])
|> Enum.map(fn {key, val} -> %{config_name: to_string(key), config_value: val} end)
|> Enum.map(fn {key, val} -> %{name: to_string(key), value: val} end)

create_topic_args = %{
topic: topic,
name: topic,
num_partitions: Keyword.get(opts, :partitions, 1),
replication_factor: Keyword.get(opts, :replicas, 1),
replica_assignment: [],
config_entries: config
assignments: [],
configs: config
}

version = Elsa.Util.get_api_version(connection, :create_topics)
Expand Down Expand Up @@ -88,15 +88,18 @@ defmodule Elsa.Topic do
defp check_response(response) do
message = kpro_rsp(response, :msg)

error_key =
case Map.has_key?(message, :topic_errors) do
true -> :topic_errors
false -> :topic_error_codes
response_key =
case Map.has_key?(message, :topics) do
true -> :topics
false -> :responses
end

case Enum.find(message[error_key], fn error -> error.error_code != :no_error end) do
case Enum.find(message[response_key], fn response -> response.error_code != :no_error end) do
nil -> :ok
error -> {:error, {error.error_code, error[:error_message]}}
response -> {:error, {response.error_code, resp_error_msg(response, response_key)}}
end
end

defp resp_error_msg(response, :topics), do: response.error_message
defp resp_error_msg(_response, :responses), do: :delete_topic_error
end
24 changes: 14 additions & 10 deletions lib/elsa/util.ex
Original file line number Diff line number Diff line change
Expand Up @@ -109,21 +109,25 @@ defmodule Elsa.Util do
def partition_count(endpoints, topic) when is_list(endpoints) do
{:ok, metadata} = :brod.get_metadata(reformat_endpoints(endpoints), [topic])

metadata.topic_metadata
|> Enum.map(fn topic_metadata ->
Enum.count(topic_metadata.partition_metadata)
end)
|> hd()
count_partitions(metadata)
end

def partition_count(connection, topic) when is_atom(connection) or is_pid(connection) do
{:ok, metadata} = :brod_client.get_metadata(connection, topic)

metadata.topic_metadata
|> Enum.map(fn topic_metadata ->
Enum.count(topic_metadata.partition_metadata)
end)
|> hd()
count_partitions(metadata)
end

# Handle brod < 3.16
defp count_partitions(%{topic_metadata: topic_metadatas}) do
[count | _] = for %{partition_metadata: metadata} <- topic_metadatas, do: Enum.count(metadata)
count
end

# Handle brod 3.16+
defp count_partitions(%{topics: topics}) do
[count | _] = for %{partitions: partitions} <- topics, do: Enum.count(partitions)
count
end

defp connect(endpoints, :controller), do: :kpro.connect_controller(endpoints, [])
Expand Down
6 changes: 3 additions & 3 deletions mix.exs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
defmodule Elsa.MixProject do
use Mix.Project

@version "1.0.0-rc.2"
@version "1.0.0-rc.3"
@github "https://github.com/bbalser/elsa"

def project do
[
app: :elsa,
name: "Elsa",
version: @version,
elixir: "~> 1.8",
elixir: "~> 1.10",
start_permanent: Mix.env() == :prod,
description: description(),
package: package(),
Expand All @@ -30,7 +30,7 @@ defmodule Elsa.MixProject do

defp deps do
[
{:brod, "~> 3.14.0"},
{:brod, "~> 3.16"},
{:patiently, "~> 0.2", only: [:dev, :test, :integration]},
{:divo, "~> 1.3", only: [:dev, :test, :integration], override: true},
{:divo_kafka, "~> 0.1.7", only: [:dev, :test, :integration]},
Expand Down
17 changes: 8 additions & 9 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
%{
"brod": {:hex, :brod, "3.14.0", "f959408e88acd0feca22f6a43ca26e70201c6e5c57dc74b87f08ef65a5e7fe18", [:rebar3], [{:kafka_protocol, "2.3.6", [hex: :kafka_protocol, repo: "hexpm", optional: false]}, {:supervisor3, "1.1.11", [hex: :supervisor3, repo: "hexpm", optional: false]}], "hexpm", "a0153437835b810d93e79c7000f55b0c4bd62c281a3224a2615f6aa72b52d484"},
"brod": {:hex, :brod, "3.16.1", "1c7b03f99c7cc310de5511cadad9879ab0cc5f1a2612211e68c26dad517d31b0", [:rebar3], [{:kafka_protocol, "4.0.1", [hex: :kafka_protocol, repo: "hexpm", optional: false]}, {:snappyer, "1.2.8", [hex: :snappyer, repo: "hexpm", optional: false]}, {:supervisor3, "1.1.11", [hex: :supervisor3, repo: "hexpm", optional: false]}], "hexpm", "8297c47cd1ff0657955027fa1beb62edfaab1cc5e09b714cc29bd7f1c8d40083"},
"checkov": {:hex, :checkov, "1.0.0", "cecf1be22ea506b2fbd6741d7c00f4876bb2be76ea1b95493c25b51028f24410", [:mix], [], "hexpm", "9fa85e6fdf1bcec2dd0d996d0c1e5a83e336dafb97c931232af1cb1e7ef4420a"},
"crc32cer": {:hex, :crc32cer, "0.1.4", "a656dff19474d1a1fc5bb0081610ab6b0695b23affc47fa90abeb079a8ef9752", [:rebar3], [], "hexpm", "964735a5422cf65bbc5354860a560fff546f0026f83f8860525bd58ab5bade5d"},
"crc32cer": {:hex, :crc32cer, "0.1.8", "c6c2275c5fb60a95f4935d414f30b50ee9cfed494081c9b36ebb02edfc2f48db", [:rebar3], [], "hexpm", "251499085482920deb6c9b7aadabf9fb4c432f96add97ab42aee4501e5b6f591"},
"dialyxir": {:hex, :dialyxir, "1.1.0", "c5aab0d6e71e5522e77beff7ba9e08f8e02bad90dfbeffae60eaf0cb47e29488", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "07ea8e49c45f15264ebe6d5b93799d4dd56a44036cf42d0ad9c960bc266c0b9a"},
"divo": {:hex, :divo, "1.3.1", "a7cdb05d4525a9703e11dbcf40567d426b546f8e816b9c9465232c10bc6a257b", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:patiently, "~> 0.2", [hex: :patiently, repo: "hexpm", optional: false]}], "hexpm", "b3edef7baf068bbf864c01c8f3ed06fcf81c08e8d4adcc1cfc4b7d6eb69c6a18"},
"divo_kafka": {:hex, :divo_kafka, "0.1.7", "e8253bb735e001c41f35645ac0429740b6b6350ceb0ae268609f769f0b3883c5", [:mix], [{:divo, "~> 1.1", [hex: :divo, repo: "hexpm", optional: false]}], "hexpm", "25f9b89a1f59f6801b8b1e044eaa8cdce4e0756b4a8512458ea31f9c99ec338f"},
"earmark": {:hex, :earmark, "1.4.5", "62ffd3bd7722fb7a7b1ecd2419ea0b458c356e7168c1f5d65caf09b4fbdd13c8", [:mix], [], "hexpm", "b7d0e6263d83dc27141a523467799a685965bf8b13b6743413f19a7079843f4f"},
"earmark_parser": {:hex, :earmark_parser, "1.4.15", "b29e8e729f4aa4a00436580dcc2c9c5c51890613457c193cc8525c388ccb2f06", [:mix], [], "hexpm", "044523d6438ea19c1b8ec877ec221b008661d3c27e3b848f4c879f500421ca5c"},
"earmark_parser": {:hex, :earmark_parser, "1.4.17", "6f3c7e94170377ba45241d394389e800fb15adc5de51d0a3cd52ae766aafd63f", [:mix], [], "hexpm", "f93ac89c9feca61c165b264b5837bf82344d13bebc634cd575cb711e2e342023"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_doc": {:hex, :ex_doc, "0.25.3", "3edf6a0d70a39d2eafde030b8895501b1c93692effcbd21347296c18e47618ce", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "9ebebc2169ec732a38e9e779fd0418c9189b3ca93f4a676c961be6c1527913f5"},
"ex_doc": {:hex, :ex_doc, "0.25.5", "ac3c5425a80b4b7c4dfecdf51fa9c23a44877124dd8ca34ee45ff608b1c6deb9", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "688cfa538cdc146bc4291607764a7f1fcfa4cce8009ecd62de03b27197528350"},
"jason": {:hex, :jason, "1.2.2", "ba43e3f2709fd1aa1dce90aaabfd039d000469c05c56f0b8e31978e03fa39052", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "18a228f5f0058ee183f29f9eae0805c6e59d61c3b006760668d8d18ff0d12179"},
"kafka_protocol": {:hex, :kafka_protocol, "2.3.6", "df076a8ef49fffae3535c805cb00f3a057ce1895e63398bf8a10569eeeac02f8", [:rebar, :rebar3], [{:crc32cer, "0.1.4", [hex: :crc32cer, repo: "hexpm", optional: false]}, {:snappyer, "1.2.5", [hex: :snappyer, repo: "hexpm", optional: false]}], "hexpm", "7cb061fe46babc7fd269d2c0e5b4dba5d1efc4f7dacce85b17a9cca973106b23"},
"kafka_protocol": {:hex, :kafka_protocol, "4.0.1", "fc696880c73483c8b032c4bb60f2873046035c7824e1edcb924cfce643cf23dd", [:rebar3], [{:crc32cer, "0.1.8", [hex: :crc32cer, repo: "hexpm", optional: false]}], "hexpm", "687bfd9989998ec8fbbc3ed50d1239a6c07a7dc15b52914ad477413b89ecb621"},
"makeup": {:hex, :makeup, "1.0.5", "d5a830bc42c9800ce07dd97fa94669dfb93d3bf5fcf6ea7a0c67b2e0e4a7f26c", [:mix], [{:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cfa158c02d3f5c0c665d0af11512fed3fba0144cf1aadee0f2ce17747fba2ca9"},
"makeup_elixir": {:hex, :makeup_elixir, "0.15.1", "b5888c880d17d1cc3e598f05cdb5b5a91b7b17ac4eaf5f297cb697663a1094dd", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "db68c173234b07ab2a07f645a5acdc117b9f99d69ebf521821d89690ae6c6ec8"},
"makeup_elixir": {:hex, :makeup_elixir, "0.15.2", "dc72dfe17eb240552857465cc00cce390960d9a0c055c4ccd38b70629227e97c", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "fd23ae48d09b32eff49d4ced2b43c9f086d402ee4fd4fcb2d7fad97fa8823e75"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"},
"meck": {:hex, :meck, "0.9.2", "85ccbab053f1db86c7ca240e9fc718170ee5bda03810a6292b5306bf31bae5f5", [:rebar3], [], "hexpm", "81344f561357dc40a8344afa53767c32669153355b626ea9fcbc8da6b3045826"},
"nimble_parsec": {:hex, :nimble_parsec, "1.1.0", "3a6fca1550363552e54c216debb6a9e95bd8d32348938e13de5eda962c0d7f89", [:mix], [], "hexpm", "08eb32d66b706e913ff748f11694b17981c0b04a33ef470e33e11b3d3ac8f54b"},
"nimble_parsec": {:hex, :nimble_parsec, "1.2.0", "b44d75e2a6542dcb6acf5d71c32c74ca88960421b6874777f79153bbbbd7dccc", [:mix], [], "hexpm", "52b2871a7515a5ac49b00f214e4165a40724cf99798d8e4a65e4fd64ebd002c1"},
"patiently": {:hex, :patiently, "0.2.0", "67eb139591e10c4b363ae0198e832552f191c58894731efd3bf124ec4722267a", [:mix], [], "hexpm", "c08cc5edc27def565647a9b55a0bea8025a5f81a4472e57692f28f2292c44c94"},
"placebo": {:hex, :placebo, "2.0.0", "c0e773dec77e941bcbcc14d10b759f2d66775aff9b75051f3e41939b64300e81", [:mix], [{:meck, "~> 0.9", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm", "e0872cec8848d7e59ba96396f45ee1ad34662c689c86ba6190694d38b4289844"},
"snappyer": {:hex, :snappyer, "1.2.5", "9154b9ac84031f0a799f72a4aa87df23ab2193b5631475fa2cdc304382d2df77", [:rebar3], [], "hexpm", "d2adc26a81efd5f138397a38a0bb545188d302972721f8be0de37fa452c8aed7"},
"snappyer": {:hex, :snappyer, "1.2.8", "201ce9067a33c71a6a5087c0c3a49a010b17112d461e6df696c722dcb6d0934a", [:rebar3], [], "hexpm", "35518e79a28548b56d8fd6aee2f565f12f51c2d3d053f9cfa817c83be88c4f3d"},
"supervisor3": {:hex, :supervisor3, "1.1.11", "d81cdec31d102fde407423e1d05b569572850deebed86b951d5233c387cba80b", [:rebar3], [], "hexpm", "e6c2dedbcabcba24995a218aca12db5e208b80d3252692b22ef0f1a266104b50"},
}
4 changes: 2 additions & 2 deletions test/integration/elsa/consumer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ defmodule Elsa.ConsumerTest do

send_messages(topic, ["message1", "message2"])

assert_receive {:message, %{topic: topic, partition: 0, offset: _, key: "", value: "message1"}}, 5_000
assert_receive {:message, %{topic: topic, partition: 1, offset: _, key: "", value: "message2"}}, 5_000
assert_receive {:message, %{topic: ^topic, partition: 0, offset: _, key: "", value: "message1"}}, 5_000
assert_receive {:message, %{topic: ^topic, partition: 1, offset: _, key: "", value: "message2"}}, 5_000
end

test "Elsa.Consumer will hand messages to the handler without state" do
Expand Down
30 changes: 15 additions & 15 deletions test/unit/elsa/topic_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ defmodule Elsa.TopicTest do
allow :kpro_req_lib.create_topics(any(), any(), any()), return: :topic_request

message = %{
topic_errors: [
topics: [
%{
error_code: :topic_already_exists,
error_message: "Topic 'elsa-topic' already exists.",
topic: "elsa-topic"
name: "elsa-topic"
}
]
}
Expand All @@ -54,10 +54,10 @@ defmodule Elsa.TopicTest do
allow :kpro_req_lib.delete_topics(any(), any(), any()), return: :topic_request

message = %{
topic_error_codes: [
responses: [
%{
error_code: :topic_doesnt_exist,
topic: "elsa-topic"
name: "elsa-topic"
}
]
}
Expand All @@ -70,21 +70,21 @@ defmodule Elsa.TopicTest do

internal_result = function.(:connection)

assert {:error, {:topic_doesnt_exist, nil}} == internal_result
assert {:error, {:topic_doesnt_exist, :delete_topic_error}} == internal_result
end
end

describe "list_topics/1" do
test "extracts topics and partitions as a list of tuples" do
metadata = %{
topic_metadata: [
topics: [
%{
partition_metadata: [%{partition: 0}],
topic: "elsa-other-topic"
partitions: [%{partition: 0}],
name: "elsa-other-topic"
},
%{
partition_metadata: [%{partition: 0}, %{partition: 1}],
topic: "elsa-topic"
partitions: [%{partition: 0}, %{partition: 1}],
name: "elsa-topic"
}
]
}
Expand All @@ -102,14 +102,14 @@ defmodule Elsa.TopicTest do
describe "exists?/2" do
test "returns a boolean identifying the presence of a given topic" do
metadata = %{
topic_metadata: [
topics: [
%{
partition_metadata: [%{partition: 0}],
topic: "elsa-other-topic"
partitions: [%{partition: 0}],
name: "elsa-other-topic"
},
%{
partition_metadata: [%{partition: 0}, %{partition: 1}],
topic: "elsa-topic"
partitions: [%{partition: 0}, %{partition: 1}],
name: "elsa-topic"
}
]
}
Expand Down

0 comments on commit 53eb831

Please sign in to comment.