diff --git a/lib/kafka_ex.ex b/lib/kafka_ex.ex index cd8bd7b6..0ee0631c 100644 --- a/lib/kafka_ex.ex +++ b/lib/kafka_ex.ex @@ -133,7 +133,7 @@ defmodule KafkaEx do def describe_group(consumer_group_name, opts \\ []) do worker_name = Keyword.get(opts, :worker_name, Config.default_worker()) - case Server.call(worker_name, {:describe_groups, [consumer_group_name]}) do + case Server.call(worker_name, {:describe_groups, [consumer_group_name], opts}) do {:ok, [group]} -> {:ok, group} {:error, error} -> {:error, error} end @@ -227,8 +227,9 @@ defmodule KafkaEx do """ @spec latest_offset(binary, integer, atom | pid) :: [OffsetResponse.t()] | :topic_not_found - def latest_offset(topic, partition, name \\ Config.default_worker()), - do: offset(topic, partition, :latest, name) + def latest_offset(topic, partition, name \\ Config.default_worker()) do + offset(topic, partition, :latest, name) + end @doc """ Get the offset of the earliest message still persistent in Kafka @@ -240,10 +241,10 @@ defmodule KafkaEx do [%KafkaEx.Protocol.Offset.Response{partition_offsets: [%{error_code: 0, offset: [0], partition: 0}], topic: "foo"}] ``` """ - @spec earliest_offset(binary, integer, atom | pid) :: - [OffsetResponse.t()] | :topic_not_found - def earliest_offset(topic, partition, name \\ Config.default_worker()), - do: offset(topic, partition, :earliest, name) + @spec earliest_offset(binary, integer, atom | pid) :: [OffsetResponse.t()] | :topic_not_found + def earliest_offset(topic, partition, name \\ Config.default_worker()) do + offset(topic, partition, :earliest, name) + end @doc """ Get the offset of the message sent at the specified date/time @@ -255,14 +256,14 @@ defmodule KafkaEx do [%KafkaEx.Protocol.Offset.Response{partition_offsets: [%{error_code: 0, offset: [256], partition: 0}], topic: "foo"}] ``` """ - @spec offset( - binary, - number, - :calendar.datetime() | :earliest | :latest, - atom | pid - ) :: [OffsetResponse.t()] | :topic_not_found + @type valid_timestamp :: :earliest | :latest | :calendar.datetime() + @spec offset(binary, number, valid_timestamp, atom | pid) :: [OffsetResponse.t()] | :topic_not_found def offset(topic, partition, time, name \\ Config.default_worker()) do - Server.call(name, {:offset, topic, partition, time}) + case Server.call(name, {:offset, topic, partition, time}) do + {:ok, response} -> parse_offset_value(response) + {:error, :topic_not_found} -> :topic_not_found + result -> result + end end @wait_time 10 @@ -812,4 +813,23 @@ defmodule KafkaEx do end end end + + # ------------------------------------------------------------------- + # Backwards compatibility + # ------------------------------------------------------------------- + defp parse_offset_value([%KafkaEx.New.Structs.Offset{} | _] = offsets) do + Enum.map(offsets, fn offset -> + %OffsetResponse{ + topic: offset.topic, + partition_offsets: + Enum.map(offset.partition_offsets, fn value -> + %{ + partition: value.partition, + error_code: value.error_code, + offset: [value.offset] + } + end) + } + end) + end end diff --git a/lib/kafka_ex/new/protocols/kayrock/list_offsets/v0_response_impl.ex b/lib/kafka_ex/new/protocols/kayrock/list_offsets/v0_response_impl.ex index 17f852c9..62275e83 100644 --- a/lib/kafka_ex/new/protocols/kayrock/list_offsets/v0_response_impl.ex +++ b/lib/kafka_ex/new/protocols/kayrock/list_offsets/v0_response_impl.ex @@ -13,12 +13,12 @@ defimpl KafkaEx.New.Protocols.Kayrock.ListOffsets.Response, for: Kayrock.ListOff end defp build_offset(topic, %{partition: partition, error_code: 0, offsets: []}) do - data = %{partition: partition, offset: 0} + data = %{partition: partition, offset: 0, error_code: :no_error} {:ok, KafkaEx.New.Structs.Offset.from_list_offset(topic, [data])} end defp build_offset(topic, %{partition: partition, error_code: 0, offsets: [offset | _]}) do - data = %{partition: partition, offset: offset} + data = %{partition: partition, offset: offset, error_code: :no_error} {:ok, KafkaEx.New.Structs.Offset.from_list_offset(topic, [data])} end diff --git a/lib/kafka_ex/new/protocols/kayrock/list_offsets/v1_response_impl.ex b/lib/kafka_ex/new/protocols/kayrock/list_offsets/v1_response_impl.ex index 8560b945..0787d460 100644 --- a/lib/kafka_ex/new/protocols/kayrock/list_offsets/v1_response_impl.ex +++ b/lib/kafka_ex/new/protocols/kayrock/list_offsets/v1_response_impl.ex @@ -13,7 +13,7 @@ defimpl KafkaEx.New.Protocols.Kayrock.ListOffsets.Response, for: Kayrock.ListOff end defp build_offset(topic, %{error_code: 0, partition: p, offset: o}) do - data = %{partition: p, offset: o} + data = %{partition: p, offset: o, error_code: :no_error} {:ok, KafkaEx.New.Structs.Offset.from_list_offset(topic, [data])} end diff --git a/lib/kafka_ex/new/protocols/kayrock/list_offsets/v2_response_impl.ex b/lib/kafka_ex/new/protocols/kayrock/list_offsets/v2_response_impl.ex index 5ea0e5c7..d8a73ee5 100644 --- a/lib/kafka_ex/new/protocols/kayrock/list_offsets/v2_response_impl.ex +++ b/lib/kafka_ex/new/protocols/kayrock/list_offsets/v2_response_impl.ex @@ -13,7 +13,7 @@ defimpl KafkaEx.New.Protocols.Kayrock.ListOffsets.Response, for: Kayrock.ListOff end defp build_offset(topic, %{error_code: 0, partition: p, offset: o, timestamp: t}) do - data = %{partition: p, offset: o, timestamp: t} + data = %{partition: p, offset: o, timestamp: t, error_code: :no_error} {:ok, KafkaEx.New.Structs.Offset.from_list_offset(topic, [data])} end diff --git a/lib/kafka_ex/new/structs/offset/partition_offset.ex b/lib/kafka_ex/new/structs/offset/partition_offset.ex index b1bd5a33..391b8293 100644 --- a/lib/kafka_ex/new/structs/offset/partition_offset.ex +++ b/lib/kafka_ex/new/structs/offset/partition_offset.ex @@ -2,19 +2,22 @@ defmodule KafkaEx.New.Structs.Offset.PartitionOffset do @moduledoc """ This module represents Offset value for a specific partition """ - defstruct [:partition, :offset, :timestamp] + defstruct [:partition, :offset, :error_code, :timestamp] @type partition :: KafkaEx.Types.partition() @type offset :: KafkaEx.Types.offset() @type timestamp :: KafkaEx.Types.timestamp() + @type error_code :: KafkaEx.Types.error_code() | atom @type partition_response :: %{ required(:partition) => partition, + required(:error_code) => error_code, required(:offset) => offset, optional(:timestamp) => timestamp } @type t :: %__MODULE__{ partition: partition, + error_code: error_code, offset: offset, timestamp: timestamp } @@ -24,12 +27,14 @@ defmodule KafkaEx.New.Structs.Offset.PartitionOffset do For backward compatibility with kafka_ex, we will replace this nil values with -1 """ @spec build(partition_response) :: __MODULE__.t() - def build(%{partition: p, offset: o, timestamp: t}), do: do_build(p, o, t) - def build(%{partition: p, offset: o}), do: do_build(p, o, -1) + def build(%{partition: p, offset: o, error_code: e, timestamp: t}), do: do_build(p, o, e, t) + def build(%{partition: p, offset: o, error_code: e}), do: do_build(p, o, e, -1) + def build(%{partition: p, offset: o}), do: do_build(p, o, :no_error, -1) - defp do_build(partition, offset, timestamp) do + defp do_build(partition, offset, error_code, timestamp) do %__MODULE__{ partition: partition, + error_code: error_code, offset: offset, timestamp: timestamp } diff --git a/test/integration/kayrock/compatibility_test.exs b/test/integration/kayrock/compatibility_test.exs index aa8fcd79..3d9bc28c 100644 --- a/test/integration/kayrock/compatibility_test.exs +++ b/test/integration/kayrock/compatibility_test.exs @@ -42,11 +42,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do {:ok, %{consumer_group: consumer_group, topic: topic}} end - test "with new client - returns group metadata", %{ - client: client, - consumer_group: consumer_group, - topic: topic - } do + test "with new client - returns group metadata", %{client: client, consumer_group: consumer_group, topic: topic} do join_to_group(client, topic, consumer_group) {:ok, group_metadata} = KafkaExAPI.describe_group(client, consumer_group) @@ -57,11 +53,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do assert length(group_metadata.members) == 1 end - test "with old client - returns group metadata", %{ - client: client, - consumer_group: consumer_group, - topic: topic - } do + test "with old client - returns group metadata", %{client: client, consumer_group: consumer_group, topic: topic} do join_to_group(client, topic, consumer_group) {:ok, group_metadata} = KafkaEx.describe_group(consumer_group, worker_name: client) @@ -109,21 +101,11 @@ defmodule KafkaEx.KayrockCompatibilityTest do end test "produce/4 without an acq required returns :ok", %{client: client} do - assert KafkaEx.produce("food", 0, "hey", - worker_name: client, - required_acks: 0 - ) == :ok + assert KafkaEx.produce("food", 0, "hey", worker_name: client, required_acks: 0) == :ok end test "produce/4 with ack required returns an ack", %{client: client} do - {:ok, offset} = - KafkaEx.produce( - "food", - 0, - "hey", - worker_name: client, - required_acks: 1 - ) + {:ok, offset} = KafkaEx.produce("food", 0, "hey", worker_name: client, required_acks: 1) assert is_integer(offset) refute offset == nil @@ -306,9 +288,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do assert offset == 0 end - test "latest_offset retrieves offset of 0 for non-existing topic", %{ - client: client - } do + test "latest_offset retrieves offset of 0 for non-existing topic", %{client: client} do random_string = KafkaEx.TestHelpers.generate_random_string() {:ok, produce_offset} = diff --git a/test/kafka_ex/new/protocols/kayrock/list_offsets/response_test.exs b/test/kafka_ex/new/protocols/kayrock/list_offsets/response_test.exs index c8ed9d46..6d2c6bf1 100644 --- a/test/kafka_ex/new/protocols/kayrock/list_offsets/response_test.exs +++ b/test/kafka_ex/new/protocols/kayrock/list_offsets/response_test.exs @@ -13,6 +13,7 @@ defmodule KafkaEx.New.Protocols.Kayrock.ListOffsets.ResponseTest do partition_offsets: [ %KafkaEx.New.Structs.Offset.PartitionOffset{ partition: 1, + error_code: :no_error, offset: 0, timestamp: -1 } diff --git a/test/kafka_ex/new/structs/offset/partition_offset_test.exs b/test/kafka_ex/new/structs/offset/partition_offset_test.exs index b720d0bf..2a4ad46c 100644 --- a/test/kafka_ex/new/structs/offset/partition_offset_test.exs +++ b/test/kafka_ex/new/structs/offset/partition_offset_test.exs @@ -9,16 +9,18 @@ defmodule KafkaEx.New.Structs.Offset.PartitionOffsetTest do assert result == %PartitionOffset{ partition: 1, + error_code: :no_error, offset: 2, timestamp: -1 } end test "returns struct with timestamp" do - result = PartitionOffset.build(%{partition: 1, offset: 2, timestamp: 123}) + result = PartitionOffset.build(%{partition: 1, offset: 2, error_code: :no_error, timestamp: 123}) assert result == %PartitionOffset{ partition: 1, + error_code: :no_error, offset: 2, timestamp: 123 } diff --git a/test/kafka_ex/new/structs/offset_test.exs b/test/kafka_ex/new/structs/offset_test.exs index fe222f44..3a72449b 100644 --- a/test/kafka_ex/new/structs/offset_test.exs +++ b/test/kafka_ex/new/structs/offset_test.exs @@ -5,34 +5,34 @@ defmodule KafkaEx.New.Structs.OffsetTest do describe "from_list_offset/2" do test "creates offset with v0 partition responses" do - result = Offset.from_list_offset("test-topic", [%{offset: 1, partition: 2}]) + result = Offset.from_list_offset("test-topic", [%{offset: 1, error_code: :no_error, partition: 2}]) assert result == %Offset{ topic: "test-topic", partition_offsets: [ - %Offset.PartitionOffset{offset: 1, partition: 2, timestamp: -1} + %Offset.PartitionOffset{offset: 1, partition: 2, error_code: :no_error, timestamp: -1} ] } end test "creates offset with v1 partition responses" do - result = Offset.from_list_offset("test-topic", [%{offset: 1, partition: 2}]) + result = Offset.from_list_offset("test-topic", [%{offset: 1, error_code: :no_error, partition: 2}]) assert result == %Offset{ topic: "test-topic", partition_offsets: [ - %Offset.PartitionOffset{offset: 1, partition: 2, timestamp: -1} + %Offset.PartitionOffset{offset: 1, partition: 2, error_code: :no_error, timestamp: -1} ] } end test "creates offset with v2 partition responses" do - result = Offset.from_list_offset("test-topic", [%{offset: 1, partition: 2, timestamp: 3}]) + result = Offset.from_list_offset("test-topic", [%{offset: 1, error_code: :no_error, partition: 2, timestamp: 3}]) assert result == %Offset{ topic: "test-topic", partition_offsets: [ - %Offset.PartitionOffset{offset: 1, partition: 2, timestamp: 3} + %Offset.PartitionOffset{offset: 1, partition: 2, error_code: :no_error, timestamp: 3} ] } end