From 0a9deb51216d66cadfadf9316f147a4ef8ef3260 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Wed, 8 May 2024 18:59:49 +0200 Subject: [PATCH 1/4] c/topic_table: bump _topics_map_revision in more places Partition balancer relies on _topics_map_revision checks to safely iterate over topic table collections with partition granularity (i.e. references to partition data and replica sets are stored and accessed across yield points). To make this safe, increment _topics_map_revision every time _topics, _updates_in_progress, _disabled_partitions or nested collections are modified in a way that invalidates references or iterators. (cherry picked from commit 30bbfb1f24f7e289c6a02956e9718bff4e2f6754) --- src/v/cluster/topic_table.cc | 38 +++++++++++++++++++++++++++++------- src/v/cluster/topic_table.h | 9 +++++++-- 2 files changed, 38 insertions(+), 9 deletions(-) diff --git a/src/v/cluster/topic_table.cc b/src/v/cluster/topic_table.cc index 5dd4b90bebad..878eef885225 100644 --- a/src/v/cluster/topic_table.cc +++ b/src/v/cluster/topic_table.cc @@ -347,6 +347,8 @@ topic_table::apply(finish_moving_partition_replicas_cmd cmd, model::offset o) { _updates_in_progress.erase(it); + _topics_map_revision++; + on_partition_move_finish(cmd.key, cmd.value); // notify backend about finished update @@ -421,6 +423,8 @@ topic_table::apply(cancel_moving_partition_replicas_cmd cmd, model::offset o) { current_assignment_it->replicas = in_progress_it->second.get_previous_replicas(); + _topics_map_revision++; + _pending_deltas.emplace_back( std::move(cmd.key), current_assignment_it->group, @@ -464,6 +468,11 @@ topic_table::apply(revert_cancel_partition_move_cmd cmd, model::offset o) { co_return errc::no_update_in_progress; } + auto p_meta_it = tp->second.partitions.find(ntp.tp.partition); + if (p_meta_it == tp->second.partitions.end()) { + co_return errc::partition_not_exists; + } + // revert replica set update current_assignment_it->replicas = in_progress_it->second.get_target_replicas(); @@ -474,11 +483,7 @@ topic_table::apply(revert_cancel_partition_move_cmd cmd, model::offset o) { current_assignment_it->replicas, }; - // update partition_meta object - auto p_meta_it = tp->second.partitions.find(ntp.tp.partition); - if (p_meta_it == tp->second.partitions.end()) { - co_return errc::partition_not_exists; - } + // update partition_meta object: // the cancellation was reverted and update went through, we must // update replicas_revisions. p_meta_it->second.replicas_revisions = update_replicas_revisions( @@ -490,6 +495,8 @@ topic_table::apply(revert_cancel_partition_move_cmd cmd, model::offset o) { /// Since the update is already finished we drop in_progress state _updates_in_progress.erase(in_progress_it); + _topics_map_revision++; + // notify backend about finished update _pending_deltas.emplace_back( ntp, @@ -675,6 +682,7 @@ topic_table::apply(set_topic_partitions_disabled_cmd cmd, model::offset o) { } } + _topics_map_revision++; notify_waiters(); co_return errc::success; @@ -1003,6 +1011,7 @@ class topic_table::snapshot_applier { disabled_partitions_t& _disabled_partitions; fragmented_vector& _pending_deltas; topic_table_probe& _probe; + model::revision_id& _topics_map_revision; model::revision_id _snap_revision; public: @@ -1011,6 +1020,7 @@ class topic_table::snapshot_applier { , _disabled_partitions(parent._disabled_partitions) , _pending_deltas(parent._pending_deltas) , _probe(parent._probe) + , _topics_map_revision(parent._topics_map_revision) , _snap_revision(snap_revision) {} void delete_ntp( @@ -1018,7 +1028,9 @@ class topic_table::snapshot_applier { auto ntp = model::ntp(ns_tp.ns, ns_tp.tp, p_as.id); vlog( clusterlog.trace, "deleting ntp {} not in controller snapshot", ntp); - _updates_in_progress.erase(ntp); + if (_updates_in_progress.erase(ntp)) { + _topics_map_revision++; + }; _pending_deltas.emplace_back( std::move(ntp), @@ -1040,7 +1052,9 @@ class topic_table::snapshot_applier { delete_ntp(ns_tp, p_as); co_await ss::coroutine::maybe_yield(); } - _disabled_partitions.erase(ns_tp); + if (_disabled_partitions.erase(ns_tp)) { + _topics_map_revision++; + }; _probe.handle_topic_deletion(ns_tp); // topic_metadata_item object is supposed to be removed from _topics by // the caller @@ -1055,6 +1069,9 @@ class topic_table::snapshot_applier { vlog(clusterlog.trace, "adding ntp {} from controller snapshot", ntp); size_t pending_deltas_start_idx = _pending_deltas.size(); + // we are going to modify md_item so increment the revision right away. + _topics_map_revision++; + const model::partition_id p_id = ntp.tp.partition; // 1. reconcile the _topics state (the md_item object) and generate @@ -1196,7 +1213,9 @@ class topic_table::snapshot_applier { topic_metadata_item ret{topic_metadata{topic.metadata, {}}}; if (topic.disabled_set) { _disabled_partitions[ns_tp] = *topic.disabled_set; + _topics_map_revision++; } + for (const auto& [p_id, partition] : topic.partitions) { auto ntp = model::ntp(ns_tp.ns, ns_tp.tp, p_id); add_ntp(ntp, topic, partition, ret, false); @@ -1235,6 +1254,7 @@ ss::future<> topic_table::apply_snapshot( // The topic was re-created, delete and add it anew. co_await applier.delete_topic(ns_tp, md_item); md_item = co_await applier.create_topic(ns_tp, topic_snapshot); + _topics_map_revision++; } else { // The topic was present in the previous set, now we need to // reconcile individual partitions. @@ -1252,10 +1272,12 @@ ss::future<> topic_table::apply_snapshot( old_disabled_set = std::exchange( _disabled_partitions[ns_tp], *topic_snapshot.disabled_set); + _topics_map_revision++; } else if (auto it = _disabled_partitions.find(ns_tp); it != _disabled_partitions.end()) { old_disabled_set = std::move(it->second); _disabled_partitions.erase(it); + _topics_map_revision++; } // 2. For each partition in the new set, reconcile assignments @@ -1293,6 +1315,7 @@ ss::future<> topic_table::apply_snapshot( if (!topic_snapshot.partitions.contains(as_it_copy->id)) { applier.delete_ntp(ns_tp, *as_it_copy); md_item.get_assignments().erase(as_it_copy); + _topics_map_revision++; } co_await ss::coroutine::maybe_yield(); } @@ -1704,6 +1727,7 @@ void topic_table::change_partition_replicas( auto previous_assignment = current_assignment.replicas; // replace partition replica set current_assignment.replicas = new_assignment; + _topics_map_revision++; // calculate delta for backend diff --git a/src/v/cluster/topic_table.h b/src/v/cluster/topic_table.h index 776fcd3d2dca..987778e3b391 100644 --- a/src/v/cluster/topic_table.h +++ b/src/v/cluster/topic_table.h @@ -679,8 +679,13 @@ class topic_table { updates_t _updates_in_progress; model::revision_id _last_applied_revision_id; - // Monotonic counter that is bumped for every addition/deletion to topics - // map. Unlike other revisions this does not correspond to the command + + // Monotonic counter that is bumped each time _topics, _disabled_partitions, + // or _updates_in_progress are modified in a way that makes iteration over + // them unsafe (i.e. invalidates iterators or references, including + // for nested collections like partition sets and replica sets). + // + // Unlike other revisions this does not correspond to the command // revision that updated the map. model::revision_id _topics_map_revision{0}; From bc99aaae49d04dfd76ce1a8d6164d8433e5a48a3 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Fri, 10 May 2024 15:35:27 +0200 Subject: [PATCH 2/4] utils: introduce concurrent_modification_error exception It is handy to have a base class for all instances of concurrent modifications. (cherry picked from commit 627db4d751bc2e583f0800be0856a3cfb2aaf67b) --- src/v/cluster/topic_table.h | 13 ++++------- src/v/cluster/topics_frontend.cc | 2 +- src/v/utils/exceptions.h | 33 +++++++++++++++++++++++++++ src/v/utils/stable_iterator_adaptor.h | 10 ++++---- 4 files changed, 43 insertions(+), 15 deletions(-) create mode 100644 src/v/utils/exceptions.h diff --git a/src/v/cluster/topic_table.h b/src/v/cluster/topic_table.h index 987778e3b391..aac5fd5ad447 100644 --- a/src/v/cluster/topic_table.h +++ b/src/v/cluster/topic_table.h @@ -94,22 +94,17 @@ class topic_table { // * partition::get_revision_id() // * raft::group_configuration::revision_id() - class concurrent_modification_error final : public std::exception { + class concurrent_modification_error final + : public ::concurrent_modification_error { public: concurrent_modification_error( model::revision_id initial_revision, model::revision_id current_revision) - : _msg(ssx::sformat( + : ::concurrent_modification_error(ssx::sformat( "Topic table was modified by concurrent fiber. " - "(initial_revision: " - "{}, current_revision: {}) ", + "(initial_revision: {}, current_revision: {}) ", initial_revision, current_revision)) {} - - const char* what() const noexcept final { return _msg.c_str(); } - - private: - ss::sstring _msg; }; class in_progress_update { diff --git a/src/v/cluster/topics_frontend.cc b/src/v/cluster/topics_frontend.cc index 8d492010fdf8..a540a5934694 100644 --- a/src/v/cluster/topics_frontend.cc +++ b/src/v/cluster/topics_frontend.cc @@ -1014,7 +1014,7 @@ topics_frontend::partitions_with_lost_majority( co_return errc::concurrent_modification_error; } co_return result; - } catch (const topic_table::concurrent_modification_error& e) { + } catch (const concurrent_modification_error& e) { // state changed while generating the plan, force caller to retry; vlog( clusterlog.info, diff --git a/src/v/utils/exceptions.h b/src/v/utils/exceptions.h new file mode 100644 index 000000000000..f8289206d2fa --- /dev/null +++ b/src/v/utils/exceptions.h @@ -0,0 +1,33 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ +#pragma once + +#include "base/seastarx.h" + +#include + +#include + +/// Some objects reference state that changes comparatively rarely (e.g. +/// topic_table state) across yield points and expect these references to remain +/// valid. In case these references are invalidated by a concurrent fiber, this +/// exception is thrown. This is a signal for the caller to restart the +/// computation with up-to-date state. +class concurrent_modification_error : public std::exception { +public: + explicit concurrent_modification_error(ss::sstring s) + : _msg(std::move(s)) {} + + const char* what() const noexcept override { return _msg.c_str(); } + +private: + ss::sstring _msg; +}; diff --git a/src/v/utils/stable_iterator_adaptor.h b/src/v/utils/stable_iterator_adaptor.h index cfde5ec4d47f..0e93cbace122 100644 --- a/src/v/utils/stable_iterator_adaptor.h +++ b/src/v/utils/stable_iterator_adaptor.h @@ -11,20 +11,20 @@ #pragma once #include "base/seastarx.h" +#include "utils/exceptions.h" #include #include #include -#include -#include #include -class iterator_stability_violation : public std::runtime_error { +class iterator_stability_violation final + : public concurrent_modification_error { public: - explicit iterator_stability_violation(const std::string& why) - : std::runtime_error(why){}; + explicit iterator_stability_violation(ss::sstring why) + : concurrent_modification_error(std::move(why)){}; }; /* From 0d0f3ecfc6b2c533b5ea321eebd6301310df4044 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Fri, 10 May 2024 15:52:07 +0200 Subject: [PATCH 3/4] c/scheduling: detect concurrent allocation_state replacements Shard-local allocation_state object is replaced when we are applying a controller snapshot. After this happens, all live allocated_partition objects become invalid. Detect this and throw concurrent_modification_error in case these objects are still used and make destructors no-op. (cherry picked from commit a57a5757e278ac6b7ca0c58de0f1b23075a93402) --- src/v/cluster/scheduling/types.cc | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/v/cluster/scheduling/types.cc b/src/v/cluster/scheduling/types.cc index 5b6bf1a95ca2..55a80dcf4215 100644 --- a/src/v/cluster/scheduling/types.cc +++ b/src/v/cluster/scheduling/types.cc @@ -13,6 +13,7 @@ #include "cluster/logger.h" #include "cluster/scheduling/allocation_state.h" +#include "utils/exceptions.h" #include "utils/to_string.h" #include @@ -47,6 +48,9 @@ allocation_units::allocation_units( allocation_units::~allocation_units() { oncore_debug_verify(_oncore); + if (unlikely(!_state)) { + return; + } for (const auto& replica : _added_replicas) { _state->remove_allocation(replica, _domain); _state->remove_final_count(replica, _domain); @@ -80,6 +84,11 @@ allocated_partition::prepare_move(model::node_id prev_node) const { model::broker_shard allocated_partition::add_replica( model::node_id node, const std::optional& prev) { + if (unlikely(!_state)) { + throw concurrent_modification_error( + "allocation_state was concurrently replaced"); + } + if (!_original_node2shard) { _original_node2shard.emplace(); for (const auto& bs : _replicas) { @@ -155,7 +164,12 @@ bool allocated_partition::is_original(model::node_id node) const { } errc allocated_partition::try_revert(const reallocation_step& step) { - if (!_original_node2shard || !_state) { + if (unlikely(!_state)) { + throw concurrent_modification_error( + "allocation_state was concurrently replaced"); + } + + if (!_original_node2shard) { return errc::no_update_in_progress; } From 9ab30ef0b3ddc4579abd8e67be5fa360b4485bf8 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Wed, 8 May 2024 18:57:56 +0200 Subject: [PATCH 4/4] c/partition_balancer/ut: add concurrent topic_table updates test (cherry picked from commit 1c63990fe32e86e85f8137cb637ab60304fce5ea) --- .../partition_balancer_planner_fixture.h | 4 +- .../tests/partition_balancer_planner_test.cc | 142 +++++++++++++++++- 2 files changed, 144 insertions(+), 2 deletions(-) diff --git a/src/v/cluster/tests/partition_balancer_planner_fixture.h b/src/v/cluster/tests/partition_balancer_planner_fixture.h index 08dc69900886..c216ff51d9eb 100644 --- a/src/v/cluster/tests/partition_balancer_planner_fixture.h +++ b/src/v/cluster/tests/partition_balancer_planner_fixture.h @@ -160,7 +160,8 @@ struct partition_balancer_planner_fixture { cluster::partition_balancer_planner make_planner( model::partition_autobalancing_mode mode = model::partition_autobalancing_mode::continuous, - size_t max_concurrent_actions = 2) { + size_t max_concurrent_actions = 2, + bool request_ondemand_rebalance = false) { return cluster::partition_balancer_planner( cluster::planner_config{ .mode = mode, @@ -168,6 +169,7 @@ struct partition_balancer_planner_fixture { .hard_max_disk_usage_ratio = 0.95, .max_concurrent_actions = max_concurrent_actions, .node_availability_timeout_sec = std::chrono::minutes(1), + .ondemand_rebalance_requested = request_ondemand_rebalance, .segment_fallocation_step = 16, .node_responsiveness_timeout = std::chrono::seconds(10), .topic_aware = true, diff --git a/src/v/cluster/tests/partition_balancer_planner_test.cc b/src/v/cluster/tests/partition_balancer_planner_test.cc index 79737ce4f58f..d8ae7eb02d6d 100644 --- a/src/v/cluster/tests/partition_balancer_planner_test.cc +++ b/src/v/cluster/tests/partition_balancer_planner_test.cc @@ -8,12 +8,15 @@ // by the Apache License, Version 2.0 #include "base/vlog.h" +#include "cluster/controller_snapshot.h" #include "cluster/health_monitor_types.h" #include "cluster/tests/partition_balancer_planner_fixture.h" +#include "utils/stable_iterator_adaptor.h" #include +#include -static ss::logger logger("partition_balancer_planner"); +static ss::logger logger("pb_planner_test"); // a shorthand to avoid spelling out model::node_id static model::node_id n(model::node_id::type id) { return model::node_id{id}; }; @@ -924,3 +927,140 @@ FIXTURE_TEST(balancing_modes, partition_balancer_planner_fixture) { BOOST_REQUIRE_EQUAL(plan_data.cancellations.size(), 0); BOOST_REQUIRE_EQUAL(plan_data.failed_actions_count, 0); } + +FIXTURE_TEST( + concurrent_topic_table_updates, partition_balancer_planner_fixture) { + // Apply lots of topic_table update commands, while concurrently invoking + // the planner. The main goal of this test is to pass ASan checks. + + allocator_register_nodes(5); + config::shard_local_cfg().disable_metrics.set_value(true); + config::shard_local_cfg().disable_public_metrics.set_value(true); + + auto make_create_tp_cmd = [this](ss::sstring name, int partitions) { + int16_t replication_factor = 3; + cluster::topic_configuration cfg( + test_ns, model::topic{name}, partitions, replication_factor); + + ss::chunked_fifo assignments; + for (model::partition_id::type i = 0; i < partitions; ++i) { + std::vector replicas; + for (int r = 0; r < replication_factor; ++r) { + replicas.push_back(model::broker_shard{ + model::node_id{r}, + random_generators::get_int(0, 3)}); + } + std::shuffle( + replicas.begin(), + replicas.end(), + random_generators::internal::gen); + + assignments.push_back(cluster::partition_assignment{ + raft::group_id{1}, model::partition_id{i}, replicas}); + } + return cluster::create_topic_cmd{ + make_tp_ns(name), + cluster::topic_configuration_assignment(cfg, std::move(assignments))}; + }; + + size_t successes = 0; + size_t failures = 0; + size_t reassignments = 0; + bool should_stop = false; + ss::future<> planning_fiber = ss::async([&] { + while (!should_stop) { + vlog(logger.trace, "planning fiber: invoking..."); + auto hr = create_health_report(); + auto planner = make_planner( + model::partition_autobalancing_mode::node_add, 50, true); + + try { + auto plan_data = planner.plan_actions(hr, as).get(); + successes += 1; + reassignments += plan_data.reassignments.size(); + } catch (concurrent_modification_error&) { + failures += 1; + } + vlog(logger.trace, "planning fiber: iteration done"); + } + }); + auto deferred = ss::defer([&] { + if (!should_stop) { + should_stop = true; + planning_fiber.get(); + } + }); + + cluster::topic_table other_tt; + model::offset controller_offset{0}; + std::set cur_topics; + bool node_isolated = false; + + for (size_t iter = 0; iter < 1'000; ++iter) { + int random_val = random_generators::get_int(0, 10); + if (random_val == 10) { + // allow the planner to make some progress + ss::sleep(50ms).get(); + continue; + } + + // randomly create and delete topics + auto topic = ssx::sformat("topic_{}", random_val); + if (!cur_topics.contains(topic)) { + vlog( + logger.trace, + "modifying fiber: creating topic {} (isolated: {})", + topic, + node_isolated); + auto cmd = make_create_tp_cmd( + topic, random_generators::get_int(1, 20)); + other_tt.apply(cmd, controller_offset).get(); + if (!node_isolated) { + workers.dispatch_topic_command(cmd); + } + cur_topics.insert(topic); + } else { + vlog( + logger.trace, + "modifying fiber: deleting topic {} (isolated: {})", + topic, + node_isolated); + cluster::delete_topic_cmd cmd{make_tp_ns(topic), make_tp_ns(topic)}; + other_tt.apply(cmd, controller_offset).get(); + if (!node_isolated) { + workers.dispatch_topic_command(cmd); + } + cur_topics.erase(topic); + } + + if (random_generators::get_int(5) == 0) { + // flip node_isolated flag + + if (node_isolated) { + // simulate node coming back from isolation and recovering + // current controller state from a snapshot. + vlog(logger.trace, "modifying fiber: applying snapshot"); + node_isolated = false; + cluster::controller_snapshot snap; + other_tt.fill_snapshot(snap).get(); + workers.members.local().fill_snapshot(snap); + workers.dispatcher.apply_snapshot(controller_offset, snap) + .get(); + } else { + node_isolated = true; + } + } + + controller_offset += 1; + + vlog(logger.trace, "modifying fiber: iteration done"); + } + + should_stop = true; + planning_fiber.get(); + + // sanity-check that planning made some progress. + BOOST_REQUIRE(successes > 0); + BOOST_REQUIRE(failures > 0); + BOOST_REQUIRE(reassignments > 0); +}