diff --git a/src/v/cluster/controller.cc b/src/v/cluster/controller.cc index 51bf5a2f3ea3..393af0a86812 100644 --- a/src/v/cluster/controller.cc +++ b/src/v/cluster/controller.cc @@ -104,6 +104,7 @@ ss::future<> controller::wire_up() { config::shard_local_cfg().topic_fds_per_partition.bind(), config::shard_local_cfg().topic_partitions_per_shard.bind(), config::shard_local_cfg().topic_partitions_reserve_shard0.bind(), + config::shard_local_cfg().kafka_nodelete_topics.bind(), config::shard_local_cfg().enable_rack_awareness.bind()); }) .then([this] { return _credentials.start(); }) diff --git a/src/v/cluster/health_manager.cc b/src/v/cluster/health_manager.cc index 58e4b0167153..543bcbc30e83 100644 --- a/src/v/cluster/health_manager.cc +++ b/src/v/cluster/health_manager.cc @@ -83,7 +83,10 @@ ss::future health_manager::ensure_partition_replication(model::ntp ntp) { ntp.tp.partition, _target_replication_factor); auto allocation = _allocator.local().reallocate_partition( - constraints, *assignment, get_allocation_domain(ntp)); + model::topic_namespace{ntp.ns, ntp.tp.topic}, + constraints, + *assignment, + get_allocation_domain(ntp)); if (!allocation) { vlog( clusterlog.warn, diff --git a/src/v/cluster/members_backend.cc b/src/v/cluster/members_backend.cc index 1cea91635600..e1a789e34ae3 100644 --- a/src/v/cluster/members_backend.cc +++ b/src/v/cluster/members_backend.cc @@ -31,6 +31,7 @@ void reassign_replicas( partition_assignment current_assignment, members_backend::partition_reallocation& reallocation) { auto res = allocator.reallocate_partition( + model::topic_namespace{ntp.ns, ntp.tp.topic}, reallocation.constraints.value(), current_assignment, get_allocation_domain(ntp), diff --git a/src/v/cluster/partition_balancer_planner.cc b/src/v/cluster/partition_balancer_planner.cc index 1068453f7890..9c1ed6522c6b 100644 --- a/src/v/cluster/partition_balancer_planner.cc +++ b/src/v/cluster/partition_balancer_planner.cc @@ -40,7 +40,7 @@ distinct_from(const absl::flat_hash_set& nodes) { : _nodes(nodes) {} hard_constraint_evaluator - make_evaluator(const replicas_t&) const final { + make_evaluator(const model::ntp&, const replicas_t&) const final { return [this](const allocation_node& node) { return !_nodes.contains(node.id()); }; @@ -828,7 +828,7 @@ partition_balancer_planner::reassignable_partition::move_replica( if (!_reallocated) { _reallocated = _ctx._parent._partition_allocator.make_allocated_partition( - replicas(), get_allocation_domain(_ntp)); + _ntp, replicas(), get_allocation_domain(_ntp)); } // Verify that we are moving only original replicas. This assumption diff --git a/src/v/cluster/scheduling/allocation_node.cc b/src/v/cluster/scheduling/allocation_node.cc index 63283d677fa3..1e0452c5d5bf 100644 --- a/src/v/cluster/scheduling/allocation_node.cc +++ b/src/v/cluster/scheduling/allocation_node.cc @@ -20,12 +20,14 @@ allocation_node::allocation_node( model::node_id id, uint32_t cpus, config::binding partitions_per_shard, - config::binding partitions_reserve_shard0) + config::binding partitions_reserve_shard0, + config::binding> internal_kafka_topics) : _id(id) , _weights(cpus) , _max_capacity((cpus * partitions_per_shard()) - partitions_reserve_shard0()) , _partitions_per_shard(std::move(partitions_per_shard)) , _partitions_reserve_shard0(std::move(partitions_reserve_shard0)) + , _internal_kafka_topics(std::move(internal_kafka_topics)) , _cpus(cpus) { // add extra weights to core 0 _weights[0] = _partitions_reserve_shard0(); @@ -48,6 +50,26 @@ allocation_node::allocation_node( }); } +bool allocation_node::is_full(const model::ntp& ntp) const { + // Internal topics are excluded from checks to prevent allocation failures + // when creating them. This is okay because they are fairly small in number + // compared to kafka user topic partitions. + auto is_internal_ns = ntp.ns == model::redpanda_ns + || ntp.ns == model::kafka_internal_namespace; + if (is_internal_ns) { + return false; + } + const auto& internal_topics = _internal_kafka_topics(); + auto is_internal_topic = ntp.ns == model::kafka_namespace + && std::any_of( + internal_topics.cbegin(), + internal_topics.cend(), + [&ntp](const ss::sstring& topic) { + return topic == ntp.tp.topic(); + }); + return !is_internal_topic && _allocated_partitions >= _max_capacity; +} + ss::shard_id allocation_node::allocate(const partition_allocation_domain domain) { auto it = std::min_element(_weights.begin(), _weights.end()); diff --git a/src/v/cluster/scheduling/allocation_node.h b/src/v/cluster/scheduling/allocation_node.h index 7e783d58a12a..a740734b2ba6 100644 --- a/src/v/cluster/scheduling/allocation_node.h +++ b/src/v/cluster/scheduling/allocation_node.h @@ -33,9 +33,10 @@ class allocation_node { allocation_node( model::node_id, - uint32_t, - config::binding, - config::binding); + uint32_t /*cpus*/, + config::binding /*partitions_per_shard*/, + config::binding /*partitions_reserve_shard0*/, + config::binding> /*internal_kafka_topics*/); allocation_node(allocation_node&& o) noexcept = default; allocation_node& operator=(allocation_node&&) = delete; @@ -123,7 +124,7 @@ class allocation_node { bool empty() const { return _allocated_partitions == allocation_capacity{0}; } - bool is_full() const { return _allocated_partitions >= _max_capacity; } + bool is_full(const model::ntp&) const; ss::shard_id allocate(partition_allocation_domain); private: @@ -150,6 +151,7 @@ class allocation_node { config::binding _partitions_per_shard; config::binding _partitions_reserve_shard0; + config::binding> _internal_kafka_topics; // Keep track of how much weight we applied to shard0, // to enable runtime updates int32_t _shard0_reserved{0}; diff --git a/src/v/cluster/scheduling/allocation_state.cc b/src/v/cluster/scheduling/allocation_state.cc index aaacd0ce814d..c44d9d175b30 100644 --- a/src/v/cluster/scheduling/allocation_state.cc +++ b/src/v/cluster/scheduling/allocation_state.cc @@ -59,7 +59,8 @@ void allocation_state::register_node( broker.id(), broker.properties().cores, _partitions_per_shard, - _partitions_reserve_shard0); + _partitions_reserve_shard0, + _internal_kafka_topics); if (state == allocation_node::state::decommissioned) { node->decommission(); @@ -93,7 +94,8 @@ void allocation_state::update_allocation_nodes( b.id(), b.properties().cores, _partitions_per_shard, - _partitions_reserve_shard0)); + _partitions_reserve_shard0, + _internal_kafka_topics)); } else { it->second->update_core_count(b.properties().cores); // node was added back to the cluster @@ -114,7 +116,8 @@ void allocation_state::upsert_allocation_node(const model::broker& broker) { broker.id(), broker.properties().cores, _partitions_per_shard, - _partitions_reserve_shard0)); + _partitions_reserve_shard0, + _internal_kafka_topics)); } else { it->second->update_core_count(broker.properties().cores); // node was added back to the cluster diff --git a/src/v/cluster/scheduling/allocation_state.h b/src/v/cluster/scheduling/allocation_state.h index a87a25211011..4d8569fb2546 100644 --- a/src/v/cluster/scheduling/allocation_state.h +++ b/src/v/cluster/scheduling/allocation_state.h @@ -30,9 +30,11 @@ class allocation_state : public ss::weakly_referencable { allocation_state( config::binding partitions_per_shard, - config::binding partitions_reserve_shard0) - : _partitions_per_shard(partitions_per_shard) - , _partitions_reserve_shard0(partitions_reserve_shard0) {} + config::binding partitions_reserve_shard0, + config::binding> internal_kafka_topics) + : _partitions_per_shard(std::move(partitions_per_shard)) + , _partitions_reserve_shard0(std::move(partitions_reserve_shard0)) + , _internal_kafka_topics(std::move(internal_kafka_topics)) {} // Allocation nodes void register_node(node_ptr); @@ -84,6 +86,7 @@ class allocation_state : public ss::weakly_referencable { config::binding _partitions_per_shard; config::binding _partitions_reserve_shard0; + config::binding> _internal_kafka_topics; raft::group_id _highest_group{0}; underlying_t _nodes; diff --git a/src/v/cluster/scheduling/allocation_strategy.cc b/src/v/cluster/scheduling/allocation_strategy.cc index 04498a2d008e..2243e05dcbef 100644 --- a/src/v/cluster/scheduling/allocation_strategy.cc +++ b/src/v/cluster/scheduling/allocation_strategy.cc @@ -42,13 +42,14 @@ inline bool contains_node_already( } std::vector solve_hard_constraints( + const model::ntp& ntp, const std::vector& current_replicas, const std::vector& constraints, const allocation_state::underlying_t& nodes) { std::vector evaluators; evaluators.reserve(constraints.size()); for (auto& c : constraints) { - evaluators.push_back(c->make_evaluator(current_replicas)); + evaluators.push_back(c->make_evaluator(ntp, current_replicas)); } // empty hard constraints, all nodes are eligible @@ -160,6 +161,7 @@ allocation_strategy simple_allocation_strategy() { class impl : public allocation_strategy::impl { public: result choose_node( + const model::ntp& ntp, const std::vector& current_replicas, const allocation_constraints& request, allocation_state& state, @@ -169,6 +171,7 @@ allocation_strategy simple_allocation_strategy() { * evaluate hard constraints */ std::vector possible_nodes = solve_hard_constraints( + ntp, current_replicas, request.hard_constraints, state.allocation_nodes()); diff --git a/src/v/cluster/scheduling/allocation_strategy.h b/src/v/cluster/scheduling/allocation_strategy.h index 4b6ea3f8676b..c36c5f6f1d53 100644 --- a/src/v/cluster/scheduling/allocation_strategy.h +++ b/src/v/cluster/scheduling/allocation_strategy.h @@ -26,6 +26,7 @@ class allocation_strategy { * constraints in the specified domain */ virtual result choose_node( + const model::ntp&, const replicas_t&, const allocation_constraints&, allocation_state&, @@ -39,11 +40,12 @@ class allocation_strategy { : _impl(std::move(impl)) {} result choose_node( + const model::ntp& ntp, const replicas_t& current_replicas, const allocation_constraints& ac, allocation_state& state, const partition_allocation_domain domain) { - return _impl->choose_node(current_replicas, ac, state, domain); + return _impl->choose_node(ntp, current_replicas, ac, state, domain); } private: diff --git a/src/v/cluster/scheduling/constraints.cc b/src/v/cluster/scheduling/constraints.cc index f69930d9b023..eb0db84774c3 100644 --- a/src/v/cluster/scheduling/constraints.cc +++ b/src/v/cluster/scheduling/constraints.cc @@ -29,8 +29,10 @@ hard_constraint not_fully_allocated() { class impl : public hard_constraint::impl { public: hard_constraint_evaluator - make_evaluator(const replicas_t&) const final { - return [](const allocation_node& node) { return !node.is_full(); }; + make_evaluator(const model::ntp& ntp, const replicas_t&) const final { + return [&ntp](const allocation_node& node) { + return !node.is_full(ntp); + }; } ss::sstring name() const final { @@ -45,7 +47,7 @@ hard_constraint is_active() { class impl : public hard_constraint::impl { public: hard_constraint_evaluator - make_evaluator(const replicas_t&) const final { + make_evaluator(const model::ntp&, const replicas_t&) const final { return [](const allocation_node& node) { return node.is_active(); }; } @@ -62,7 +64,7 @@ hard_constraint on_node(model::node_id id) { : _id(id) {} hard_constraint_evaluator - make_evaluator(const replicas_t&) const final { + make_evaluator(const model::ntp&, const replicas_t&) const final { return [this](const allocation_node& node) { return node.id() == _id; }; } @@ -89,7 +91,7 @@ hard_constraint on_nodes(const std::vector& ids) { } } hard_constraint_evaluator - make_evaluator(const replicas_t&) const final { + make_evaluator(const model::ntp&, const replicas_t&) const final { return [this](const allocation_node& node) { return _ids.contains(node.id()); }; @@ -125,7 +127,7 @@ hard_constraint distinct_from(const replicas_t& replicas) { : _replicas(r) {} hard_constraint_evaluator - make_evaluator(const replicas_t&) const final { + make_evaluator(const model::ntp&, const replicas_t&) const final { return [this](const allocation_node& node) { return std::all_of( _replicas.begin(), @@ -150,8 +152,8 @@ hard_constraint distinct_from(const replicas_t& replicas) { hard_constraint distinct_nodes() { class impl : public hard_constraint::impl { public: - hard_constraint_evaluator - make_evaluator(const replicas_t& current_replicas) const final { + hard_constraint_evaluator make_evaluator( + const model::ntp&, const replicas_t& current_replicas) const final { return [¤t_replicas](const allocation_node& node) { return std::all_of( current_replicas.begin(), @@ -187,7 +189,7 @@ hard_constraint disk_not_overflowed_by_partition( , _node_disk_reports(node_disk_reports) {} hard_constraint_evaluator - make_evaluator(const replicas_t&) const final { + make_evaluator(const model::ntp&, const replicas_t&) const final { return [this](const allocation_node& node) { auto disk_it = _node_disk_reports.find(node.id()); if (disk_it == _node_disk_reports.end()) { @@ -322,32 +324,6 @@ soft_constraint least_disk_filled( std::make_unique(max_disk_usage_ratio, node_disk_reports)); } -soft_constraint make_soft_constraint(hard_constraint constraint) { - class impl : public soft_constraint::impl { - public: - explicit impl(hard_constraint constraint) - : _hard_constraint(std::move(constraint)) {} - - soft_constraint_evaluator - make_evaluator(const replicas_t& replicas) const final { - auto ev = _hard_constraint.make_evaluator(replicas); - return - [ev = std::move(ev)](const allocation_node& node) -> uint64_t { - return ev(node) ? soft_constraint::max_score : 0; - }; - } - - ss::sstring name() const final { - return ssx::sformat( - "soft constraint adapter of ({})", _hard_constraint); - } - - const hard_constraint _hard_constraint; - }; - - return soft_constraint(std::make_unique(std::move(constraint))); -} - soft_constraint distinct_rack_preferred(const members_table& members) { return distinct_labels_preferred( rack_label.data(), diff --git a/src/v/cluster/scheduling/constraints.h b/src/v/cluster/scheduling/constraints.h index ca89b8b0ed28..7a441c5347e1 100644 --- a/src/v/cluster/scheduling/constraints.h +++ b/src/v/cluster/scheduling/constraints.h @@ -23,12 +23,6 @@ namespace cluster { class allocation_state; static constexpr std::string_view rack_label = "rack"; -/** - * make_soft_constraint adapts hard constraint to soft one by returning - * max score for nodes that matches the soft constraint and 0 for - * the ones that not - */ -soft_constraint make_soft_constraint(hard_constraint); hard_constraint not_fully_allocated(); hard_constraint is_active(); diff --git a/src/v/cluster/scheduling/partition_allocator.cc b/src/v/cluster/scheduling/partition_allocator.cc index 362383265b4b..fc0afc09e6de 100644 --- a/src/v/cluster/scheduling/partition_allocator.cc +++ b/src/v/cluster/scheduling/partition_allocator.cc @@ -46,16 +46,18 @@ partition_allocator::partition_allocator( config::binding> fds_per_partition, config::binding partitions_per_shard, config::binding partitions_reserve_shard0, + config::binding> internal_kafka_topics, config::binding enable_rack_awareness) : _state(std::make_unique( - partitions_per_shard, partitions_reserve_shard0)) + partitions_per_shard, partitions_reserve_shard0, internal_kafka_topics)) , _allocation_strategy(simple_allocation_strategy()) , _members(members) - , _memory_per_partition(memory_per_partition) - , _fds_per_partition(fds_per_partition) - , _partitions_per_shard(partitions_per_shard) - , _partitions_reserve_shard0(partitions_reserve_shard0) - , _enable_rack_awareness(enable_rack_awareness) {} + , _memory_per_partition(std::move(memory_per_partition)) + , _fds_per_partition(std::move(fds_per_partition)) + , _partitions_per_shard(std::move(partitions_per_shard)) + , _partitions_reserve_shard0(std::move(partitions_reserve_shard0)) + , _internal_kafka_topics(std::move(internal_kafka_topics)) + , _enable_rack_awareness(std::move(enable_rack_awareness)) {} allocation_constraints partition_allocator::default_constraints( const partition_allocation_domain domain) { @@ -77,6 +79,7 @@ allocation_constraints partition_allocator::default_constraints( } result partition_allocator::allocate_new_partition( + model::topic_namespace nt, partition_constraints p_constraints, const partition_allocation_domain domain) { vlog( @@ -94,7 +97,9 @@ result partition_allocator::allocate_new_partition( auto effective_constraints = default_constraints(domain); effective_constraints.add(p_constraints.constraints); - allocated_partition ret{{}, domain, *_state}; + model::ntp ntp{ + std::move(nt.ns), std::move(nt.tp), p_constraints.partition_id}; + allocated_partition ret{std::move(ntp), {}, domain, *_state}; for (auto r = 0; r < replicas_to_allocate; ++r) { auto replica = do_allocate_replica( ret, std::nullopt, effective_constraints); @@ -271,10 +276,11 @@ partition_allocator::allocate(allocation_request request) { intermediate_allocation assignments( *_state, request.partitions.size(), request.domain); + const auto& nt = request._nt; for (auto& p_constraints : request.partitions) { auto const partition_id = p_constraints.partition_id; auto allocated = allocate_new_partition( - std::move(p_constraints), request.domain); + nt, std::move(p_constraints), request.domain); if (!allocated) { co_return allocated.error(); } @@ -290,6 +296,7 @@ partition_allocator::allocate(allocation_request request) { } result partition_allocator::reallocate_partition( + model::topic_namespace nt, partition_constraints p_constraints, const partition_assignment& current_assignment, const partition_allocation_domain domain, @@ -309,7 +316,10 @@ result partition_allocator::reallocate_partition( return errc::topic_invalid_replication_factor; } - allocated_partition res{current_assignment.replicas, domain, *_state}; + model::ntp ntp{ + std::move(nt.ns), std::move(nt.tp), p_constraints.partition_id}; + allocated_partition res{ + std::move(ntp), current_assignment.replicas, domain, *_state}; if (num_new_replicas == 0 && replicas_to_reallocate.empty()) { // nothing to do @@ -340,9 +350,11 @@ result partition_allocator::reallocate_partition( } allocated_partition partition_allocator::make_allocated_partition( + model::ntp ntp, std::vector replicas, partition_allocation_domain domain) const { - return allocated_partition{std::move(replicas), domain, *_state}; + return allocated_partition{ + std::move(ntp), std::move(replicas), domain, *_state}; } result partition_allocator::reallocate_replica( @@ -381,7 +393,11 @@ result partition_allocator::do_allocate_replica( }); auto node = _allocation_strategy.choose_node( - partition._replicas, effective_constraints, *_state, partition._domain); + partition._ntp, + partition._replicas, + effective_constraints, + *_state, + partition._domain); if (!node) { return node.error(); } @@ -430,7 +446,9 @@ void partition_allocator::remove_final_counts( ss::future<> partition_allocator::apply_snapshot(const controller_snapshot& snap) { auto new_state = std::make_unique( - _partitions_per_shard, _partitions_reserve_shard0); + _partitions_per_shard, + _partitions_reserve_shard0, + _internal_kafka_topics); for (const auto& [id, node] : snap.members.nodes) { allocation_node::state state; diff --git a/src/v/cluster/scheduling/partition_allocator.h b/src/v/cluster/scheduling/partition_allocator.h index 9d7b5fd3f44f..1eb975486efb 100644 --- a/src/v/cluster/scheduling/partition_allocator.h +++ b/src/v/cluster/scheduling/partition_allocator.h @@ -32,11 +32,13 @@ class partition_allocator { static constexpr ss::shard_id shard = 0; partition_allocator( ss::sharded&, - config::binding>, - config::binding>, - config::binding, - config::binding, - config::binding); + config::binding> memory_per_partition, + config::binding> fds_per_partition, + config::binding partitions_per_shard, + config::binding partitions_reserve_shard0, + config::binding> + kafka_topics_skipping_allocation, + config::binding enable_rack_awareness); // Replica placement APIs @@ -51,6 +53,7 @@ class partition_allocator { /// replicas to reach the requested replication factor will be allocated /// anew. result reallocate_partition( + model::topic_namespace, partition_constraints, const partition_assignment&, partition_allocation_domain, @@ -59,6 +62,7 @@ class partition_allocator { /// Create allocated_partition object from current replicas for use with the /// allocate_replica method. allocated_partition make_allocated_partition( + model::ntp ntp, std::vector replicas, partition_allocation_domain) const; @@ -178,7 +182,9 @@ class partition_allocator { check_cluster_limits(allocation_request const& request) const; result allocate_new_partition( - partition_constraints, partition_allocation_domain); + model::topic_namespace nt, + partition_constraints, + partition_allocation_domain); result do_allocate_replica( allocated_partition&, @@ -196,6 +202,7 @@ class partition_allocator { config::binding> _fds_per_partition; config::binding _partitions_per_shard; config::binding _partitions_reserve_shard0; + config::binding> _internal_kafka_topics; config::binding _enable_rack_awareness; }; } // namespace cluster diff --git a/src/v/cluster/scheduling/types.cc b/src/v/cluster/scheduling/types.cc index 745ab367cf4f..1becc229fea4 100644 --- a/src/v/cluster/scheduling/types.cc +++ b/src/v/cluster/scheduling/types.cc @@ -19,9 +19,9 @@ namespace cluster { -hard_constraint_evaluator -hard_constraint::make_evaluator(const replicas_t& current_replicas) const { - auto ev = _impl->make_evaluator(current_replicas); +hard_constraint_evaluator hard_constraint::make_evaluator( + const model::ntp& ntp, const replicas_t& current_replicas) const { + auto ev = _impl->make_evaluator(ntp, current_replicas); return [this, ev = std::move(ev)](const allocation_node& node) { auto res = ev(node); vlog(clusterlog.trace, "{}({}) = {}", name(), node.id(), res); @@ -85,10 +85,12 @@ allocation_units::~allocation_units() { } allocated_partition::allocated_partition( + model::ntp ntp, std::vector replicas, partition_allocation_domain domain, allocation_state& state) - : _replicas(std::move(replicas)) + : _ntp(std::move(ntp)) + , _replicas(std::move(replicas)) , _domain(domain) , _state(state.weak_from_this()) {} diff --git a/src/v/cluster/scheduling/types.h b/src/v/cluster/scheduling/types.h index e281bc30ec57..aa9e5ca32967 100644 --- a/src/v/cluster/scheduling/types.h +++ b/src/v/cluster/scheduling/types.h @@ -43,8 +43,8 @@ using soft_constraint_evaluator class hard_constraint { public: struct impl { - virtual hard_constraint_evaluator - make_evaluator(const replicas_t& current_replicas) const + virtual hard_constraint_evaluator make_evaluator( + const model::ntp&, const replicas_t& current_replicas) const = 0; virtual ss::sstring name() const = 0; @@ -63,7 +63,7 @@ class hard_constraint { ~hard_constraint() noexcept = default; hard_constraint_evaluator - make_evaluator(const replicas_t& current_replicas) const; + make_evaluator(const model::ntp&, const replicas_t& current_replicas) const; ss::sstring name() const { return _impl->name(); } @@ -243,6 +243,8 @@ class allocated_partition { return _replicas; } + const model::ntp& ntp() const { return _ntp; } + bool has_changes() const; bool is_original(model::node_id) const; @@ -260,6 +262,7 @@ class allocated_partition { // construct an object from an original assignment allocated_partition( + model::ntp, std::vector, partition_allocation_domain, allocation_state&); @@ -279,6 +282,7 @@ class allocated_partition { std::vector release_new_partition(); private: + model::ntp _ntp; std::vector _replicas; std::optional> _original_node2shard; @@ -309,14 +313,17 @@ struct partition_constraints { struct allocation_request { allocation_request() = delete; - explicit allocation_request(const partition_allocation_domain domain_) - : domain(domain_) {} + explicit allocation_request( + model::topic_namespace nt, const partition_allocation_domain domain_) + : _nt(std::move(nt)) + , domain(domain_) {} allocation_request(const allocation_request&) = delete; allocation_request(allocation_request&&) = default; allocation_request& operator=(const allocation_request&) = delete; allocation_request& operator=(allocation_request&&) = default; ~allocation_request() = default; + model::topic_namespace _nt; ss::chunked_fifo partitions; partition_allocation_domain domain; diff --git a/src/v/cluster/tests/partition_allocator_fixture.h b/src/v/cluster/tests/partition_allocator_fixture.h index 835a261269b0..725095ba9765 100644 --- a/src/v/cluster/tests/partition_allocator_fixture.h +++ b/src/v/cluster/tests/partition_allocator_fixture.h @@ -17,6 +17,7 @@ #include "cluster/scheduling/allocation_strategy.h" #include "cluster/scheduling/partition_allocator.h" #include "config/configuration.h" +#include "config/mock_property.h" #include "model/fundamental.h" #include "model/metadata.h" #include "net/unresolved_address.h" @@ -37,6 +38,7 @@ struct partition_allocator_fixture { config::mock_binding>(std::nullopt), config::mock_binding(uint32_t{partitions_per_shard}), config::mock_binding(uint32_t{partitions_reserve_shard0}), + kafka_internal_topics.bind(), config::mock_binding(true)) { members.start().get0(); ss::smp::invoke_on_all([] { @@ -69,7 +71,8 @@ struct partition_allocator_fixture { broker.id(), broker.properties().cores, config::mock_binding(uint32_t{partitions_per_shard}), - config::mock_binding(uint32_t{partitions_reserve_shard0}))); + config::mock_binding(uint32_t{partitions_reserve_shard0}), + kafka_internal_topics.bind())); } void saturate_all_machines() { @@ -115,8 +118,13 @@ struct partition_allocator_fixture { cluster::allocation_request make_allocation_request(int partitions, uint16_t replication_factor) { + return make_allocation_request(tn, partitions, replication_factor); + } + + cluster::allocation_request make_allocation_request( + model::topic_namespace tn, int partitions, uint16_t replication_factor) { cluster::allocation_request req( - cluster::partition_allocation_domains::common); + std::move(tn), cluster::partition_allocation_domains::common); req.partitions.reserve(partitions); for (int i = 0; i < partitions; ++i) { req.partitions.emplace_back( @@ -125,6 +133,8 @@ struct partition_allocator_fixture { return req; } + config::mock_property> kafka_internal_topics{{}}; + model::topic_namespace tn{model::kafka_namespace, model::topic{"test"}}; ss::sharded members; cluster::partition_allocator allocator; diff --git a/src/v/cluster/tests/partition_allocator_tests.cc b/src/v/cluster/tests/partition_allocator_tests.cc index 5b3af264f3f9..d6916d6da57a 100644 --- a/src/v/cluster/tests/partition_allocator_tests.cc +++ b/src/v/cluster/tests/partition_allocator_tests.cc @@ -85,7 +85,7 @@ FIXTURE_TEST(unregister_node, partition_allocator_fixture) { BOOST_REQUIRE_EQUAL(allocator.state().available_nodes(), 2); } -FIXTURE_TEST(invalid_allocation_over_capacity, partition_allocator_fixture) { +FIXTURE_TEST(allocation_over_capacity, partition_allocator_fixture) { register_node(0, 6); register_node(1, 6); register_node(2, 6); @@ -96,6 +96,26 @@ FIXTURE_TEST(invalid_allocation_over_capacity, partition_allocator_fixture) { allocator.allocate(make_allocation_request(1, 1)).get().has_error()); // group id hasn't changed BOOST_REQUIRE_EQUAL(allocator.state().last_group_id(), gr); + + // Make the topic internal and retry, should work. + kafka_internal_topics.update({tn.tp()}); + BOOST_REQUIRE(allocator.allocate(make_allocation_request(1, 1)).get()); + BOOST_REQUIRE_GT(allocator.state().last_group_id(), gr); + + // Undo the configuration, should fail again. + kafka_internal_topics.update({}); + BOOST_REQUIRE( + allocator.allocate(make_allocation_request(1, 1)).get().has_error()); + + auto int_1 = model::topic_namespace{ + model::ns{"redpanda"}, model::topic{"controller"}}; + auto int_2 = model::topic_namespace{ + model::ns{"kafka_internal"}, model::topic{"controller"}}; + // Internal namespaces should work too. + BOOST_REQUIRE( + allocator.allocate(make_allocation_request(int_1, 1, 1)).get()); + BOOST_REQUIRE( + allocator.allocate(make_allocation_request(int_2, 1, 1)).get()); } FIXTURE_TEST(max_allocation, partition_allocator_fixture) { @@ -294,8 +314,8 @@ FIXTURE_TEST(decommission_node, partition_allocator_fixture) { cluster::hard_constraint make_throwing_hard_evaluator() { struct impl : cluster::hard_constraint::impl { - cluster::hard_constraint_evaluator - make_evaluator(const cluster::replicas_t&) const final { + cluster::hard_constraint_evaluator make_evaluator( + const model::ntp&, const cluster::replicas_t&) const final { return [](const cluster::allocation_node&) -> bool { throw std::runtime_error("evaluation exception"); }; @@ -310,8 +330,8 @@ cluster::hard_constraint make_throwing_hard_evaluator() { cluster::hard_constraint make_false_evaluator() { struct impl : cluster::hard_constraint::impl { - cluster::hard_constraint_evaluator - make_evaluator(const cluster::replicas_t&) const final { + cluster::hard_constraint_evaluator make_evaluator( + const model::ntp&, const cluster::replicas_t&) const final { return [](const cluster::allocation_node&) { return true; }; } ss::sstring name() const final { @@ -324,8 +344,8 @@ cluster::hard_constraint make_false_evaluator() { cluster::hard_constraint make_nop_evaluator() { struct impl : cluster::hard_constraint::impl { - cluster::hard_constraint_evaluator - make_evaluator(const cluster::replicas_t&) const final { + cluster::hard_constraint_evaluator make_evaluator( + const model::ntp&, const cluster::replicas_t&) const final { return [](const cluster::allocation_node&) { return true; }; } ss::sstring name() const final { return "NOP evaluator"; } @@ -419,6 +439,7 @@ FIXTURE_TEST(change_replication_factor, partition_allocator_fixture) { // try to allocate 3 replicas no 2 nodes - should fail auto expected_failure = allocator.reallocate_partition( + tn, cluster::partition_constraints(model::partition_id(0), 3), res.value()->get_assignments().front(), cluster::partition_allocation_domains::common); @@ -429,6 +450,7 @@ FIXTURE_TEST(change_replication_factor, partition_allocator_fixture) { register_node(3, 4); auto expected_success = allocator.reallocate_partition( + tn, cluster::partition_constraints(model::partition_id(0), 3), res.value()->get_assignments().front(), cluster::partition_allocation_domains::common); @@ -627,8 +649,11 @@ FIXTURE_TEST(incrementally_reallocate_replicas, partition_allocator_fixture) { check_final_counts(allocator, {1, 1, 1, 0}); { + auto partition_id = res.value()->get_assignments().front().id; + auto ntp = model::ntp{tn.ns, tn.tp, partition_id}; cluster::allocated_partition reallocated - = allocator.make_allocated_partition(original_replicas, domain); + = allocator.make_allocated_partition( + std::move(ntp), original_replicas, domain); cluster::allocation_constraints not_on_old_nodes; not_on_old_nodes.add(cluster::distinct_from(original_replicas)); @@ -749,6 +774,7 @@ FIXTURE_TEST(reallocate_partition_with_move, partition_allocator_fixture) { { auto res = allocator.reallocate_partition( + tn, cluster::partition_constraints( model::partition_id(0), 4, not_on_old_nodes), original_assignment, @@ -770,6 +796,7 @@ FIXTURE_TEST(reallocate_partition_with_move, partition_allocator_fixture) { { auto res = allocator.reallocate_partition( + tn, cluster::partition_constraints( model::partition_id(0), 5, not_on_old_nodes), original_assignment, @@ -782,6 +809,7 @@ FIXTURE_TEST(reallocate_partition_with_move, partition_allocator_fixture) { { auto res = allocator.reallocate_partition( + tn, cluster::partition_constraints( model::partition_id(0), 4, not_on_old_nodes), original_assignment, @@ -835,9 +863,13 @@ FIXTURE_TEST( check_allocated_counts(allocator, {2, 2, 2, 1}); check_final_counts(allocator, {2, 2, 2, 1}); + auto id = partition_2->get_assignments().front().id; + auto ntp = model::ntp{tn.ns, tn.tp, id}; cluster::allocated_partition reallocated = allocator.make_allocated_partition( - original_replicas, cluster::partition_allocation_domains::common); + std::move(ntp), + original_replicas, + cluster::partition_allocation_domains::common); auto moved = allocator.reallocate_replica( reallocated, model::node_id{0}, not_on_0); @@ -892,8 +924,8 @@ static cluster::allocation_constraints on_node(model::node_id id) { explicit impl(model::node_id id) : _id(id) {} - cluster::hard_constraint_evaluator - make_evaluator(const cluster::replicas_t&) const final { + cluster::hard_constraint_evaluator make_evaluator( + const model::ntp&, const cluster::replicas_t&) const final { return [this](const cluster::allocation_node& node) { return node.id() == _id; }; @@ -930,8 +962,11 @@ FIXTURE_TEST(revert_allocation_step, partition_allocator_fixture) { auto n = [](int id) { return model::node_id{id}; }; { + auto partition_id = res.value()->get_assignments().front().id; + auto ntp = model::ntp{tn.ns, tn.tp, partition_id}; cluster::allocated_partition reallocated - = allocator.make_allocated_partition(original_replicas, domain); + = allocator.make_allocated_partition( + std::move(ntp), original_replicas, domain); auto step1 = allocator.reallocate_replica( reallocated, n(0), on_node(n(3))); BOOST_REQUIRE(step1); diff --git a/src/v/cluster/tests/partition_balancer_planner_fixture.h b/src/v/cluster/tests/partition_balancer_planner_fixture.h index bf88333352ea..3ee32805489f 100644 --- a/src/v/cluster/tests/partition_balancer_planner_fixture.h +++ b/src/v/cluster/tests/partition_balancer_planner_fixture.h @@ -48,7 +48,8 @@ create_allocation_node(model::node_id nid, uint32_t cores) { nid, cores, config::mock_binding(uint32_t{partitions_per_shard}), - config::mock_binding(uint32_t{partitions_reserve_shard0})); + config::mock_binding(uint32_t{partitions_reserve_shard0}), + config::mock_binding>({})); } struct controller_workers { @@ -64,6 +65,9 @@ struct controller_workers { config::mock_binding>(std::nullopt), config::mock_binding(uint32_t{partitions_per_shard}), config::mock_binding(uint32_t{partitions_reserve_shard0}), + config::mock_binding>( + std::vector{ + {"__audit", "__consumer_offsets", "_schemas"}}), config::mock_binding(true)) .get(); // use node status that is not used in test as self is always available @@ -90,7 +94,7 @@ struct controller_workers { test_ns, model::topic(topic), partitions, replication_factor); cluster::allocation_request req( - cluster::partition_allocation_domains::common); + cfg.tp_ns, cluster::partition_allocation_domains::common); req.partitions.reserve(partitions); for (auto p = 0; p < partitions; ++p) { req.partitions.emplace_back( @@ -162,6 +166,28 @@ struct partition_balancer_planner_fixture { workers.allocator.local()); } + cluster::topic_configuration_assignment make_tp_configuration( + const ss::sstring& topic, int partitions, int16_t replication_factor) { + cluster::topic_configuration cfg( + test_ns, model::topic(topic), partitions, replication_factor); + + cluster::allocation_request req( + cfg.tp_ns, cluster::partition_allocation_domains::common); + req.partitions.reserve(partitions); + for (auto p = 0; p < partitions; ++p) { + req.partitions.emplace_back( + model::partition_id(p), replication_factor); + } + + auto pas = workers.allocator.local() + .allocate(std::move(req)) + .get() + .value() + ->copy_assignments(); + + return {cfg, std::move(pas)}; + } + model::topic_namespace make_tp_ns(const ss::sstring& tp) { return {test_ns, model::topic(tp)}; } diff --git a/src/v/cluster/tests/partition_balancer_simulator_test.cc b/src/v/cluster/tests/partition_balancer_simulator_test.cc index f5bd9dbecf3b..744497b42ae0 100644 --- a/src/v/cluster/tests/partition_balancer_simulator_test.cc +++ b/src/v/cluster/tests/partition_balancer_simulator_test.cc @@ -59,7 +59,8 @@ class partition_balancer_sim_fixture { id, n_cores, config::mock_binding(1000), - config::mock_binding(0))); + config::mock_binding(0), + config::mock_binding>({}))); // add some random initial used space size_t initial_used = random_generators::get_int( diff --git a/src/v/cluster/tests/topic_table_fixture.h b/src/v/cluster/tests/topic_table_fixture.h index f71f1756ab31..7e5988ce1e8a 100644 --- a/src/v/cluster/tests/topic_table_fixture.h +++ b/src/v/cluster/tests/topic_table_fixture.h @@ -54,6 +54,7 @@ struct topic_table_fixture { config::mock_binding>(std::nullopt), config::mock_binding(uint32_t{partitions_per_shard}), config::mock_binding(uint32_t{partitions_reserve_shard0}), + config::mock_binding>({}), config::mock_binding(false)) .get0(); allocator.local().register_node( @@ -88,7 +89,8 @@ struct topic_table_fixture { nid, cores, config::mock_binding(uint32_t{partitions_per_shard}), - config::mock_binding(uint32_t{partitions_reserve_shard0})); + config::mock_binding(uint32_t{partitions_reserve_shard0}), + config::mock_binding>({})); } cluster::topic_configuration_assignment make_tp_configuration( @@ -97,7 +99,7 @@ struct topic_table_fixture { test_ns, model::topic(topic), partitions, replication_factor); cluster::allocation_request req( - cluster::partition_allocation_domains::common); + cfg.tp_ns, cluster::partition_allocation_domains::common); req.partitions.reserve(partitions); for (auto p = 0; p < partitions; ++p) { req.partitions.emplace_back( diff --git a/src/v/cluster/topics_frontend.cc b/src/v/cluster/topics_frontend.cc index 8f2cf182bdd3..06daccbe25e2 100644 --- a/src/v/cluster/topics_frontend.cc +++ b/src/v/cluster/topics_frontend.cc @@ -304,7 +304,8 @@ make_error_result(const model::topic_namespace& tp_ns, std::error_code ec) { allocation_request make_allocation_request(const custom_assignable_topic_configuration& ca_cfg) { // no custom assignments, lets allocator decide based on partition count - allocation_request req(get_allocation_domain(ca_cfg.cfg.tp_ns)); + const auto& tp_ns = ca_cfg.cfg.tp_ns; + allocation_request req(tp_ns, get_allocation_domain(tp_ns)); if (!ca_cfg.has_custom_assignment()) { req.partitions.reserve(ca_cfg.cfg.partition_count); for (auto p = 0; p < ca_cfg.cfg.partition_count; ++p) { @@ -955,7 +956,7 @@ allocation_request make_allocation_request( const create_partitions_configuration& cfg) { const auto new_partitions_cnt = cfg.new_total_partition_count - current_partitions_count; - allocation_request req(get_allocation_domain(cfg.tp_ns)); + allocation_request req(cfg.tp_ns, get_allocation_domain(cfg.tp_ns)); req.partitions.reserve(new_partitions_cnt); for (auto p = 0; p < new_partitions_cnt; ++p) { req.partitions.emplace_back(model::partition_id(p), replication_factor); @@ -1287,18 +1288,17 @@ ss::future topics_frontend::increase_replication_factor( partition_constraints.partition_id = p_id; - auto ntp = model::ntp(topic.ns, topic.tp, p_id); - return _allocator .invoke_on( partition_allocator::shard, - [partition_constraints, - assignment = std::move(assignment), - ntp = std::move(ntp)](partition_allocator& al) { + [topic, + partition_constraints, + assignment = std::move(assignment)](partition_allocator& al) { return al.reallocate_partition( + topic, partition_constraints, assignment, - get_allocation_domain(ntp)); + get_allocation_domain({topic.ns, topic.tp})); }) .then([&error, &units, &new_assignments, topic, p_id]( result reallocation) { @@ -1386,8 +1386,8 @@ allocation_request make_allocation_request( model::ntp ntp, replication_factor tp_replication_factor, const std::vector& new_replicas) { - allocation_request req( - get_allocation_domain(model::topic_namespace{ntp.ns, ntp.tp.topic})); + auto nt = model::topic_namespace(ntp.ns, ntp.tp.topic); + allocation_request req(nt, get_allocation_domain(nt)); req.partitions.reserve(1); allocation_constraints constraints; constraints.add(on_nodes(new_replicas));