Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v22.3.x] Support compressed batches in kafka client #10176

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 21 additions & 14 deletions src/v/pandaproxy/json/requests/fetch.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "pandaproxy/json/rjson_util.h"
#include "pandaproxy/json/types.h"
#include "seastarx.h"
#include "storage/parser_utils.h"

#include <seastar/core/sstring.hh>

Expand Down Expand Up @@ -121,20 +122,26 @@ class rjson_serialize_impl<kafka::fetch_response> {
auto rjs = rjson_serialize_impl<model::record>(
_fmt, tpv, adapter.batch->base_offset());

adapter.batch->for_each_record(
[&rjs, &w](model::record record) {
auto offset = record.offset_delta() + rjs.base_offset()();
if (!rjs(w, std::move(record))) {
throw serialize_error(
make_error_code(error_code::unable_to_serialize),
fmt::format(
"Unable to serialize record at offset {} in "
"topic:partition {}:{}",
offset,
rjs.tpv().topic(),
rjs.tpv().partition()));
}
});
auto batch = std::move(*adapter.batch);

if (batch.compressed()) {
batch = storage::internal::maybe_decompress_batch_sync(
batch);
}

batch.for_each_record([&rjs, &w](model::record record) {
auto offset = record.offset_delta() + rjs.base_offset()();
if (!rjs(w, std::move(record))) {
throw serialize_error(
make_error_code(error_code::unable_to_serialize),
fmt::format(
"Unable to serialize record at offset {} in "
"topic:partition {}:{}",
offset,
rjs.tpv().topic(),
rjs.tpv().partition()));
}
});
}
}
w.EndArray();
Expand Down
2 changes: 1 addition & 1 deletion src/v/pandaproxy/rest/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ rp_test(
produce.cc
consumer_group.cc
DEFINITIONS BOOST_TEST_DYN_LINK
LIBRARIES v::seastar_testing_main v::application v::http
LIBRARIES v::seastar_testing_main v::application v::http v::storage_test_utils
LABELS pandaproxy
)
121 changes: 121 additions & 0 deletions src/v/pandaproxy/rest/test/consumer_group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -459,3 +459,124 @@ FIXTURE_TEST(
R"({"error_code":40801,"message":"Unable to serialize value of record at offset 1 in topic:partition t:0"})");
}
}

FIXTURE_TEST(
pandaproxy_consume_group_compressed_fetch, pandaproxy_test_fixture) {
using namespace std::chrono_literals;
info("Waiting for leadership");
wait_for_controller_leadership().get();

auto tp = model::topic_partition(model::topic("t"), model::partition_id(0));
auto ntp = make_default_ntp(tp.topic, tp.partition);
auto log_config = make_default_config();
{
using namespace storage;
storage::disk_log_builder builder(log_config);
storage::ntp_config ntp_cfg(
ntp,
log_config.base_dir,
nullptr,
get_next_partition_revision_id().get());
builder | start(std::move(ntp_cfg)) | add_segment(model::offset(0))
| add_random_batch(model::offset(0), 10, maybe_compress_batches::yes)
| stop();
}

add_topic(model::topic_namespace_view(ntp)).get();
auto shard = app.shard_table.local().shard_for(ntp);

tests::cooperative_spin_wait_with_timeout(10s, [this, shard, ntp] {
return app.partition_manager.invoke_on(
*shard, [ntp](cluster::partition_manager& mgr) {
auto partition = mgr.get(ntp);
return partition
&& partition->committed_offset() >= model::offset(1);
});
}).get();

info("Connecting client");
auto client = make_proxy_client();

kafka::group_id group_id{"test_group"};
kafka::member_id member_id{kafka::no_member};

{
info("Create consumer");
ss::sstring req_body(R"({
"name": "test_consumer",
"format": "json",
"auto.offset.reset": "earliest",
"auto.commit.enable": "false",
"fetch.min.bytes": "1",
"consumer.request.timeout.ms": "10000"
})");
iobuf req_body_buf;
req_body_buf.append(req_body.data(), req_body.size());
auto res = http_request(
client,
fmt::format("/consumers/{}", group_id()),
std::move(req_body_buf),
boost::beast::http::verb::post,
ppj::serialization_format::v2,
ppj::serialization_format::v2);
BOOST_REQUIRE_EQUAL(
res.headers.result(), boost::beast::http::status::ok);

auto res_data = ppj::rjson_parse(
res.body.data(), ppj::create_consumer_response_handler());
BOOST_REQUIRE_EQUAL(res_data.instance_id, "test_consumer");
member_id = res_data.instance_id;
BOOST_REQUIRE_EQUAL(
res_data.base_uri,
fmt::format(
"http://{}:{}/consumers/{}/instances/{}",
"127.0.0.1",
"8082",
group_id(),
member_id()));
BOOST_REQUIRE_EQUAL(
res.headers.at(boost::beast::http::field::content_type),
to_header_value(ppj::serialization_format::v2));
}

info("Member id: {}", member_id);

{
info("Subscribe consumer");
ss::sstring req_body(R"(
{
"topics": [
"t"
]
})");
iobuf req_body_buf;
req_body_buf.append(req_body.data(), req_body.size());
auto res = http_request(
client,
fmt::format(
"/consumers/{}/instances/{}/subscription", group_id(), member_id()),
std::move(req_body_buf),
boost::beast::http::verb::post,
ppj::serialization_format::v2,
ppj::serialization_format::v2);
BOOST_REQUIRE_EQUAL(
res.headers.result(), boost::beast::http::status::no_content);
}

