Skip to content

Commit

Permalink
Merge pull request #10178 from vbotbuildovich/backport-3062-8546-v22.…
Browse files Browse the repository at this point in the history
…2.x-194

[v22.2.x] Support compressed batches in kafka client
  • Loading branch information
michael-redpanda authored Apr 19, 2023
2 parents cf6ebac + bde9552 commit 7984cf2
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 24 deletions.
35 changes: 21 additions & 14 deletions src/v/pandaproxy/json/requests/fetch.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "pandaproxy/json/rjson_util.h"
#include "pandaproxy/json/types.h"
#include "seastarx.h"
#include "storage/parser_utils.h"

#include <seastar/core/sstring.hh>

Expand Down Expand Up @@ -121,20 +122,26 @@ class rjson_serialize_impl<kafka::fetch_response> {
auto rjs = rjson_serialize_impl<model::record>(
_fmt, tpv, adapter.batch->base_offset());

adapter.batch->for_each_record(
[&rjs, &w](model::record record) {
auto offset = record.offset_delta() + rjs.base_offset()();
if (!rjs(w, std::move(record))) {
throw serialize_error(
make_error_code(error_code::unable_to_serialize),
fmt::format(
"Unable to serialize record at offset {} in "
"topic:partition {}:{}",
offset,
rjs.tpv().topic(),
rjs.tpv().partition()));
}
});
auto batch = std::move(*adapter.batch);

if (batch.compressed()) {
batch = storage::internal::maybe_decompress_batch_sync(
batch);
}

batch.for_each_record([&rjs, &w](model::record record) {
auto offset = record.offset_delta() + rjs.base_offset()();
if (!rjs(w, std::move(record))) {
throw serialize_error(
make_error_code(error_code::unable_to_serialize),
fmt::format(
"Unable to serialize record at offset {} in "
"topic:partition {}:{}",
offset,
rjs.tpv().topic(),
rjs.tpv().partition()));
}
});
}
}
w.EndArray();
Expand Down
2 changes: 1 addition & 1 deletion src/v/pandaproxy/rest/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ rp_test(
produce.cc
consumer_group.cc
DEFINITIONS BOOST_TEST_DYN_LINK
LIBRARIES v::seastar_testing_main v::application v::http
LIBRARIES v::seastar_testing_main v::application v::http v::storage_test_utils
LABELS pandaproxy
)
121 changes: 121 additions & 0 deletions src/v/pandaproxy/rest/test/consumer_group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -459,3 +459,124 @@ FIXTURE_TEST(
R"({"error_code":40801,"message":"Unable to serialize value of record at offset 1 in topic:partition t:0"})");
}
}

FIXTURE_TEST(
pandaproxy_consume_group_compressed_fetch, pandaproxy_test_fixture) {
using namespace std::chrono_literals;
info("Waiting for leadership");
wait_for_controller_leadership().get();

auto tp = model::topic_partition(model::topic("t"), model::partition_id(0));
auto ntp = make_default_ntp(tp.topic, tp.partition);
auto log_config = make_default_config();
{
using namespace storage;
storage::disk_log_builder builder(log_config);
storage::ntp_config ntp_cfg(
ntp,
log_config.base_dir,
nullptr,
get_next_partition_revision_id().get());
builder | start(std::move(ntp_cfg)) | add_segment(model::offset(0))
| add_random_batch(model::offset(0), 10, maybe_compress_batches::yes)
| stop();
}

add_topic(model::topic_namespace_view(ntp)).get();
auto shard = app.shard_table.local().shard_for(ntp);

tests::cooperative_spin_wait_with_timeout(10s, [this, shard, ntp] {
return app.partition_manager.invoke_on(
*shard, [ntp](cluster::partition_manager& mgr) {
auto partition = mgr.get(ntp);
return partition
&& partition->committed_offset() >= model::offset(1);
});
}).get();

info("Connecting client");
auto client = make_proxy_client();

kafka::group_id group_id{"test_group"};
kafka::member_id member_id{kafka::no_member};

{
info("Create consumer");
ss::sstring req_body(R"({
"name": "test_consumer",
"format": "json",
"auto.offset.reset": "earliest",
"auto.commit.enable": "false",
"fetch.min.bytes": "1",
"consumer.request.timeout.ms": "10000"
})");
iobuf req_body_buf;
req_body_buf.append(req_body.data(), req_body.size());
auto res = http_request(
client,
fmt::format("/consumers/{}", group_id()),
std::move(req_body_buf),
boost::beast::http::verb::post,
ppj::serialization_format::v2,
ppj::serialization_format::v2);
BOOST_REQUIRE_EQUAL(
res.headers.result(), boost::beast::http::status::ok);

auto res_data = ppj::rjson_parse(
res.body.data(), ppj::create_consumer_response_handler());
BOOST_REQUIRE_EQUAL(res_data.instance_id, "test_consumer");
member_id = res_data.instance_id;
BOOST_REQUIRE_EQUAL(
res_data.base_uri,
fmt::format(
"http://{}:{}/consumers/{}/instances/{}",
"127.0.0.1",
"8082",
group_id(),
member_id()));
BOOST_REQUIRE_EQUAL(
res.headers.at(boost::beast::http::field::content_type),
to_header_value(ppj::serialization_format::v2));
}

info("Member id: {}", member_id);

{
info("Subscribe consumer");
ss::sstring req_body(R"(
{
"topics": [
"t"
]
})");
iobuf req_body_buf;
req_body_buf.append(req_body.data(), req_body.size());
auto res = http_request(
client,
fmt::format(
"/consumers/{}/instances/{}/subscription", group_id(), member_id()),
std::move(req_body_buf),
boost::beast::http::verb::post,
ppj::serialization_format::v2,
ppj::serialization_format::v2);
BOOST_REQUIRE_EQUAL(
res.headers.result(), boost::beast::http::status::no_content);
}

