diff --git a/src/v/cluster/members_backend.cc b/src/v/cluster/members_backend.cc index 60d73a08e4d6..6419af8f7a80 100644 --- a/src/v/cluster/members_backend.cc +++ b/src/v/cluster/members_backend.cc @@ -25,7 +25,7 @@ namespace cluster { namespace { struct node_replicas { - size_t allocated_replicas; + size_t final_count; size_t max_capacity; }; @@ -40,15 +40,13 @@ calculate_replicas_per_node( if (!n->is_active()) { continue; } - auto [it, _] = ret.try_emplace( + auto [it, inserted] = ret.try_emplace( id, node_replicas{ - .allocated_replicas = 0, - .max_capacity = n->domain_partition_capacity(domain), + .final_count = n->domain_final_partitions(domain), + .max_capacity = n->max_capacity(), }); - - const auto domain_allocated = n->domain_allocated_partitions(domain); - it->second.allocated_replicas += domain_allocated; + vassert(inserted, "node {} inserted twice", id); } return ret; } @@ -57,7 +55,7 @@ static size_t calculate_total_replicas(const node_replicas_map_t& node_replicas) { size_t total_replicas = 0; for (auto& [_, replicas] : node_replicas) { - total_replicas += replicas.allocated_replicas; + total_replicas += replicas.final_count; } return total_replicas; } @@ -137,10 +135,7 @@ void reassign_replicas( * **/ double calculate_unevenness_error( - const partition_allocator& allocator, - const members_backend::update_meta& update, - const topic_table& topics, - partition_allocation_domain domain) { + const partition_allocator& allocator, partition_allocation_domain domain) { static const std::vector domains{ partition_allocation_domains::consumer_offsets, partition_allocation_domains::common}; @@ -148,53 +143,8 @@ double calculate_unevenness_error( const auto node_cnt = allocator.state().available_nodes(); auto node_replicas = calculate_replicas_per_node(allocator, domain); - /** - * adjust per node replicas with the replicas that are going to be removed - * from the node after successful reallocation - * based on the state of reallocation the following adjustments are made: - * - * reallocation_state::initial - no adjustment required - * reallocation_state::reassigned - allocator already updated, adjusting - * reallocation_state::requested - allocator already updated, adjusting - * reallocation_state::finished - no adjustment required - * - * Do not need to care about the cancel related state here as no - * cancellations are requested when node is added to the cluster. - */ - for (const auto& [ntp, r] : update.partition_reallocations) { - using state = members_backend::reallocation_state; - /** - * In the initial or finished state the adjustment doesn't have - * to be taken into account as partition balancer is already updated. - */ - if ( - r.state == state::initial || r.state == state::finished - || r.state == state::cancelled || r.state == state::request_cancel) { - continue; - } - /** - * if a partition move was already requested it might have already been - * finished, consult topic table to check if the update is still in - * progress. If no move is in progress the adjustment must be skipped as - * allocator state is already up to date. Reallocation will be marked as - * finished in reconciliation loop pass. - */ - if (r.state == state::requested && !topics.is_update_in_progress(ntp)) { - continue; - } - - if (get_allocation_domain(ntp) == domain) { - for (const auto& to_remove : r.replicas_to_remove) { - auto it = node_replicas.find(to_remove); - if (it != node_replicas.end()) { - it->second.allocated_replicas--; - } - } - } - } const auto total_replicas = calculate_total_replicas(node_replicas); - if (total_replicas == 0) { return 0.0; } @@ -213,14 +163,14 @@ double calculate_unevenness_error( double err = 0; for (auto& [id, allocation_info] : node_replicas) { double diff = static_cast(target_replicas_per_node) - - static_cast(allocation_info.allocated_replicas); + - static_cast(allocation_info.final_count); vlog( clusterlog.trace, "node {} has {} replicas allocated in domain {}, requested replicas " "per node {}, difference: {}", id, - allocation_info.allocated_replicas, + allocation_info.final_count, domain, target_replicas_per_node, diff); @@ -485,8 +435,7 @@ void members_backend::default_reallocation_strategy:: } calculate_reallocations_batch( max_batch_size, allocator, topics, meta, domain); - auto current_error = calculate_unevenness_error( - allocator, meta, topics, domain); + auto current_error = calculate_unevenness_error(allocator, domain); auto [it, _] = meta.last_unevenness_error.try_emplace(domain, 1.0); auto improvement = it->second - current_error; @@ -564,9 +513,8 @@ void members_backend::default_reallocation_strategy:: absl::flat_hash_map to_move_from_node; for (auto& [id, info] : node_replicas) { - auto to_move = info.allocated_replicas - - std::min( - target_replicas_per_node, info.allocated_replicas); + auto to_move = info.final_count + - std::min(target_replicas_per_node, info.final_count); if (to_move > 0) { to_move_from_node.emplace(id, to_move); } @@ -582,7 +530,7 @@ void members_backend::default_reallocation_strategy:: cnt, id, domain, - node_replicas[id].allocated_replicas); + node_replicas[id].final_count); } } diff --git a/src/v/cluster/scheduling/allocation_node.cc b/src/v/cluster/scheduling/allocation_node.cc index 505058caeeb8..63283d677fa3 100644 --- a/src/v/cluster/scheduling/allocation_node.cc +++ b/src/v/cluster/scheduling/allocation_node.cc @@ -54,6 +54,8 @@ allocation_node::allocate(const partition_allocation_domain domain) { (*it)++; // increment the weights _allocated_partitions++; ++_allocated_domain_partitions[domain]; + _final_partitions++; + ++_final_domain_partitions[domain]; const ss::shard_id core = std::distance(_weights.begin(), it); vlog( clusterlog.trace, diff --git a/src/v/cluster/scheduling/allocation_node.h b/src/v/cluster/scheduling/allocation_node.h index 82351bd45eef..7e783d58a12a 100644 --- a/src/v/cluster/scheduling/allocation_node.h +++ b/src/v/cluster/scheduling/allocation_node.h @@ -99,6 +99,9 @@ class allocation_node { return _allocated_partitions; } + // number of partitions after all in-progress movements are finished + allocation_capacity final_partitions() const { return _final_partitions; } + allocation_capacity domain_allocated_partitions(partition_allocation_domain domain) const { if (auto it = _allocated_domain_partitions.find(domain); @@ -108,6 +111,15 @@ class allocation_node { return allocation_capacity{0}; } + allocation_capacity + domain_final_partitions(partition_allocation_domain domain) const { + if (auto it = _final_domain_partitions.find(domain); + it != _final_domain_partitions.end()) { + return it->second; + } + return allocation_capacity{0}; + } + bool empty() const { return _allocated_partitions == allocation_capacity{0}; } @@ -129,6 +141,11 @@ class allocation_node { // number of partitions allocated in a specific allocation domain absl::flat_hash_map _allocated_domain_partitions; + // number of partitions after all movements are finished + allocation_capacity _final_partitions{0}; + absl::flat_hash_map + _final_domain_partitions; + state _state = state::active; config::binding _partitions_per_shard; diff --git a/src/v/cluster/scheduling/allocation_state.cc b/src/v/cluster/scheduling/allocation_state.cc index 7d94ae261d3f..30f84bebc330 100644 --- a/src/v/cluster/scheduling/allocation_state.cc +++ b/src/v/cluster/scheduling/allocation_state.cc @@ -22,6 +22,7 @@ void allocation_state::rollback( for (auto& as : v) { for (auto& bs : as.replicas) { remove_allocation(bs, domain); + remove_final_count(bs, domain); } // rollback for each assignment as the groups are distinct _highest_group = raft::group_id(_highest_group() - 1); @@ -190,6 +191,26 @@ uint32_t allocation_state::allocate( return it->second->allocate(domain); } +void allocation_state::add_final_count( + const model::broker_shard& replica, + const partition_allocation_domain domain) { + verify_shard(); + if (auto it = _nodes.find(replica.node_id); it != _nodes.end()) { + ++it->second->_final_partitions; + ++it->second->_final_domain_partitions[domain]; + } +} + +void allocation_state::remove_final_count( + const model::broker_shard& replica, + const partition_allocation_domain domain) { + verify_shard(); + if (auto it = _nodes.find(replica.node_id); it != _nodes.end()) { + --it->second->_final_partitions; + --it->second->_final_domain_partitions[domain]; + } +} + void allocation_state::verify_shard() const { /* This is a consistency check on the use of the allocation state: * it checks that the caller is on the same shard the state was originally diff --git a/src/v/cluster/scheduling/allocation_state.h b/src/v/cluster/scheduling/allocation_state.h index d9f683472b17..5d54df1a7a11 100644 --- a/src/v/cluster/scheduling/allocation_state.h +++ b/src/v/cluster/scheduling/allocation_state.h @@ -57,6 +57,10 @@ class allocation_state : public ss::weakly_referencable { add_allocation(const model::broker_shard&, partition_allocation_domain); void remove_allocation(const model::broker_shard&, partition_allocation_domain); + void + add_final_count(const model::broker_shard&, partition_allocation_domain); + void + remove_final_count(const model::broker_shard&, partition_allocation_domain); void rollback( const ss::chunked_fifo& pa, diff --git a/src/v/cluster/scheduling/constraints.cc b/src/v/cluster/scheduling/constraints.cc index aa32b34482d8..f69930d9b023 100644 --- a/src/v/cluster/scheduling/constraints.cc +++ b/src/v/cluster/scheduling/constraints.cc @@ -219,7 +219,7 @@ hard_constraint disk_not_overflowed_by_partition( max_disk_usage_ratio, partition_size, node_disk_reports)); } -soft_constraint least_allocated() { +soft_constraint max_final_capacity() { class impl : public soft_constraint::impl { public: soft_constraint_evaluator @@ -227,7 +227,10 @@ soft_constraint least_allocated() { return [](const allocation_node& node) { // we return 0 for fully allocated node and 10'000'000 for nodes // with maximum capacity available - return (soft_constraint::max_score * node.partition_capacity()) + auto final_capacity + = node.max_capacity() + - std::min(node.max_capacity(), node.final_partitions()); + return (soft_constraint::max_score * final_capacity) / node.max_capacity(); }; } @@ -239,7 +242,7 @@ soft_constraint least_allocated() { } soft_constraint -least_allocated_in_domain(const partition_allocation_domain domain) { +max_final_capacity_in_domain(const partition_allocation_domain domain) { struct impl : soft_constraint::impl { explicit impl(partition_allocation_domain domain_) : domain(domain_) {} @@ -247,8 +250,11 @@ least_allocated_in_domain(const partition_allocation_domain domain) { soft_constraint_evaluator make_evaluator(const replicas_t&) const final { return [this](const allocation_node& node) { - return (soft_constraint::max_score - * node.domain_partition_capacity(domain)) + auto final_capacity = node.max_capacity() + - std::min( + node.max_capacity(), + node.domain_final_partitions(domain)); + return (soft_constraint::max_score * final_capacity) / node.max_capacity(); }; } diff --git a/src/v/cluster/scheduling/constraints.h b/src/v/cluster/scheduling/constraints.h index 0fa73c12d0f1..ca89b8b0ed28 100644 --- a/src/v/cluster/scheduling/constraints.h +++ b/src/v/cluster/scheduling/constraints.h @@ -54,18 +54,18 @@ hard_constraint disk_not_overflowed_by_partition( node_disk_reports); /* - * scores nodes based on free overall allocation capacity left + * scores nodes based on partition count after all moves have been finished * returning `0` for fully allocated nodes and `max_capacity` for empty nodes */ -soft_constraint least_allocated(); +soft_constraint max_final_capacity(); /* - * scores nodes based on allocation capacity used by priority partitions - * returning `0` for nodes fully allocated for priority partitions - * and `max_capacity` for nodes without any priority partitions - * non-priority partition allocations are ignored + * scores nodes based on partition counts of priority partitions after all moves + * have been finished, returning `0` for nodes fully allocated for priority + * partitions and `max_capacity` for nodes without any priority partitions. + * non-priority partition allocations are ignored. */ -soft_constraint least_allocated_in_domain(partition_allocation_domain); +soft_constraint max_final_capacity_in_domain(partition_allocation_domain); /* * constraint scores nodes on free disk space diff --git a/src/v/cluster/scheduling/partition_allocator.cc b/src/v/cluster/scheduling/partition_allocator.cc index c23b4c9048f4..cc2460a13d16 100644 --- a/src/v/cluster/scheduling/partition_allocator.cc +++ b/src/v/cluster/scheduling/partition_allocator.cc @@ -66,9 +66,9 @@ allocation_constraints partition_allocator::default_constraints( req.add(is_active()); if (domain == partition_allocation_domains::common) { - req.add(least_allocated()); + req.add(max_final_capacity()); } else { - req.add(least_allocated_in_domain(domain)); + req.add(max_final_capacity_in_domain(domain)); } if (_enable_rack_awareness()) { req.add(distinct_rack_preferred(_members.local())); @@ -406,6 +406,22 @@ void partition_allocator::remove_allocations( } } +void partition_allocator::add_final_counts( + const std::vector& to_add, + const partition_allocation_domain domain) { + for (const auto& bs : to_add) { + _state->add_final_count(bs, domain); + } +} + +void partition_allocator::remove_final_counts( + const std::vector& to_remove, + const partition_allocation_domain domain) { + for (const auto& bs : to_remove) { + _state->remove_final_count(bs, domain); + } +} + ss::future<> partition_allocator::apply_snapshot(const controller_snapshot& snap) { auto new_state = std::make_unique( @@ -439,6 +455,8 @@ partition_allocator::apply_snapshot(const controller_snapshot& snap) { new_state->add_allocation(bs, domain); } + const std::vector* final_replicas = nullptr; + if (auto it = topic.updates.find(p_id); it != topic.updates.end()) { const auto& update = it->second; // Both old and new replicas contribute to allocator weights @@ -448,6 +466,24 @@ partition_allocator::apply_snapshot(const controller_snapshot& snap) { for (const auto& bs : additional_replicas) { new_state->add_allocation(bs, domain); } + + // final counts depend on the update state + switch (update.state) { + case reconfiguration_state::in_progress: + case reconfiguration_state::force_update: + final_replicas = &update.target_assignment; + break; + case reconfiguration_state::cancelled: + case reconfiguration_state::force_cancelled: + final_replicas = &partition.replicas; + break; + } + } else { + final_replicas = &partition.replicas; + } + + for (const auto& bs : *final_replicas) { + new_state->add_final_count(bs, domain); } co_await ss::coroutine::maybe_yield(); diff --git a/src/v/cluster/scheduling/partition_allocator.h b/src/v/cluster/scheduling/partition_allocator.h index 175fb9036eb5..56ff72337ebc 100644 --- a/src/v/cluster/scheduling/partition_allocator.h +++ b/src/v/cluster/scheduling/partition_allocator.h @@ -112,12 +112,17 @@ class partition_allocator { const std::vector&, partition_allocation_domain); void remove_allocations( const std::vector&, partition_allocation_domain); + void add_final_counts( + const std::vector&, partition_allocation_domain); + void remove_final_counts( + const std::vector&, partition_allocation_domain); void add_allocations_for_new_partition( const std::vector& replicas, raft::group_id group_id, partition_allocation_domain domain) { add_allocations(replicas, domain); + add_final_counts(replicas, domain); _state->update_highest_group_id(group_id); } diff --git a/src/v/cluster/scheduling/types.cc b/src/v/cluster/scheduling/types.cc index 151a082e981e..377287152b97 100644 --- a/src/v/cluster/scheduling/types.cc +++ b/src/v/cluster/scheduling/types.cc @@ -79,6 +79,7 @@ allocation_units::~allocation_units() { for (auto& pas : _assignments) { for (auto& replica : pas.replicas) { _state->remove_allocation(replica, _domain); + _state->remove_final_count(replica, _domain); } } } @@ -136,9 +137,11 @@ allocated_partition::prepare_move(model::node_id prev_node) { std::swap(_replicas[prev.idx], _replicas.back()); _replicas.pop_back(); _state->remove_allocation(prev.bs, _domain); + _state->remove_final_count(prev.bs, _domain); if (prev.original) { _state->remove_allocation(*prev.original, _domain); } + return prev; } @@ -164,6 +167,7 @@ model::broker_shard allocated_partition::add_replica( it != _original_node2shard->end()) { // this is an original replica, preserve the shard replica.shard = it->second; + _state->add_final_count(replica, _domain); } else { // the replica is new, choose the shard and add allocation replica.shard = _state->allocate(node, _domain); @@ -178,6 +182,7 @@ void allocated_partition::cancel_move(const previous_replica& prev) { _replicas.push_back(prev.bs); std::swap(_replicas[prev.idx], _replicas.back()); _state->add_allocation(prev.bs, _domain); + _state->add_final_count(prev.bs, _domain); if (prev.original) { _state->add_allocation(*prev.original, _domain); } @@ -226,10 +231,22 @@ allocated_partition::~allocated_partition() { } for (const auto& bs : _replicas) { - if (!_original_node2shard->contains(bs.node_id)) { + auto orig_it = _original_node2shard->find(bs.node_id); + if (orig_it == _original_node2shard->end()) { + // new replica _state->remove_allocation(bs, _domain); + _state->remove_final_count(bs, _domain); + } else { + // original replica that didn't change, erase from the map in + // preparation for the loop below + _original_node2shard->erase(orig_it); } } + + for (const auto& kv : *_original_node2shard) { + model::broker_shard bs{kv.first, kv.second}; + _state->add_final_count(bs, _domain); + } } partition_constraints::partition_constraints( diff --git a/src/v/cluster/tests/backend_reallocation_strategy_test.cc b/src/v/cluster/tests/backend_reallocation_strategy_test.cc index 3303008ea743..a50564d766c1 100644 --- a/src/v/cluster/tests/backend_reallocation_strategy_test.cc +++ b/src/v/cluster/tests/backend_reallocation_strategy_test.cc @@ -217,8 +217,12 @@ struct strategy_test_fixture { // update allocator allocator.add_allocations( added, cluster::partition_allocation_domains::common); + allocator.add_final_counts( + added, cluster::partition_allocation_domains::common); allocator.remove_allocations( removed, cluster::partition_allocation_domains::common); + allocator.remove_final_counts( + removed, cluster::partition_allocation_domains::common); auto ec = co_await topics.apply( cluster::move_partition_replicas_cmd(ntp, pr.new_replica_set), diff --git a/src/v/cluster/tests/partition_allocator_tests.cc b/src/v/cluster/tests/partition_allocator_tests.cc index 69162bf719c5..b1f99de3481d 100644 --- a/src/v/cluster/tests/partition_allocator_tests.cc +++ b/src/v/cluster/tests/partition_allocator_tests.cc @@ -23,6 +23,8 @@ #include #include +ss::logger logger{"allocator_test"}; + static void validate_replica_set_diversity( const std::vector& replicas) { if (replicas.size() > 1) { @@ -568,16 +570,32 @@ FIXTURE_TEST(even_distribution_pri_allocation, partition_allocator_fixture) { } } -void check_partition_counts( +void check_allocated_counts( const cluster::partition_allocator& allocator, const std::vector& expected, cluster::partition_allocation_domain domain = cluster::partition_allocation_domains::common) { - for (const auto& node : allocator.state().allocation_nodes()) { - model::node_id node_id = node.first; - auto count = node.second->domain_allocated_partitions(domain); - BOOST_REQUIRE_EQUAL(count(), expected.at(node_id())); + std::vector counts; + for (const auto& [id, node] : allocator.state().allocation_nodes()) { + BOOST_REQUIRE(id() == counts.size()); + counts.push_back(node->domain_allocated_partitions(domain)); } + logger.debug("allocated counts: {}, expected: {}", counts, expected); + BOOST_CHECK_EQUAL(counts, expected); +}; + +void check_final_counts( + const cluster::partition_allocator& allocator, + const std::vector& expected, + cluster::partition_allocation_domain domain + = cluster::partition_allocation_domains::common) { + std::vector counts; + for (const auto& [id, node] : allocator.state().allocation_nodes()) { + BOOST_REQUIRE(id() == counts.size()); + counts.push_back(node->domain_final_partitions(domain)); + } + logger.debug("final counts: {}, expected: {}", counts, expected); + BOOST_CHECK_EQUAL(counts, expected); }; FIXTURE_TEST(incrementally_reallocate_replicas, partition_allocator_fixture) { @@ -594,7 +612,8 @@ FIXTURE_TEST(incrementally_reallocate_replicas, partition_allocator_fixture) { // add another node to move replicas to and from register_node(3, 1); - check_partition_counts(allocator, {1, 1, 1, 0}); + check_allocated_counts(allocator, {1, 1, 1, 0}); + check_final_counts(allocator, {1, 1, 1, 0}); { cluster::allocated_partition reallocated @@ -612,7 +631,8 @@ FIXTURE_TEST(incrementally_reallocate_replicas, partition_allocator_fixture) { BOOST_REQUIRE_EQUAL( reallocated.replicas().at(2).node_id, model::node_id{3}); - check_partition_counts(allocator, {1, 1, 1, 1}); + check_allocated_counts(allocator, {1, 1, 1, 1}); + check_final_counts(allocator, {0, 1, 1, 1}); // there is no replica on node 0 any more auto moved2 = allocator.reallocate_replica( @@ -629,7 +649,8 @@ FIXTURE_TEST(incrementally_reallocate_replicas, partition_allocator_fixture) { BOOST_REQUIRE_EQUAL( moved3.error(), cluster::errc::no_eligible_allocation_nodes); - check_partition_counts(allocator, {1, 1, 1, 1}); + check_allocated_counts(allocator, {1, 1, 1, 1}); + check_final_counts(allocator, {0, 1, 1, 1}); // replicas can move to the same place auto moved4 = allocator.reallocate_replica( @@ -639,16 +660,23 @@ FIXTURE_TEST(incrementally_reallocate_replicas, partition_allocator_fixture) { BOOST_REQUIRE_EQUAL( reallocated.replicas().at(2).node_id, model::node_id{3}); - check_partition_counts(allocator, {1, 1, 1, 1}); + check_allocated_counts(allocator, {1, 1, 1, 1}); + check_final_counts(allocator, {0, 1, 1, 1}); + + std::vector node_0( + {model::broker_shard{.node_id = model::node_id{0}, .shard = 0}}); + cluster::allocation_constraints not_on_node0; + not_on_node0.add(cluster::distinct_from(node_0)); auto moved5 = allocator.reallocate_replica( - reallocated, model::node_id{2}, cluster::allocation_constraints{}); + reallocated, model::node_id{2}, not_on_node0); BOOST_REQUIRE(moved5.has_value()); BOOST_REQUIRE_EQUAL(moved5.value().node_id, model::node_id{2}); BOOST_REQUIRE_EQUAL( reallocated.replicas().at(2).node_id, model::node_id{2}); - check_partition_counts(allocator, {1, 1, 1, 1}); + check_allocated_counts(allocator, {1, 1, 1, 1}); + check_final_counts(allocator, {0, 1, 1, 1}); std::vector new_replicas(reallocated.replicas()); cluster::allocation_constraints not_on_new_nodes; @@ -663,7 +691,8 @@ FIXTURE_TEST(incrementally_reallocate_replicas, partition_allocator_fixture) { BOOST_REQUIRE_EQUAL( reallocated.replicas().at(2).node_id, model::node_id{0}); - check_partition_counts(allocator, {1, 1, 1, 0}); + check_allocated_counts(allocator, {1, 1, 1, 0}); + check_final_counts(allocator, {1, 1, 1, 0}); // do another move so that we have something to revert auto moved7 = allocator.reallocate_replica( @@ -674,10 +703,12 @@ FIXTURE_TEST(incrementally_reallocate_replicas, partition_allocator_fixture) { BOOST_REQUIRE_EQUAL( reallocated.replicas().at(2).node_id, model::node_id{3}); - check_partition_counts(allocator, {1, 1, 1, 1}); + check_allocated_counts(allocator, {1, 1, 1, 1}); + check_final_counts(allocator, {1, 0, 1, 1}); } - check_partition_counts(allocator, {1, 1, 1, 0}); + check_allocated_counts(allocator, {1, 1, 1, 0}); + check_final_counts(allocator, {1, 1, 1, 0}); } FIXTURE_TEST(reallocate_partition_with_move, partition_allocator_fixture) { @@ -695,7 +726,8 @@ FIXTURE_TEST(reallocate_partition_with_move, partition_allocator_fixture) { register_node(3, 1); register_node(4, 1); - check_partition_counts(allocator, {1, 1, 1, 0, 0}); + check_allocated_counts(allocator, {1, 1, 1, 0, 0}); + check_final_counts(allocator, {1, 1, 1, 0, 0}); cluster::allocation_constraints not_on_old_nodes; not_on_old_nodes.add(cluster::distinct_from(original_assignment.replicas)); @@ -717,7 +749,8 @@ FIXTURE_TEST(reallocate_partition_with_move, partition_allocator_fixture) { BOOST_REQUIRE_EQUAL(replicas_set.size(), 4); BOOST_REQUIRE(!replicas_set.contains(model::node_id{0})); - check_partition_counts(allocator, {1, 1, 1, 1, 1}); + check_allocated_counts(allocator, {1, 1, 1, 1, 1}); + check_final_counts(allocator, {0, 1, 1, 1, 1}); } { @@ -784,7 +817,8 @@ FIXTURE_TEST( replica2shard.emplace(bs.node_id, bs.shard); } - check_partition_counts(allocator, {2, 2, 2, 1}); + check_allocated_counts(allocator, {2, 2, 2, 1}); + check_final_counts(allocator, {2, 2, 2, 1}); cluster::allocated_partition reallocated = allocator.make_allocated_partition( @@ -800,7 +834,8 @@ FIXTURE_TEST( // more attractive. But replicas on nodes 0, 1, and 2 should still end up on // shard 2 partition_1.reset(); - check_partition_counts(allocator, {1, 1, 1, 1}); + check_allocated_counts(allocator, {1, 1, 1, 1}); + check_final_counts(allocator, {0, 1, 1, 1}); // Reallocate replica on node 1 to itself. moved = allocator.reallocate_replica( diff --git a/src/v/cluster/tests/topic_updates_dispatcher_test.cc b/src/v/cluster/tests/topic_updates_dispatcher_test.cc index ebe72f06b9f5..73055a72f9ca 100644 --- a/src/v/cluster/tests/topic_updates_dispatcher_test.cc +++ b/src/v/cluster/tests/topic_updates_dispatcher_test.cc @@ -19,10 +19,19 @@ #include using namespace std::chrono_literals; +ss::logger logger{"dispatcher_test"}; + struct topic_table_updates_dispatcher_fixture : topic_table_fixture { topic_table_updates_dispatcher_fixture() : dispatcher(allocator, table, leaders, pb_state) {} + template + void dispatch_command(Cmd cmd) { + auto res + = dispatcher.apply_update(serde_serialize_cmd(std::move(cmd))).get(); + BOOST_REQUIRE_EQUAL(res, cluster::errc::success); + } + void create_topics() { auto cmd_1 = make_create_topic_cmd("test_tp_1", 1, 3); cmd_1.value.cfg.properties.compaction_strategy @@ -37,19 +46,9 @@ struct topic_table_updates_dispatcher_fixture : topic_table_fixture { auto cmd_2 = make_create_topic_cmd("test_tp_2", 12, 3); auto cmd_3 = make_create_topic_cmd("test_tp_3", 8, 1); - auto res_1 = dispatcher - .apply_update(serialize_cmd(std::move(cmd_1)).get0()) - .get0(); - auto res_2 = dispatcher - .apply_update(serialize_cmd(std::move(cmd_2)).get0()) - .get0(); - auto res_3 = dispatcher - .apply_update(serialize_cmd(std::move(cmd_3)).get0()) - .get0(); - - BOOST_REQUIRE_EQUAL(res_1, cluster::errc::success); - BOOST_REQUIRE_EQUAL(res_2, cluster::errc::success); - BOOST_REQUIRE_EQUAL(res_3, cluster::errc::success); + dispatch_command(std::move(cmd_1)); + dispatch_command(std::move(cmd_2)); + dispatch_command(std::move(cmd_3)); } cluster::topic_updates_dispatcher dispatcher; @@ -165,3 +164,190 @@ FIXTURE_TEST( current_cluster_capacity(allocator.local().state().allocation_nodes()), max_cluster_capacity() - (1 * 3 + 12 * 3 + 8 * 1)); } + +FIXTURE_TEST( + allocator_partition_counts, topic_table_updates_dispatcher_fixture) { + const auto& allocation_nodes = allocator.local().state().allocation_nodes(); + + auto check_allocated_counts = [&](std::vector expected) { + std::vector counts; + for (const auto& [id, node] : allocation_nodes) { + BOOST_REQUIRE(id() == counts.size() + 1); // 1-based node ids + counts.push_back(node->allocated_partitions()); + } + logger.debug("allocated counts: {}, expected: {}", counts, expected); + BOOST_CHECK_EQUAL(counts, expected); + }; + + auto check_final_counts = [&](std::vector expected) { + std::vector counts; + for (const auto& [id, node] : allocation_nodes) { + BOOST_REQUIRE(id() == counts.size() + 1); // 1-based node ids + counts.push_back(node->final_partitions()); + } + logger.debug("final counts: {}, expected: {}", counts, expected); + BOOST_CHECK_EQUAL(counts, expected); + }; + + auto create_topic_cmd = make_create_topic_cmd("test_tp_1", 4, 3); + logger.info("create topic {}", create_topic_cmd.key); + dispatch_command(create_topic_cmd); + + // create a node to move replicas to + allocator.local().register_node( + create_allocation_node(model::node_id(4), 4)); + + check_allocated_counts({4, 4, 4, 0}); + check_final_counts({4, 4, 4, 0}); + + // get data needed to move a partition + auto get_partition = [&](size_t id) { + model::ntp ntp{ + create_topic_cmd.key.ns, + create_topic_cmd.key.tp, + model::partition_id{id}}; + auto assignment_it = std::next( + create_topic_cmd.value.assignments.begin(), id); + BOOST_REQUIRE(assignment_it->id() == id); + + auto old_replicas = assignment_it->replicas; + + auto new_replicas = old_replicas; + auto it = std::find_if( + new_replicas.begin(), new_replicas.end(), [](const auto& bs) { + return bs.node_id() == 1; + }); + BOOST_REQUIRE(it != new_replicas.end()); + it->node_id = model::node_id{4}; + + return std::tuple{ + ntp, + old_replicas, + new_replicas, + }; + }; + + // move + finish + { + auto [ntp, old_replicas, new_replicas] = get_partition(0); + + logger.info("move ntp {}", ntp); + dispatch_command( + cluster::move_partition_replicas_cmd{ntp, new_replicas}); + check_allocated_counts({4, 4, 4, 1}); + check_final_counts({3, 4, 4, 1}); + + logger.info("finish move"); + dispatch_command( + cluster::finish_moving_partition_replicas_cmd{ntp, new_replicas}); + check_allocated_counts({3, 4, 4, 1}); + check_final_counts({3, 4, 4, 1}); + } + + // move + cancel + force_cancel + finish + { + auto [ntp, old_replicas, new_replicas] = get_partition(1); + + logger.info("move ntp {}", ntp); + dispatch_command( + cluster::move_partition_replicas_cmd{ntp, new_replicas}); + check_allocated_counts({3, 4, 4, 2}); + check_final_counts({2, 4, 4, 2}); + + logger.info("cancel move"); + dispatch_command(cluster::cancel_moving_partition_replicas_cmd{ + ntp, + cluster::cancel_moving_partition_replicas_cmd_data{ + cluster::force_abort_update{false}}}); + check_allocated_counts({3, 4, 4, 2}); + check_final_counts({3, 4, 4, 1}); + + logger.info("force-cancel move"); + dispatch_command(cluster::cancel_moving_partition_replicas_cmd{ + ntp, + cluster::cancel_moving_partition_replicas_cmd_data{ + cluster::force_abort_update{true}}}); + check_allocated_counts({3, 4, 4, 2}); + check_final_counts({3, 4, 4, 1}); + + logger.info("finish move"); + dispatch_command( + cluster::finish_moving_partition_replicas_cmd{ntp, old_replicas}); + check_allocated_counts({3, 4, 4, 1}); + check_final_counts({3, 4, 4, 1}); + } + + // move + cancel + revert_cancel + { + auto [ntp, old_replicas, new_replicas] = get_partition(2); + + logger.info("move ntp {}", ntp); + dispatch_command( + cluster::move_partition_replicas_cmd{ntp, new_replicas}); + check_allocated_counts({3, 4, 4, 2}); + check_final_counts({2, 4, 4, 2}); + + logger.info("cancel move"); + dispatch_command(cluster::cancel_moving_partition_replicas_cmd{ + ntp, + cluster::cancel_moving_partition_replicas_cmd_data{ + cluster::force_abort_update{false}}}); + check_allocated_counts({3, 4, 4, 2}); + check_final_counts({3, 4, 4, 1}); + + logger.info("revert_cancel move"); + dispatch_command(cluster::revert_cancel_partition_move_cmd( + int8_t{0}, + cluster::revert_cancel_partition_move_cmd_data{.ntp = ntp})); + check_allocated_counts({2, 4, 4, 2}); + check_final_counts({2, 4, 4, 2}); + } + + // force_move + { + auto [ntp, old_replicas, new_replicas] = get_partition(3); + + // for new_replicas choose a proper subset of old replicas, as required + // by force_partition_reconfiguration. + auto repl_it = std::find_if( + old_replicas.begin(), old_replicas.end(), [](const auto& bs) { + return bs.node_id() == 1; + }); + BOOST_REQUIRE(repl_it != old_replicas.end()); + new_replicas = std::vector({*repl_it}); + + logger.info( + "force_partition_reconfiguration ntp {} to {}", ntp, new_replicas); + dispatch_command(cluster::force_partition_reconfiguration_cmd{ + ntp, + cluster::force_partition_reconfiguration_cmd_data(new_replicas)}); + check_allocated_counts({2, 4, 4, 2}); + check_final_counts({2, 3, 3, 2}); + + logger.info("finish move"); + dispatch_command( + cluster::finish_moving_partition_replicas_cmd{ntp, new_replicas}); + check_allocated_counts({2, 3, 3, 2}); + check_final_counts({2, 3, 3, 2}); + } + + // move topic + topic delete + { + // move everything back + logger.info("move topic"); + std::vector cmd_data; + for (const auto& p_as : create_topic_cmd.value.assignments) { + cmd_data.emplace_back(p_as.id, p_as.replicas); + } + dispatch_command( + cluster::move_topic_replicas_cmd(create_topic_cmd.key, cmd_data)); + check_allocated_counts({4, 4, 4, 2}); + check_final_counts({4, 4, 4, 0}); + + logger.info("delete topic"); + dispatch_command(cluster::delete_topic_cmd( + create_topic_cmd.key, create_topic_cmd.key)); + check_allocated_counts({0, 0, 0, 0}); + check_final_counts({0, 0, 0, 0}); + } +} diff --git a/src/v/cluster/topic_updates_dispatcher.cc b/src/v/cluster/topic_updates_dispatcher.cc index 7aecf06da894..25c02ed3cd4f 100644 --- a/src/v/cluster/topic_updates_dispatcher.cc +++ b/src/v/cluster/topic_updates_dispatcher.cc @@ -170,9 +170,9 @@ ss::future topic_updates_dispatcher::apply( "Partition {} have to exist before successful " "partition reallocation", ntp); - auto to_add = subtract_replica_sets(cmd.value, p_as->replicas); - _partition_allocator.local().add_allocations( - to_add, get_allocation_domain(ntp)); + + update_allocations_for_reconfiguration( + p_as->replicas, cmd.value, get_allocation_domain(ntp)); _partition_balancer_state.local().handle_ntp_update( ntp.ns, ntp.tp.topic, ntp.tp.partition, p_as->replicas, cmd.value); @@ -202,6 +202,16 @@ ss::future topic_updates_dispatcher::apply( "currently being updated", ntp); + auto to_add = subtract_replica_sets( + *new_target_replicas, current_assignment->replicas); + _partition_allocator.local().add_final_counts( + to_add, get_allocation_domain(ntp)); + + auto to_remove = subtract_replica_sets( + current_assignment->replicas, *new_target_replicas); + _partition_allocator.local().remove_final_counts( + to_remove, get_allocation_domain(ntp)); + _partition_balancer_state.local().handle_ntp_update( ntp.ns, ntp.tp.topic, @@ -345,10 +355,10 @@ ss::future topic_updates_dispatcher::apply( if (assigment_it == assignments.value().end()) { co_return std::error_code(errc::partition_not_exists); } - auto to_add = subtract_replica_sets( - replicas, assigment_it->replicas); - _partition_allocator.local().add_allocations( - to_add, get_allocation_domain(ntp)); + + update_allocations_for_reconfiguration( + assigment_it->replicas, replicas, get_allocation_domain(ntp)); + _partition_balancer_state.local().handle_ntp_update( ntp.ns, ntp.tp.topic, @@ -406,10 +416,17 @@ ss::future topic_updates_dispatcher::apply( "currently being cancelled", ntp); + auto to_add = subtract_replica_sets( + *target_replicas, *previous_replicas); + _partition_allocator.local().add_final_counts( + to_add, get_allocation_domain(ntp)); + auto to_delete = subtract_replica_sets( *previous_replicas, *target_replicas); _partition_allocator.local().remove_allocations( to_delete, get_allocation_domain(ntp)); + _partition_allocator.local().remove_final_counts( + to_delete, get_allocation_domain(ntp)); _partition_balancer_state.local().handle_ntp_update( ntp.ns, @@ -423,9 +440,29 @@ ss::future topic_updates_dispatcher::apply( ss::future topic_updates_dispatcher::apply( force_partition_reconfiguration_cmd cmd, model::offset base_offset) { - // Post dispatch, allocator updates are skipped because the target - // replica set is a subset of the original replica set. - return dispatch_updates_to_cores(std::move(cmd), base_offset); + auto p_as = _topic_table.local().get_partition_assignment(cmd.key); + auto ec = co_await dispatch_updates_to_cores(cmd, base_offset); + if (ec) { + co_return ec; + } + + const auto& ntp = cmd.key; + vassert( + p_as.has_value(), + "Partition {} have to exist before successful force-reconfiguration", + ntp); + + update_allocations_for_reconfiguration( + p_as->replicas, cmd.value.replicas, get_allocation_domain(ntp)); + + _partition_balancer_state.local().handle_ntp_update( + ntp.ns, + ntp.tp.topic, + ntp.tp.partition, + p_as->replicas, + cmd.value.replicas); + + co_return ec; } topic_updates_dispatcher::in_progress_map @@ -520,6 +557,7 @@ void topic_updates_dispatcher::deallocate_topic( ? p_as.replicas : union_replica_sets(it->second, p_as.replicas); _partition_allocator.local().remove_allocations(to_delete, domain); + _partition_allocator.local().remove_final_counts(p_as.replicas, domain); if (unlikely(clusterlog.is_enabled(ss::log_level::trace))) { vlog( clusterlog.trace, @@ -541,6 +579,18 @@ void topic_updates_dispatcher::add_allocations_for_new_partitions( } } +void topic_updates_dispatcher::update_allocations_for_reconfiguration( + const std::vector& previous, + const std::vector& target, + partition_allocation_domain domain) { + auto to_add = subtract_replica_sets(target, previous); + _partition_allocator.local().add_allocations(to_add, domain); + _partition_allocator.local().add_final_counts(to_add, domain); + + auto to_remove = subtract_replica_sets(previous, target); + _partition_allocator.local().remove_final_counts(to_remove, domain); +} + ss::future<> topic_updates_dispatcher::fill_snapshot(controller_snapshot& snap) const { co_await _topic_table.local().fill_snapshot(snap); diff --git a/src/v/cluster/topic_updates_dispatcher.h b/src/v/cluster/topic_updates_dispatcher.h index 3f59e4aa1ac2..4b7fc0089c93 100644 --- a/src/v/cluster/topic_updates_dispatcher.h +++ b/src/v/cluster/topic_updates_dispatcher.h @@ -115,6 +115,11 @@ class topic_updates_dispatcher { void add_allocations_for_new_partitions(const T&, partition_allocation_domain); + void update_allocations_for_reconfiguration( + const std::vector& previous, + const std::vector& target, + partition_allocation_domain); + void deallocate_topic( const model::topic_namespace&, const assignments_set&,