Skip to content

Commit

Permalink
Merge pull request #181 from mmaslankaprv/faust-fetch-assertion
Browse files Browse the repository at this point in the history
fixed assertion triggered when fetching multiple topics in one request
  • Loading branch information
mmaslankaprv authored Nov 26, 2020
2 parents ecda48b + ee87535 commit f54d86a
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 21 deletions.
2 changes: 1 addition & 1 deletion src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ configuration::configuration()
"heartbeats at the cost of a longer time to detect failures. "
"Default quota tracking window size in milliseconds",
required::no,
30'000ms)
300s)
, group_initial_rebalance_delay(
*this,
"group_initial_rebalance_delay",
Expand Down
6 changes: 2 additions & 4 deletions src/v/kafka/requests/fetch_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,6 @@ static ss::future<> fetch_topic_partitions(op_context& octx) {

octx.for_each_fetch_partition(
[&resp_it, &octx, &fetches](fetch_partition p) {
// we use gate as we may not wait for all the fetch results
return fetches.push_back(fetch_topic_partition(octx, p, resp_it++));
});

Expand All @@ -540,7 +539,7 @@ static ss::future<> fetch_topic_partitions(op_context& octx) {
fetch_reads_debounce_timeout, octx.request.max_wait_time));
});
});
} // namespace kafka
}

