Skip to content

Commit

Permalink
Migrate compression tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Argonus committed Jan 9, 2024
1 parent c4bbe84 commit f96878a
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 109 deletions.
Original file line number Diff line number Diff line change
@@ -1,95 +1,13 @@
defmodule Kayrock.Client.ProduceTest do
use Kayrock.ClientCase
defmodule Kayrock.Client.CompressionTest do
use Kayrock.IntegrationCase
use ExUnit.Case, async: true

alias Kayrock.RecordBatch
alias Kayrock.RecordBatch.Record
alias Kayrock.RecordBatch.RecordHeader
import Kayrock.TestSupport
import Kayrock.RequestFactory

test "gzip produce works", %{client: client} do
{:ok, topic} = ensure_test_topic(client, "simple_produce")
container(:kafka, KafkaContainer.new(), shared: true)

record_batch = RecordBatch.from_binary_list(["foo", "bar", "baz"], :gzip)
{:ok, _resp} = Kayrock.produce(client, record_batch, topic, 0)

offset = Kayrock.Convenience.partition_last_offset(client, topic, 0)

{:ok, resp} = Kayrock.fetch(client, topic, 0, offset - 1)

[main_resp] = resp.responses
[partition_resp] = main_resp.partition_responses

[
%RecordBatch{
partition_leader_epoch: partition_leader_epoch,
records: [%Record{offset: first_offset} | _]
}
| _
] = partition_resp.record_set

assert resp == %Kayrock.Fetch.V4.Response{
correlation_id: 4,
responses: [
%{
partition_responses: [
%{
partition_header: %{
aborted_transactions: [],
error_code: 0,
high_watermark: offset,
last_stable_offset: offset,
partition: 0
},
record_set: [
%Kayrock.RecordBatch{
attributes: 1,
base_sequence: -1,
batch_length: 94,
batch_offset: first_offset,
crc: 1_821_682_799,
first_timestamp: -1,
last_offset_delta: 2,
max_timestamp: -1,
partition_leader_epoch: partition_leader_epoch,
producer_epoch: -1,
producer_id: -1,
records: [
%Kayrock.RecordBatch.Record{
attributes: 0,
headers: [],
key: nil,
offset: first_offset,
timestamp: -1,
value: "foo"
},
%Kayrock.RecordBatch.Record{
attributes: 0,
headers: [],
key: nil,
offset: first_offset + 1,
timestamp: -1,
value: "bar"
},
%Kayrock.RecordBatch.Record{
attributes: 0,
headers: [],
key: nil,
offset: first_offset + 2,
timestamp: -1,
value: "baz"
}
]
}
]
}
],
topic: "simple_produce"
}
],
throttle_time_ms: 0
}
end

