Skip to content

Commit

Permalink
Merge pull request #369 from kafkaex/api_versions_streaming
Browse files Browse the repository at this point in the history
Allow specifying kafka API versions for streaming
  • Loading branch information
dantswain authored Oct 4, 2019
2 parents 173e3f7 + 8578095 commit 52863f4
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 6 deletions.
20 changes: 17 additions & 3 deletions lib/kafka_ex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,13 @@ defmodule KafkaEx do
- `auto_commit` (boolean): If true, the stream automatically commits offsets
of fetched messages. See discussion above.
- `api_versions` (map): Allows overriding api versions for `:fetch`,
`:offset_fetch`, and `:offset_commit` when using the Kayrock client. Defaults to
`%{fetch: 0, offset_fetch: 0, offset_commit: 0}`. Use
`%{fetch: 3, offset_fetch: 3, offset_commit: 3}` with the kayrock client to
achieve offsets stored in kafka (instead of zookeeper) and messages fetched
with timestamps.
"""
@spec stream(binary, integer, Keyword.t()) :: KafkaEx.Stream.t()
def stream(topic, partition, opts \\ []) do
Expand All @@ -514,12 +521,17 @@ defmodule KafkaEx do
no_wait_at_logend = Keyword.get(opts, :no_wait_at_logend, false)
wait_time = Keyword.get(opts, :wait_time, @wait_time)

default_api_versions = %{fetch: 0, offset_fetch: 0, offset_commit: 0}
api_versions = Keyword.get(opts, :api_versions, %{})
api_versions = Map.merge(default_api_versions, api_versions)

retrieved_offset =
if consumer_group && !supplied_offset do
request = %OffsetFetchRequest{
topic: topic,
partition: partition,
consumer_group: consumer_group
consumer_group: consumer_group,
api_version: Map.fetch!(api_versions, :offset_fetch)
}

fetched_offset =
Expand All @@ -539,14 +551,16 @@ defmodule KafkaEx do
offset: retrieved_offset,
wait_time: wait_time,
min_bytes: min_bytes,
max_bytes: max_bytes
max_bytes: max_bytes,
api_version: Map.fetch!(api_versions, :fetch)
}

%Stream{
worker_name: worker_name,
fetch_request: fetch_request,
consumer_group: consumer_group,
no_wait_at_logend: no_wait_at_logend
no_wait_at_logend: no_wait_at_logend,
api_versions: api_versions
}
end

Expand Down
10 changes: 7 additions & 3 deletions lib/kafka_ex/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ defmodule KafkaEx.Stream do
defstruct worker_name: nil,
fetch_request: %FetchRequest{},
consumer_group: nil,
no_wait_at_logend: false
no_wait_at_logend: false,
api_versions: %{fetch: 0, offset_fetch: 0, offset_commit: 0}

@type t :: %__MODULE__{}

Expand Down Expand Up @@ -134,7 +135,8 @@ defmodule KafkaEx.Stream do
topic: stream_data.fetch_request.topic,
partition: stream_data.fetch_request.partition,
offset: offset,
metadata: ""
metadata: "",
api_version: Map.fetch!(stream_data.api_versions, :offset_commit)
}
})
end
Expand All @@ -145,8 +147,10 @@ defmodule KafkaEx.Stream do
defp fetch_response(data, offset) do
req = data.fetch_request

# note we set auto_commit: false in the actual request because the stream
# processor handles commits on its own
data.worker_name
|> Server.call({:fetch, %{req | offset: offset}})
|> Server.call({:fetch, %{req | offset: offset, auto_commit: false}})
|> FetchResponse.partition_messages(req.topic, req.partition)
end
end
Expand Down
78 changes: 78 additions & 0 deletions test/integration/kayrock/compatibility_streaming_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
defmodule KafkaEx.KayrockCompatibilityStreamingTest do
@moduledoc """
Tests for streaming using Kayrock for client implementation
"""

use ExUnit.Case

alias KafkaEx.New.Client
alias KafkaEx.Protocol.OffsetFetch.Request, as: OffsetFetchRequest
alias KafkaEx.Protocol.OffsetFetch.Response, as: OffsetFetchResponse

@moduletag :new_client

setup do
{:ok, pid} =
KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client)

{:ok, %{client: pid}}
end

test "stream with kafka offset storage and timestamps", %{client: client} do
topic = "kayrock_stream_test"
partition = 0
consumer_group = "streamers"

{:ok, topic} = TestHelper.ensure_append_timestamp_topic(client, topic)

KafkaEx.produce(topic, partition, "foo 1", api_version: 3)
KafkaEx.produce(topic, partition, "foo 2", api_version: 3)
KafkaEx.produce(topic, partition, "foo 3", api_version: 3)

stream =
KafkaEx.stream(topic, partition,
worker_name: client,
auto_commit: true,
no_wait_at_logend: true,
consumer_group: consumer_group,
api_versions: %{
fetch: 3,
offset_fetch: 3,
offset_commit: 3
}
)

TestHelper.wait_for(fn ->
length(Enum.take(stream, 3)) == 3
end)

[msg1, msg2, msg3] = Enum.take(stream, 3)

assert msg1.value == "foo 1"
assert msg2.value == "foo 2"
assert msg3.value == "foo 3"
assert is_integer(msg1.timestamp)
assert msg1.timestamp > 0
assert is_integer(msg2.timestamp)
assert msg2.timestamp > 0
assert is_integer(msg3.timestamp)
assert msg3.timestamp > 0

[
%OffsetFetchResponse{
partitions: [
%{error_code: :no_error, offset: offset}
]
}
] =
KafkaEx.offset_fetch(client, %OffsetFetchRequest{
topic: topic,
partition: partition,
consumer_group: consumer_group,
api_version: 3
})

assert is_integer(offset)
assert offset > 0
end
end

0 comments on commit 52863f4

Please sign in to comment.