Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow specifying kafka API versions for streaming #369

Merged
merged 3 commits into from
Oct 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions lib/kafka_ex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,13 @@ defmodule KafkaEx do

- `auto_commit` (boolean): If true, the stream automatically commits offsets
of fetched messages. See discussion above.

- `api_versions` (map): Allows overriding api versions for `:fetch`,
`:offset_fetch`, and `:offset_commit` when using the Kayrock client. Defaults to
`%{fetch: 0, offset_fetch: 0, offset_commit: 0}`. Use
`%{fetch: 3, offset_fetch: 3, offset_commit: 3}` with the kayrock client to
achieve offsets stored in kafka (instead of zookeeper) and messages fetched
with timestamps.
"""
@spec stream(binary, integer, Keyword.t()) :: KafkaEx.Stream.t()
def stream(topic, partition, opts \\ []) do
Expand All @@ -514,12 +521,17 @@ defmodule KafkaEx do
no_wait_at_logend = Keyword.get(opts, :no_wait_at_logend, false)
wait_time = Keyword.get(opts, :wait_time, @wait_time)

default_api_versions = %{fetch: 0, offset_fetch: 0, offset_commit: 0}
api_versions = Keyword.get(opts, :api_versions, %{})
api_versions = Map.merge(default_api_versions, api_versions)

retrieved_offset =
if consumer_group && !supplied_offset do
request = %OffsetFetchRequest{
topic: topic,
partition: partition,
consumer_group: consumer_group
consumer_group: consumer_group,
api_version: Map.fetch!(api_versions, :offset_fetch)
}

fetched_offset =
Expand All @@ -539,14 +551,16 @@ defmodule KafkaEx do
offset: retrieved_offset,
wait_time: wait_time,
min_bytes: min_bytes,
max_bytes: max_bytes
max_bytes: max_bytes,
api_version: Map.fetch!(api_versions, :fetch)
}

%Stream{
worker_name: worker_name,
fetch_request: fetch_request,
consumer_group: consumer_group,
no_wait_at_logend: no_wait_at_logend
no_wait_at_logend: no_wait_at_logend,
api_versions: api_versions
}
end

Expand Down
10 changes: 7 additions & 3 deletions lib/kafka_ex/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ defmodule KafkaEx.Stream do
defstruct worker_name: nil,
fetch_request: %FetchRequest{},
consumer_group: nil,
no_wait_at_logend: false
no_wait_at_logend: false,
api_versions: %{fetch: 0, offset_fetch: 0, offset_commit: 0}

@type t :: %__MODULE__{}

Expand Down Expand Up @@ -134,7 +135,8 @@ defmodule KafkaEx.Stream do
topic: stream_data.fetch_request.topic,
partition: stream_data.fetch_request.partition,
offset: offset,
metadata: ""
metadata: "",
api_version: Map.fetch!(stream_data.api_versions, :offset_commit)
}
})
end
Expand All @@ -145,8 +147,10 @@ defmodule KafkaEx.Stream do
defp fetch_response(data, offset) do
req = data.fetch_request

# note we set auto_commit: false in the actual request because the stream
# processor handles commits on its own
data.worker_name
|> Server.call({:fetch, %{req | offset: offset}})
|> Server.call({:fetch, %{req | offset: offset, auto_commit: false}})
|> FetchResponse.partition_messages(req.topic, req.partition)
end
end
Expand Down
78 changes: 78 additions & 0 deletions test/integration/kayrock/compatibility_streaming_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
defmodule KafkaEx.KayrockCompatibilityStreamingTest do
@moduledoc """
Tests for streaming using Kayrock for client implementation
"""

use ExUnit.Case

alias KafkaEx.New.Client
alias KafkaEx.Protocol.OffsetFetch.Request, as: OffsetFetchRequest
alias KafkaEx.Protocol.OffsetFetch.Response, as: OffsetFetchResponse

@moduletag :new_client

setup do
{:ok, pid} =
KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client)

{:ok, %{client: pid}}
end

test "stream with kafka offset storage and timestamps", %{client: client} do
topic = "kayrock_stream_test"
partition = 0
consumer_group = "streamers"

{:ok, topic} = TestHelper.ensure_append_timestamp_topic(client, topic)

KafkaEx.produce(topic, partition, "foo 1", api_version: 3)
KafkaEx.produce(topic, partition, "foo 2", api_version: 3)
KafkaEx.produce(topic, partition, "foo 3", api_version: 3)

stream =
KafkaEx.stream(topic, partition,
worker_name: client,
auto_commit: true,
no_wait_at_logend: true,
consumer_group: consumer_group,
api_versions: %{
fetch: 3,
offset_fetch: 3,
offset_commit: 3
}
)

TestHelper.wait_for(fn ->
length(Enum.take(stream, 3)) == 3
end)

[msg1, msg2, msg3] = Enum.take(stream, 3)

assert msg1.value == "foo 1"
assert msg2.value == "foo 2"
assert msg3.value == "foo 3"
assert is_integer(msg1.timestamp)
assert msg1.timestamp > 0
assert is_integer(msg2.timestamp)
assert msg2.timestamp > 0
assert is_integer(msg3.timestamp)
assert msg3.timestamp > 0

[
%OffsetFetchResponse{
partitions: [
%{error_code: :no_error, offset: offset}
]
}
] =
KafkaEx.offset_fetch(client, %OffsetFetchRequest{
topic: topic,
partition: partition,
consumer_group: consumer_group,
api_version: 3
})

assert is_integer(offset)
assert offset > 0
end
end