{
info("Consume from topic");
auto res = http_request(
client,
fmt::format(
"/consumers/{}/instances/{}/records?timeout={}&max_bytes={}",
group_id(),
member_id(),
"1000",
"1000000"),
boost::beast::http::verb::get,
ppj::serialization_format::v2,
ppj::serialization_format::binary_v2);
BOOST_REQUIRE_EQUAL(
res.headers.result(), boost::beast::http::status::ok);
}
}
46 changes: 46 additions & 0 deletions src/v/pandaproxy/rest/test/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -386,3 +386,49 @@ FIXTURE_TEST(pandaproxy_fetch_binary_with_json2, pandaproxy_test_fixture) {
R"({"error_code":40801,"message":"Unable to serialize value of record at offset 1 in topic:partition t:0"})");
}
}

FIXTURE_TEST(pandaproxy_fetch_compressed, pandaproxy_test_fixture) {
wait_for_controller_leadership().get();

auto tp = model::topic_partition(model::topic("t"), model::partition_id(0));
auto ntp = make_default_ntp(tp.topic, tp.partition);
auto log_config = make_default_config();
{
using namespace storage;
storage::disk_log_builder builder(log_config);
storage::ntp_config ntp_cfg(
ntp,
log_config.base_dir,
nullptr,
get_next_partition_revision_id().get());
builder | start(std::move(ntp_cfg)) | add_segment(model::offset(0))
| add_random_batch(model::offset(0), 10, maybe_compress_batches::yes)
| stop();
}

add_topic(model::topic_namespace_view(ntp)).get();
auto shard = app.shard_table.local().shard_for(ntp);

tests::cooperative_spin_wait_with_timeout(10s, [this, shard, ntp] {
return app.partition_manager.invoke_on(
*shard, [ntp](cluster::partition_manager& mgr) {
auto partition = mgr.get(ntp);
return partition
&& partition->committed_offset() >= model::offset(1);
});
}).get();

auto client = make_proxy_client();
{
info("Fetching offset 0 from topic {}", tp);
auto res = http_request(
client,
"/topics/t/partitions/0/records?offset=0&max_bytes=1024&timeout=5000",
boost::beast::http::verb::get,
ppj::serialization_format::v2,
ppj::serialization_format::binary_v2);

BOOST_REQUIRE_EQUAL(
res.headers.result(), boost::beast::http::status::ok);
}
}
1 change: 1 addition & 0 deletions src/v/pandaproxy/schema_registry/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -1113,6 +1113,7 @@ struct consume_to_store {

ss::future<ss::stop_iteration> operator()(model::record_batch b) {
if (!b.header().attrs.is_control()) {
b = co_await storage::internal::decompress_batch(std::move(b));
auto base_offset = b.base_offset();
co_await model::for_each_record(
b, [this, base_offset](model::record& rec) {
Expand Down
26 changes: 17 additions & 9 deletions src/v/storage/parser_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,27 @@ model::record_batch_reader decompress_batch_consumer::end_of_stream() {
}

ss::future<model::record_batch> decompress_batch(model::record_batch&& b) {
return ss::futurize_invoke(decompress_batch_sync, std::move(b));
}

ss::future<model::record_batch> decompress_batch(const model::record_batch& b) {
return ss::futurize_invoke(maybe_decompress_batch_sync, b);
}

model::record_batch decompress_batch_sync(model::record_batch&& b) {
if (!b.compressed()) {
return ss::make_ready_future<model::record_batch>(std::move(b));
return std::move(b);
}
return decompress_batch(b);

return maybe_decompress_batch_sync(b);
}

ss::future<model::record_batch> decompress_batch(const model::record_batch& b) {
model::record_batch maybe_decompress_batch_sync(const model::record_batch& b) {
if (unlikely(!b.compressed())) {
return ss::make_exception_future<model::record_batch>(
std::runtime_error(fmt_with_ctx(
fmt::format,
"Asked to decompressed a non-compressed batch:{}",
b.header())));
throw std::runtime_error(fmt_with_ctx(
fmt::format,
"Asked to decompressed a non-compressed batch:{}",
b.header()));
}
iobuf body_buf = compression::compressor::uncompress(
b.data(), b.header().attrs.compression());
Expand All @@ -56,7 +64,7 @@ ss::future<model::record_batch> decompress_batch(const model::record_batch& b) {
reset_size_checksum_metadata(h, body_buf);
auto batch = model::record_batch(
h, std::move(body_buf), model::record_batch::tag_ctor_ng{});
return ss::make_ready_future<model::record_batch>(std::move(batch));
return batch;
}

compress_batch_consumer::compress_batch_consumer(
Expand Down
5 changes: 5 additions & 0 deletions src/v/storage/parser_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ class compress_batch_consumer {
ss::future<model::record_batch> decompress_batch(model::record_batch&&);
/// \brief batch decompression
ss::future<model::record_batch> decompress_batch(const model::record_batch&);
/// \brief synchronous batch decompression
model::record_batch decompress_batch_sync(model::record_batch&&);
/// \brief synchronous batch decompression
/// \throw std::runtime_error If provided batch is not compressed
model::record_batch maybe_decompress_batch_sync(const model::record_batch&);

/// \brief batch compression
ss::future<model::record_batch>
Expand Down

0 comments on commit 7984cf2

Please sign in to comment.