Skip to content

Commit

Permalink
c/topic_table: bump _topics_map_revision in more places
Browse files Browse the repository at this point in the history
Partition balancer relies on _topics_map_revision checks to safely iterate
over topic table collections with partition granularity (i.e. references
to partition data and replica sets are stored and accessed across yield
points). To make this safe, increment _topics_map_revision every time
_topics, _updates_in_progress, _disabled_partitions or nested
collections are modified in a way that invalidates references or
iterators.
  • Loading branch information
ztlpn committed May 8, 2024
1 parent f509e0b commit 30bbfb1
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 9 deletions.
38 changes: 31 additions & 7 deletions src/v/cluster/topic_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,8 @@ topic_table::apply(finish_moving_partition_replicas_cmd cmd, model::offset o) {

_updates_in_progress.erase(it);

_topics_map_revision++;

on_partition_move_finish(cmd.key, cmd.value);

// notify backend about finished update
Expand Down Expand Up @@ -416,6 +418,8 @@ topic_table::apply(cancel_moving_partition_replicas_cmd cmd, model::offset o) {
current_assignment_it->replicas
= in_progress_it->second.get_previous_replicas();

_topics_map_revision++;

_pending_deltas.emplace_back(
std::move(cmd.key),
current_assignment_it->group,
Expand Down Expand Up @@ -459,6 +463,11 @@ topic_table::apply(revert_cancel_partition_move_cmd cmd, model::offset o) {
co_return errc::no_update_in_progress;
}

auto p_meta_it = tp->second.partitions.find(ntp.tp.partition);
if (p_meta_it == tp->second.partitions.end()) {
co_return errc::partition_not_exists;
}

// revert replica set update
current_assignment_it->replicas
= in_progress_it->second.get_target_replicas();
Expand All @@ -469,11 +478,7 @@ topic_table::apply(revert_cancel_partition_move_cmd cmd, model::offset o) {
current_assignment_it->replicas,
};

// update partition_meta object
auto p_meta_it = tp->second.partitions.find(ntp.tp.partition);
if (p_meta_it == tp->second.partitions.end()) {
co_return errc::partition_not_exists;
}
// update partition_meta object:
// the cancellation was reverted and update went through, we must
// update replicas_revisions.
p_meta_it->second.replicas_revisions = update_replicas_revisions(
Expand All @@ -485,6 +490,8 @@ topic_table::apply(revert_cancel_partition_move_cmd cmd, model::offset o) {
/// Since the update is already finished we drop in_progress state
_updates_in_progress.erase(in_progress_it);

_topics_map_revision++;

// notify backend about finished update
_pending_deltas.emplace_back(
ntp,
Expand Down Expand Up @@ -670,6 +677,7 @@ topic_table::apply(set_topic_partitions_disabled_cmd cmd, model::offset o) {
}
}

_topics_map_revision++;
notify_waiters();

co_return errc::success;
Expand Down Expand Up @@ -998,6 +1006,7 @@ class topic_table::snapshot_applier {
disabled_partitions_t& _disabled_partitions;
fragmented_vector<delta>& _pending_deltas;
topic_table_probe& _probe;
model::revision_id& _topics_map_revision;
model::revision_id _snap_revision;

public:
Expand All @@ -1006,14 +1015,17 @@ class topic_table::snapshot_applier {
, _disabled_partitions(parent._disabled_partitions)
, _pending_deltas(parent._pending_deltas)
, _probe(parent._probe)
, _topics_map_revision(parent._topics_map_revision)
, _snap_revision(snap_revision) {}

void delete_ntp(
const model::topic_namespace& ns_tp, const partition_assignment& p_as) {
auto ntp = model::ntp(ns_tp.ns, ns_tp.tp, p_as.id);
vlog(
clusterlog.trace, "deleting ntp {} not in controller snapshot", ntp);
_updates_in_progress.erase(ntp);
if (_updates_in_progress.erase(ntp)) {
_topics_map_revision++;
};

_pending_deltas.emplace_back(
std::move(ntp),
Expand All @@ -1035,7 +1047,9 @@ class topic_table::snapshot_applier {
delete_ntp(ns_tp, p_as);
co_await ss::coroutine::maybe_yield();
}
_disabled_partitions.erase(ns_tp);
if (_disabled_partitions.erase(ns_tp)) {
_topics_map_revision++;
};
_probe.handle_topic_deletion(ns_tp);
// topic_metadata_item object is supposed to be removed from _topics by
// the caller
Expand All @@ -1050,6 +1064,9 @@ class topic_table::snapshot_applier {
vlog(clusterlog.trace, "adding ntp {} from controller snapshot", ntp);
size_t pending_deltas_start_idx = _pending_deltas.size();

// we are going to modify md_item so increment the revision right away.
_topics_map_revision++;

const model::partition_id p_id = ntp.tp.partition;

// 1. reconcile the _topics state (the md_item object) and generate
Expand Down Expand Up @@ -1191,7 +1208,9 @@ class topic_table::snapshot_applier {
topic_metadata_item ret{topic_metadata{topic.metadata, {}}};
if (topic.disabled_set) {
_disabled_partitions[ns_tp] = *topic.disabled_set;
_topics_map_revision++;
}

for (const auto& [p_id, partition] : topic.partitions) {
auto ntp = model::ntp(ns_tp.ns, ns_tp.tp, p_id);
add_ntp(ntp, topic, partition, ret, false);
Expand Down Expand Up @@ -1230,6 +1249,7 @@ ss::future<> topic_table::apply_snapshot(
// The topic was re-created, delete and add it anew.
co_await applier.delete_topic(ns_tp, md_item);
md_item = co_await applier.create_topic(ns_tp, topic_snapshot);
_topics_map_revision++;
} else {
// The topic was present in the previous set, now we need to
// reconcile individual partitions.
Expand All @@ -1247,10 +1267,12 @@ ss::future<> topic_table::apply_snapshot(
old_disabled_set = std::exchange(
_disabled_partitions[ns_tp],
*topic_snapshot.disabled_set);
_topics_map_revision++;
} else if (auto it = _disabled_partitions.find(ns_tp);
it != _disabled_partitions.end()) {
old_disabled_set = std::move(it->second);
_disabled_partitions.erase(it);
_topics_map_revision++;
}

// 2. For each partition in the new set, reconcile assignments
Expand Down Expand Up @@ -1288,6 +1310,7 @@ ss::future<> topic_table::apply_snapshot(
if (!topic_snapshot.partitions.contains(as_it_copy->id)) {
applier.delete_ntp(ns_tp, *as_it_copy);
md_item.get_assignments().erase(as_it_copy);
_topics_map_revision++;
}
co_await ss::coroutine::maybe_yield();
}
Expand Down Expand Up @@ -1633,6 +1656,7 @@ void topic_table::change_partition_replicas(
auto previous_assignment = current_assignment.replicas;
// replace partition replica set
current_assignment.replicas = new_assignment;
_topics_map_revision++;

// calculate delta for backend

Expand Down
9 changes: 7 additions & 2 deletions src/v/cluster/topic_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -668,8 +668,13 @@ class topic_table {

updates_t _updates_in_progress;
model::revision_id _last_applied_revision_id;
// Monotonic counter that is bumped for every addition/deletion to topics
// map. Unlike other revisions this does not correspond to the command

// Monotonic counter that is bumped each time _topics, _disabled_partitions,
// or _updates_in_progress are modified in a way that makes iteration over
// them unsafe (i.e. invalidates iterators or references, including
// for nested collections like partition sets and replica sets).
//
// Unlike other revisions this does not correspond to the command
// revision that updated the map.
model::revision_id _topics_map_revision{0};

Expand Down

0 comments on commit 30bbfb1

Please sign in to comment.