Skip to content

Commit

Permalink
Fix producint multiple messages in record batch
Browse files Browse the repository at this point in the history
  • Loading branch information
Argonus committed Jan 19, 2024
1 parent f16d63b commit 506e919
Show file tree
Hide file tree
Showing 2 changed files with 287 additions and 9 deletions.
18 changes: 9 additions & 9 deletions lib/kayrock/record_batch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ defmodule Kayrock.RecordBatch do
@spec serialize(t) :: iodata
def serialize(%__MODULE__{} = record_batch) do
[first_record | _] = record_batch.records

num_records = length(record_batch.records)

max_timestamp =
Expand All @@ -112,11 +111,13 @@ defmodule Kayrock.RecordBatch do
base_timestamp = first_record.timestamp

records =
for record <- record_batch.records do
record_batch.records
|> Enum.with_index()
|> Enum.map(fn {record, offset_delta} ->
record
|> normalize_record(base_offset, base_timestamp)
|> serialize_record
end
|> normalize_record(offset_delta, base_timestamp)
|> serialize_record()
end)

records =
case compression_type(record_batch.attributes) do
Expand Down Expand Up @@ -163,8 +164,7 @@ defmodule Kayrock.RecordBatch do
nil
end

def deserialize(msg_set_size, msg_set_data)
when byte_size(msg_set_data) == msg_set_size do
def deserialize(msg_set_size, msg_set_data) when byte_size(msg_set_data) == msg_set_size do
case get_magic_byte(msg_set_data) do
{2, batch_offset, batch_length, partition_leader_epoch, rest} ->
<<>>
Expand Down Expand Up @@ -405,11 +405,11 @@ defmodule Kayrock.RecordBatch do
[encode_varint(IO.iodata_length(without_length)), without_length]
end

defp normalize_record(record, base_offset, base_timestamp) do
defp normalize_record(record, offset_delta, base_timestamp) do
%{
record
| timestamp: maybe_delta(record.timestamp, base_timestamp),
offset: maybe_delta(record.offset, base_offset)
offset: offset_delta
}
end

Expand Down
278 changes: 278 additions & 0 deletions test/integration/producer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,65 @@ defmodule Kayrock.Integration.ProducerTest do
[message] = List.first(response.partition_responses).record_set.messages
assert message.value == "test"
end

test "v#{version} - produce and reads data using message set with multiple messages", %{
kafka: kafka
} do
api_version = unquote(version)
{:ok, client_pid} = build_client(kafka)

# Create Topic
topic_name = create_topic(client_pid, api_version)

# [GIVEN] MessageSet with timestamp
record_set = %Kayrock.MessageSet{
messages: [
%Kayrock.MessageSet.Message{
key: "1",
value: "foo",
attributes: 0
},
%Kayrock.MessageSet.Message{
key: "1",
value: "bar",
attributes: 0
},
%Kayrock.MessageSet.Message{
key: "1",
value: "baz",
attributes: 0
}
]
}

# [WHEN] Produce message with timestamp
produce_message_request =
produce_messages_request(topic_name, [[record_set: record_set]], 1, api_version)

{:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller)
[response] = resp.responses
assert response.topic == topic_name

[partition_response] = response.partition_responses
assert partition_response.error_code == 0
offset = partition_response.base_offset

# [THEN] Fetch message from topic
partition_data = [[topic: topic_name, partition: 0, fetch_offset: offset]]
fetch_request = fetch_messages_request(partition_data, [], api_version)
{:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller)

[response] = resp.responses
assert response.topic == topic_name

# [THEN] Verify message data
[message_one, message_two, message_three] =
List.first(response.partition_responses).record_set.messages

assert message_one.value == "foo"
assert message_two.value == "bar"
assert message_three.value == "baz"
end
end

for version <- [2, 3] do
Expand Down Expand Up @@ -101,6 +160,112 @@ defmodule Kayrock.Integration.ProducerTest do
assert message.value == "test"
assert message.timestamp == timestamp
end

test "v#{version} - produce and reads data using message set with multiple messages", %{
kafka: kafka
} do
api_version = unquote(version)
{:ok, client_pid} = build_client(kafka)

# Create Topic
topic_name = create_topic(client_pid, api_version)
timestamp = DateTime.utc_now() |> DateTime.to_unix(:millisecond)

# [GIVEN] MessageSet with timestamp
records = [
%Kayrock.RecordBatch.Record{
key: "1",
value: "foo",
timestamp: timestamp,
attributes: 0
},
%Kayrock.RecordBatch.Record{
key: "1",
value: "bar",
timestamp: timestamp,
attributes: 0
},
%Kayrock.RecordBatch.Record{
key: "1",
value: "baz",
timestamp: timestamp,
attributes: 0
}
]

# [WHEN] Produce message with timestamp
record_set = %Kayrock.RecordBatch{records: records}

produce_message_request =
produce_messages_request(topic_name, [[record_set: record_set]], 1, api_version)

{:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller)
[response] = resp.responses
assert response.topic == topic_name

[partition_response] = response.partition_responses
assert partition_response.error_code == 0