{
info("Consume from topic");
auto res = http_request(
client,
fmt::format(
"/consumers/{}/instances/{}/records?timeout={}&max_bytes={}",
group_id(),
member_id(),
"1000",
"1000000"),
boost::beast::http::verb::get,
ppj::serialization_format::v2,
ppj::serialization_format::binary_v2);
BOOST_REQUIRE_EQUAL(
res.headers.result(), boost::beast::http::status::ok);
}
}
46 changes: 46 additions & 0 deletions src/v/pandaproxy/rest/test/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -386,3 +386,49 @@ FIXTURE_TEST(pandaproxy_fetch_binary_with_json2, pandaproxy_test_fixture) {
R"({"error_code":40801,"message":"Unable to serialize value of record at offset 1 in topic:partition t:0"})");
}
}

FIXTURE_TEST(pandaproxy_fetch_compressed, pandaproxy_test_fixture) {
wait_for_controller_leadership().get();

auto tp = model::topic_partition(model::topic("t"), model::partition_id(0));
auto ntp = make_default_ntp(tp.topic, tp.partition);
auto log_config = make_default_config();
{
using namespace storage;
storage::disk_log_builder builder(log_config);
storage::ntp_config ntp_cfg(
ntp,
log_config.base_dir,
nullptr,
get_next_partition_revision_id().get());
builder | start(std::move(ntp_cfg)) | add_segment(model::offset(0))
| add_random_batch(model::offset(0), 10, maybe_compress_batches::yes)
| stop();
}

add_topic(model::topic_namespace_view(ntp)).get();
auto shard = app.shard_table.local().shard_for(ntp);

tests::cooperative_spin_wait_with_timeout(10s, [this, shard, ntp] {
return app.partition_manager.invoke_on(
*shard, [ntp](cluster::partition_manager& mgr) {
auto partition = mgr.get(ntp);
return partition
&& partition->committed_offset() >= model::offset(1);
});
}).get();

auto client = make_proxy_client();
{
info("Fetching offset 0 from topic {}", tp);
auto res = http_request(
client,
"/topics/t/partitions/0/records?offset=0&max_bytes=1024&timeout=5000",
boost::beast::http::verb::get,
ppj::serialization_format::v2,
ppj::serialization_format::binary_v2);

BOOST_REQUIRE_EQUAL(
res.headers.result(), boost::beast::http::status::ok);
}
}
1 change: 1 addition & 0 deletions src/v/pandaproxy/schema_registry/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -1113,6 +1113,7 @@ struct consume_to_store {

ss::future<ss::stop_iteration> operator()(model::record_batch b) {
if (!b.header().attrs.is_control()) {
b = co_await storage::internal::decompress_batch(std::move(b));
auto base_offset = b.base_offset();
co_await model::for_each_record(
b, [this, base_offset](model::record& rec) {
Expand Down
26 changes: 17 additions & 9 deletions src/v/storage/parser_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,27 @@ model::record_batch_reader decompress_batch_consumer::end_of_stream() {
}

ss::future<model::record_batch> decompress_batch(model::record_batch&& b) {
return ss::futurize_invoke(decompress_batch_sync, std::move(b));
}

ss::future<model::record_batch> decompress_batch(const model::record_batch& b) {
return ss::futurize_invoke(maybe_decompress_batch_sync, b);
}

model::record_batch decompress_batch_sync(model::record_batch&& b) {
if (!b.compressed()) {
return ss::make_ready_future<model::record_batch>(std::move(b));
return std::move(b);
}
return decompress_batch(b);

return maybe_decompress_batch_sync(b);
}

ss::future<model::record_batch> decompress_batch(const model::record_batch& b) {
model::record_batch maybe_decompress_batch_sync(const model::record_batch& b) {
if (unlikely(!b.compressed())) {
return ss::make_exception_future<model::record_batch>(
std::runtime_error(fmt_with_ctx(
fmt::format,
"Asked to decompressed a non-compressed batch:{}",
b.header())));
throw std::runtime_error(fmt_with_ctx(
fmt::format,
"Asked to decompressed a non-compressed batch:{}",
b.header()));
}
iobuf body_buf = compression::compressor::uncompress(
b.data(), b.header().attrs.compression());
Expand All @@ -56,7 +64,7 @@ ss::future<model::record_batch> decompress_batch(const model::record_batch& b) {
reset_size_checksum_metadata(h, body_buf);
auto batch = model::record_batch(
h, std::move(body_buf), model::record_batch::tag_ctor_ng{});
return ss::make_ready_future<model::record_batch>(std::move(batch));
return batch;
}

compress_batch_consumer::compress_batch_consumer(
Expand Down
5 changes: 5 additions & 0 deletions src/v/storage/parser_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ class compress_batch_consumer {
ss::future<model::record_batch> decompress_batch(model::record_batch&&);
/// \brief batch decompression
ss::future<model::record_batch> decompress_batch(const model::record_batch&);
/// \brief synchronous batch decompression
model::record_batch decompress_batch_sync(model::record_batch&&);
/// \brief synchronous batch decompression
/// \throw std::runtime_error If provided batch is not compressed
model::record_batch maybe_decompress_batch_sync(const model::record_batch&);

/// \brief batch compression
ss::future<model::record_batch>
Expand Down