Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Produce fixes and code regeneration #32

Merged
merged 5 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions lib/kayrock/message_set.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
@type t :: %__MODULE__{}
end

use Bitwise

Check warning on line 30 in lib/kayrock/message_set.ex

View workflow job for this annotation

GitHub Actions / Static Code Analysis (1.15, 26.1)

use Bitwise is deprecated. import Bitwise instead

Check warning on line 30 in lib/kayrock/message_set.ex

View workflow job for this annotation

GitHub Actions / Static Code Analysis (1.15, 26.1)

use Bitwise is deprecated. import Bitwise instead

Check warning on line 30 in lib/kayrock/message_set.ex

View workflow job for this annotation

GitHub Actions / runner / Test (1.15, 26.1)

use Bitwise is deprecated. import Bitwise instead

Check warning on line 30 in lib/kayrock/message_set.ex

View workflow job for this annotation

GitHub Actions / Integration Test (1.15, 26.1)

use Bitwise is deprecated. import Bitwise instead

defstruct messages: [], magic: 0

Expand Down Expand Up @@ -81,13 +81,11 @@
}

c ->
decompressed = Kayrock.Compression.decompress(c, value)

if magic == 1 do
Enum.reverse(do_deserialize(decompressed, [], offset))
else
Enum.reverse(do_deserialize(decompressed, [], 0))
end
c
|> Kayrock.Compression.decompress(value)
|> do_deserialize([], 0)
|> correct_offsets(offset)
|> Enum.reverse()
end

do_deserialize(orig_rest, [msg | acc], add_offset)
Expand All @@ -97,6 +95,21 @@
Enum.reverse(List.flatten(acc))
end

# All other cases, compressed inner messages should have relative offset, with below attributes:
# - The container message should have the 'real' offset
# - The container message's offset should be the 'real' offset of the last message in the compressed batch
defp correct_offsets(messages, real_offset) do
max_relative_offset = messages |> List.last() |> Map.fetch!(:offset)

if max_relative_offset == real_offset do
messages
else
Enum.map(messages, fn msg ->
Map.update!(msg, :offset, &(&1 + real_offset))
end)
end
end

defp create_message_set([], _compression_type), do: {"", 0}

defp create_message_set(messages, :none) do
Expand Down
20 changes: 9 additions & 11 deletions lib/kayrock/record_batch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@

@type t :: %__MODULE__{}

use Bitwise

Check warning on line 63 in lib/kayrock/record_batch.ex

View workflow job for this annotation

GitHub Actions / Static Code Analysis (1.15, 26.1)

use Bitwise is deprecated. import Bitwise instead

Check warning on line 63 in lib/kayrock/record_batch.ex

View workflow job for this annotation

GitHub Actions / Static Code Analysis (1.15, 26.1)

use Bitwise is deprecated. import Bitwise instead

Check warning on line 63 in lib/kayrock/record_batch.ex

View workflow job for this annotation

GitHub Actions / runner / Test (1.15, 26.1)

use Bitwise is deprecated. import Bitwise instead

Check warning on line 63 in lib/kayrock/record_batch.ex

View workflow job for this annotation

GitHub Actions / Integration Test (1.15, 26.1)

use Bitwise is deprecated. import Bitwise instead

alias Kayrock.Compression
alias Kayrock.MessageSet
Expand Down Expand Up @@ -100,23 +100,23 @@
@spec serialize(t) :: iodata
def serialize(%__MODULE__{} = record_batch) do
[first_record | _] = record_batch.records

num_records = length(record_batch.records)

max_timestamp =
record_batch.records
|> Enum.map(fn r -> r.timestamp end)
|> Enum.max()

base_offset = first_record.offset
base_timestamp = first_record.timestamp

records =
for record <- record_batch.records do
record_batch.records
|> Enum.with_index()
|> Enum.map(fn {record, offset_delta} ->
record
|> normalize_record(base_offset, base_timestamp)
|> serialize_record
end
|> normalize_record(offset_delta, base_timestamp)
|> serialize_record()
end)

records =
case compression_type(record_batch.attributes) do
Expand Down Expand Up @@ -163,11 +163,9 @@
nil
end

def deserialize(msg_set_size, msg_set_data)
when byte_size(msg_set_data) == msg_set_size do
def deserialize(msg_set_size, msg_set_data) when byte_size(msg_set_data) == msg_set_size do
case get_magic_byte(msg_set_data) do
{2, batch_offset, batch_length, partition_leader_epoch, rest} ->
<<>>
deserialize(rest, [], batch_offset, batch_length, partition_leader_epoch)