# [THEN] Fetch message from topic
partition_data = [[topic: topic_name, partition: 0, fetch_offset: 0, log_start_offset: 0]]
fetch_request = fetch_messages_request(partition_data, [], api_version)
{:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller)

[response] = resp.responses
assert response.topic == topic_name

# [THEN] Verify message data
[message_one, message_two, message_three] =
List.first(response.partition_responses).record_set.messages

assert message_one.value == "foo"
assert message_one.offset == 0
assert message_one.timestamp == timestamp

assert message_two.value == "bar"
assert message_two.offset == 1
assert message_two.timestamp == timestamp

assert message_three.value == "baz"
assert message_three.offset == 2
assert message_three.timestamp == timestamp

# [THEN] Produce another message
record = %Kayrock.RecordBatch.Record{
key: "1",
value: "zab",
timestamp: timestamp,
attributes: 0
}

record_set = %Kayrock.RecordBatch{records: [record]}

# [WHEN] Produce message with timestamp
produce_message_request =
produce_messages_request(topic_name, [[record_set: record_set]], 1, api_version)

{:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller)
[response] = resp.responses
assert response.topic == topic_name

[partition_response] = response.partition_responses
assert partition_response.error_code == 0
offset = partition_response.base_offset

# [THEN] Fetch message from topic
partition_data = [[topic: topic_name, partition: 0, fetch_offset: offset]]
fetch_request = fetch_messages_request(partition_data, [], api_version)
{:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller)

[response] = resp.responses
assert response.topic == topic_name

# [THEN] Verify message data
[message] = List.first(response.partition_responses).record_set.messages
assert message.value == "zab"
assert message.offset == 3
assert message.timestamp == timestamp
end
end

for version <- [4, 5, 6, 7] do
Expand Down Expand Up @@ -154,6 +319,119 @@ defmodule Kayrock.Integration.ProducerTest do
assert message.timestamp == timestamp
assert message.headers == [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}]
end

test "v#{version} - produce and reads data using message set with multiple messages", %{
kafka: kafka
} do
api_version = unquote(version)
{:ok, client_pid} = build_client(kafka)

# Create Topic
topic_name = create_topic(client_pid, api_version)
timestamp = DateTime.utc_now() |> DateTime.to_unix(:millisecond)

# [GIVEN] MessageSet with timestamp
record_set = %Kayrock.RecordBatch{
records: [
%Kayrock.RecordBatch.Record{
key: "1",
value: "foo",
timestamp: timestamp,
headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}],
attributes: 0
},
%Kayrock.RecordBatch.Record{
key: "1",
value: "bar",
timestamp: timestamp,
headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}],
attributes: 0
},
%Kayrock.RecordBatch.Record{
key: "1",
value: "baz",
timestamp: timestamp,
headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}],
attributes: 0
}
]
}

# [WHEN] Produce message with timestamp
produce_message_request =
produce_messages_request(topic_name, [[record_set: record_set]], 1, api_version)

{:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller)
[response] = resp.responses
assert response.topic == topic_name

[partition_response] = response.partition_responses
assert partition_response.error_code == 0
offset = partition_response.base_offset

# [THEN] Fetch message from topic
partition_data = [[topic: topic_name, partition: 0, fetch_offset: offset]]
fetch_request = fetch_messages_request(partition_data, [], api_version)
{:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller)

[response] = resp.responses
assert response.topic == topic_name

# [THEN] Verify message data
[message_one, message_two, message_three] =
List.first(response.partition_responses).record_set |> List.first() |> Map.get(:records)

assert message_one.value == "foo"
assert message_one.offset == 0
assert message_one.timestamp == timestamp
assert message_one.headers == [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}]

assert message_two.value == "bar"
assert message_two.offset == 1
assert message_two.timestamp == timestamp
assert message_two.headers == [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}]

assert message_three.value == "baz"
assert message_three.offset == 2
assert message_three.timestamp == timestamp
assert message_three.headers == [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}]

# [THEN] Produce another message
record = %Kayrock.RecordBatch.Record{
key: "1",
value: "zab",
timestamp: timestamp,
attributes: 0
}

record_set = %Kayrock.RecordBatch{records: [record]}

# [WHEN] Produce message with timestamp
produce_message_request =
produce_messages_request(topic_name, [[record_set: record_set]], 1, api_version)

{:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller)
[response] = resp.responses
assert response.topic == topic_name

[partition_response] = response.partition_responses
assert partition_response.error_code == 0
offset = partition_response.base_offset

# [THEN] Fetch message from topic
partition_data = [[topic: topic_name, partition: 0, fetch_offset: offset]]
fetch_request = fetch_messages_request(partition_data, [], api_version)
{:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller)

[response] = resp.responses
assert response.topic == topic_name

# [THEN] Verify message data
[message] = List.first(response.partition_responses).record_set |> List.first() |> Map.get(:records)
assert message.value == "zab"
assert message.offset == 3
assert message.timestamp == timestamp
end
end
end

Expand Down

0 comments on commit 506e919

Please sign in to comment.