Skip to content

Commit

Permalink
kafka/server: run activate_feature in background
Browse files Browse the repository at this point in the history
Ufter upgrading node in cluster to 22.1.x version from 22.11.x
upgraded node isn't controller leade, it enters group_metadata_migration::start
and hits the "kafka_internal/group topic does not exists, activating" path
this call waits for activate_feature,activate_feature loops until the feature is active,
but it cannot be activated because only the controller leader runs the feature_manager
logic for activating features, and the controller leader is a 21.11.x node
that doesn't have the code. The node remains in 'booting' state indefinitely

Fixes: redpanda-data#4469
  • Loading branch information
VadimPlh committed May 4, 2022
1 parent 5fbe5aa commit aa94f69
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 41 deletions.
92 changes: 51 additions & 41 deletions src/v/kafka/server/group_metadata_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -526,36 +526,40 @@ ss::future<> group_metadata_migration::activate_feature(ss::abort_source& as) {
vlog(mlog.info, "activating consumer offsets feature");
while (!feature_table().is_active(cluster::feature::consumer_offsets)
&& !as.abort_requested()) {
if (_controller.is_raft0_leader()) {
co_await feature_table().await_feature_preparing(
cluster::feature::consumer_offsets, as);

auto err = co_await feature_manager().write_action(
cluster::feature_update_action{
.feature_name = ss::sstring(
_controller.get_feature_table()
.local()
.get_state(cluster::feature::consumer_offsets)
.spec.name),
.action
= cluster::feature_update_action::action_t::complete_preparing,
});
co_await do_activate_feature(as);
}
}

if (
err
|| !feature_table().is_active(
cluster::feature::consumer_offsets)) {
if (err) {
vlog(
mlog.info,
"error activating consumer offsets feature: {}",
err.message());
}
co_await ss::sleep_abortable(default_timeout, as);
ss::future<>
group_metadata_migration::do_activate_feature(ss::abort_source& as) {
if (_controller.is_raft0_leader()) {
co_await feature_table().await_feature_preparing(
cluster::feature::consumer_offsets, as);

auto err = co_await feature_manager().write_action(
cluster::feature_update_action{
.feature_name = ss::sstring(
_controller.get_feature_table()
.local()
.get_state(cluster::feature::consumer_offsets)
.spec.name),
.action
= cluster::feature_update_action::action_t::complete_preparing,
});

if (
err
|| !feature_table().is_active(cluster::feature::consumer_offsets)) {
if (err) {
vlog(
mlog.info,
"error activating consumer offsets feature: {}",
err.message());
}
} else {
co_await ss::sleep_abortable(default_timeout, as);
}
} else {
co_await ss::sleep_abortable(default_timeout, as);
}
}

Expand Down Expand Up @@ -680,23 +684,29 @@ ss::future<> group_metadata_migration::start(ss::abort_source& as) {
co_return;
}

// if no group topic is present just make feature active
if (!_controller.get_topics_state().local().contains(
model::kafka_group_nt, model::partition_id{0})) {
vlog(
mlog.info,
"kafka_internal/group topic does not exists, activating "
"consumer_offsets feature");
try {
co_await activate_feature(as);
} catch (const ss::abort_requested_exception&) {
// ignore abort requested exception, we are shutting down
}
_sub = as.subscribe([this]() noexcept { _as.request_abort(); });
if (!_sub) {
// Smbd finished redpanda
co_return;
}
// otherwise wait for feature to be preparing and execute migration
ssx::spawn_with_gate(
_background_gate, [this]() -> ss::future<> { return do_apply(); });

// Wait for feature to be preparing and execute migration
ssx::spawn_with_gate(_background_gate, [this]() -> ss::future<> {
while (!feature_table().is_active(cluster::feature::consumer_offsets)
&& !_as.abort_requested()) {
if (!_controller.get_topics_state().local().contains(
model::kafka_group_nt, model::partition_id{0})) {
vlog(
mlog.info,
"kafka_internal/group topic does not exists, activating "
"consumer_offsets feature");
co_await do_activate_feature(_as);
} else {
co_return co_await do_apply();
}
}
});

co_return;
}

Expand Down
8 changes: 8 additions & 0 deletions src/v/kafka/server/group_metadata_migration.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ struct group_metadata_migration {
ss::future<> do_apply();
ss::future<> migrate_metadata();
ss::future<> activate_feature(ss::abort_source&);
ss::future<> do_activate_feature(ss::abort_source&);

void dispatch_ntp_migration(model::ntp);

Expand All @@ -62,6 +63,13 @@ struct group_metadata_migration {
ss::sharded<kafka::group_router>& _group_router;
ss::gate _partitions_gate;
ss::gate _background_gate;

// We need to subscribe on stop_signal as to stop
// loop inside activate_feature. Because stop_signal
// does not wait background fiber and will dbe deleted
// before fiber will be stopped.
ss::optimized_optional<ss::abort_source::subscription> _sub;
ss::abort_source _as;
};

} // namespace kafka

0 comments on commit aa94f69

Please sign in to comment.