Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

c/partition_allocator: maintain and use final partition counts #11154

Merged
merged 5 commits into from
Jun 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 13 additions & 65 deletions src/v/cluster/members_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
namespace cluster {
namespace {
struct node_replicas {
size_t allocated_replicas;
size_t final_count;
size_t max_capacity;
};

Expand All @@ -40,15 +40,13 @@ calculate_replicas_per_node(
if (!n->is_active()) {
continue;
}
auto [it, _] = ret.try_emplace(
auto [it, inserted] = ret.try_emplace(
id,
node_replicas{
.allocated_replicas = 0,
.max_capacity = n->domain_partition_capacity(domain),
.final_count = n->domain_final_partitions(domain),
.max_capacity = n->max_capacity(),
});

const auto domain_allocated = n->domain_allocated_partitions(domain);
it->second.allocated_replicas += domain_allocated;
vassert(inserted, "node {} inserted twice", id);
}
return ret;
}
Expand All @@ -57,7 +55,7 @@ static size_t
calculate_total_replicas(const node_replicas_map_t& node_replicas) {
size_t total_replicas = 0;
for (auto& [_, replicas] : node_replicas) {
total_replicas += replicas.allocated_replicas;
total_replicas += replicas.final_count;
}
return total_replicas;
}
Expand Down Expand Up @@ -137,64 +135,16 @@ void reassign_replicas(
*
**/
double calculate_unevenness_error(
const partition_allocator& allocator,
const members_backend::update_meta& update,
const topic_table& topics,
partition_allocation_domain domain) {
const partition_allocator& allocator, partition_allocation_domain domain) {
static const std::vector<partition_allocation_domain> domains{
partition_allocation_domains::consumer_offsets,
partition_allocation_domains::common};

const auto node_cnt = allocator.state().available_nodes();

auto node_replicas = calculate_replicas_per_node(allocator, domain);
/**
* adjust per node replicas with the replicas that are going to be removed
* from the node after successful reallocation
* based on the state of reallocation the following adjustments are made:
*
* reallocation_state::initial - no adjustment required
* reallocation_state::reassigned - allocator already updated, adjusting
* reallocation_state::requested - allocator already updated, adjusting
* reallocation_state::finished - no adjustment required
*
* Do not need to care about the cancel related state here as no
* cancellations are requested when node is added to the cluster.
*/

for (const auto& [ntp, r] : update.partition_reallocations) {
using state = members_backend::reallocation_state;
/**
* In the initial or finished state the adjustment doesn't have
* to be taken into account as partition balancer is already updated.
*/
if (
r.state == state::initial || r.state == state::finished
|| r.state == state::cancelled || r.state == state::request_cancel) {
continue;
}
/**
* if a partition move was already requested it might have already been
* finished, consult topic table to check if the update is still in
* progress. If no move is in progress the adjustment must be skipped as
* allocator state is already up to date. Reallocation will be marked as
* finished in reconciliation loop pass.
*/
if (r.state == state::requested && !topics.is_update_in_progress(ntp)) {
continue;
}

if (get_allocation_domain(ntp) == domain) {
for (const auto& to_remove : r.replicas_to_remove) {
auto it = node_replicas.find(to_remove);
if (it != node_replicas.end()) {
it->second.allocated_replicas--;
}
}
}
}
const auto total_replicas = calculate_total_replicas(node_replicas);

if (total_replicas == 0) {
return 0.0;
}
Expand All @@ -213,14 +163,14 @@ double calculate_unevenness_error(
double err = 0;
for (auto& [id, allocation_info] : node_replicas) {
double diff = static_cast<double>(target_replicas_per_node)
- static_cast<double>(allocation_info.allocated_replicas);
- static_cast<double>(allocation_info.final_count);

vlog(
clusterlog.trace,
"node {} has {} replicas allocated in domain {}, requested replicas "
"per node {}, difference: {}",
id,
allocation_info.allocated_replicas,
allocation_info.final_count,
domain,
target_replicas_per_node,
diff);
Expand Down Expand Up @@ -485,8 +435,7 @@ void members_backend::default_reallocation_strategy::
}
calculate_reallocations_batch(
max_batch_size, allocator, topics, meta, domain);
auto current_error = calculate_unevenness_error(
allocator, meta, topics, domain);
auto current_error = calculate_unevenness_error(allocator, domain);
auto [it, _] = meta.last_unevenness_error.try_emplace(domain, 1.0);

auto improvement = it->second - current_error;
Expand Down Expand Up @@ -564,9 +513,8 @@ void members_backend::default_reallocation_strategy::
absl::flat_hash_map<model::node_id, size_t> to_move_from_node;

for (auto& [id, info] : node_replicas) {
auto to_move = info.allocated_replicas
- std::min(
target_replicas_per_node, info.allocated_replicas);
auto to_move = info.final_count
- std::min(target_replicas_per_node, info.final_count);
if (to_move > 0) {
to_move_from_node.emplace(id, to_move);
}
Expand All @@ -582,7 +530,7 @@ void members_backend::default_reallocation_strategy::
cnt,
id,
domain,
node_replicas[id].allocated_replicas);
node_replicas[id].final_count);
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/scheduling/allocation_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ allocation_node::allocate(const partition_allocation_domain domain) {
(*it)++; // increment the weights
_allocated_partitions++;
++_allocated_domain_partitions[domain];
_final_partitions++;
++_final_domain_partitions[domain];
const ss::shard_id core = std::distance(_weights.begin(), it);
vlog(
clusterlog.trace,
Expand Down
17 changes: 17 additions & 0 deletions src/v/cluster/scheduling/allocation_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ class allocation_node {
return _allocated_partitions;
}

// number of partitions after all in-progress movements are finished
allocation_capacity final_partitions() const { return _final_partitions; }

allocation_capacity
domain_allocated_partitions(partition_allocation_domain domain) const {
if (auto it = _allocated_domain_partitions.find(domain);
Expand All @@ -108,6 +111,15 @@ class allocation_node {
return allocation_capacity{0};
}

allocation_capacity
domain_final_partitions(partition_allocation_domain domain) const {
if (auto it = _final_domain_partitions.find(domain);
it != _final_domain_partitions.end()) {
return it->second;
}
return allocation_capacity{0};
}

bool empty() const {
return _allocated_partitions == allocation_capacity{0};
}
Expand All @@ -129,6 +141,11 @@ class allocation_node {
// number of partitions allocated in a specific allocation domain
absl::flat_hash_map<partition_allocation_domain, allocation_capacity>
_allocated_domain_partitions;
// number of partitions after all movements are finished
allocation_capacity _final_partitions{0};
absl::flat_hash_map<partition_allocation_domain, allocation_capacity>
_final_domain_partitions;

state _state = state::active;

config::binding<uint32_t> _partitions_per_shard;
Expand Down
21 changes: 21 additions & 0 deletions src/v/cluster/scheduling/allocation_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ void allocation_state::rollback(
for (auto& as : v) {
for (auto& bs : as.replicas) {
remove_allocation(bs, domain);
remove_final_count(bs, domain);
}
// rollback for each assignment as the groups are distinct
_highest_group = raft::group_id(_highest_group() - 1);
Expand Down Expand Up @@ -190,6 +191,26 @@ uint32_t allocation_state::allocate(
return it->second->allocate(domain);
}

void allocation_state::add_final_count(
const model::broker_shard& replica,
const partition_allocation_domain domain) {
verify_shard();
if (auto it = _nodes.find(replica.node_id); it != _nodes.end()) {
++it->second->_final_partitions;
++it->second->_final_domain_partitions[domain];
}
}

void allocation_state::remove_final_count(
const model::broker_shard& replica,
const partition_allocation_domain domain) {
verify_shard();
if (auto it = _nodes.find(replica.node_id); it != _nodes.end()) {
--it->second->_final_partitions;
--it->second->_final_domain_partitions[domain];
}
}

void allocation_state::verify_shard() const {
/* This is a consistency check on the use of the allocation state:
* it checks that the caller is on the same shard the state was originally
Expand Down
4 changes: 4 additions & 0 deletions src/v/cluster/scheduling/allocation_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ class allocation_state : public ss::weakly_referencable<allocation_state> {
add_allocation(const model::broker_shard&, partition_allocation_domain);
void
remove_allocation(const model::broker_shard&, partition_allocation_domain);
void
add_final_count(const model::broker_shard&, partition_allocation_domain);
void
remove_final_count(const model::broker_shard&, partition_allocation_domain);

void rollback(
const ss::chunked_fifo<partition_assignment>& pa,
Expand Down
16 changes: 11 additions & 5 deletions src/v/cluster/scheduling/constraints.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,15 +219,18 @@ hard_constraint disk_not_overflowed_by_partition(
max_disk_usage_ratio, partition_size, node_disk_reports));
}

soft_constraint least_allocated() {
soft_constraint max_final_capacity() {
class impl : public soft_constraint::impl {
public:
soft_constraint_evaluator
make_evaluator(const replicas_t&) const final {
return [](const allocation_node& node) {
// we return 0 for fully allocated node and 10'000'000 for nodes
// with maximum capacity available
return (soft_constraint::max_score * node.partition_capacity())
auto final_capacity
= node.max_capacity()
- std::min(node.max_capacity(), node.final_partitions());
return (soft_constraint::max_score * final_capacity)
/ node.max_capacity();
};
}
Expand All @@ -239,16 +242,19 @@ soft_constraint least_allocated() {
}

soft_constraint
least_allocated_in_domain(const partition_allocation_domain domain) {
max_final_capacity_in_domain(const partition_allocation_domain domain) {
struct impl : soft_constraint::impl {
explicit impl(partition_allocation_domain domain_)
: domain(domain_) {}

soft_constraint_evaluator
make_evaluator(const replicas_t&) const final {
return [this](const allocation_node& node) {
return (soft_constraint::max_score
* node.domain_partition_capacity(domain))
auto final_capacity = node.max_capacity()
- std::min(
node.max_capacity(),
node.domain_final_partitions(domain));
return (soft_constraint::max_score * final_capacity)
/ node.max_capacity();
};
}
Expand Down
14 changes: 7 additions & 7 deletions src/v/cluster/scheduling/constraints.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,18 @@ hard_constraint disk_not_overflowed_by_partition(
node_disk_reports);

/*
* scores nodes based on free overall allocation capacity left
* scores nodes based on partition count after all moves have been finished
* returning `0` for fully allocated nodes and `max_capacity` for empty nodes
*/
soft_constraint least_allocated();
soft_constraint max_final_capacity();

/*
* scores nodes based on allocation capacity used by priority partitions
* returning `0` for nodes fully allocated for priority partitions
* and `max_capacity` for nodes without any priority partitions
* non-priority partition allocations are ignored
* scores nodes based on partition counts of priority partitions after all moves
* have been finished, returning `0` for nodes fully allocated for priority
* partitions and `max_capacity` for nodes without any priority partitions.
* non-priority partition allocations are ignored.
*/
soft_constraint least_allocated_in_domain(partition_allocation_domain);
soft_constraint max_final_capacity_in_domain(partition_allocation_domain);

/*
* constraint scores nodes on free disk space
Expand Down
40 changes: 38 additions & 2 deletions src/v/cluster/scheduling/partition_allocator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ allocation_constraints partition_allocator::default_constraints(
req.add(is_active());

if (domain == partition_allocation_domains::common) {
req.add(least_allocated());
req.add(max_final_capacity());
} else {
req.add(least_allocated_in_domain(domain));
req.add(max_final_capacity_in_domain(domain));
}
if (_enable_rack_awareness()) {
req.add(distinct_rack_preferred(_members.local()));
Expand Down Expand Up @@ -406,6 +406,22 @@ void partition_allocator::remove_allocations(
}
}

void partition_allocator::add_final_counts(
const std::vector<model::broker_shard>& to_add,
const partition_allocation_domain domain) {
for (const auto& bs : to_add) {
_state->add_final_count(bs, domain);
}
}

void partition_allocator::remove_final_counts(
const std::vector<model::broker_shard>& to_remove,
const partition_allocation_domain domain) {
for (const auto& bs : to_remove) {
_state->remove_final_count(bs, domain);
}
}

ss::future<>
partition_allocator::apply_snapshot(const controller_snapshot& snap) {
auto new_state = std::make_unique<allocation_state>(
Expand Down Expand Up @@ -439,6 +455,8 @@ partition_allocator::apply_snapshot(const controller_snapshot& snap) {
new_state->add_allocation(bs, domain);
}

const std::vector<model::broker_shard>* final_replicas = nullptr;

if (auto it = topic.updates.find(p_id); it != topic.updates.end()) {
const auto& update = it->second;
// Both old and new replicas contribute to allocator weights
Expand All @@ -448,6 +466,24 @@ partition_allocator::apply_snapshot(const controller_snapshot& snap) {
for (const auto& bs : additional_replicas) {
new_state->add_allocation(bs, domain);
}

// final counts depend on the update state
switch (update.state) {
case reconfiguration_state::in_progress:
case reconfiguration_state::force_update:
final_replicas = &update.target_assignment;
break;
case reconfiguration_state::cancelled:
case reconfiguration_state::force_cancelled:
final_replicas = &partition.replicas;
break;
}
} else {
final_replicas = &partition.replicas;
}

for (const auto& bs : *final_replicas) {
new_state->add_final_count(bs, domain);
}

co_await ss::coroutine::maybe_yield();
Expand Down
Loading