Skip to content

Commit

Permalink
Produce fixes and code regeneration (#32)
Browse files Browse the repository at this point in the history
* Add compression & producer tests

* Fix produce & deserialize

s

* Simplify decoding logic

* Extend test

* Add simple single broker behaviour testcase
  • Loading branch information
Argonus authored Jan 24, 2024
1 parent ea5084b commit c2de899
Show file tree
Hide file tree
Showing 7 changed files with 757 additions and 124 deletions.
27 changes: 20 additions & 7 deletions lib/kayrock/message_set.ex
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,11 @@ defmodule Kayrock.MessageSet do
}

c ->
decompressed = Kayrock.Compression.decompress(c, value)

if magic == 1 do
Enum.reverse(do_deserialize(decompressed, [], offset))
else
Enum.reverse(do_deserialize(decompressed, [], 0))
end
c
|> Kayrock.Compression.decompress(value)
|> do_deserialize([], 0)
|> correct_offsets(offset)
|> Enum.reverse()
end

do_deserialize(orig_rest, [msg | acc], add_offset)
Expand All @@ -97,6 +95,21 @@ defmodule Kayrock.MessageSet do
Enum.reverse(List.flatten(acc))
end

# All other cases, compressed inner messages should have relative offset, with below attributes:
# - The container message should have the 'real' offset
# - The container message's offset should be the 'real' offset of the last message in the compressed batch
defp correct_offsets(messages, real_offset) do
max_relative_offset = messages |> List.last() |> Map.fetch!(:offset)

if max_relative_offset == real_offset do
messages
else
Enum.map(messages, fn msg ->
Map.update!(msg, :offset, &(&1 + real_offset))
end)
end
end

defp create_message_set([], _compression_type), do: {"", 0}

defp create_message_set(messages, :none) do
Expand Down
20 changes: 9 additions & 11 deletions lib/kayrock/record_batch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -100,23 +100,23 @@ defmodule Kayrock.RecordBatch do
@spec serialize(t) :: iodata
def serialize(%__MODULE__{} = record_batch) do
[first_record | _] = record_batch.records

num_records = length(record_batch.records)

max_timestamp =
record_batch.records
|> Enum.map(fn r -> r.timestamp end)
|> Enum.max()

base_offset = first_record.offset
base_timestamp = first_record.timestamp

records =
for record <- record_batch.records do
record_batch.records
|> Enum.with_index()
|> Enum.map(fn {record, offset_delta} ->
record
|> normalize_record(base_offset, base_timestamp)
|> serialize_record
end
|> normalize_record(offset_delta, base_timestamp)
|> serialize_record()
end)

records =
case compression_type(record_batch.attributes) do
Expand Down Expand Up @@ -163,11 +163,9 @@ defmodule Kayrock.RecordBatch do
nil
end

def deserialize(msg_set_size, msg_set_data)
when byte_size(msg_set_data) == msg_set_size do
def deserialize(msg_set_size, msg_set_data) when byte_size(msg_set_data) == msg_set_size do
case get_magic_byte(msg_set_data) do
{2, batch_offset, batch_length, partition_leader_epoch, rest} ->
<<>>
deserialize(rest, [], batch_offset, batch_length, partition_leader_epoch)

{magic, _, _, _, _} ->
Expand Down Expand Up @@ -405,11 +403,11 @@ defmodule Kayrock.RecordBatch do
[encode_varint(IO.iodata_length(without_length)), without_length]
end

defp normalize_record(record, base_offset, base_timestamp) do
defp normalize_record(record, offset_delta, base_timestamp) do
%{
record
| timestamp: maybe_delta(record.timestamp, base_timestamp),
offset: maybe_delta(record.offset, base_offset)
offset: offset_delta
}
end

Expand Down
111 changes: 111 additions & 0 deletions test/integration/behaviour/single_broker_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
defmodule Kayrock.Integration.Behaviour.SingleBrokerTest do
use Kayrock.IntegrationCase
use ExUnit.Case, async: true

