Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

partition_allocator: exclude internal topics from per node partition limit checks #11249

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/v/cluster/controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ ss::future<> controller::wire_up() {
config::shard_local_cfg().topic_fds_per_partition.bind(),
config::shard_local_cfg().topic_partitions_per_shard.bind(),
config::shard_local_cfg().topic_partitions_reserve_shard0.bind(),
config::shard_local_cfg().kafka_nodelete_topics.bind(),
config::shard_local_cfg().enable_rack_awareness.bind());
})
.then([this] { return _credentials.start(); })
Expand Down
5 changes: 4 additions & 1 deletion src/v/cluster/health_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,10 @@ ss::future<bool> health_manager::ensure_partition_replication(model::ntp ntp) {
ntp.tp.partition, _target_replication_factor);

auto allocation = _allocator.local().reallocate_partition(
constraints, *assignment, get_allocation_domain(ntp));
model::topic_namespace{ntp.ns, ntp.tp.topic},
constraints,
*assignment,
get_allocation_domain(ntp));
if (!allocation) {
vlog(
clusterlog.warn,
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/members_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ void reassign_replicas(
partition_assignment current_assignment,
members_backend::partition_reallocation& reallocation) {
auto res = allocator.reallocate_partition(
model::topic_namespace{ntp.ns, ntp.tp.topic},
reallocation.constraints.value(),
current_assignment,
get_allocation_domain(ntp),
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/partition_balancer_planner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ distinct_from(const absl::flat_hash_set<model::node_id>& nodes) {
: _nodes(nodes) {}

hard_constraint_evaluator
make_evaluator(const replicas_t&) const final {
make_evaluator(const model::ntp&, const replicas_t&) const final {
return [this](const allocation_node& node) {
return !_nodes.contains(node.id());
};
Expand Down Expand Up @@ -828,7 +828,7 @@ partition_balancer_planner::reassignable_partition::move_replica(
if (!_reallocated) {
_reallocated
= _ctx._parent._partition_allocator.make_allocated_partition(
replicas(), get_allocation_domain(_ntp));
_ntp, replicas(), get_allocation_domain(_ntp));
}

// Verify that we are moving only original replicas. This assumption
Expand Down
24 changes: 23 additions & 1 deletion src/v/cluster/scheduling/allocation_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ allocation_node::allocation_node(
model::node_id id,
uint32_t cpus,
config::binding<uint32_t> partitions_per_shard,
config::binding<uint32_t> partitions_reserve_shard0)
config::binding<uint32_t> partitions_reserve_shard0,
config::binding<std::vector<ss::sstring>> internal_kafka_topics)
: _id(id)
, _weights(cpus)
, _max_capacity((cpus * partitions_per_shard()) - partitions_reserve_shard0())
, _partitions_per_shard(std::move(partitions_per_shard))
, _partitions_reserve_shard0(std::move(partitions_reserve_shard0))
, _internal_kafka_topics(std::move(internal_kafka_topics))
, _cpus(cpus) {
// add extra weights to core 0
_weights[0] = _partitions_reserve_shard0();
Expand All @@ -48,6 +50,26 @@ allocation_node::allocation_node(
});
}

bool allocation_node::is_full(const model::ntp& ntp) const {
// Internal topics are excluded from checks to prevent allocation failures
// when creating them. This is okay because they are fairly small in number
// compared to kafka user topic partitions.
auto is_internal_ns = ntp.ns == model::redpanda_ns
|| ntp.ns == model::kafka_internal_namespace;
if (is_internal_ns) {
return false;
}
const auto& internal_topics = _internal_kafka_topics();
auto is_internal_topic = ntp.ns == model::kafka_namespace
&& std::any_of(
internal_topics.cbegin(),
internal_topics.cend(),
[&ntp](const ss::sstring& topic) {
return topic == ntp.tp.topic();
});
return !is_internal_topic && _allocated_partitions >= _max_capacity;
}

ss::shard_id
allocation_node::allocate(const partition_allocation_domain domain) {
auto it = std::min_element(_weights.begin(), _weights.end());
Expand Down
10 changes: 6 additions & 4 deletions src/v/cluster/scheduling/allocation_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ class allocation_node {

allocation_node(
model::node_id,
uint32_t,
config::binding<uint32_t>,
config::binding<uint32_t>);
uint32_t /*cpus*/,
config::binding<uint32_t> /*partitions_per_shard*/,
config::binding<uint32_t> /*partitions_reserve_shard0*/,
config::binding<std::vector<ss::sstring>> /*internal_kafka_topics*/);

allocation_node(allocation_node&& o) noexcept = default;
allocation_node& operator=(allocation_node&&) = delete;
Expand Down Expand Up @@ -123,7 +124,7 @@ class allocation_node {
bool empty() const {
return _allocated_partitions == allocation_capacity{0};
}
bool is_full() const { return _allocated_partitions >= _max_capacity; }
bool is_full(const model::ntp&) const;
ss::shard_id allocate(partition_allocation_domain);

private:
Expand All @@ -150,6 +151,7 @@ class allocation_node {

config::binding<uint32_t> _partitions_per_shard;
config::binding<uint32_t> _partitions_reserve_shard0;
config::binding<std::vector<ss::sstring>> _internal_kafka_topics;
// Keep track of how much weight we applied to shard0,
// to enable runtime updates
int32_t _shard0_reserved{0};
Expand Down
9 changes: 6 additions & 3 deletions src/v/cluster/scheduling/allocation_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ void allocation_state::register_node(
broker.id(),
broker.properties().cores,
_partitions_per_shard,
_partitions_reserve_shard0);
_partitions_reserve_shard0,
_internal_kafka_topics);

if (state == allocation_node::state::decommissioned) {
node->decommission();
Expand Down Expand Up @@ -93,7 +94,8 @@ void allocation_state::update_allocation_nodes(
b.id(),
b.properties().cores,
_partitions_per_shard,
_partitions_reserve_shard0));
_partitions_reserve_shard0,
_internal_kafka_topics));
} else {
it->second->update_core_count(b.properties().cores);
// node was added back to the cluster
Expand All @@ -114,7 +116,8 @@ void allocation_state::upsert_allocation_node(const model::broker& broker) {
broker.id(),
broker.properties().cores,
_partitions_per_shard,
_partitions_reserve_shard0));
_partitions_reserve_shard0,
_internal_kafka_topics));
} else {
it->second->update_core_count(broker.properties().cores);
// node was added back to the cluster
Expand Down
9 changes: 6 additions & 3 deletions src/v/cluster/scheduling/allocation_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ class allocation_state : public ss::weakly_referencable<allocation_state> {

allocation_state(
config::binding<uint32_t> partitions_per_shard,
config::binding<uint32_t> partitions_reserve_shard0)
: _partitions_per_shard(partitions_per_shard)
, _partitions_reserve_shard0(partitions_reserve_shard0) {}
config::binding<uint32_t> partitions_reserve_shard0,
config::binding<std::vector<ss::sstring>> internal_kafka_topics)
: _partitions_per_shard(std::move(partitions_per_shard))
, _partitions_reserve_shard0(std::move(partitions_reserve_shard0))
, _internal_kafka_topics(std::move(internal_kafka_topics)) {}

// Allocation nodes
void register_node(node_ptr);
Expand Down Expand Up @@ -84,6 +86,7 @@ class allocation_state : public ss::weakly_referencable<allocation_state> {

config::binding<uint32_t> _partitions_per_shard;
config::binding<uint32_t> _partitions_reserve_shard0;
config::binding<std::vector<ss::sstring>> _internal_kafka_topics;

raft::group_id _highest_group{0};
underlying_t _nodes;
Expand Down
5 changes: 4 additions & 1 deletion src/v/cluster/scheduling/allocation_strategy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ inline bool contains_node_already(
}

std::vector<model::node_id> solve_hard_constraints(
const model::ntp& ntp,
const std::vector<model::broker_shard>& current_replicas,
const std::vector<hard_constraint_ptr>& constraints,
const allocation_state::underlying_t& nodes) {
std::vector<hard_constraint_evaluator> evaluators;
evaluators.reserve(constraints.size());
for (auto& c : constraints) {
evaluators.push_back(c->make_evaluator(current_replicas));
evaluators.push_back(c->make_evaluator(ntp, current_replicas));
}

// empty hard constraints, all nodes are eligible
Expand Down Expand Up @@ -160,6 +161,7 @@ allocation_strategy simple_allocation_strategy() {
class impl : public allocation_strategy::impl {
public:
result<model::node_id> choose_node(
const model::ntp& ntp,
const std::vector<model::broker_shard>& current_replicas,
const allocation_constraints& request,
allocation_state& state,
Expand All @@ -169,6 +171,7 @@ allocation_strategy simple_allocation_strategy() {
* evaluate hard constraints
*/
std::vector<model::node_id> possible_nodes = solve_hard_constraints(
ntp,
current_replicas,
request.hard_constraints,
state.allocation_nodes());
Expand Down
4 changes: 3 additions & 1 deletion src/v/cluster/scheduling/allocation_strategy.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class allocation_strategy {
* constraints in the specified domain
*/
virtual result<model::node_id> choose_node(
const model::ntp&,
const replicas_t&,
const allocation_constraints&,
allocation_state&,
Expand All @@ -39,11 +40,12 @@ class allocation_strategy {
: _impl(std::move(impl)) {}

result<model::node_id> choose_node(
const model::ntp& ntp,
const replicas_t& current_replicas,
const allocation_constraints& ac,
allocation_state& state,
const partition_allocation_domain domain) {
return _impl->choose_node(current_replicas, ac, state, domain);
return _impl->choose_node(ntp, current_replicas, ac, state, domain);
}

private:
Expand Down
46 changes: 11 additions & 35 deletions src/v/cluster/scheduling/constraints.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ hard_constraint not_fully_allocated() {
class impl : public hard_constraint::impl {
public:
hard_constraint_evaluator
make_evaluator(const replicas_t&) const final {
return [](const allocation_node& node) { return !node.is_full(); };
make_evaluator(const model::ntp& ntp, const replicas_t&) const final {
return [&ntp](const allocation_node& node) {
return !node.is_full(ntp);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not necessary to fix right now, but I have a feeling that allocation_state should be just "dumb state" (e.g. for full node check it should just provide the number of currently allocated partitions) and all logic should be in evaluators. The reason is that 1) evaluators already contain some of the logic, why not all? and 2) they have more context available (existing replicas, previous node for that replica (not available right now, but we'll have to add it soon)) so we'll either have to pass it all to allocation_state or just decide everything in the evaluators.

Copy link
Contributor Author

@bharathv bharathv Jun 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right.. this thought did cross my mind, I agree with you. Makes sense to move to constraint once the context becomes more complicated.

};
}

ss::sstring name() const final {
Expand All @@ -45,7 +47,7 @@ hard_constraint is_active() {
class impl : public hard_constraint::impl {
public:
hard_constraint_evaluator
make_evaluator(const replicas_t&) const final {
make_evaluator(const model::ntp&, const replicas_t&) const final {
return [](const allocation_node& node) { return node.is_active(); };
}

Expand All @@ -62,7 +64,7 @@ hard_constraint on_node(model::node_id id) {
: _id(id) {}

hard_constraint_evaluator
make_evaluator(const replicas_t&) const final {
make_evaluator(const model::ntp&, const replicas_t&) const final {
return
[this](const allocation_node& node) { return node.id() == _id; };
}
Expand All @@ -89,7 +91,7 @@ hard_constraint on_nodes(const std::vector<model::node_id>& ids) {
}
}
hard_constraint_evaluator
make_evaluator(const replicas_t&) const final {
make_evaluator(const model::ntp&, const replicas_t&) const final {
return [this](const allocation_node& node) {
return _ids.contains(node.id());
};
Expand Down Expand Up @@ -125,7 +127,7 @@ hard_constraint distinct_from(const replicas_t& replicas) {
: _replicas(r) {}

hard_constraint_evaluator
make_evaluator(const replicas_t&) const final {
make_evaluator(const model::ntp&, const replicas_t&) const final {
return [this](const allocation_node& node) {
return std::all_of(
_replicas.begin(),
Expand All @@ -150,8 +152,8 @@ hard_constraint distinct_from(const replicas_t& replicas) {
hard_constraint distinct_nodes() {
class impl : public hard_constraint::impl {
public:
hard_constraint_evaluator
make_evaluator(const replicas_t& current_replicas) const final {
hard_constraint_evaluator make_evaluator(
const model::ntp&, const replicas_t& current_replicas) const final {
return [&current_replicas](const allocation_node& node) {
return std::all_of(
current_replicas.begin(),
Expand Down Expand Up @@ -187,7 +189,7 @@ hard_constraint disk_not_overflowed_by_partition(
, _node_disk_reports(node_disk_reports) {}

hard_constraint_evaluator
make_evaluator(const replicas_t&) const final {
make_evaluator(const model::ntp&, const replicas_t&) const final {
return [this](const allocation_node& node) {
auto disk_it = _node_disk_reports.find(node.id());
if (disk_it == _node_disk_reports.end()) {
Expand Down Expand Up @@ -322,32 +324,6 @@ soft_constraint least_disk_filled(
std::make_unique<impl>(max_disk_usage_ratio, node_disk_reports));
}

soft_constraint make_soft_constraint(hard_constraint constraint) {
class impl : public soft_constraint::impl {
public:
explicit impl(hard_constraint constraint)
: _hard_constraint(std::move(constraint)) {}

soft_constraint_evaluator
make_evaluator(const replicas_t& replicas) const final {
auto ev = _hard_constraint.make_evaluator(replicas);
return
[ev = std::move(ev)](const allocation_node& node) -> uint64_t {
return ev(node) ? soft_constraint::max_score : 0;
};
}

ss::sstring name() const final {
return ssx::sformat(
"soft constraint adapter of ({})", _hard_constraint);
}

const hard_constraint _hard_constraint;
};

return soft_constraint(std::make_unique<impl>(std::move(constraint)));
}

soft_constraint distinct_rack_preferred(const members_table& members) {
return distinct_labels_preferred(
rack_label.data(),
Expand Down
6 changes: 0 additions & 6 deletions src/v/cluster/scheduling/constraints.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,6 @@ namespace cluster {
class allocation_state;

static constexpr std::string_view rack_label = "rack";
/**
* make_soft_constraint adapts hard constraint to soft one by returning
* max score for nodes that matches the soft constraint and 0 for
* the ones that not
*/
soft_constraint make_soft_constraint(hard_constraint);

hard_constraint not_fully_allocated();
hard_constraint is_active();
Expand Down
Loading