diff --git a/src/v/kafka/server/group_metadata_migration.cc b/src/v/kafka/server/group_metadata_migration.cc index 89159df76f71b..522c0b590c914 100644 --- a/src/v/kafka/server/group_metadata_migration.cc +++ b/src/v/kafka/server/group_metadata_migration.cc @@ -526,36 +526,40 @@ ss::future<> group_metadata_migration::activate_feature(ss::abort_source& as) { vlog(mlog.info, "activating consumer offsets feature"); while (!feature_table().is_active(cluster::feature::consumer_offsets) && !as.abort_requested()) { - if (_controller.is_raft0_leader()) { - co_await feature_table().await_feature_preparing( - cluster::feature::consumer_offsets, as); - - auto err = co_await feature_manager().write_action( - cluster::feature_update_action{ - .feature_name = ss::sstring( - _controller.get_feature_table() - .local() - .get_state(cluster::feature::consumer_offsets) - .spec.name), - .action - = cluster::feature_update_action::action_t::complete_preparing, - }); + co_await do_activate_feature(as); + } +} - if ( - err - || !feature_table().is_active( - cluster::feature::consumer_offsets)) { - if (err) { - vlog( - mlog.info, - "error activating consumer offsets feature: {}", - err.message()); - } - co_await ss::sleep_abortable(default_timeout, as); +ss::future<> +group_metadata_migration::do_activate_feature(ss::abort_source& as) { + if (_controller.is_raft0_leader()) { + co_await feature_table().await_feature_preparing( + cluster::feature::consumer_offsets, as); + + auto err = co_await feature_manager().write_action( + cluster::feature_update_action{ + .feature_name = ss::sstring( + _controller.get_feature_table() + .local() + .get_state(cluster::feature::consumer_offsets) + .spec.name), + .action + = cluster::feature_update_action::action_t::complete_preparing, + }); + + if ( + err + || !feature_table().is_active(cluster::feature::consumer_offsets)) { + if (err) { + vlog( + mlog.info, + "error activating consumer offsets feature: {}", + err.message()); } - } else { co_await ss::sleep_abortable(default_timeout, as); } + } else { + co_await ss::sleep_abortable(default_timeout, as); } } @@ -680,23 +684,29 @@ ss::future<> group_metadata_migration::start(ss::abort_source& as) { co_return; } - // if no group topic is present just make feature active - if (!_controller.get_topics_state().local().contains( - model::kafka_group_nt, model::partition_id{0})) { - vlog( - mlog.info, - "kafka_internal/group topic does not exists, activating " - "consumer_offsets feature"); - try { - co_await activate_feature(as); - } catch (const ss::abort_requested_exception&) { - // ignore abort requested exception, we are shutting down - } + _sub = as.subscribe([this]() noexcept { _as.request_abort(); }); + if (!_sub) { + // Smbd finished redpanda co_return; } - // otherwise wait for feature to be preparing and execute migration - ssx::spawn_with_gate( - _background_gate, [this]() -> ss::future<> { return do_apply(); }); + + // Wait for feature to be preparing and execute migration + ssx::spawn_with_gate(_background_gate, [this]() -> ss::future<> { + while (!feature_table().is_active(cluster::feature::consumer_offsets) + && !_as.abort_requested()) { + if (!_controller.get_topics_state().local().contains( + model::kafka_group_nt, model::partition_id{0})) { + vlog( + mlog.info, + "kafka_internal/group topic does not exists, activating " + "consumer_offsets feature"); + co_await do_activate_feature(_as); + } else { + co_return co_await do_apply(); + } + } + }); + co_return; } diff --git a/src/v/kafka/server/group_metadata_migration.h b/src/v/kafka/server/group_metadata_migration.h index 8d7b898fa31b9..d656644c35285 100644 --- a/src/v/kafka/server/group_metadata_migration.h +++ b/src/v/kafka/server/group_metadata_migration.h @@ -51,6 +51,7 @@ struct group_metadata_migration { ss::future<> do_apply(); ss::future<> migrate_metadata(); ss::future<> activate_feature(ss::abort_source&); + ss::future<> do_activate_feature(ss::abort_source&); void dispatch_ntp_migration(model::ntp); @@ -62,6 +63,13 @@ struct group_metadata_migration { ss::sharded& _group_router; ss::gate _partitions_gate; ss::gate _background_gate; + + // We need to subscribe on stop_signal as to stop + // loop inside activate_feature. Because stop_signal + // does not wait background fiber and will dbe deleted + // before fiber will be stopped. + ss::optimized_optional _sub; + ss::abort_source _as; }; } // namespace kafka