Skip to content

Commit

Permalink
k/group: abort recovery when the read is incomplete
Browse files Browse the repository at this point in the history
If storage layer will return early from the reader the group stm is not
fully recovered. In this case a consumer offset coordinator can not
start replying to client requests as the in memory consumer group
representation may be incomplete. Added a code preventing this situation
from happening.

When incomplete read happened the group manager requests the underlying
partition leader to step down. This will trigger another leader election
and will trigger leadership notification leading to group recovery.

Signed-off-by: Michał Maślanka <[email protected]>
  • Loading branch information
mmaslankaprv authored and ballard26 committed Apr 10, 2024
1 parent 61e7cb6 commit 33aeba2
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 3 deletions.
21 changes: 18 additions & 3 deletions src/v/kafka/server/group_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -849,16 +849,31 @@ ss::future<> group_manager::handle_partition_leader_change(
std::nullopt,
std::nullopt,
std::nullopt);

auto expected_to_read = model::prev_offset(
p->partition->high_watermark());
return p->partition->make_reader(reader_config)
.then([this, term, p, timeout](
.then([this, term, p, timeout, expected_to_read](
model::record_batch_reader reader) {
return std::move(reader)
.consume(
group_recovery_consumer(_serializer_factory(), p->as),
timeout)
.then([this, term, p](
.then([this, term, p, expected_to_read](
group_recovery_consumer_state state) {
if (state.last_read_offset < expected_to_read) {
vlog(
klog.error,
"error recovering group state from {}. "
"Expected to read up to {} but last offset "
"consumed is equal to {}",
p->partition->ntp(),
expected_to_read,
state.last_read_offset);
// force step down to allow other node to
// recover group
return p->partition->raft()->step_down(
"unable to recover group, short read");
}
// avoid trying to recover if we stopped the
// reader because an abort was requested
if (p->as.abort_requested()) {
Expand Down
1 change: 1 addition & 0 deletions src/v/kafka/server/group_recovery_consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ group_recovery_consumer::operator()(model::record_batch batch) {
if (_as.abort_requested()) {
co_return ss::stop_iteration::yes;
}
_state.last_read_offset = batch.last_offset();
if (batch.header().type == model::record_batch_type::raft_data) {
_batch_base_offset = batch.base_offset();
co_await model::for_each_record(batch, [this](model::record& r) {
Expand Down
2 changes: 2 additions & 0 deletions src/v/kafka/server/group_recovery_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include "kafka/server/group_metadata.h"
#include "kafka/server/group_stm.h"
#include "model/fundamental.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/future.hh>
Expand All @@ -30,6 +31,7 @@ struct group_recovery_consumer_state {
* retention feature is activated. see group::offset_metadata for more info.
*/
bool has_offset_retention_feature_fence{false};
model::offset last_read_offset;
};

class group_recovery_consumer {
Expand Down

0 comments on commit 33aeba2

Please sign in to comment.