import Kayrock.TestSupport
import Kayrock.RequestFactory

container(:kafka, KafkaContainer.new(), shared: true)

test "for single broker lifecycle", %{kafka: kafka} do
# [WHEN] There is client connected to broker
{:ok, client_pid} = build_client(kafka)

# [AND] There is a topic created
topic_name = create_topic(client_pid, 0)

# [WHEN] Client can read from empty topic
fetch_request = fetch_messages_request([[topic: topic_name]], [], 5)
{:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller)

# [THEN] It receives empty response
[%{partition_responses: [%{record_set: record_set}]}] = resp.responses
assert is_nil(record_set)

# [WHEN] Client can write to topic
record_set = record_set([{"1", "test-one"}])
produce_request = produce_messages_request(topic_name, [[record_set: record_set]], 1, 5)
{:ok, _resp} = Kayrock.client_call(client_pid, produce_request, :controller)

# [AND] Fetch message from that topic
fetch_request = fetch_messages_request([[topic: topic_name]], [], 5)
{:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller)

# [THEN] It receives message
[%{partition_responses: [%{record_set: [record_set]}]}] = resp.responses
[record] = record_set.records
assert record.key == "1"
assert record.value == "test-one"
assert record.offset == 0

# [WHEN] Client can write multiple messages to topic
record_set = record_set([{"2", "test-two"}, {"3", "test-three"}])
produce_request = produce_messages_request(topic_name, [[record_set: record_set]], 1, 5)
{:ok, _resp} = Kayrock.client_call(client_pid, produce_request, :controller)

# [AND] Fetch messages from that topic
fetch_request = fetch_messages_request([[topic: topic_name, fetch_offset: 1]], [], 5)
{:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller)

# [THEN] It receives messages
[%{partition_responses: [%{record_set: [record_set]}]}] = resp.responses
[record_one, record_two] = record_set.records
assert record_one.key == "2"
assert record_one.value == "test-two"
assert record_one.offset == 1

assert record_two.key == "3"
assert record_two.value == "test-three"
assert record_two.offset == 2

# [WHEN] Client is closed
:ok = Kayrock.Client.stop(client_pid)

# [AND] New client is created
{:ok, client_pid} = build_client(kafka)

# [AND] Fetch messages from that topic
fetch_request = fetch_messages_request([[topic: topic_name, fetch_offset: 0]], [], 5)
{:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller)

# [THEN] It receives messages
[%{partition_responses: [%{record_set: [record_set_one, record_set_two]}]}] = resp.responses
[record] = record_set_one.records
assert record.key == "1"
assert record.value == "test-one"
assert record.offset == 0

[record_one, record_two] = record_set_two.records
assert record_one.key == "2"
assert record_one.value == "test-two"
assert record_one.offset == 1

assert record_two.key == "3"
assert record_two.value == "test-three"
assert record_two.offset == 2
end

defp build_client(kafka) do
uris = [{"localhost", Container.mapped_port(kafka, 9092)}]

Check warning on line 89 in test/integration/behaviour/single_broker_test.exs

View workflow job for this annotation

GitHub Actions / runner / Test (1.13, 24.3)

Container.mapped_port/2 is undefined (module Container is not available or is yet to be defined)

Check warning on line 89 in test/integration/behaviour/single_broker_test.exs

View workflow job for this annotation

GitHub Actions / runner / Test (1.10, 22.3)

Container.mapped_port/2 is undefined (module Container is not available or is yet to be defined)
Kayrock.Client.start_link(uris)
end

defp create_topic(client_pid, api_version) do
topic_name = unique_string()
create_request = create_topic_request(topic_name, api_version)
{:ok, _} = Kayrock.client_call(client_pid, create_request, :controller)
topic_name
end

defp record_set(key_values) do
%Kayrock.RecordBatch{
records:
Enum.map(key_values, fn {key, value} ->
%Kayrock.RecordBatch.Record{
key: key,
value: value
}
end)
}
end
end
Loading

0 comments on commit c2de899

Please sign in to comment.