Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

partition_allocator: preserve shards for replicas on original nodes #11077

Merged
merged 4 commits into from
May 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/v/cluster/members_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "cluster/members_manager.h"
#include "cluster/members_table.h"
#include "cluster/scheduling/allocation_strategy.h"
#include "cluster/scheduling/partition_allocator.h"
#include "cluster/topic_table.h"
#include "cluster/topics_frontend.h"
#include "cluster/types.h"
Expand Down
3 changes: 2 additions & 1 deletion src/v/cluster/node_isolation_watcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include "cluster/node_isolation_watcher.h"

#include "cluster/logger.h"
#include "cluster/metadata_cache.h"
#include "config/node_config.h"
#include "ssx/future-util.h"
Expand Down Expand Up @@ -101,4 +102,4 @@ ss::future<bool> node_isolation_watcher::is_node_isolated() {
co_return true;
}

} // namespace cluster
} // namespace cluster
35 changes: 18 additions & 17 deletions src/v/cluster/partition_balancer_planner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ class partition_balancer_planner::reassignable_partition {
return (_reallocated ? _reallocated->replicas() : _orig_replicas);
};

bool is_original(const model::broker_shard& replica) const {
bool is_original(model::node_id replica) const {
return !_reallocated || _reallocated->is_original(replica);
}

Expand All @@ -370,7 +370,7 @@ class partition_balancer_planner::reassignable_partition {
, _ctx(ctx) {}

bool has_changes() const {
return _reallocated && _reallocated->has_node_changes();
return _reallocated && _reallocated->has_changes();
}

allocation_constraints
Expand Down Expand Up @@ -665,7 +665,7 @@ auto partition_balancer_planner::request_context::do_with_partition(
reassignment_it->second = std::move(*reassignable._reallocated);
} else if (
reassignable._reallocated
&& reassignable._reallocated->has_node_changes()) {
&& reassignable._reallocated->has_changes()) {
_reassignments.emplace(ntp, std::move(*reassignable._reallocated));
_planned_moves_size_bytes += reassignable._size_bytes;
}
Expand Down Expand Up @@ -807,10 +807,10 @@ void partition_balancer_planner::get_node_drain_actions(
}

ctx.for_each_partition([&](partition& part) {
std::vector<model::broker_shard> to_move;
std::vector<model::node_id> to_move;
for (const auto& bs : part.replicas()) {
if (nodes.contains(bs.node_id)) {
to_move.push_back(bs);
to_move.push_back(bs.node_id);
}
}

Expand All @@ -820,11 +820,11 @@ void partition_balancer_planner::get_node_drain_actions(

part.match_variant(
[&](reassignable_partition& part) {
for (const auto& bs : to_move) {
if (part.is_original(bs)) {
for (const auto& replica : to_move) {
if (part.is_original(replica)) {
// ignore result
(void)part.move_replica(
bs.node_id,
replica,
ctx.config().hard_max_disk_usage_ratio,
reason);
}
Expand All @@ -841,7 +841,7 @@ void partition_balancer_planner::get_node_drain_actions(
}

for (const auto& r : to_move) {
if (!previous_replicas_set.contains(r.node_id)) {
if (!previous_replicas_set.contains(r)) {
// makes sense to cancel
part.request_cancel(reason);
break;
Expand Down Expand Up @@ -886,14 +886,14 @@ void partition_balancer_planner::get_rack_constraint_repair_actions(
}

ctx.with_partition(ntp, [&](partition& part) {
std::vector<model::broker_shard> to_move;
std::vector<model::node_id> to_move;
absl::flat_hash_set<model::rack_id> cur_racks;
for (const auto& bs : part.replicas()) {
auto rack = ctx.state().members().get_node_rack_id(bs.node_id);
if (rack) {
auto [it, inserted] = cur_racks.insert(*rack);
if (!inserted) {
to_move.push_back(bs);
to_move.push_back(bs.node_id);
}
}
}
Expand All @@ -910,12 +910,12 @@ void partition_balancer_planner::get_rack_constraint_repair_actions(

part.match_variant(
[&](reassignable_partition& part) {
for (const auto& bs : to_move) {
if (part.is_original(bs)) {
for (const auto& replica : to_move) {
if (part.is_original(replica)) {
// only move replicas that haven't been moved for
// other reasons
(void)part.move_replica(
bs.node_id,
replica,
ctx.config().hard_max_disk_usage_ratio,
"rack constraint repair");
}
Expand Down Expand Up @@ -982,8 +982,9 @@ void partition_balancer_planner::get_full_node_actions(request_context& ctx) {
[&](reassignable_partition& part) {
std::vector<model::node_id> replicas_on_full_nodes;
for (const auto& bs : part.replicas()) {
if (part.is_original(bs) && find_full_node(bs.node_id)) {
replicas_on_full_nodes.push_back(bs.node_id);
model::node_id replica = bs.node_id;
if (part.is_original(replica) && find_full_node(replica)) {
replicas_on_full_nodes.push_back(replica);
}
}

Expand Down Expand Up @@ -1032,7 +1033,7 @@ void partition_balancer_planner::get_full_node_actions(request_context& ctx) {
for (const auto& r : part.replicas()) {
if (
ctx.timed_out_unavailable_nodes.contains(r.node_id)
|| !part.is_original(r)) {
|| !part.is_original(r.node_id)) {
continue;
}

Expand Down
14 changes: 5 additions & 9 deletions src/v/cluster/scheduling/allocation_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -181,17 +181,13 @@ void allocation_state::remove_allocation(
}
}

result<uint32_t> allocation_state::allocate(
uint32_t allocation_state::allocate(
model::node_id id, const partition_allocation_domain domain) {
verify_shard();
if (auto it = _nodes.find(id); it != _nodes.end()) {
if (it->second->is_full()) {
return errc::invalid_node_operation;
}
return it->second->allocate(domain);
}

return errc::node_does_not_exists;
auto it = _nodes.find(id);
vassert(
it != _nodes.end(), "allocated node with id {} have to be present", id);
return it->second->allocate(domain);
}

void allocation_state::verify_shard() const {
Expand Down
5 changes: 3 additions & 2 deletions src/v/cluster/scheduling/allocation_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ class allocation_state : public ss::weakly_referencable<allocation_state> {
const underlying_t& allocation_nodes() const { return _nodes; }
int16_t available_nodes() const;

// choose a shard for a replica and add the corresponding allocation.
result<uint32_t> allocate(model::node_id id, partition_allocation_domain);
// Choose a shard for a replica and add the corresponding allocation.
// node_id is required to belong to an existing node.
uint32_t allocate(model::node_id id, partition_allocation_domain);

// Operations on state
void
Expand Down
15 changes: 3 additions & 12 deletions src/v/cluster/scheduling/allocation_strategy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,11 @@ model::node_id find_best_fit(
allocation_strategy simple_allocation_strategy() {
class impl : public allocation_strategy::impl {
public:
result<model::broker_shard> allocate_replica(
result<model::node_id> choose_node(
const std::vector<model::broker_shard>& current_replicas,
const allocation_constraints& request,
allocation_state& state,
const partition_allocation_domain domain) final {
const partition_allocation_domain) final {
const auto& nodes = state.allocation_nodes();
/**
* evaluate hard constraints
Expand All @@ -186,16 +186,7 @@ allocation_strategy simple_allocation_strategy() {
possible_nodes,
nodes);

auto it = nodes.find(best_fit);
vassert(
it != nodes.end(),
"allocated node with id {} have to be present",
best_fit);
auto core = (it->second)->allocate(domain);
return model::broker_shard{
.node_id = it->first,
.shard = core,
};
return best_fit;
}
};
return make_allocation_strategy<impl>();
Expand Down
6 changes: 3 additions & 3 deletions src/v/cluster/scheduling/allocation_strategy.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class allocation_strategy {
* Allocates single replica according to set of given allocation
* constraints in the specified domain
*/
virtual result<model::broker_shard> allocate_replica(
virtual result<model::node_id> choose_node(
const replicas_t&,
const allocation_constraints&,
allocation_state&,
Expand All @@ -38,12 +38,12 @@ class allocation_strategy {
explicit allocation_strategy(std::unique_ptr<impl> impl)
: _impl(std::move(impl)) {}

result<model::broker_shard> allocate_replica(
result<model::node_id> choose_node(
const replicas_t& current_replicas,
const allocation_constraints& ac,
allocation_state& state,
const partition_allocation_domain domain) {
return _impl->allocate_replica(current_replicas, ac, state, domain);
return _impl->choose_node(current_replicas, ac, state, domain);
}

private:
Expand Down
10 changes: 4 additions & 6 deletions src/v/cluster/scheduling/partition_allocator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -380,16 +380,14 @@ result<model::broker_shard> partition_allocator::do_allocate_replica(
}
});

auto replica = _allocation_strategy.allocate_replica(
auto node = _allocation_strategy.choose_node(
partition._replicas, effective_constraints, *_state, partition._domain);
if (!replica) {
return replica;
if (!node) {
return node.error();
}

revert.cancel();
partition.add_replica(replica.value(), prev);

return replica;
return partition.add_replica(node.value(), prev);
}

void partition_allocator::add_allocations(
Expand Down
4 changes: 4 additions & 0 deletions src/v/cluster/scheduling/partition_allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ class partition_allocator {
/// try to substitute an existing replica with a newly allocated one and add
/// it to the allocated_partition object. If the request fails,
/// allocated_partition remains unchanged.
///
/// Note: if after reallocation the replica ends up on a node from the
/// original replica set (doesn't matter if the same as `previous` or a
/// different one), its shard id is preserved.
result<model::broker_shard> reallocate_replica(
allocated_partition&, model::node_id previous, allocation_constraints);

Expand Down
Loading