Skip to content

Commit

Permalink
pp/tests: Added test to consume compressed batch
Browse files Browse the repository at this point in the history
Added two tests, one for direct partition fetch and an additional one
that will use consumer groups.

Signed-off-by: Michael Boquard <[email protected]>
  • Loading branch information
michael-redpanda committed Apr 18, 2023
1 parent aa69b3e commit 8bc0c58
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/v/pandaproxy/rest/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ rp_test(
produce.cc
consumer_group.cc
DEFINITIONS BOOST_TEST_DYN_LINK
LIBRARIES v::seastar_testing_main v::application v::http
LIBRARIES v::seastar_testing_main v::application v::http v::storage_test_utils
LABELS pandaproxy
)
121 changes: 121 additions & 0 deletions src/v/pandaproxy/rest/test/consumer_group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -459,3 +459,124 @@ FIXTURE_TEST(
R"({"error_code":40801,"message":"Unable to serialize value of record at offset 1 in topic:partition t:0"})");
}
}

FIXTURE_TEST(
pandaproxy_consume_group_compressed_fetch, pandaproxy_test_fixture) {
using namespace std::chrono_literals;
info("Waiting for leadership");
wait_for_controller_leadership().get();

auto tp = model::topic_partition(model::topic("t"), model::partition_id(0));
auto ntp = make_default_ntp(tp.topic, tp.partition);
auto log_config = make_default_config();
{
using namespace storage;
storage::disk_log_builder builder(log_config);
storage::ntp_config ntp_cfg(
ntp,
log_config.base_dir,
nullptr,
get_next_partition_revision_id().get());
builder | start(std::move(ntp_cfg)) | add_segment(model::offset(0))
| add_random_batch(model::offset(0), 10, maybe_compress_batches::yes)
| stop();
}

add_topic(model::topic_namespace_view(ntp)).get();
auto shard = app.shard_table.local().shard_for(ntp);

tests::cooperative_spin_wait_with_timeout(10s, [this, shard, ntp] {
return app.partition_manager.invoke_on(
*shard, [ntp](cluster::partition_manager& mgr) {
auto partition = mgr.get(ntp);
return partition
&& partition->committed_offset() >= model::offset(1);
});
}).get();

info("Connecting client");
auto client = make_proxy_client();

kafka::group_id group_id{"test_group"};
kafka::member_id member_id{kafka::no_member};

{
info("Create consumer");
ss::sstring req_body(R"({
"name": "test_consumer",
"format": "json",
"auto.offset.reset": "earliest",
"auto.commit.enable": "false",
"fetch.min.bytes": "1",
"consumer.request.timeout.ms": "10000"
})");
iobuf req_body_buf;
req_body_buf.append(req_body.data(), req_body.size());
auto res = http_request(
client,
fmt::format("/consumers/{}", group_id()),
std::move(req_body_buf),
boost::beast::http::verb::post,
ppj::serialization_format::v2,
ppj::serialization_format::v2);
BOOST_REQUIRE_EQUAL(
res.headers.result(), boost::beast::http::status::ok);

auto res_data = ppj::rjson_parse(
res.body.data(), ppj::create_consumer_response_handler());
BOOST_REQUIRE_EQUAL(res_data.instance_id, "test_consumer");
member_id = res_data.instance_id;
BOOST_REQUIRE_EQUAL(
res_data.base_uri,
fmt::format(
"http://{}:{}/consumers/{}/instances/{}",
"127.0.0.1",
"8082",
group_id(),
member_id()));
BOOST_REQUIRE_EQUAL(
res.headers.at(boost::beast::http::field::content_type),
to_header_value(ppj::serialization_format::v2));
}

info("Member id: {}", member_id);

{
info("Subscribe consumer");
ss::sstring req_body(R"(
{
"topics": [
"t"
]
})");
iobuf req_body_buf;
req_body_buf.append(req_body.data(), req_body.size());
auto res = http_request(
client,
fmt::format(
"/consumers/{}/instances/{}/subscription", group_id(), member_id()),
std::move(req_body_buf),
boost::beast::http::verb::post,
ppj::serialization_format::v2,
ppj::serialization_format::v2);
BOOST_REQUIRE_EQUAL(
res.headers.result(), boost::beast::http::status::no_content);
}

{
info("Consume from topic");
auto res = http_request(
client,
fmt::format(
"/consumers/{}/instances/{}/records?timeout={}&max_bytes={}",
group_id(),
member_id(),
"1000",
"1000000"),
boost::beast::http::verb::get,
ppj::serialization_format::v2,
ppj::serialization_format::binary_v2);
BOOST_REQUIRE_EQUAL(
res.headers.result(), boost::beast::http::status::ok);
}
}
46 changes: 46 additions & 0 deletions src/v/pandaproxy/rest/test/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -386,3 +386,49 @@ FIXTURE_TEST(pandaproxy_fetch_binary_with_json2, pandaproxy_test_fixture) {
R"({"error_code":40801,"message":"Unable to serialize value of record at offset 1 in topic:partition t:0"})");
}
}

FIXTURE_TEST(pandaproxy_fetch_compressed, pandaproxy_test_fixture) {
wait_for_controller_leadership().get();

auto tp = model::topic_partition(model::topic("t"), model::partition_id(0));
auto ntp = make_default_ntp(tp.topic, tp.partition);
auto log_config = make_default_config();
{
using namespace storage;
storage::disk_log_builder builder(log_config);
storage::ntp_config ntp_cfg(
ntp,
log_config.base_dir,
nullptr,
get_next_partition_revision_id().get());
builder | start(std::move(ntp_cfg)) | add_segment(model::offset(0))
| add_random_batch(model::offset(0), 10, maybe_compress_batches::yes)
| stop();
}

add_topic(model::topic_namespace_view(ntp)).get();
auto shard = app.shard_table.local().shard_for(ntp);

tests::cooperative_spin_wait_with_timeout(10s, [this, shard, ntp] {
return app.partition_manager.invoke_on(
*shard, [ntp](cluster::partition_manager& mgr) {
auto partition = mgr.get(ntp);
return partition
&& partition->committed_offset() >= model::offset(1);
});
}).get();

auto client = make_proxy_client();
{
info("Fetching offset 0 from topic {}", tp);
auto res = http_request(
client,
"/topics/t/partitions/0/records?offset=0&max_bytes=1024&timeout=5000",
boost::beast::http::verb::get,
ppj::serialization_format::v2,
ppj::serialization_format::binary_v2);

BOOST_REQUIRE_EQUAL(
res.headers.result(), boost::beast::http::status::ok);
}
}

0 comments on commit 8bc0c58

Please sign in to comment.