describe "with snappy compression" do
describe "with compression" do
setup do
on_exit(fn ->
Application.put_env(:kayrock, :snappy_module, :snappy)
Expand All @@ -98,25 +16,105 @@ defmodule Kayrock.Client.ProduceTest do
:ok
end

test "using snappyer produce works", %{client: client} do
Application.put_env(:kayrock, :snappy_module, :snappyer)
test "gzip produce works", %{kafka: kafka} do

Check failure on line 19 in test/integration/compression_test.exs

View workflow job for this annotation

GitHub Actions / Integration Test (1.15, 26.1)

test with compression gzip produce works (Kayrock.Client.CompressionTest)
{:ok, client_pid} = build_client(kafka)
topic_name = create_topic(client_pid)

{:ok, topic} = ensure_test_topic(client, "simple_produce")
record_batch = Kayrock.RecordBatch.from_binary_list(["foo", "bar", "baz"], :gzip)
{:ok, _resp} = Kayrock.produce(client_pid, record_batch, topic_name, 0)
offset = Kayrock.Convenience.partition_last_offset(client_pid, topic_name, 0)
{:ok, resp} = Kayrock.fetch(client_pid, topic_name, 0, offset - 1)
[main_resp] = resp.responses
[partition_resp] = main_resp.partition_responses

record_batch = RecordBatch.from_binary_list(["foo", "bar", "baz"], :snappy)
{:ok, _resp} = Kayrock.produce(client, record_batch, topic, 0)
[
%Kayrock.RecordBatch{
partition_leader_epoch: partition_leader_epoch,
records: [%Kayrock.RecordBatch.Record{offset: first_offset} | _]
}
| _
] = partition_resp.record_set

offset = Kayrock.Convenience.partition_last_offset(client, topic, 0)
assert resp == %Kayrock.Fetch.V4.Response{
correlation_id: 4,
responses: [
%{
partition_responses: [
%{
partition_header: %{
aborted_transactions: [],
error_code: 0,
high_watermark: offset,
last_stable_offset: offset,
partition: 0
},
record_set: [
%Kayrock.RecordBatch{
attributes: 1,
base_sequence: -1,
batch_length: 94,
batch_offset: first_offset,
crc: 1_821_682_799,
first_timestamp: -1,
last_offset_delta: 2,
max_timestamp: -1,
partition_leader_epoch: partition_leader_epoch,
producer_epoch: -1,
producer_id: -1,
records: [
%Kayrock.RecordBatch.Record{
attributes: 0,
headers: [],
key: nil,
offset: first_offset,
timestamp: -1,
value: "foo"
},
%Kayrock.RecordBatch.Record{
attributes: 0,
headers: [],
key: nil,
offset: first_offset + 1,
timestamp: -1,
value: "bar"
},
%Kayrock.RecordBatch.Record{
attributes: 0,
headers: [],
key: nil,
offset: first_offset + 2,
timestamp: -1,
value: "baz"
}
]
}
]
}
],
topic: topic_name
}
],
throttle_time_ms: 0
}
end

{:ok, resp} = Kayrock.fetch(client, topic, 0, offset - 1)
test "using snappyer produce works", %{kafka: kafka} do

Check failure on line 101 in test/integration/compression_test.exs

View workflow job for this annotation

GitHub Actions / Integration Test (1.15, 26.1)

test with compression using snappyer produce works (Kayrock.Client.CompressionTest)
Application.put_env(:kayrock, :snappy_module, :snappyer)
{:ok, client_pid} = build_client(kafka)
topic_name = create_topic(client_pid)

record_batch = Kayrock.RecordBatch.from_binary_list(["foo", "bar", "baz"], :snappy)
{:ok, _resp} = Kayrock.produce(client_pid, record_batch, topic_name, 0)
offset = Kayrock.Convenience.partition_last_offset(client_pid, topic_name, 0)

{:ok, resp} = Kayrock.fetch(client_pid, topic_name, 0, offset - 1)
[main_resp] = resp.responses
[partition_resp] = main_resp.partition_responses

[
%RecordBatch{
%Kayrock.RecordBatch{
partition_leader_epoch: partition_leader_epoch,
records: [%Record{offset: first_offset} | _]
records: [%Kayrock.RecordBatch.Record{offset: first_offset} | _]
}
| _
] = partition_resp.record_set
Expand Down Expand Up @@ -184,23 +182,22 @@ defmodule Kayrock.Client.ProduceTest do
}
end

test "using snappy-erlang-nif produce works", %{client: client} do
{:ok, topic} = ensure_test_topic(client, "simple_produce")

record_batch = RecordBatch.from_binary_list(["foo", "bar", "baz"], :snappy)
{:ok, _resp} = Kayrock.produce(client, record_batch, topic, 0)
test "using snappy-erlang-nif produce works", %{kafka: kafka} do

Check failure on line 185 in test/integration/compression_test.exs

View workflow job for this annotation

GitHub Actions / Integration Test (1.15, 26.1)

test with compression using snappy-erlang-nif produce works (Kayrock.Client.CompressionTest)
{:ok, client_pid} = build_client(kafka)
topic_name = create_topic(client_pid)

offset = Kayrock.Convenience.partition_last_offset(client, topic, 0)

{:ok, resp} = Kayrock.fetch(client, topic, 0, offset - 1)
record_batch = Kayrock.RecordBatch.from_binary_list(["foo", "bar", "baz"], :snappy)
{:ok, _resp} = Kayrock.produce(client_pid, record_batch, topic_name, 0)
offset = Kayrock.Convenience.partition_last_offset(client_pid, topic_name, 0)

{:ok, resp} = Kayrock.fetch(client_pid, topic_name, 0, offset - 1)
[main_resp] = resp.responses
[partition_resp] = main_resp.partition_responses

[
%RecordBatch{
%Kayrock.RecordBatch{
partition_leader_epoch: partition_leader_epoch,
records: [%Record{offset: first_offset} | _]
records: [%Kayrock.RecordBatch.Record{offset: first_offset} | _]
}
| _
] = partition_resp.record_set
Expand Down Expand Up @@ -268,4 +265,16 @@ defmodule Kayrock.Client.ProduceTest do
}
end
end

defp build_client(kafka) do
uris = [{"localhost", Container.mapped_port(kafka, 9092)}]

Check warning on line 270 in test/integration/compression_test.exs

View workflow job for this annotation

GitHub Actions / runner / Test (1.13, 24.3)

Container.mapped_port/2 is undefined (module Container is not available or is yet to be defined)

Check warning on line 270 in test/integration/compression_test.exs

View workflow job for this annotation

GitHub Actions / runner / Test (1.10, 22.3)

Container.mapped_port/2 is undefined (module Container is not available or is yet to be defined)
Kayrock.Client.start_link(uris)
end

defp create_topic(client_pid) do
topic_name = unique_string()
create_request = create_topic_request(topic_name, 5)
{:ok, _} = Kayrock.client_call(client_pid, create_request, :controller)
topic_name
end
end
1 change: 0 additions & 1 deletion test/integration/producer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ defmodule Kayrock.Integration.ProducerTest do
use ExUnit.Case, async: true

import Kayrock.TestSupport
import Kayrock.Convenience
import Kayrock.RequestFactory

container(:kafka, KafkaContainer.new(), shared: true)
Expand Down

0 comments on commit f96878a

Please sign in to comment.