diff --git a/src/v/pandaproxy/rest/test/CMakeLists.txt b/src/v/pandaproxy/rest/test/CMakeLists.txt index 924dc18f24bff..da6c421693d5f 100644 --- a/src/v/pandaproxy/rest/test/CMakeLists.txt +++ b/src/v/pandaproxy/rest/test/CMakeLists.txt @@ -7,6 +7,6 @@ rp_test( produce.cc consumer_group.cc DEFINITIONS BOOST_TEST_DYN_LINK - LIBRARIES v::seastar_testing_main v::application v::http + LIBRARIES v::seastar_testing_main v::application v::http v::storage_test_utils LABELS pandaproxy ) diff --git a/src/v/pandaproxy/rest/test/consumer_group.cc b/src/v/pandaproxy/rest/test/consumer_group.cc index befe9757b44b0..b3c23584c8c93 100644 --- a/src/v/pandaproxy/rest/test/consumer_group.cc +++ b/src/v/pandaproxy/rest/test/consumer_group.cc @@ -459,3 +459,124 @@ FIXTURE_TEST( R"({"error_code":40801,"message":"Unable to serialize value of record at offset 1 in topic:partition t:0"})"); } } + +FIXTURE_TEST( + pandaproxy_consume_group_compressed_fetch, pandaproxy_test_fixture) { + using namespace std::chrono_literals; + info("Waiting for leadership"); + wait_for_controller_leadership().get(); + + auto tp = model::topic_partition(model::topic("t"), model::partition_id(0)); + auto ntp = make_default_ntp(tp.topic, tp.partition); + auto log_config = make_default_config(); + { + using namespace storage; + storage::disk_log_builder builder(log_config); + storage::ntp_config ntp_cfg( + ntp, + log_config.base_dir, + nullptr, + get_next_partition_revision_id().get()); + builder | start(std::move(ntp_cfg)) | add_segment(model::offset(0)) + | add_random_batch(model::offset(0), 10, maybe_compress_batches::yes) + | stop(); + } + + add_topic(model::topic_namespace_view(ntp)).get(); + auto shard = app.shard_table.local().shard_for(ntp); + + tests::cooperative_spin_wait_with_timeout(10s, [this, shard, ntp] { + return app.partition_manager.invoke_on( + *shard, [ntp](cluster::partition_manager& mgr) { + auto partition = mgr.get(ntp); + return partition + && partition->committed_offset() >= model::offset(1); + }); + }).get(); + + info("Connecting client"); + auto client = make_proxy_client(); + + kafka::group_id group_id{"test_group"}; + kafka::member_id member_id{kafka::no_member}; + + { + info("Create consumer"); + ss::sstring req_body(R"({ + "name": "test_consumer", + "format": "json", + "auto.offset.reset": "earliest", + "auto.commit.enable": "false", + "fetch.min.bytes": "1", + "consumer.request.timeout.ms": "10000" +})"); + iobuf req_body_buf; + req_body_buf.append(req_body.data(), req_body.size()); + auto res = http_request( + client, + fmt::format("/consumers/{}", group_id()), + std::move(req_body_buf), + boost::beast::http::verb::post, + ppj::serialization_format::v2, + ppj::serialization_format::v2); + BOOST_REQUIRE_EQUAL( + res.headers.result(), boost::beast::http::status::ok); + + auto res_data = ppj::rjson_parse( + res.body.data(), ppj::create_consumer_response_handler()); + BOOST_REQUIRE_EQUAL(res_data.instance_id, "test_consumer"); + member_id = res_data.instance_id; + BOOST_REQUIRE_EQUAL( + res_data.base_uri, + fmt::format( + "http://{}:{}/consumers/{}/instances/{}", + "127.0.0.1", + "8082", + group_id(), + member_id())); + BOOST_REQUIRE_EQUAL( + res.headers.at(boost::beast::http::field::content_type), + to_header_value(ppj::serialization_format::v2)); + } + + info("Member id: {}", member_id); + + { + info("Subscribe consumer"); + ss::sstring req_body(R"( +{ + "topics": [ + "t" + ] +})"); + iobuf req_body_buf; + req_body_buf.append(req_body.data(), req_body.size()); + auto res = http_request( + client, + fmt::format( + "/consumers/{}/instances/{}/subscription", group_id(), member_id()), + std::move(req_body_buf), + boost::beast::http::verb::post, + ppj::serialization_format::v2, + ppj::serialization_format::v2); + BOOST_REQUIRE_EQUAL( + res.headers.result(), boost::beast::http::status::no_content); + } + + { + info("Consume from topic"); + auto res = http_request( + client, + fmt::format( + "/consumers/{}/instances/{}/records?timeout={}&max_bytes={}", + group_id(), + member_id(), + "1000", + "1000000"), + boost::beast::http::verb::get, + ppj::serialization_format::v2, + ppj::serialization_format::binary_v2); + BOOST_REQUIRE_EQUAL( + res.headers.result(), boost::beast::http::status::ok); + } +} diff --git a/src/v/pandaproxy/rest/test/fetch.cc b/src/v/pandaproxy/rest/test/fetch.cc index f7bdc5ca73bbe..12b519a6995c5 100644 --- a/src/v/pandaproxy/rest/test/fetch.cc +++ b/src/v/pandaproxy/rest/test/fetch.cc @@ -386,3 +386,49 @@ FIXTURE_TEST(pandaproxy_fetch_binary_with_json2, pandaproxy_test_fixture) { R"({"error_code":40801,"message":"Unable to serialize value of record at offset 1 in topic:partition t:0"})"); } } + +FIXTURE_TEST(pandaproxy_fetch_compressed, pandaproxy_test_fixture) { + wait_for_controller_leadership().get(); + + auto tp = model::topic_partition(model::topic("t"), model::partition_id(0)); + auto ntp = make_default_ntp(tp.topic, tp.partition); + auto log_config = make_default_config(); + { + using namespace storage; + storage::disk_log_builder builder(log_config); + storage::ntp_config ntp_cfg( + ntp, + log_config.base_dir, + nullptr, + get_next_partition_revision_id().get()); + builder | start(std::move(ntp_cfg)) | add_segment(model::offset(0)) + | add_random_batch(model::offset(0), 10, maybe_compress_batches::yes) + | stop(); + } + + add_topic(model::topic_namespace_view(ntp)).get(); + auto shard = app.shard_table.local().shard_for(ntp); + + tests::cooperative_spin_wait_with_timeout(10s, [this, shard, ntp] { + return app.partition_manager.invoke_on( + *shard, [ntp](cluster::partition_manager& mgr) { + auto partition = mgr.get(ntp); + return partition + && partition->committed_offset() >= model::offset(1); + }); + }).get(); + + auto client = make_proxy_client(); + { + info("Fetching offset 0 from topic {}", tp); + auto res = http_request( + client, + "/topics/t/partitions/0/records?offset=0&max_bytes=1024&timeout=5000", + boost::beast::http::verb::get, + ppj::serialization_format::v2, + ppj::serialization_format::binary_v2); + + BOOST_REQUIRE_EQUAL( + res.headers.result(), boost::beast::http::status::ok); + } +}