Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v23.3.x] Fix some concurrent memory access problems in partition balancer #18450

Merged
merged 3 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion src/v/cluster/scheduling/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include "cluster/logger.h"
#include "cluster/scheduling/allocation_state.h"
#include "utils/exceptions.h"
#include "utils/to_string.h"

#include <fmt/ostream.h>
Expand Down Expand Up @@ -76,6 +77,9 @@ allocation_units::allocation_units(

allocation_units::~allocation_units() {
oncore_debug_verify(_oncore);
if (unlikely(!_state)) {
return;
}
for (auto& pas : _assignments) {
for (auto& replica : pas.replicas) {
_state->remove_allocation(replica, _domain);
Expand All @@ -96,6 +100,11 @@ allocated_partition::allocated_partition(

std::optional<allocated_partition::previous_replica>
allocated_partition::prepare_move(model::node_id prev_node) {
if (unlikely(!_state)) {
throw concurrent_modification_error(
"allocation_state was concurrently replaced");
}

previous_replica prev;
auto it = std::find_if(
_replicas.begin(), _replicas.end(), [prev_node](const auto& bs) {
Expand Down Expand Up @@ -149,6 +158,11 @@ allocated_partition::prepare_move(model::node_id prev_node) {

model::broker_shard allocated_partition::add_replica(
model::node_id node, const std::optional<previous_replica>& prev) {
if (unlikely(!_state)) {
throw concurrent_modification_error(
"allocation_state was concurrently replaced");
}

if (!_original_node2shard) {
_original_node2shard.emplace();
for (const auto& bs : _replicas) {
Expand Down Expand Up @@ -225,7 +239,12 @@ bool allocated_partition::is_original(model::node_id node) const {
}

errc allocated_partition::try_revert(const reallocation_step& step) {
if (!_original_node2shard || !_state) {
if (unlikely(!_state)) {
throw concurrent_modification_error(
"allocation_state was concurrently replaced");
}

if (!_original_node2shard) {
return errc::no_update_in_progress;
}

Expand Down
38 changes: 31 additions & 7 deletions src/v/cluster/topic_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,8 @@ topic_table::apply(finish_moving_partition_replicas_cmd cmd, model::offset o) {

_updates_in_progress.erase(it);

_topics_map_revision++;

on_partition_move_finish(cmd.key, cmd.value);

// notify backend about finished update
Expand Down Expand Up @@ -417,6 +419,8 @@ topic_table::apply(cancel_moving_partition_replicas_cmd cmd, model::offset o) {
current_assignment_it->replicas
= in_progress_it->second.get_previous_replicas();

_topics_map_revision++;

_pending_deltas.emplace_back(
std::move(cmd.key),
model::revision_id(o),
Expand Down Expand Up @@ -459,6 +463,11 @@ topic_table::apply(revert_cancel_partition_move_cmd cmd, model::offset o) {
co_return errc::no_update_in_progress;
}

auto p_meta_it = tp->second.partitions.find(ntp.tp.partition);
if (p_meta_it == tp->second.partitions.end()) {
co_return errc::partition_not_exists;
}

// revert replica set update
current_assignment_it->replicas
= in_progress_it->second.get_target_replicas();
Expand All @@ -469,11 +478,7 @@ topic_table::apply(revert_cancel_partition_move_cmd cmd, model::offset o) {
current_assignment_it->replicas,
};

// update partition_meta object
auto p_meta_it = tp->second.partitions.find(ntp.tp.partition);
if (p_meta_it == tp->second.partitions.end()) {
co_return errc::partition_not_exists;
}
// update partition_meta object:
// the cancellation was reverted and update went through, we must
// update replicas_revisions.
p_meta_it->second.replicas_revisions = update_replicas_revisions(
Expand All @@ -485,6 +490,8 @@ topic_table::apply(revert_cancel_partition_move_cmd cmd, model::offset o) {
/// Since the update is already finished we drop in_progress state
_updates_in_progress.erase(in_progress_it);

_topics_map_revision++;

// notify backend about finished update
_pending_deltas.emplace_back(
ntp, model::revision_id(o), topic_table_delta_type::replicas_updated);
Expand Down Expand Up @@ -664,6 +671,7 @@ topic_table::apply(set_topic_partitions_disabled_cmd cmd, model::offset o) {
}
}

_topics_map_revision++;
notify_waiters();

co_return errc::success;
Expand Down Expand Up @@ -988,6 +996,7 @@ class topic_table::snapshot_applier {
disabled_partitions_t& _disabled_partitions;
fragmented_vector<delta>& _pending_deltas;
topic_table_probe& _probe;
model::revision_id& _topics_map_revision;
model::revision_id _snap_revision;

public:
Expand All @@ -996,14 +1005,17 @@ class topic_table::snapshot_applier {
, _disabled_partitions(parent._disabled_partitions)
, _pending_deltas(parent._pending_deltas)
, _probe(parent._probe)
, _topics_map_revision(parent._topics_map_revision)
, _snap_revision(snap_revision) {}

void delete_ntp(
const model::topic_namespace& ns_tp, const partition_assignment& p_as) {
auto ntp = model::ntp(ns_tp.ns, ns_tp.tp, p_as.id);
vlog(
clusterlog.trace, "deleting ntp {} not in controller snapshot", ntp);
_updates_in_progress.erase(ntp);
if (_updates_in_progress.erase(ntp)) {
_topics_map_revision++;
};

_pending_deltas.emplace_back(
std::move(ntp), _snap_revision, topic_table_delta_type::removed);
Expand All @@ -1022,7 +1034,9 @@ class topic_table::snapshot_applier {
delete_ntp(ns_tp, p_as);
co_await ss::coroutine::maybe_yield();
}
_disabled_partitions.erase(ns_tp);
if (_disabled_partitions.erase(ns_tp)) {
_topics_map_revision++;
};
_probe.handle_topic_deletion(ns_tp);
// topic_metadata_item object is supposed to be removed from _topics by
// the caller
Expand All @@ -1037,6 +1051,9 @@ class topic_table::snapshot_applier {
vlog(clusterlog.trace, "adding ntp {} from controller snapshot", ntp);
size_t pending_deltas_start_idx = _pending_deltas.size();

// we are going to modify md_item so increment the revision right away.
_topics_map_revision++;

const model::partition_id p_id = ntp.tp.partition;

// 1. reconcile the _topics state (the md_item object) and generate
Expand Down Expand Up @@ -1169,7 +1186,9 @@ class topic_table::snapshot_applier {
topic_metadata_item ret{topic_metadata{topic.metadata, {}}};
if (topic.disabled_set) {
_disabled_partitions[ns_tp] = *topic.disabled_set;
_topics_map_revision++;
}

for (const auto& [p_id, partition] : topic.partitions) {
auto ntp = model::ntp(ns_tp.ns, ns_tp.tp, p_id);
add_ntp(ntp, topic, partition, ret, false);
Expand Down Expand Up @@ -1208,6 +1227,7 @@ ss::future<> topic_table::apply_snapshot(
// The topic was re-created, delete and add it anew.
co_await applier.delete_topic(ns_tp, md_item);
md_item = co_await applier.create_topic(ns_tp, topic_snapshot);
_topics_map_revision++;
} else {
// The topic was present in the previous set, now we need to
// reconcile individual partitions.
Expand All @@ -1225,10 +1245,12 @@ ss::future<> topic_table::apply_snapshot(
old_disabled_set = std::exchange(
_disabled_partitions[ns_tp],
*topic_snapshot.disabled_set);
_topics_map_revision++;
} else if (auto it = _disabled_partitions.find(ns_tp);
it != _disabled_partitions.end()) {
old_disabled_set = std::move(it->second);
_disabled_partitions.erase(it);
_topics_map_revision++;
}

// 2. For each partition in the new set, reconcile assignments
Expand Down Expand Up @@ -1265,6 +1287,7 @@ ss::future<> topic_table::apply_snapshot(
if (!topic_snapshot.partitions.contains(as_it_copy->id)) {
applier.delete_ntp(ns_tp, *as_it_copy);
md_item.get_assignments().erase(as_it_copy);
_topics_map_revision++;
}
co_await ss::coroutine::maybe_yield();
}
Expand Down Expand Up @@ -1642,6 +1665,7 @@ void topic_table::change_partition_replicas(
auto previous_assignment = current_assignment.replicas;
// replace partition replica set
current_assignment.replicas = new_assignment;
_topics_map_revision++;

// calculate delta for backend

Expand Down
23 changes: 12 additions & 11 deletions src/v/cluster/topic_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,21 +98,17 @@ class topic_table {
// * partition::get_revision_id()
// * raft::group_configuration::revision_id()

class concurrent_modification_error final : public std::exception {
class concurrent_modification_error final
: public ::concurrent_modification_error {
public:
concurrent_modification_error(
model::revision_id initial_revision,
model::revision_id current_revision)
: _msg(ssx::sformat(
"Topic table was modified by concurrent fiber. (initial_revision: "
"{}, current_revision: {}) ",
: ::concurrent_modification_error(ssx::sformat(
"Topic table was modified by concurrent fiber. "
"(initial_revision: {}, current_revision: {}) ",
initial_revision,
current_revision)) {}

const char* what() const noexcept final { return _msg.c_str(); }

private:
ss::sstring _msg;
};

class in_progress_update {
Expand Down Expand Up @@ -627,8 +623,13 @@ class topic_table {

updates_t _updates_in_progress;
model::revision_id _last_applied_revision_id;
// Monotonic counter that is bumped for every addition/deletion to topics
// map. Unlike other revisions this does not correspond to the command

// Monotonic counter that is bumped each time _topics, _disabled_partitions,
// or _updates_in_progress are modified in a way that makes iteration over
// them unsafe (i.e. invalidates iterators or references, including
// for nested collections like partition sets and replica sets).
//
// Unlike other revisions this does not correspond to the command
// revision that updated the map.
model::revision_id _topics_map_revision{0};

Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/topics_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -976,7 +976,7 @@ topics_frontend::partitions_with_lost_majority(
co_return errc::concurrent_modification_error;
}
co_return result;
} catch (const topic_table::concurrent_modification_error& e) {
} catch (const concurrent_modification_error& e) {
// state changed while generating the plan, force caller to retry;
vlog(
clusterlog.info,
Expand Down
33 changes: 33 additions & 0 deletions src/v/utils/exceptions.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/
#pragma once

#include "seastarx.h"

#include <seastar/core/sstring.hh>

#include <stdexcept>

/// Some objects reference state that changes comparatively rarely (e.g.
/// topic_table state) across yield points and expect these references to remain
/// valid. In case these references are invalidated by a concurrent fiber, this
/// exception is thrown. This is a signal for the caller to restart the
/// computation with up-to-date state.
class concurrent_modification_error : public std::exception {
public:
explicit concurrent_modification_error(ss::sstring s)
: _msg(std::move(s)) {}

const char* what() const noexcept override { return _msg.c_str(); }

private:
ss::sstring _msg;
};
10 changes: 5 additions & 5 deletions src/v/utils/stable_iterator_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,20 @@
#pragma once

#include "seastarx.h"
#include "utils/exceptions.h"

#include <seastar/util/noncopyable_function.hh>

#include <boost/iterator/iterator_adaptor.hpp>
#include <fmt/format.h>

#include <stdexcept>
#include <string_view>
#include <version>

class iterator_stability_violation : public std::runtime_error {
class iterator_stability_violation final
: public concurrent_modification_error {
public:
explicit iterator_stability_violation(const std::string& why)
: std::runtime_error(why){};
explicit iterator_stability_violation(ss::sstring why)
: concurrent_modification_error(std::move(why)){};
};

/*
Expand Down
Loading