{magic, _, _, _, _} ->
Expand Down Expand Up @@ -405,11 +403,11 @@
[encode_varint(IO.iodata_length(without_length)), without_length]
end

defp normalize_record(record, base_offset, base_timestamp) do
defp normalize_record(record, offset_delta, base_timestamp) do
%{
record
| timestamp: maybe_delta(record.timestamp, base_timestamp),
offset: maybe_delta(record.offset, base_offset)
offset: offset_delta
}
end

Expand Down
111 changes: 111 additions & 0 deletions test/integration/behaviour/single_broker_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
defmodule Kayrock.Integration.Behaviour.SingleBrokerTest do
use Kayrock.IntegrationCase
use ExUnit.Case, async: true

import Kayrock.TestSupport
import Kayrock.RequestFactory

container(:kafka, KafkaContainer.new(), shared: true)

test "for single broker lifecycle", %{kafka: kafka} do
# [WHEN] There is client connected to broker
{:ok, client_pid} = build_client(kafka)

# [AND] There is a topic created
topic_name = create_topic(client_pid, 0)

# [WHEN] Client can read from empty topic
fetch_request = fetch_messages_request([[topic: topic_name]], [], 5)
{:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller)

# [THEN] It receives empty response
[%{partition_responses: [%{record_set: record_set}]}] = resp.responses
assert is_nil(record_set)

# [WHEN] Client can write to topic
record_set = record_set([{"1", "test-one"}])
produce_request = produce_messages_request(topic_name, [[record_set: record_set]], 1, 5)
{:ok, _resp} = Kayrock.client_call(client_pid, produce_request, :controller)

# [AND] Fetch message from that topic
fetch_request = fetch_messages_request([[topic: topic_name]], [], 5)
{:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller)

# [THEN] It receives message
[%{partition_responses: [%{record_set: [record_set]}]}] = resp.responses
[record] = record_set.records
assert record.key == "1"
assert record.value == "test-one"
assert record.offset == 0

# [WHEN] Client can write multiple messages to topic
record_set = record_set([{"2", "test-two"}, {"3", "test-three"}])
produce_request = produce_messages_request(topic_name, [[record_set: record_set]], 1, 5)
{:ok, _resp} = Kayrock.client_call(client_pid, produce_request, :controller)

# [AND] Fetch messages from that topic
fetch_request = fetch_messages_request([[topic: topic_name, fetch_offset: 1]], [], 5)
{:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller)

# [THEN] It receives messages
[%{partition_responses: [%{record_set: [record_set]}]}] = resp.responses
[record_one, record_two] = record_set.records
assert record_one.key == "2"
assert record_one.value == "test-two"
assert record_one.offset == 1

assert record_two.key == "3"
assert record_two.value == "test-three"
assert record_two.offset == 2

# [WHEN] Client is closed
:ok = Kayrock.Client.stop(client_pid)

# [AND] New client is created
{:ok, client_pid} = build_client(kafka)

# [AND] Fetch messages from that topic
fetch_request = fetch_messages_request([[topic: topic_name, fetch_offset: 0]], [], 5)
{:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller)

# [THEN] It receives messages
[%{partition_responses: [%{record_set: [record_set_one, record_set_two]}]}] = resp.responses
[record] = record_set_one.records
assert record.key == "1"
assert record.value == "test-one"
assert record.offset == 0

[record_one, record_two] = record_set_two.records
assert record_one.key == "2"
assert record_one.value == "test-two"
assert record_one.offset == 1

assert record_two.key == "3"
assert record_two.value == "test-three"
assert record_two.offset == 2
end

defp build_client(kafka) do
uris = [{"localhost", Container.mapped_port(kafka, 9092)}]

Check warning on line 89 in test/integration/behaviour/single_broker_test.exs

View workflow job for this annotation

GitHub Actions / runner / Test (1.10, 22.3)

Container.mapped_port/2 is undefined (module Container is not available or is yet to be defined)

Check warning on line 89 in test/integration/behaviour/single_broker_test.exs

View workflow job for this annotation

GitHub Actions / runner / Test (1.13, 24.3)

Container.mapped_port/2 is undefined (module Container is not available or is yet to be defined)
Kayrock.Client.start_link(uris)
end

defp create_topic(client_pid, api_version) do
topic_name = unique_string()
create_request = create_topic_request(topic_name, api_version)
{:ok, _} = Kayrock.client_call(client_pid, create_request, :controller)
topic_name
end

defp record_set(key_values) do
%Kayrock.RecordBatch{
records:
Enum.map(key_values, fn {key, value} ->
%Kayrock.RecordBatch.Record{
key: key,
value: value
}
end)
}
end
end
Loading
Loading