ss::future<response_ptr>
fetch_api::process(request_context&& rctx, ss::smp_service_group ssg) {
Expand Down Expand Up @@ -710,7 +709,6 @@ op_context::response_iterator::response_iterator(

void op_context::response_iterator::set(
fetch_response::partition_response&& response) {
// we are already done, for now ignore what we read
vassert(
response.id == _it->partition_response->id,
"Response and current partition ids have to be the same. Current "
Expand Down Expand Up @@ -752,7 +750,7 @@ void op_context::response_iterator::set(
}

op_context::response_iterator& op_context::response_iterator::operator++() {
_it->partition_response++;
_it++;
return *this;
}

Expand Down
132 changes: 116 additions & 16 deletions src/v/kafka/tests/fetch_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -253,49 +253,60 @@ FIXTURE_TEST(fetch_one, redpanda_thread_fixture) {
}
}

SEASTAR_THREAD_TEST_CASE(fetch_response_iterator_test) {
kafka::fetch_response response;

auto make_partition = [](ss::sstring topic) {
FIXTURE_TEST(fetch_response_iterator_test, redpanda_thread_fixture) {
static auto make_partition = [](ss::sstring topic) {
return kafka::fetch_response::partition(model::topic(std::move(topic)));
};

auto make_partition_response = [](int id) {
static auto make_partition_response = [](int id) {
kafka::fetch_response::partition_response resp;
resp.error = kafka::error_code::none;
resp.id = model::partition_id(id);
resp.last_stable_offset = model::offset(0);
return resp;
};

response.partitions.push_back(make_partition("tp-1"));
response.partitions.push_back(make_partition("tp-2"));
response.partitions.push_back(make_partition("tp-3"));

response.partitions[0].responses.push_back(make_partition_response(0));
response.partitions[0].responses.push_back(make_partition_response(1));
response.partitions[0].responses.push_back(make_partition_response(2));
auto make_test_fetch_response = []() {
kafka::fetch_response response;
response.partitions.push_back(make_partition("tp-1"));
response.partitions.push_back(make_partition("tp-2"));
response.partitions.push_back(make_partition("tp-3"));

response.partitions[1].responses.push_back(make_partition_response(0));
response.partitions[0].responses.push_back(make_partition_response(0));
response.partitions[0].responses.push_back(make_partition_response(1));
response.partitions[0].responses.push_back(make_partition_response(2));

response.partitions[2].responses.push_back(make_partition_response(0));
response.partitions[2].responses.push_back(make_partition_response(1));
response.partitions[1].responses.push_back(make_partition_response(0));

response.partitions[2].responses.push_back(make_partition_response(0));
response.partitions[2].responses.push_back(make_partition_response(1));
return response;
};
kafka::op_context ctx(
make_request_context(), ss::default_smp_service_group());
auto response = make_test_fetch_response();
ctx.response = make_test_fetch_response();
auto wrapper_iterator = ctx.response_begin();
int i = 0;

for (auto it = response.begin(); it != response.end(); ++it) {
if (i < 3) {
BOOST_REQUIRE_EQUAL(it->partition->name(), "tp-1");
BOOST_REQUIRE_EQUAL(it->partition_response->id(), i);

} else if (i == 3) {
BOOST_REQUIRE_EQUAL(it->partition->name(), "tp-2");
BOOST_REQUIRE_EQUAL(it->partition_response->id(), 0);
} else {
BOOST_REQUIRE_EQUAL(it->partition->name(), "tp-3");
BOOST_REQUIRE_EQUAL(it->partition_response->id(), i - 4);
}
BOOST_REQUIRE_EQUAL(
it->partition->name, wrapper_iterator->partition->name);
BOOST_REQUIRE_EQUAL(
wrapper_iterator->partition_response->id,
wrapper_iterator->partition_response->id);
++i;
++wrapper_iterator;
}
};

Expand Down Expand Up @@ -465,3 +476,92 @@ FIXTURE_TEST(fetch_one_debounce, redpanda_thread_fixture) {
BOOST_REQUIRE(resp.partitions[0].responses[0].record_set);
BOOST_REQUIRE(resp.partitions[0].responses[0].record_set->size_bytes() > 0);
}

FIXTURE_TEST(fetch_multi_topics, redpanda_thread_fixture) {
// create a topic partition with some data
model::topic topic_1("foo");
model::topic topic_2("bar");
model::offset zero(0);
wait_for_controller_leadership().get0();

add_topic(model::topic_namespace(model::ns("kafka"), topic_1), 6).get();
add_topic(model::topic_namespace(model::ns("kafka"), topic_2), 1).get();

std::vector<model::ntp> ntps = {};
// topic 1
for (int i = 0; i < 6; ++i) {
ntps.push_back(make_default_ntp(topic_1, model::partition_id(i)));
wait_for_partition_offset(ntps.back(), model::offset(0)).get0();
}
// topic 2
ntps.push_back(make_default_ntp(topic_2, model::partition_id(0)));
wait_for_partition_offset(ntps.back(), model::offset(0)).get0();

// request
kafka::fetch_request req;
req.max_bytes = std::numeric_limits<int32_t>::max();
req.min_bytes = 1;
req.max_wait_time = std::chrono::milliseconds(3000);
req.session_id = kafka::invalid_fetch_session_id;
req.topics = {
{
.name = topic_1,
.partitions = {},
},
{
.name = topic_2,
.partitions = {},
}};

for (auto& ntp : ntps) {
kafka::fetch_request::partition p;
p.id = model::partition_id(ntp.tp.partition);
p.log_start_offset = zero;
p.fetch_offset = zero;
p.partition_max_bytes = std::numeric_limits<int32_t>::max();
auto idx = ntp.tp.topic == topic_1 ? 0 : 1;
req.topics[idx].partitions.push_back(p);
}

auto client = make_kafka_client().get0();
client.connect().get();
// add date to all partitions
for (auto& ntp : ntps) {
auto shard = app.shard_table.local().shard_for(ntp);
auto r = app.partition_manager
.invoke_on(
*shard,
[ntp](cluster::partition_manager& mgr) {
auto partition = mgr.get(ntp);
auto batches = storage::test::make_random_batches(
model::offset(0), 5);
auto rdr = model::make_memory_record_batch_reader(
std::move(batches));
return partition->replicate(
std::move(rdr),
raft::replicate_options(
raft::consistency_level::quorum_ack));
})
.get0();
}

auto resp = client.dispatch(req, kafka::api_version(4)).get0();
client.stop().then([&client] { client.shutdown(); }).get();

BOOST_REQUIRE_EQUAL(resp.partitions.size(), 2);
BOOST_REQUIRE_EQUAL(resp.partitions[0].name, topic_1);
BOOST_REQUIRE_EQUAL(resp.partitions[1].name, topic_2);
BOOST_REQUIRE_EQUAL(resp.partitions[0].responses.size(), 6);
BOOST_REQUIRE_EQUAL(resp.partitions[1].responses.size(), 1);
size_t total_size = 0;
for (int i = 0; i < 6; ++i) {
BOOST_REQUIRE_EQUAL(
resp.partitions[0].responses[i].error, kafka::error_code::none);
BOOST_REQUIRE_EQUAL(
resp.partitions[0].responses[i].id, model::partition_id(i));
BOOST_REQUIRE(resp.partitions[0].responses[i].record_set);

total_size += resp.partitions[0].responses[i].record_set->size_bytes();
}
BOOST_REQUIRE_GT(total_size, 0);
}

0 comments on commit f54d86a

Please sign in to comment.