Skip to content

Commit

Permalink
Fix produce & deserialize
Browse files Browse the repository at this point in the history
s
  • Loading branch information
Argonus committed Jan 23, 2024
1 parent 0c6f3fa commit 611c597
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 24 deletions.
20 changes: 19 additions & 1 deletion lib/kayrock/message_set.ex
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ defmodule Kayrock.MessageSet do
decompressed = Kayrock.Compression.decompress(c, value)

if magic == 1 do
Enum.reverse(do_deserialize(decompressed, [], offset))
decompressed
|> do_deserialize([], 0)
|> correct_offsets(offset)
|> Enum.reverse()
else
Enum.reverse(do_deserialize(decompressed, [], 0))
end
Expand All @@ -97,6 +100,21 @@ defmodule Kayrock.MessageSet do
Enum.reverse(List.flatten(acc))
end

# All other cases, compressed inner messages should have relative offset, with below attributes:
# - The container message should have the 'real' offset
# - The container message's offset should be the 'real' offset of the last message in the compressed batch
defp correct_offsets(messages, real_offset) do
max_relative_offset = messages |> List.last() |> Map.fetch!(:offset)

if max_relative_offset == real_offset do
messages
else
Enum.map(messages, fn msg ->
Map.update!(msg, :offset, &(&1 + real_offset))
end)
end
end

defp create_message_set([], _compression_type), do: {"", 0}

defp create_message_set(messages, :none) do
Expand Down
20 changes: 9 additions & 11 deletions lib/kayrock/record_batch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -100,23 +100,23 @@ defmodule Kayrock.RecordBatch do
@spec serialize(t) :: iodata
def serialize(%__MODULE__{} = record_batch) do
[first_record | _] = record_batch.records

num_records = length(record_batch.records)

max_timestamp =
record_batch.records
|> Enum.map(fn r -> r.timestamp end)
|> Enum.max()

base_offset = first_record.offset
base_timestamp = first_record.timestamp

records =
for record <- record_batch.records do
record_batch.records
|> Enum.with_index()
|> Enum.map(fn {record, offset_delta} ->
record
|> normalize_record(base_offset, base_timestamp)
|> serialize_record
end
|> normalize_record(offset_delta, base_timestamp)
|> serialize_record()
end)

records =
case compression_type(record_batch.attributes) do
Expand Down Expand Up @@ -163,11 +163,9 @@ defmodule Kayrock.RecordBatch do
nil
end

def deserialize(msg_set_size, msg_set_data)
when byte_size(msg_set_data) == msg_set_size do
def deserialize(msg_set_size, msg_set_data) when byte_size(msg_set_data) == msg_set_size do
case get_magic_byte(msg_set_data) do
{2, batch_offset, batch_length, partition_leader_epoch, rest} ->
<<>>
deserialize(rest, [], batch_offset, batch_length, partition_leader_epoch)

{magic, _, _, _, _} ->
Expand Down Expand Up @@ -405,11 +403,11 @@ defmodule Kayrock.RecordBatch do
[encode_varint(IO.iodata_length(without_length)), without_length]
end

defp normalize_record(record, base_offset, base_timestamp) do
defp normalize_record(record, offset_delta, base_timestamp) do
%{
record
| timestamp: maybe_delta(record.timestamp, base_timestamp),
offset: maybe_delta(record.offset, base_offset)
offset: offset_delta
}
end

Expand Down
24 changes: 12 additions & 12 deletions test/kayrock/message_serde_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -173,27 +173,27 @@ defmodule Kayrock.MessageSerdeTest do
attributes: 0,
headers: [],
key: nil,
offset: 0,
offset: 1,
timestamp: -1,
value: "bar"
},
%Kayrock.RecordBatch.Record{
attributes: 0,
headers: [],
key: nil,
offset: 0,
offset: 2,
timestamp: -1,
value: "baz"
}
]
}

expect =
<<0, 0, 0, 90, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 78, 255, 255, 255, 255, 2, 240, 195, 168,
31, 0, 2, 0, 0, 0, 2, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
<<0, 0, 0, 93, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 81, 255, 255, 255, 255, 2, 240, 3, 91,
168, 0, 2, 0, 0, 0, 2, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 0,
0, 0, 3, 30, 36, 18, 0, 0, 0, 1, 6, 102, 111, 111, 0, 9, 10, 52, 98, 97, 114, 0, 18, 0,
0, 0, 1, 6, 98, 97, 122, 0>>
0, 0, 3, 30, 116, 18, 0, 0, 0, 1, 6, 102, 111, 111, 0, 18, 0, 0, 2, 1, 6, 98, 97, 114,
0, 18, 0, 0, 4, 1, 6, 98, 97, 122, 0>>

got = IO.iodata_to_binary(RecordBatch.serialize(record_batch))
assert got == expect, compare_binaries(got, expect)
Expand Down Expand Up @@ -227,27 +227,27 @@ defmodule Kayrock.MessageSerdeTest do
attributes: 0,
headers: [],
key: nil,
offset: 0,
offset: 1,
timestamp: -1,
value: "bar"
},
%Kayrock.RecordBatch.Record{
attributes: 0,
headers: [],
key: nil,
offset: 0,
offset: 2,
timestamp: -1,
value: "baz"
}
]
}

expect =
<<0, 0, 0, 90, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 78, 255, 255, 255, 255, 2, 240, 195, 168,
31, 0, 2, 0, 0, 0, 2, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
<<0, 0, 0, 93, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 81, 255, 255, 255, 255, 2, 240, 3, 91,
168, 0, 2, 0, 0, 0, 2, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 0,
0, 0, 3, 30, 36, 18, 0, 0, 0, 1, 6, 102, 111, 111, 0, 9, 10, 52, 98, 97, 114, 0, 18, 0,
0, 0, 1, 6, 98, 97, 122, 0>>
0, 0, 3, 30, 116, 18, 0, 0, 0, 1, 6, 102, 111, 111, 0, 18, 0, 0, 2, 1, 6, 98, 97, 114,
0, 18, 0, 0, 4, 1, 6, 98, 97, 122, 0>>

got = IO.iodata_to_binary(RecordBatch.serialize(record_batch))
assert got == expect, compare_binaries(got, expect)
Expand Down

0 comments on commit 611c597

Please sign in to comment.