diff --git a/lib/kafka_ex.ex b/lib/kafka_ex.ex index 9095bce9..19712040 100644 --- a/lib/kafka_ex.ex +++ b/lib/kafka_ex.ex @@ -502,6 +502,13 @@ defmodule KafkaEx do - `auto_commit` (boolean): If true, the stream automatically commits offsets of fetched messages. See discussion above. + + - `api_versions` (map): Allows overriding api versions for `:fetch`, + `:offset_fetch`, and `:offset_commit` when using the Kayrock client. Defaults to + `%{fetch: 0, offset_fetch: 0, offset_commit: 0}`. Use + `%{fetch: 3, offset_fetch: 3, offset_commit: 3}` with the kayrock client to + achieve offsets stored in kafka (instead of zookeeper) and messages fetched + with timestamps. """ @spec stream(binary, integer, Keyword.t()) :: KafkaEx.Stream.t() def stream(topic, partition, opts \\ []) do @@ -514,12 +521,17 @@ defmodule KafkaEx do no_wait_at_logend = Keyword.get(opts, :no_wait_at_logend, false) wait_time = Keyword.get(opts, :wait_time, @wait_time) + default_api_versions = %{fetch: 0, offset_fetch: 0, offset_commit: 0} + api_versions = Keyword.get(opts, :api_versions, %{}) + api_versions = Map.merge(default_api_versions, api_versions) + retrieved_offset = if consumer_group && !supplied_offset do request = %OffsetFetchRequest{ topic: topic, partition: partition, - consumer_group: consumer_group + consumer_group: consumer_group, + api_version: Map.fetch!(api_versions, :offset_fetch) } fetched_offset = @@ -539,14 +551,16 @@ defmodule KafkaEx do offset: retrieved_offset, wait_time: wait_time, min_bytes: min_bytes, - max_bytes: max_bytes + max_bytes: max_bytes, + api_version: Map.fetch!(api_versions, :fetch) } %Stream{ worker_name: worker_name, fetch_request: fetch_request, consumer_group: consumer_group, - no_wait_at_logend: no_wait_at_logend + no_wait_at_logend: no_wait_at_logend, + api_versions: api_versions } end diff --git a/lib/kafka_ex/stream.ex b/lib/kafka_ex/stream.ex index 5aeeed5f..466bb45b 100644 --- a/lib/kafka_ex/stream.ex +++ b/lib/kafka_ex/stream.ex @@ -9,7 +9,8 @@ defmodule KafkaEx.Stream do defstruct worker_name: nil, fetch_request: %FetchRequest{}, consumer_group: nil, - no_wait_at_logend: false + no_wait_at_logend: false, + api_versions: %{fetch: 0, offset_fetch: 0, offset_commit: 0} @type t :: %__MODULE__{} @@ -134,7 +135,8 @@ defmodule KafkaEx.Stream do topic: stream_data.fetch_request.topic, partition: stream_data.fetch_request.partition, offset: offset, - metadata: "" + metadata: "", + api_version: Map.fetch!(stream_data.api_versions, :offset_commit) } }) end @@ -145,8 +147,10 @@ defmodule KafkaEx.Stream do defp fetch_response(data, offset) do req = data.fetch_request + # note we set auto_commit: false in the actual request because the stream + # processor handles commits on its own data.worker_name - |> Server.call({:fetch, %{req | offset: offset}}) + |> Server.call({:fetch, %{req | offset: offset, auto_commit: false}}) |> FetchResponse.partition_messages(req.topic, req.partition) end end diff --git a/test/integration/kayrock/compatibility_streaming_test.exs b/test/integration/kayrock/compatibility_streaming_test.exs new file mode 100644 index 00000000..643422cb --- /dev/null +++ b/test/integration/kayrock/compatibility_streaming_test.exs @@ -0,0 +1,78 @@ +defmodule KafkaEx.KayrockCompatibilityStreamingTest do + @moduledoc """ + Tests for streaming using Kayrock for client implementation + """ + + use ExUnit.Case + + alias KafkaEx.New.Client + alias KafkaEx.Protocol.OffsetFetch.Request, as: OffsetFetchRequest + alias KafkaEx.Protocol.OffsetFetch.Response, as: OffsetFetchResponse + + @moduletag :new_client + + setup do + {:ok, pid} = + KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client) + + {:ok, %{client: pid}} + end + + test "stream with kafka offset storage and timestamps", %{client: client} do + topic = "kayrock_stream_test" + partition = 0 + consumer_group = "streamers" + + {:ok, topic} = TestHelper.ensure_append_timestamp_topic(client, topic) + + KafkaEx.produce(topic, partition, "foo 1", api_version: 3) + KafkaEx.produce(topic, partition, "foo 2", api_version: 3) + KafkaEx.produce(topic, partition, "foo 3", api_version: 3) + + stream = + KafkaEx.stream(topic, partition, + worker_name: client, + auto_commit: true, + no_wait_at_logend: true, + consumer_group: consumer_group, + api_versions: %{ + fetch: 3, + offset_fetch: 3, + offset_commit: 3 + } + ) + + TestHelper.wait_for(fn -> + length(Enum.take(stream, 3)) == 3 + end) + + [msg1, msg2, msg3] = Enum.take(stream, 3) + + assert msg1.value == "foo 1" + assert msg2.value == "foo 2" + assert msg3.value == "foo 3" + assert is_integer(msg1.timestamp) + assert msg1.timestamp > 0 + assert is_integer(msg2.timestamp) + assert msg2.timestamp > 0 + assert is_integer(msg3.timestamp) + assert msg3.timestamp > 0 + + [ + %OffsetFetchResponse{ + partitions: [ + %{error_code: :no_error, offset: offset} + ] + } + ] = + KafkaEx.offset_fetch(client, %OffsetFetchRequest{ + topic: topic, + partition: partition, + consumer_group: consumer_group, + api_version: 3 + }) + + assert is_integer(offset) + assert offset > 0 + end +end