From 1e9968e6606a30362de0da1183b4d9c54910f861 Mon Sep 17 00:00:00 2001 From: Michael Boquard Date: Mon, 17 Apr 2023 13:53:30 -0400 Subject: [PATCH 1/4] storage: Added synchronous record_batch decompress Added calls that will decompress record_batch's synchronously --- src/v/storage/parser_utils.cc | 26 +++++++++++++++++--------- src/v/storage/parser_utils.h | 5 +++++ 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/src/v/storage/parser_utils.cc b/src/v/storage/parser_utils.cc index c51b1333b99a..0723f9f9cdf2 100644 --- a/src/v/storage/parser_utils.cc +++ b/src/v/storage/parser_utils.cc @@ -34,19 +34,27 @@ model::record_batch_reader decompress_batch_consumer::end_of_stream() { } ss::future decompress_batch(model::record_batch&& b) { + return ss::futurize_invoke(decompress_batch_sync, std::move(b)); +} + +ss::future decompress_batch(const model::record_batch& b) { + return ss::futurize_invoke(maybe_decompress_batch_sync, b); +} + +model::record_batch decompress_batch_sync(model::record_batch&& b) { if (!b.compressed()) { - return ss::make_ready_future(std::move(b)); + return std::move(b); } - return decompress_batch(b); + + return maybe_decompress_batch_sync(b); } -ss::future decompress_batch(const model::record_batch& b) { +model::record_batch maybe_decompress_batch_sync(const model::record_batch& b) { if (unlikely(!b.compressed())) { - return ss::make_exception_future( - std::runtime_error(fmt_with_ctx( - fmt::format, - "Asked to decompressed a non-compressed batch:{}", - b.header()))); + throw std::runtime_error(fmt_with_ctx( + fmt::format, + "Asked to decompressed a non-compressed batch:{}", + b.header())); } iobuf body_buf = compression::compressor::uncompress( b.data(), b.header().attrs.compression()); @@ -56,7 +64,7 @@ ss::future decompress_batch(const model::record_batch& b) { reset_size_checksum_metadata(h, body_buf); auto batch = model::record_batch( h, std::move(body_buf), model::record_batch::tag_ctor_ng{}); - return ss::make_ready_future(std::move(batch)); + return batch; } compress_batch_consumer::compress_batch_consumer( diff --git a/src/v/storage/parser_utils.h b/src/v/storage/parser_utils.h index 46059bd6a81c..0cfff7761403 100644 --- a/src/v/storage/parser_utils.h +++ b/src/v/storage/parser_utils.h @@ -44,6 +44,11 @@ class compress_batch_consumer { ss::future decompress_batch(model::record_batch&&); /// \brief batch decompression ss::future decompress_batch(const model::record_batch&); +/// \brief synchronous batch decompression +model::record_batch decompress_batch_sync(model::record_batch&&); +/// \brief synchronous batch decompression +/// \throw std::runtime_error If provided batch is not compressed +model::record_batch maybe_decompress_batch_sync(const model::record_batch&); /// \brief batch compression ss::future From aa69b3ec7362daab335f9374bfb60b7f77e8584a Mon Sep 17 00:00:00 2001 From: Michael Boquard Date: Mon, 17 Apr 2023 14:12:27 -0400 Subject: [PATCH 2/4] pp: Decompressing compressed batches Added calls that will decompress record_batch's synchronously when fetched via kafka client. --- src/v/pandaproxy/json/requests/fetch.h | 35 +++++++++++++++----------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/src/v/pandaproxy/json/requests/fetch.h b/src/v/pandaproxy/json/requests/fetch.h index e8b260a2d1ae..50680093feb0 100644 --- a/src/v/pandaproxy/json/requests/fetch.h +++ b/src/v/pandaproxy/json/requests/fetch.h @@ -26,6 +26,7 @@ #include "pandaproxy/json/rjson_util.h" #include "pandaproxy/json/types.h" #include "seastarx.h" +#include "storage/parser_utils.h" #include @@ -121,20 +122,26 @@ class rjson_serialize_impl { auto rjs = rjson_serialize_impl( _fmt, tpv, adapter.batch->base_offset()); - adapter.batch->for_each_record( - [&rjs, &w](model::record record) { - auto offset = record.offset_delta() + rjs.base_offset()(); - if (!rjs(w, std::move(record))) { - throw serialize_error( - make_error_code(error_code::unable_to_serialize), - fmt::format( - "Unable to serialize record at offset {} in " - "topic:partition {}:{}", - offset, - rjs.tpv().topic(), - rjs.tpv().partition())); - } - }); + auto batch = std::move(*adapter.batch); + + if (batch.compressed()) { + batch = storage::internal::maybe_decompress_batch_sync( + batch); + } + + batch.for_each_record([&rjs, &w](model::record record) { + auto offset = record.offset_delta() + rjs.base_offset()(); + if (!rjs(w, std::move(record))) { + throw serialize_error( + make_error_code(error_code::unable_to_serialize), + fmt::format( + "Unable to serialize record at offset {} in " + "topic:partition {}:{}", + offset, + rjs.tpv().topic(), + rjs.tpv().partition())); + } + }); } } w.EndArray(); From 09deb8610470aa8998c96ef6797292a7664718dd Mon Sep 17 00:00:00 2001 From: Michael Boquard Date: Tue, 11 Apr 2023 15:46:59 -0400 Subject: [PATCH 3/4] pp/tests: Added test to consume compressed batch Added two tests, one for direct partition fetch and an additional one that will use consumer groups. Signed-off-by: Michael Boquard --- src/v/pandaproxy/rest/test/CMakeLists.txt | 2 +- src/v/pandaproxy/rest/test/consumer_group.cc | 121 +++++++++++++++++++ src/v/pandaproxy/rest/test/fetch.cc | 46 +++++++ 3 files changed, 168 insertions(+), 1 deletion(-) diff --git a/src/v/pandaproxy/rest/test/CMakeLists.txt b/src/v/pandaproxy/rest/test/CMakeLists.txt index 924dc18f24bf..da6c421693d5 100644 --- a/src/v/pandaproxy/rest/test/CMakeLists.txt +++ b/src/v/pandaproxy/rest/test/CMakeLists.txt @@ -7,6 +7,6 @@ rp_test( produce.cc consumer_group.cc DEFINITIONS BOOST_TEST_DYN_LINK - LIBRARIES v::seastar_testing_main v::application v::http + LIBRARIES v::seastar_testing_main v::application v::http v::storage_test_utils LABELS pandaproxy ) diff --git a/src/v/pandaproxy/rest/test/consumer_group.cc b/src/v/pandaproxy/rest/test/consumer_group.cc index befe9757b44b..b3c23584c8c9 100644 --- a/src/v/pandaproxy/rest/test/consumer_group.cc +++ b/src/v/pandaproxy/rest/test/consumer_group.cc @@ -459,3 +459,124 @@ FIXTURE_TEST( R"({"error_code":40801,"message":"Unable to serialize value of record at offset 1 in topic:partition t:0"})"); } } + +FIXTURE_TEST( + pandaproxy_consume_group_compressed_fetch, pandaproxy_test_fixture) { + using namespace std::chrono_literals; + info("Waiting for leadership"); + wait_for_controller_leadership().get(); + + auto tp = model::topic_partition(model::topic("t"), model::partition_id(0)); + auto ntp = make_default_ntp(tp.topic, tp.partition); + auto log_config = make_default_config(); + { + using namespace storage; + storage::disk_log_builder builder(log_config); + storage::ntp_config ntp_cfg( + ntp, + log_config.base_dir, + nullptr, + get_next_partition_revision_id().get()); + builder | start(std::move(ntp_cfg)) | add_segment(model::offset(0)) + | add_random_batch(model::offset(0), 10, maybe_compress_batches::yes) + | stop(); + } + + add_topic(model::topic_namespace_view(ntp)).get(); + auto shard = app.shard_table.local().shard_for(ntp); + + tests::cooperative_spin_wait_with_timeout(10s, [this, shard, ntp] { + return app.partition_manager.invoke_on( + *shard, [ntp](cluster::partition_manager& mgr) { + auto partition = mgr.get(ntp); + return partition + && partition->committed_offset() >= model::offset(1); + }); + }).get(); + + info("Connecting client"); + auto client = make_proxy_client(); + + kafka::group_id group_id{"test_group"}; + kafka::member_id member_id{kafka::no_member}; + + { + info("Create consumer"); + ss::sstring req_body(R"({ + "name": "test_consumer", + "format": "json", + "auto.offset.reset": "earliest", + "auto.commit.enable": "false", + "fetch.min.bytes": "1", + "consumer.request.timeout.ms": "10000" +})"); + iobuf req_body_buf; + req_body_buf.append(req_body.data(), req_body.size()); + auto res = http_request( + client, + fmt::format("/consumers/{}", group_id()), + std::move(req_body_buf), + boost::beast::http::verb::post, + ppj::serialization_format::v2, + ppj::serialization_format::v2); + BOOST_REQUIRE_EQUAL( + res.headers.result(), boost::beast::http::status::ok); + + auto res_data = ppj::rjson_parse( + res.body.data(), ppj::create_consumer_response_handler()); + BOOST_REQUIRE_EQUAL(res_data.instance_id, "test_consumer"); + member_id = res_data.instance_id; + BOOST_REQUIRE_EQUAL( + res_data.base_uri, + fmt::format( + "http://{}:{}/consumers/{}/instances/{}", + "127.0.0.1", + "8082", + group_id(), + member_id())); + BOOST_REQUIRE_EQUAL( + res.headers.at(boost::beast::http::field::content_type), + to_header_value(ppj::serialization_format::v2)); + } + + info("Member id: {}", member_id); + + { + info("Subscribe consumer"); + ss::sstring req_body(R"( +{ + "topics": [ + "t" + ] +})"); + iobuf req_body_buf; + req_body_buf.append(req_body.data(), req_body.size()); + auto res = http_request( + client, + fmt::format( + "/consumers/{}/instances/{}/subscription", group_id(), member_id()), + std::move(req_body_buf), + boost::beast::http::verb::post, + ppj::serialization_format::v2, + ppj::serialization_format::v2); + BOOST_REQUIRE_EQUAL( + res.headers.result(), boost::beast::http::status::no_content); + } + + { + info("Consume from topic"); + auto res = http_request( + client, + fmt::format( + "/consumers/{}/instances/{}/records?timeout={}&max_bytes={}", + group_id(), + member_id(), + "1000", + "1000000"), + boost::beast::http::verb::get, + ppj::serialization_format::v2, + ppj::serialization_format::binary_v2); + BOOST_REQUIRE_EQUAL( + res.headers.result(), boost::beast::http::status::ok); + } +} diff --git a/src/v/pandaproxy/rest/test/fetch.cc b/src/v/pandaproxy/rest/test/fetch.cc index f7bdc5ca73bb..12b519a6995c 100644 --- a/src/v/pandaproxy/rest/test/fetch.cc +++ b/src/v/pandaproxy/rest/test/fetch.cc @@ -386,3 +386,49 @@ FIXTURE_TEST(pandaproxy_fetch_binary_with_json2, pandaproxy_test_fixture) { R"({"error_code":40801,"message":"Unable to serialize value of record at offset 1 in topic:partition t:0"})"); } } + +FIXTURE_TEST(pandaproxy_fetch_compressed, pandaproxy_test_fixture) { + wait_for_controller_leadership().get(); + + auto tp = model::topic_partition(model::topic("t"), model::partition_id(0)); + auto ntp = make_default_ntp(tp.topic, tp.partition); + auto log_config = make_default_config(); + { + using namespace storage; + storage::disk_log_builder builder(log_config); + storage::ntp_config ntp_cfg( + ntp, + log_config.base_dir, + nullptr, + get_next_partition_revision_id().get()); + builder | start(std::move(ntp_cfg)) | add_segment(model::offset(0)) + | add_random_batch(model::offset(0), 10, maybe_compress_batches::yes) + | stop(); + } + + add_topic(model::topic_namespace_view(ntp)).get(); + auto shard = app.shard_table.local().shard_for(ntp); + + tests::cooperative_spin_wait_with_timeout(10s, [this, shard, ntp] { + return app.partition_manager.invoke_on( + *shard, [ntp](cluster::partition_manager& mgr) { + auto partition = mgr.get(ntp); + return partition + && partition->committed_offset() >= model::offset(1); + }); + }).get(); + + auto client = make_proxy_client(); + { + info("Fetching offset 0 from topic {}", tp); + auto res = http_request( + client, + "/topics/t/partitions/0/records?offset=0&max_bytes=1024&timeout=5000", + boost::beast::http::verb::get, + ppj::serialization_format::v2, + ppj::serialization_format::binary_v2); + + BOOST_REQUIRE_EQUAL( + res.headers.result(), boost::beast::http::status::ok); + } +} From 641c437c1caf70fd4e15d0a99e13af47742f6a62 Mon Sep 17 00:00:00 2001 From: Michael Boquard Date: Tue, 18 Apr 2023 10:27:12 -0400 Subject: [PATCH 4/4] sr: Decompressing compressed batches in SR Fixes: #8546 Signed-off-by: Michael Boquard --- src/v/pandaproxy/schema_registry/storage.h | 1 + 1 file changed, 1 insertion(+) diff --git a/src/v/pandaproxy/schema_registry/storage.h b/src/v/pandaproxy/schema_registry/storage.h index 153a8cfd34b1..6727bd620d23 100644 --- a/src/v/pandaproxy/schema_registry/storage.h +++ b/src/v/pandaproxy/schema_registry/storage.h @@ -1112,6 +1112,7 @@ struct consume_to_store { ss::future operator()(model::record_batch b) { if (!b.header().attrs.is_control()) { + b = co_await storage::internal::decompress_batch(std::move(b)); auto base_offset = b.base_offset(); co_await model::for_each_record( b, [this, base_offset](model::record& rec) {